from __future__ import annotations import json import sys from dataclasses import asdict, dataclass from pathlib import Path import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy32.live.runtime import BEST_CASH_OVERLAY from strategy32.research.soft_router import build_cash_overlay_period_components, load_component_bundle from strategy32.scripts.run_current_cash_learned_blocker import ( CACHE_PATH, CURRENT_OVERHEAT_OVERRIDES, LearnedBlockerCandidate, _build_block_dataset, _build_regime_columns, _build_strategy_detail, _curve_from_returns, _metrics_for_curve, _ridge_predict, ) OUT_JSON = Path("/tmp/strategy32_current_cash_guarded_blocker.json") @dataclass(frozen=True, slots=True) class GuardedBlockerCandidate: blocker: LearnedBlockerCandidate guard_regime_mode: str min_core_score: float min_breadth_persist: float min_funding_persist: float max_choppy_score: float max_distribution_score: float max_panic_score: float @property def name(self) -> str: return ( f"{self.blocker.name}" f"|guard:{self.guard_regime_mode}" f"|core:{self.min_core_score:.2f}" f"|breadth:{self.min_breadth_persist:.2f}" f"|funding:{self.min_funding_persist:.2f}" f"|chop<={self.max_choppy_score:.2f}" f"|dist<={self.max_distribution_score:.2f}" f"|panic<={self.max_panic_score:.2f}" ) def _guard_active(block: pd.Series, candidate: GuardedBlockerCandidate) -> bool: regime = str(block.get("strategic_regime", "")) if candidate.guard_regime_mode == "momentum_only": regime_ok = regime == "MOMENTUM_EXPANSION" elif candidate.guard_regime_mode == "momentum_or_euphoric": regime_ok = regime in {"MOMENTUM_EXPANSION", "EUPHORIC_BREAKOUT"} else: regime_ok = False return ( regime_ok and float(block["core_score"]) >= candidate.min_core_score and float(block["breadth_persist"]) >= candidate.min_breadth_persist and float(block["funding_persist"]) >= candidate.min_funding_persist and float(block["choppy_score"]) <= candidate.max_choppy_score and float(block["distribution_score"]) <= candidate.max_distribution_score and float(block["panic_score"]) <= candidate.max_panic_score ) def _simulate_guarded_candidate( detail: pd.DataFrame, block_frame: pd.DataFrame, regime_columns: list[str], candidate: GuardedBlockerCandidate, ) -> pd.Series: rows = detail.reset_index(drop=True) features = [ "core_score", "breadth_persist", "funding_persist", "taker_persist", "volume_accel_persist", "mean_taker_imbalance", "taker_imbalance_dispersion", "positive_taker_ratio", "mean_alt_volume_accel", "positive_volume_accel_ratio", "funding_dispersion", "basis_dispersion", "alt_return_dispersion_7d", "mean_funding_acceleration", "mean_basis_trend", "panic_score", "choppy_score", "distribution_score", "cash_pct", "invested_pct", "trailing_total_21", "trailing_total_42", "trailing_core_score_21", "trailing_breadth_21", "trailing_choppy_21", *regime_columns, ] returns: list[float] = [] idx: list[pd.Timestamp] = [] blocker = candidate.blocker for block_idx, block in block_frame.iterrows(): start_idx = int(block["block_start_index"]) end_idx = int(block["block_end_index"]) bar_block = rows.iloc[start_idx : end_idx + 1] exposure_scale = 1.0 if block_idx >= blocker.train_min_blocks: train_start = max(0, block_idx - blocker.lookback_blocks) train = block_frame.iloc[train_start:block_idx] train_x = train[features].to_numpy(dtype=float) train_y = train["block_total"].to_numpy(dtype=float) test_x = block[features].to_numpy(dtype=float) pred = _ridge_predict(train_x, train_y, test_x, blocker.ridge_alpha) if pred <= blocker.prediction_threshold: exposure_scale = blocker.blocked_scale if _guard_active(block, candidate): exposure_scale = 1.0 for row in bar_block.itertuples(index=False): returns.append(float(getattr(row, "portfolio_return")) * exposure_scale) idx.append(pd.Timestamp(getattr(row, "timestamp"))) return pd.Series(returns, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float) def _candidate_space() -> list[GuardedBlockerCandidate]: candidates: list[GuardedBlockerCandidate] = [] for blocked_scale in (0.0, 0.25): blocker = LearnedBlockerCandidate( block_bars=42, train_min_blocks=12, lookback_blocks=24, ridge_alpha=1.0, prediction_threshold=-0.0025, blocked_scale=blocked_scale, ) for guard_regime_mode in ("momentum_only", "momentum_or_euphoric"): for min_core_score in (0.55, 0.60, 0.65): for min_breadth_persist in (0.45, 0.50, 0.55): for min_funding_persist in (0.50, 0.55, 0.60): for max_choppy_score in (0.35, 0.40): for max_distribution_score in (0.25, 0.30): candidates.append( GuardedBlockerCandidate( blocker=blocker, guard_regime_mode=guard_regime_mode, min_core_score=min_core_score, min_breadth_persist=min_breadth_persist, min_funding_persist=min_funding_persist, max_choppy_score=max_choppy_score, max_distribution_score=max_distribution_score, max_panic_score=0.20, ) ) return candidates def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) eval_start = latest_bar - pd.Timedelta(days=1825) print("[phase] build current baseline", flush=True) components = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) detail = _build_strategy_detail(components) regime_columns = _build_regime_columns(detail) block_frame = _build_block_dataset(detail, 42, regime_columns) baseline_curve = _curve_from_returns(detail.set_index("timestamp")["portfolio_return"]) baseline_windows, baseline_years, baseline_score, *_ = _metrics_for_curve(baseline_curve, latest_bar) top: list[dict[str, object]] = [] candidates = _candidate_space() print(f"[phase] guarded blocker search {len(candidates)} candidates", flush=True) for idx, candidate in enumerate(candidates, start=1): sim_returns = _simulate_guarded_candidate(detail, block_frame, regime_columns, candidate) curve = _curve_from_returns(sim_returns) windows, years, score, negative_years, mdd_violations = _metrics_for_curve(curve, latest_bar) payload = { "candidate": { **asdict(candidate.blocker), "guard_regime_mode": candidate.guard_regime_mode, "min_core_score": candidate.min_core_score, "min_breadth_persist": candidate.min_breadth_persist, "min_funding_persist": candidate.min_funding_persist, "max_choppy_score": candidate.max_choppy_score, "max_distribution_score": candidate.max_distribution_score, "max_panic_score": candidate.max_panic_score, }, "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": windows, "years": years, } top.append(payload) top.sort(key=lambda item: float(item["score"]), reverse=True) top = top[:10] if idx % 50 == 0 or idx == len(candidates): print(f"[search] {idx}/{len(candidates)}", flush=True) output = { "analysis": "current_cash_guarded_blocker", "latest_bar": str(latest_bar), "baseline": { "score": baseline_score, "windows": baseline_windows, "years": baseline_years, }, "top10": top, } OUT_JSON.write_text(json.dumps(output, indent=2), encoding="utf-8") print(json.dumps(top[:5], indent=2)) print(f"[saved] {OUT_JSON}", flush=True) if __name__ == "__main__": main()