from __future__ import annotations import json import sys from dataclasses import asdict, dataclass from pathlib import Path import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy32.live.runtime import BEST_CASH_OVERLAY from strategy32.research.soft_router import build_cash_overlay_period_components, load_component_bundle from strategy32.scripts.run_current_cash_learned_blocker import ( CACHE_PATH, CURRENT_OVERHEAT_OVERRIDES, LearnedBlockerCandidate, _build_block_dataset, _build_regime_columns, _build_strategy_detail, _curve_from_returns, _metrics_for_curve, _ridge_predict, ) OUT_JSON = Path("/tmp/strategy32_current_cash_conditional_blocker.json") @dataclass(frozen=True, slots=True) class ConditionalBlockerCandidate: blocker: LearnedBlockerCandidate max_trailing_total_21: float min_choppy_score: float min_distribution_score: float min_cash_pct: float @property def name(self) -> str: return ( f"{self.blocker.name}" f"|trail<={self.max_trailing_total_21:.3f}" f"|chop>={self.min_choppy_score:.2f}" f"|dist>={self.min_distribution_score:.2f}" f"|cash>={self.min_cash_pct:.2f}" ) def _conditional_active(block: pd.Series, candidate: ConditionalBlockerCandidate) -> bool: return ( float(block["trailing_total_21"]) <= candidate.max_trailing_total_21 and ( float(block["choppy_score"]) >= candidate.min_choppy_score or float(block["distribution_score"]) >= candidate.min_distribution_score or float(block["cash_pct"]) >= candidate.min_cash_pct ) ) def _simulate_conditional_candidate( detail: pd.DataFrame, block_frame: pd.DataFrame, regime_columns: list[str], candidate: ConditionalBlockerCandidate, ) -> pd.Series: rows = detail.reset_index(drop=True) features = [ "core_score", "breadth_persist", "funding_persist", "taker_persist", "volume_accel_persist", "mean_taker_imbalance", "taker_imbalance_dispersion", "positive_taker_ratio", "mean_alt_volume_accel", "positive_volume_accel_ratio", "funding_dispersion", "basis_dispersion", "alt_return_dispersion_7d", "mean_funding_acceleration", "mean_basis_trend", "panic_score", "choppy_score", "distribution_score", "cash_pct", "invested_pct", "trailing_total_21", "trailing_total_42", "trailing_core_score_21", "trailing_breadth_21", "trailing_choppy_21", *regime_columns, ] returns: list[float] = [] idx: list[pd.Timestamp] = [] blocker = candidate.blocker for block_idx, block in block_frame.iterrows(): start_idx = int(block["block_start_index"]) end_idx = int(block["block_end_index"]) bar_block = rows.iloc[start_idx : end_idx + 1] exposure_scale = 1.0 if block_idx >= blocker.train_min_blocks: train_start = max(0, block_idx - blocker.lookback_blocks) train = block_frame.iloc[train_start:block_idx] train_x = train[features].to_numpy(dtype=float) train_y = train["block_total"].to_numpy(dtype=float) test_x = block[features].to_numpy(dtype=float) pred = _ridge_predict(train_x, train_y, test_x, blocker.ridge_alpha) if pred <= blocker.prediction_threshold and _conditional_active(block, candidate): exposure_scale = blocker.blocked_scale for row in bar_block.itertuples(index=False): returns.append(float(getattr(row, "portfolio_return")) * exposure_scale) idx.append(pd.Timestamp(getattr(row, "timestamp"))) return pd.Series(returns, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float) def _candidate_space() -> list[ConditionalBlockerCandidate]: candidates: list[ConditionalBlockerCandidate] = [] for blocked_scale in (0.0, 0.25): blocker = LearnedBlockerCandidate( block_bars=42, train_min_blocks=12, lookback_blocks=24, ridge_alpha=1.0, prediction_threshold=-0.0025, blocked_scale=blocked_scale, ) for max_trailing_total_21 in (0.0, -0.01, -0.02): for min_choppy_score in (0.20, 0.30, 0.40): for min_distribution_score in (0.10, 0.20, 0.30): for min_cash_pct in (0.20, 0.40, 0.60): candidates.append( ConditionalBlockerCandidate( blocker=blocker, max_trailing_total_21=max_trailing_total_21, min_choppy_score=min_choppy_score, min_distribution_score=min_distribution_score, min_cash_pct=min_cash_pct, ) ) return candidates def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) eval_start = latest_bar - pd.Timedelta(days=1825) print("[phase] build current baseline", flush=True) components = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) detail = _build_strategy_detail(components) regime_columns = _build_regime_columns(detail) block_frame = _build_block_dataset(detail, 42, regime_columns) baseline_curve = _curve_from_returns(detail.set_index("timestamp")["portfolio_return"]) baseline_windows, baseline_years, baseline_score, *_ = _metrics_for_curve(baseline_curve, latest_bar) top: list[dict[str, object]] = [] candidates = _candidate_space() print(f"[phase] conditional blocker search {len(candidates)} candidates", flush=True) for idx, candidate in enumerate(candidates, start=1): sim_returns = _simulate_conditional_candidate(detail, block_frame, regime_columns, candidate) curve = _curve_from_returns(sim_returns) windows, years, score, negative_years, mdd_violations = _metrics_for_curve(curve, latest_bar) payload = { "candidate": { **asdict(candidate.blocker), "max_trailing_total_21": candidate.max_trailing_total_21, "min_choppy_score": candidate.min_choppy_score, "min_distribution_score": candidate.min_distribution_score, "min_cash_pct": candidate.min_cash_pct, }, "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": windows, "years": years, } top.append(payload) top.sort(key=lambda item: float(item["score"]), reverse=True) top = top[:10] if idx % 50 == 0 or idx == len(candidates): print(f"[search] {idx}/{len(candidates)}", flush=True) output = { "analysis": "current_cash_conditional_blocker", "latest_bar": str(latest_bar), "baseline": { "score": baseline_score, "windows": baseline_windows, "years": baseline_years, }, "top10": top, } OUT_JSON.write_text(json.dumps(output, indent=2), encoding="utf-8") print(json.dumps(top[:5], indent=2)) print(f"[saved] {OUT_JSON}", flush=True) if __name__ == "__main__": main()