from __future__ import annotations import itertools import json import multiprocessing as mp import sys from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import asdict, dataclass from pathlib import Path from statistics import median import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy29.backtest.metrics import max_drawdown, sharpe_ratio from strategy29.backtest.window_analysis import slice_bundle from strategy32.research.adverse_regime import AdverseRegimeResearchHarness from strategy32.research.hybrid_regime import ( _build_positive_filter_plan, _curve_returns, _run_adverse_component_curve, _run_static_component_curve, load_fixed66_bundle, ) from strategy32.scripts.run_regime_filter_analysis import STRATEGIC_REGIME_PROFILES, build_strategic_regime_frame OUT_JSON = Path("/tmp/strategy32_hybrid_strategy_search.json") WINDOWS = ( (365, "1y"), (730, "2y"), (1095, "3y"), (1460, "4y"), (1825, "5y"), ) YEAR_PERIODS = ( ("2021", pd.Timestamp("2021-03-16 04:00:00+00:00"), pd.Timestamp("2022-01-01 00:00:00+00:00")), ("2022", pd.Timestamp("2022-01-01 00:00:00+00:00"), pd.Timestamp("2023-01-01 00:00:00+00:00")), ("2023", pd.Timestamp("2023-01-01 00:00:00+00:00"), pd.Timestamp("2024-01-01 00:00:00+00:00")), ("2024", pd.Timestamp("2024-01-01 00:00:00+00:00"), pd.Timestamp("2025-01-01 00:00:00+00:00")), ("2025", pd.Timestamp("2025-01-01 00:00:00+00:00"), pd.Timestamp("2026-01-01 00:00:00+00:00")), ) EXPANSION_MODES = { "prev_static": {"filter_name": "prev_balanced", "guarded": False}, "guarded_static": {"filter_name": "guarded_positive", "guarded": False}, "guarded_switch": {"filter_name": "prev_balanced", "guarded": True}, "overheat_static": {"filter_name": "overheat_tolerant", "guarded": False}, } EUPHORIA_MODES = { "overheat_static": {"filter_name": "overheat_tolerant", "guarded": False}, "guarded_static": {"filter_name": "guarded_euphoria", "guarded": False}, "guarded_switch": {"filter_name": "overheat_tolerant", "guarded": True}, "prev_static": {"filter_name": "prev_balanced", "guarded": False}, } CAP_ENGINES = ("cap_cash", "cap_btc_rebound") CHOP_ENGINES = ("chop_cash", "chop_inverse_carry", "chop_inverse_carry_strict") DIST_ENGINES = ("dist_cash", "dist_inverse_carry_strict") @dataclass(frozen=True, slots=True) class HybridCandidate: regime_profile: str expansion_mode: str euphoria_mode: str cap_engine: str chop_engine: str dist_engine: str @property def name(self) -> str: return ( f"{self.regime_profile}" f"|exp:{self.expansion_mode}" f"|eup:{self.euphoria_mode}" f"|cap:{self.cap_engine}" f"|chop:{self.chop_engine}" f"|dist:{self.dist_engine}" ) def _annualized_return(total_return: float, days: int) -> float: if days <= 0: return 0.0 return (1.0 + total_return) ** (365.0 / days) - 1.0 def _segment_curve(curve: pd.Series, start: pd.Timestamp, end: pd.Timestamp) -> pd.Series: segment = curve.loc[(curve.index >= start) & (curve.index <= end)].copy() if segment.empty: return segment base = float(segment.iloc[0]) if base <= 0: return pd.Series(dtype=float) return segment / base * 1000.0 def _segment_metrics(curve: pd.Series, start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float]: segment = _segment_curve(curve, start, end) if len(segment) < 2: return { "start": str(start), "end": str(end), "total_return": 0.0, "annualized_return": 0.0, "sharpe": 0.0, "max_drawdown": 0.0, } total_return = float(segment.iloc[-1] / segment.iloc[0] - 1.0) days = max(int((end - start) / pd.Timedelta(days=1)), 1) return { "start": str(start), "end": str(end), "total_return": total_return, "annualized_return": _annualized_return(total_return, days), "sharpe": sharpe_ratio(segment, 6), "max_drawdown": max_drawdown(segment), } def _score_candidate(window_results: dict[str, dict[str, float]], year_results: dict[str, dict[str, float]]) -> tuple[float, int, int]: year_returns = [float(metrics["total_return"]) for metrics in year_results.values()] negative_years = sum(ret < 0 for ret in year_returns) mdd_violations = sum(float(metrics["max_drawdown"]) < -0.20 for metrics in window_results.values()) score = 0.0 score += 4.0 * float(window_results["5y"]["annualized_return"]) score += 2.2 * float(window_results["1y"]["annualized_return"]) score += 1.5 * float(window_results["2y"]["annualized_return"]) score += 1.2 * float(window_results["4y"]["annualized_return"]) score += 0.8 * float(window_results["3y"]["annualized_return"]) score += 1.5 * float(window_results["5y"]["sharpe"]) score += 0.8 * float(window_results["1y"]["sharpe"]) score += 2.0 * min(year_returns) score += 1.0 * median(year_returns) score += 0.75 * sum(max(ret, 0.0) for ret in year_returns) score -= 3.0 * negative_years score -= 0.75 * mdd_violations for label in ("1y", "2y", "3y", "4y", "5y"): dd = abs(float(window_results[label]["max_drawdown"])) score -= max(0.0, dd - 0.20) * 4.0 return score, negative_years, mdd_violations def _compose_full_curve( *, latest_bar: pd.Timestamp, timestamps: list[pd.Timestamp], regime_map: dict[pd.Timestamp, str], component_returns: dict[str, pd.Series], candidate: HybridCandidate, ) -> pd.Series: equity = 1000.0 idx = [timestamps[0]] vals = [equity] for i in range(1, len(timestamps)): signal_ts = timestamps[i - 1] execution_ts = timestamps[i] regime = regime_map.get(signal_ts, "") if regime == "MOMENTUM_EXPANSION": key = f"MOMENTUM_EXPANSION::{candidate.expansion_mode}" elif regime == "EUPHORIC_BREAKOUT": key = f"EUPHORIC_BREAKOUT::{candidate.euphoria_mode}" elif regime == "CAPITULATION_STRESS": key = candidate.cap_engine elif regime == "CHOPPY_ROTATION": key = candidate.chop_engine elif regime == "DISTRIBUTION_DRIFT": key = candidate.dist_engine else: key = "" ret = float(component_returns.get(key, pd.Series(dtype=float)).get(execution_ts, 0.0)) equity *= max(0.0, 1.0 + ret) idx.append(execution_ts) vals.append(equity) return pd.Series(vals, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float) def _exact_validate_candidate( *, bundle, latest_bar: pd.Timestamp, candidate: HybridCandidate, ) -> dict[str, object]: def run_period(eval_start: pd.Timestamp, eval_end: pd.Timestamp) -> pd.Series: raw_start = eval_start - pd.Timedelta(days=90) sliced = slice_bundle(bundle, raw_start, eval_end) regime_frame = build_strategic_regime_frame(sliced, eval_start, eval_end, profile=candidate.regime_profile) regime_map = dict(zip(pd.to_datetime(regime_frame["timestamp"]), regime_frame["strategic_regime"])) harness = AdverseRegimeResearchHarness(sliced, eval_end) component_returns: dict[str, pd.Series] = {} for mode_name, spec in EXPANSION_MODES.items(): filter_plan = _build_positive_filter_plan(regime_frame, "MOMENTUM_EXPANSION") if spec["guarded"] else None curve = _run_static_component_curve( sliced=sliced, latest_bar=eval_end, eval_start=eval_start, regime_map=regime_map, active_regime="MOMENTUM_EXPANSION", filter_name=str(spec["filter_name"]), filter_plan=filter_plan, ) component_returns[f"MOMENTUM_EXPANSION::{mode_name}"] = _curve_returns(curve) for mode_name, spec in EUPHORIA_MODES.items(): filter_plan = _build_positive_filter_plan(regime_frame, "EUPHORIC_BREAKOUT") if spec["guarded"] else None curve = _run_static_component_curve( sliced=sliced, latest_bar=eval_end, eval_start=eval_start, regime_map=regime_map, active_regime="EUPHORIC_BREAKOUT", filter_name=str(spec["filter_name"]), filter_plan=filter_plan, ) component_returns[f"EUPHORIC_BREAKOUT::{mode_name}"] = _curve_returns(curve) for engine_name in {candidate.cap_engine, candidate.chop_engine, candidate.dist_engine}: curve = _run_adverse_component_curve( eval_start=eval_start, engine_name=engine_name, harness=harness, regime_frame=regime_frame, ) component_returns[engine_name] = _curve_returns(curve) timestamps = sorted(sliced.prices["BTC"]["timestamp"].loc[sliced.prices["BTC"]["timestamp"] >= eval_start].tolist()) return _compose_full_curve( latest_bar=eval_end, timestamps=timestamps, regime_map=regime_map, component_returns=component_returns, candidate=candidate, ) window_results: dict[str, dict[str, float]] = {} for days, label in WINDOWS: eval_end = latest_bar eval_start = eval_end - pd.Timedelta(days=days) curve = run_period(eval_start, eval_end) window_results[label] = _segment_metrics(curve, eval_start, eval_end) year_results: dict[str, dict[str, float]] = {} for label, start, end_exclusive in YEAR_PERIODS: eval_end = min(latest_bar, end_exclusive - pd.Timedelta(seconds=1)) curve = run_period(start, eval_end) year_results[label] = _segment_metrics(curve, start, eval_end) ytd_start = pd.Timestamp("2026-01-01 00:00:00+00:00") year_results["2026_YTD"] = _segment_metrics(run_period(ytd_start, latest_bar), ytd_start, latest_bar) score, negative_years, mdd_violations = _score_candidate(window_results, {k: v for k, v in year_results.items() if k != "2026_YTD"}) return { "candidate": asdict(candidate), "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": window_results, "years": year_results, "validation": "exact_independent_periods", } def _build_profile_cache(profile_name: str) -> tuple[str, dict[str, object]]: bundle, latest_bar = load_fixed66_bundle("/tmp/strategy32_fixed66_bundle.pkl") eval_start = latest_bar - pd.Timedelta(days=1825) raw_start = eval_start - pd.Timedelta(days=90) sliced = slice_bundle(bundle, raw_start, latest_bar) regime_frame = build_strategic_regime_frame(sliced, eval_start, latest_bar, profile=profile_name) regime_map = dict(zip(pd.to_datetime(regime_frame["timestamp"]), regime_frame["strategic_regime"])) harness = AdverseRegimeResearchHarness(sliced, latest_bar) component_returns: dict[str, pd.Series] = {} for mode_name, spec in EXPANSION_MODES.items(): filter_plan = _build_positive_filter_plan(regime_frame, "MOMENTUM_EXPANSION") if spec["guarded"] else None curve = _run_static_component_curve( sliced=sliced, latest_bar=latest_bar, eval_start=eval_start, regime_map=regime_map, active_regime="MOMENTUM_EXPANSION", filter_name=str(spec["filter_name"]), filter_plan=filter_plan, ) component_returns[f"MOMENTUM_EXPANSION::{mode_name}"] = _curve_returns(curve) for mode_name, spec in EUPHORIA_MODES.items(): filter_plan = _build_positive_filter_plan(regime_frame, "EUPHORIC_BREAKOUT") if spec["guarded"] else None curve = _run_static_component_curve( sliced=sliced, latest_bar=latest_bar, eval_start=eval_start, regime_map=regime_map, active_regime="EUPHORIC_BREAKOUT", filter_name=str(spec["filter_name"]), filter_plan=filter_plan, ) component_returns[f"EUPHORIC_BREAKOUT::{mode_name}"] = _curve_returns(curve) for engine_name in sorted(set(CAP_ENGINES) | set(CHOP_ENGINES) | set(DIST_ENGINES)): curve = _run_adverse_component_curve( eval_start=eval_start, engine_name=engine_name, harness=harness, regime_frame=regime_frame, ) component_returns[engine_name] = _curve_returns(curve) timestamps = sorted(sliced.prices["BTC"]["timestamp"].loc[sliced.prices["BTC"]["timestamp"] >= eval_start].tolist()) return profile_name, { "regime_map": regime_map, "component_returns": component_returns, "timestamps": timestamps, } def _exact_validate_candidate_worker(candidate_payload: dict[str, str]) -> dict[str, object]: bundle, latest_bar = load_fixed66_bundle("/tmp/strategy32_fixed66_bundle.pkl") return _exact_validate_candidate(bundle=bundle, latest_bar=latest_bar, candidate=HybridCandidate(**candidate_payload)) def main() -> None: bundle, latest_bar = load_fixed66_bundle("/tmp/strategy32_fixed66_bundle.pkl") profile_caches: dict[str, dict[str, object]] = {} ctx = mp.get_context("fork") with ProcessPoolExecutor(max_workers=min(3, len(STRATEGIC_REGIME_PROFILES)), mp_context=ctx) as executor: future_map = {executor.submit(_build_profile_cache, profile_name): profile_name for profile_name in STRATEGIC_REGIME_PROFILES} for future in as_completed(future_map): profile_name, cache = future.result() profile_caches[profile_name] = cache print(f"[cache] built {profile_name}", flush=True) candidates = [ HybridCandidate(*combo) for combo in itertools.product( STRATEGIC_REGIME_PROFILES.keys(), EXPANSION_MODES.keys(), EUPHORIA_MODES.keys(), CAP_ENGINES, CHOP_ENGINES, DIST_ENGINES, ) ] rows: list[dict[str, object]] = [] for idx, candidate in enumerate(candidates, start=1): cache = profile_caches[candidate.regime_profile] full_curve = _compose_full_curve( latest_bar=latest_bar, timestamps=cache["timestamps"], regime_map=cache["regime_map"], component_returns=cache["component_returns"], candidate=candidate, ) window_results = { label: _segment_metrics(full_curve, latest_bar - pd.Timedelta(days=days), latest_bar) for days, label in WINDOWS } year_results = { label: _segment_metrics(full_curve, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) for label, start, end_exclusive in YEAR_PERIODS } year_results["2026_YTD"] = _segment_metrics(full_curve, pd.Timestamp("2026-01-01 00:00:00+00:00"), latest_bar) score, negative_years, mdd_violations = _score_candidate(window_results, {k: v for k, v in year_results.items() if k != "2026_YTD"}) row = { "candidate": asdict(candidate), "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": window_results, "years": year_results, "validation": "approx_full_curve_slice", } rows.append(row) print( f"[{idx:03d}/{len(candidates)}] {candidate.name} " f"score={score:.3f} neg_years={negative_years} mdd_viol={mdd_violations} " f"1y={window_results['1y']['total_return']*100:.2f}% " f"5y_ann={window_results['5y']['annualized_return']*100:.2f}%", flush=True, ) rows.sort(key=lambda row: (int(row["negative_years"]), int(row["mdd_violations"]), -float(row["score"]))) with ProcessPoolExecutor(max_workers=min(3, len(rows[:3])), mp_context=ctx) as executor: future_map = { executor.submit(_exact_validate_candidate_worker, row["candidate"]): row["name"] for row in rows[:3] } exact_top = [] for future in as_completed(future_map): result = future.result() exact_top.append(result) print(f"[exact] validated {future_map[future]}", flush=True) exact_top.sort(key=lambda row: (int(row["negative_years"]), int(row["mdd_violations"]), -float(row["score"]))) payload = { "analysis": "strategy32_hybrid_strategy_search", "latest_completed_bar": str(latest_bar), "candidate_count": len(candidates), "regime_profiles": list(STRATEGIC_REGIME_PROFILES.keys()), "expansion_modes": list(EXPANSION_MODES.keys()), "euphoria_modes": list(EUPHORIA_MODES.keys()), "cap_engines": list(CAP_ENGINES), "chop_engines": list(CHOP_ENGINES), "dist_engines": list(DIST_ENGINES), "summary": rows[:20], "exact_top": exact_top, } OUT_JSON.write_text(json.dumps(payload, indent=2), encoding="utf-8") print("\nTop 5 approximate candidates", flush=True) for row in rows[:5]: print( row["name"], "score", round(float(row["score"]), 3), "neg_years", row["negative_years"], "mdd_viol", row["mdd_violations"], "2025", round(float(row["years"]["2025"]["total_return"]) * 100, 2), "2024", round(float(row["years"]["2024"]["total_return"]) * 100, 2), "1y", round(float(row["windows"]["1y"]["total_return"]) * 100, 2), "5y_ann", round(float(row["windows"]["5y"]["annualized_return"]) * 100, 2), ) print("\nExact top candidates", flush=True) for row in exact_top: print( HybridCandidate(**row["candidate"]).name, "score", round(float(row["score"]), 3), "neg_years", row["negative_years"], "mdd_viol", row["mdd_violations"], "2025", round(float(row["years"]["2025"]["total_return"]) * 100, 2), "2024", round(float(row["years"]["2024"]["total_return"]) * 100, 2), "1y", round(float(row["windows"]["1y"]["total_return"]) * 100, 2), "5y_ann", round(float(row["windows"]["5y"]["annualized_return"]) * 100, 2), ) print("\nwrote", OUT_JSON, flush=True) if __name__ == "__main__": main()