from __future__ import annotations import argparse import json import multiprocessing as mp import sys from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import asdict from pathlib import Path import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy29.backtest.window_analysis import slice_bundle from strategy32.research.adverse_regime import AdverseRegimeResearchHarness from strategy32.research.hybrid_regime import ( _build_positive_filter_plan, _curve_returns, _run_adverse_component_curve, _run_static_component_curve, load_fixed66_bundle, ) from strategy32.scripts.run_hybrid_strategy_search import ( CAP_ENGINES, CHOP_ENGINES, DIST_ENGINES, EUPHORIA_MODES, EXPANSION_MODES, OUT_JSON, WINDOWS, YEAR_PERIODS, HybridCandidate, _build_profile_cache, _compose_full_curve, _score_candidate, _segment_metrics, ) from strategy32.scripts.run_regime_filter_analysis import STRATEGIC_REGIME_PROFILES, build_strategic_regime_frame FAST_OUT_JSON = Path("/tmp/strategy32_hybrid_strategy_search_fast.json") YTD_START = pd.Timestamp("2026-01-01 00:00:00+00:00") def _build_candidate_rows(latest_bar: pd.Timestamp, profile_caches: dict[str, dict[str, object]]) -> list[dict[str, object]]: candidates = [ HybridCandidate(*combo) for combo in __import__("itertools").product( STRATEGIC_REGIME_PROFILES.keys(), EXPANSION_MODES.keys(), EUPHORIA_MODES.keys(), CAP_ENGINES, CHOP_ENGINES, DIST_ENGINES, ) ] rows: list[dict[str, object]] = [] for idx, candidate in enumerate(candidates, start=1): cache = profile_caches[candidate.regime_profile] full_curve = _compose_full_curve( latest_bar=latest_bar, timestamps=cache["timestamps"], regime_map=cache["regime_map"], component_returns=cache["component_returns"], candidate=candidate, ) window_results = { label: _segment_metrics(full_curve, latest_bar - pd.Timedelta(days=days), latest_bar) for days, label in WINDOWS } year_results = { label: _segment_metrics(full_curve, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) for label, start, end_exclusive in YEAR_PERIODS } year_results["2026_YTD"] = _segment_metrics(full_curve, YTD_START, latest_bar) score, negative_years, mdd_violations = _score_candidate( window_results, {k: v for k, v in year_results.items() if k != "2026_YTD"}, ) row = { "candidate": asdict(candidate), "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": window_results, "years": year_results, "validation": "approx_full_curve_slice", } rows.append(row) print( f"[approx {idx:03d}/{len(candidates)}] {candidate.name} " f"score={score:.3f} neg_years={negative_years} mdd_viol={mdd_violations} " f"1y={window_results['1y']['total_return'] * 100:.2f}% " f"5y_ann={window_results['5y']['annualized_return'] * 100:.2f}%", flush=True, ) rows.sort(key=lambda row: (int(row["negative_years"]), int(row["mdd_violations"]), -float(row["score"]))) return rows def _period_specs(latest_bar: pd.Timestamp) -> list[tuple[str, str, pd.Timestamp, pd.Timestamp]]: specs: list[tuple[str, str, pd.Timestamp, pd.Timestamp]] = [] for days, label in WINDOWS: specs.append(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar)) for label, start, end_exclusive in YEAR_PERIODS: specs.append(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1)))) specs.append(("year", "2026_YTD", YTD_START, latest_bar)) return specs def _exact_period_worker(candidate_payload: dict[str, str], period_spec: tuple[str, str, str, str]) -> tuple[str, str, dict[str, float]]: kind, label, start_text, end_text = period_spec eval_start = pd.Timestamp(start_text) eval_end = pd.Timestamp(end_text) bundle, _ = load_fixed66_bundle("/tmp/strategy32_fixed66_bundle.pkl") candidate = HybridCandidate(**candidate_payload) raw_start = eval_start - pd.Timedelta(days=90) sliced = slice_bundle(bundle, raw_start, eval_end) regime_frame = build_strategic_regime_frame(sliced, eval_start, eval_end, profile=candidate.regime_profile) regime_map = dict(zip(pd.to_datetime(regime_frame["timestamp"]), regime_frame["strategic_regime"])) harness = AdverseRegimeResearchHarness(sliced, eval_end) component_returns: dict[str, pd.Series] = {} for mode_name, spec in EXPANSION_MODES.items(): filter_plan = _build_positive_filter_plan(regime_frame, "MOMENTUM_EXPANSION") if spec["guarded"] else None curve = _run_static_component_curve( sliced=sliced, latest_bar=eval_end, eval_start=eval_start, regime_map=regime_map, active_regime="MOMENTUM_EXPANSION", filter_name=str(spec["filter_name"]), filter_plan=filter_plan, ) component_returns[f"MOMENTUM_EXPANSION::{mode_name}"] = _curve_returns(curve) for mode_name, spec in EUPHORIA_MODES.items(): filter_plan = _build_positive_filter_plan(regime_frame, "EUPHORIC_BREAKOUT") if spec["guarded"] else None curve = _run_static_component_curve( sliced=sliced, latest_bar=eval_end, eval_start=eval_start, regime_map=regime_map, active_regime="EUPHORIC_BREAKOUT", filter_name=str(spec["filter_name"]), filter_plan=filter_plan, ) component_returns[f"EUPHORIC_BREAKOUT::{mode_name}"] = _curve_returns(curve) for engine_name in sorted({candidate.cap_engine, candidate.chop_engine, candidate.dist_engine}): curve = _run_adverse_component_curve( eval_start=eval_start, engine_name=engine_name, harness=harness, regime_frame=regime_frame, ) component_returns[engine_name] = _curve_returns(curve) timestamps = sorted(sliced.prices["BTC"]["timestamp"].loc[sliced.prices["BTC"]["timestamp"] >= eval_start].tolist()) curve = _compose_full_curve( latest_bar=eval_end, timestamps=timestamps, regime_map=regime_map, component_returns=component_returns, candidate=candidate, ) return kind, label, _segment_metrics(curve, eval_start, eval_end) def _exact_validate_candidate_parallel( candidate: HybridCandidate, latest_bar: pd.Timestamp, *, max_workers: int, ) -> dict[str, object]: period_specs = [ (kind, label, str(start), str(end)) for kind, label, start, end in _period_specs(latest_bar) ] window_results: dict[str, dict[str, float]] = {} year_results: dict[str, dict[str, float]] = {} ctx = mp.get_context("fork") with ProcessPoolExecutor(max_workers=min(max_workers, len(period_specs)), mp_context=ctx) as executor: future_map = { executor.submit(_exact_period_worker, asdict(candidate), period_spec): period_spec for period_spec in period_specs } for future in as_completed(future_map): kind, label, metrics = future.result() if kind == "window": window_results[label] = metrics else: year_results[label] = metrics print( f"[exact-period] {candidate.name} {label} " f"ret={metrics['total_return'] * 100:.2f}% " f"mdd={metrics['max_drawdown'] * 100:.2f}%", flush=True, ) ordered_windows = {label: window_results[label] for _, label in WINDOWS} ordered_years = {label: year_results[label] for label, _, _ in YEAR_PERIODS} ordered_years["2026_YTD"] = year_results["2026_YTD"] score, negative_years, mdd_violations = _score_candidate( ordered_windows, {k: v for k, v in ordered_years.items() if k != "2026_YTD"}, ) return { "candidate": asdict(candidate), "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": ordered_windows, "years": ordered_years, "validation": "exact_independent_periods_parallel", } def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--top-n", type=int, default=3) parser.add_argument("--exact-workers", type=int, default=6) parser.add_argument("--out", type=str, default=str(FAST_OUT_JSON)) args = parser.parse_args() _, latest_bar = load_fixed66_bundle("/tmp/strategy32_fixed66_bundle.pkl") profile_caches: dict[str, dict[str, object]] = {} ctx = mp.get_context("fork") with ProcessPoolExecutor(max_workers=min(3, len(STRATEGIC_REGIME_PROFILES)), mp_context=ctx) as executor: future_map = { executor.submit(_build_profile_cache, profile_name): profile_name for profile_name in STRATEGIC_REGIME_PROFILES } for future in as_completed(future_map): profile_name, cache = future.result() profile_caches[profile_name] = cache print(f"[cache] built {profile_name}", flush=True) rows = _build_candidate_rows(latest_bar, profile_caches) exact_top: list[dict[str, object]] = [] for row in rows[: args.top_n]: candidate = HybridCandidate(**row["candidate"]) print(f"[exact-start] {candidate.name}", flush=True) exact_top.append( _exact_validate_candidate_parallel( candidate, latest_bar, max_workers=args.exact_workers, ) ) exact_top.sort(key=lambda item: (int(item["negative_years"]), int(item["mdd_violations"]), -float(item["score"]))) payload = { "analysis": "strategy32_hybrid_strategy_search_fast", "latest_completed_bar": str(latest_bar), "candidate_count": len(rows), "summary": rows[:20], "exact_top": exact_top, } Path(args.out).write_text(json.dumps(payload, indent=2), encoding="utf-8") print(f"[exact-done] {candidate.name}", flush=True) Path(OUT_JSON).write_text( json.dumps( { "analysis": "strategy32_hybrid_strategy_search_fast_link", "source": str(Path(args.out)), }, indent=2, ), encoding="utf-8", ) if __name__ == "__main__": main()