from __future__ import annotations import json import multiprocessing as mp import sys from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import asdict from pathlib import Path import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy32.research.soft_router import load_component_bundle, score_candidate, segment_metrics from strategy32.scripts.run_current_cash_learned_blocker import ( CACHE_PATH, CURRENT_OVERHEAT_OVERRIDES, LearnedBlockerCandidate, _build_block_dataset, _build_regime_columns, _build_strategy_detail, _curve_from_returns, _metrics_for_curve, _simulate_candidate, ) from strategy32.scripts.run_current_relaxed_hybrid_experiment import WINDOWS, YEAR_PERIODS, YTD_START from strategy32.live.runtime import BEST_CASH_OVERLAY from strategy32.research.soft_router import build_cash_overlay_period_components OUT_JSON = Path("/tmp/strategy32_current_cash_learned_blocker_exact.json") SOURCE_JSON = Path("/tmp/strategy32_current_cash_learned_blocker.json") BASELINE_JSON = Path("/tmp/strategy32_live_combo_backtest.json") def _load_best_candidate() -> LearnedBlockerCandidate: payload = json.loads(SOURCE_JSON.read_text(encoding="utf-8")) return LearnedBlockerCandidate(**payload["candidate"]) def _exact_period_worker( cache_path: str, candidate_payload: dict[str, object], kind: str, label: str, start_text: str, end_text: str, ) -> tuple[str, str, dict[str, float]]: bundle, _ = load_component_bundle(cache_path) candidate = LearnedBlockerCandidate(**candidate_payload) eval_start = pd.Timestamp(start_text) eval_end = pd.Timestamp(end_text) components = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=eval_end, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) detail = _build_strategy_detail(components) regime_columns = _build_regime_columns(detail) block_frame = _build_block_dataset(detail, candidate.block_bars, regime_columns) simulated_returns = _simulate_candidate(detail, block_frame, regime_columns, candidate) curve = _curve_from_returns(simulated_returns) return kind, label, segment_metrics(curve, eval_start, eval_end) def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) candidate = _load_best_candidate() period_specs: list[tuple[str, str, pd.Timestamp, pd.Timestamp]] = [] for days, label in WINDOWS: period_specs.append(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar)) for label, start, end_exclusive in YEAR_PERIODS: period_specs.append(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1)))) period_specs.append(("year", "2026_YTD", YTD_START, latest_bar)) ctx = mp.get_context("fork") window_results: dict[str, dict[str, float]] = {} year_results: dict[str, dict[str, float]] = {} with ProcessPoolExecutor(max_workers=min(6, len(period_specs)), mp_context=ctx) as executor: future_map = { executor.submit( _exact_period_worker, CACHE_PATH, candidate_payload=asdict(candidate), kind=kind, label=label, start_text=str(start), end_text=str(end), ): (kind, label) for kind, label, start, end in period_specs } for future in as_completed(future_map): kind, label, metrics = future.result() if kind == "window": window_results[label] = metrics else: year_results[label] = metrics score, negative_years, mdd_violations = score_candidate( {label: window_results[label] for _, label in WINDOWS}, {k: year_results[k] for k, _, _ in YEAR_PERIODS}, ) baseline = json.loads(BASELINE_JSON.read_text(encoding="utf-8")) if BASELINE_JSON.exists() else None output = { "analysis": "current_cash_learned_blocker_exact", "latest_bar": str(latest_bar), "candidate": asdict(candidate), "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": {label: window_results[label] for _, label in WINDOWS}, "years": year_results, "baseline_exact": baseline, } OUT_JSON.write_text(json.dumps(output, indent=2), encoding="utf-8") print(json.dumps(output, indent=2)) print(f"[saved] {OUT_JSON}") if __name__ == "__main__": main()