from __future__ import annotations import json import sys from dataclasses import asdict from pathlib import Path import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy32.live.runtime import BEST_CASH_OVERLAY from strategy32.research.soft_router import ( build_cash_overlay_period_components, load_component_bundle, score_candidate, segment_metrics, ) from strategy32.scripts.run_current_cash_learned_blocker import ( CACHE_PATH, CURRENT_OVERHEAT_OVERRIDES, LearnedBlockerCandidate, _build_block_dataset, _build_regime_columns, _build_strategy_detail, _curve_from_returns, _simulate_candidate, ) from strategy32.scripts.run_current_relaxed_hybrid_experiment import WINDOWS, YEAR_PERIODS, YTD_START OUT_JSON = Path("/tmp/strategy32_current_cash_blocker_exact_plateau.json") BASELINE_JSON = Path("/tmp/strategy32_live_combo_backtest.json") def _candidate_space() -> list[LearnedBlockerCandidate]: candidates: list[LearnedBlockerCandidate] = [] for block_bars in (21, 42): for train_min_blocks in (8, 12, 18): for ridge_alpha in (0.5, 1.0, 5.0, 20.0): for prediction_threshold in (-0.0050, -0.0025, 0.0): for blocked_scale in (0.0, 0.25, 0.50): candidates.append( LearnedBlockerCandidate( block_bars=block_bars, train_min_blocks=train_min_blocks, lookback_blocks=24, ridge_alpha=ridge_alpha, prediction_threshold=prediction_threshold, blocked_scale=blocked_scale, ) ) return candidates def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) period_specs: list[tuple[str, str, pd.Timestamp, pd.Timestamp]] = [] for days, label in WINDOWS: period_specs.append(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar)) for label, start, end_exclusive in YEAR_PERIODS: period_specs.append(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1)))) period_specs.append(("year", "2026_YTD", YTD_START, latest_bar)) period_cache: dict[str, dict[str, object]] = {} for kind, label, start, end in period_specs: components = build_cash_overlay_period_components( bundle=bundle, eval_start=start, eval_end=end, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) detail = _build_strategy_detail(components) regime_columns = _build_regime_columns(detail) period_cache[label] = { "kind": kind, "start": start, "end": end, "detail": detail, "regime_columns": regime_columns, "block_frames": {}, } print(f"[periods] cached {len(period_cache)} periods", flush=True) candidates = _candidate_space() print(f"[candidates] {len(candidates)} exact candidates", flush=True) rows: list[dict[str, object]] = [] for idx, candidate in enumerate(candidates, start=1): windows: dict[str, dict[str, float]] = {} years: dict[str, dict[str, float]] = {} for label, cached in period_cache.items(): detail = cached["detail"] regime_columns = cached["regime_columns"] block_frames = cached["block_frames"] if candidate.block_bars not in block_frames: block_frames[candidate.block_bars] = _build_block_dataset(detail, candidate.block_bars, regime_columns) block_frame = block_frames[candidate.block_bars] simulated_returns = _simulate_candidate(detail, block_frame, regime_columns, candidate) curve = _curve_from_returns(simulated_returns) metrics = segment_metrics(curve, cached["start"], cached["end"]) if cached["kind"] == "window": windows[label] = metrics else: years[label] = metrics score, negative_years, mdd_violations = score_candidate( {label: windows[label] for _, label in WINDOWS}, {label: years[label] for label, _, _ in YEAR_PERIODS}, ) rows.append( { "candidate": asdict(candidate), "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": windows, "years": years, } ) if idx % 24 == 0 or idx == len(candidates): print(f"[progress] {idx}/{len(candidates)}", flush=True) rows.sort(key=lambda item: float(item["score"]), reverse=True) output = { "analysis": "current_cash_blocker_exact_plateau", "latest_bar": str(latest_bar), "all_count": len(rows), "top10": rows[:10], "baseline_exact": json.loads(BASELINE_JSON.read_text(encoding="utf-8")) if BASELINE_JSON.exists() else None, } OUT_JSON.write_text(json.dumps(output, indent=2), encoding="utf-8") print(json.dumps(output["top10"], indent=2)) print(f"[saved] {OUT_JSON}", flush=True) if __name__ == "__main__": main()