Files
strategy32/scripts/run_hybrid_strategy_search.py

459 lines
18 KiB
Python

from __future__ import annotations
import itertools
import json
import multiprocessing as mp
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import asdict, dataclass
from pathlib import Path
from statistics import median
import pandas as pd
PACKAGE_PARENT = Path(__file__).resolve().parents[2]
if str(PACKAGE_PARENT) not in sys.path:
sys.path.insert(0, str(PACKAGE_PARENT))
from strategy29.backtest.metrics import max_drawdown, sharpe_ratio
from strategy29.backtest.window_analysis import slice_bundle
from strategy32.research.adverse_regime import AdverseRegimeResearchHarness
from strategy32.research.hybrid_regime import (
_build_positive_filter_plan,
_curve_returns,
_run_adverse_component_curve,
_run_static_component_curve,
load_fixed66_bundle,
)
from strategy32.scripts.run_regime_filter_analysis import STRATEGIC_REGIME_PROFILES, build_strategic_regime_frame
OUT_JSON = Path("/tmp/strategy32_hybrid_strategy_search.json")
WINDOWS = (
(365, "1y"),
(730, "2y"),
(1095, "3y"),
(1460, "4y"),
(1825, "5y"),
)
YEAR_PERIODS = (
("2021", pd.Timestamp("2021-03-16 04:00:00+00:00"), pd.Timestamp("2022-01-01 00:00:00+00:00")),
("2022", pd.Timestamp("2022-01-01 00:00:00+00:00"), pd.Timestamp("2023-01-01 00:00:00+00:00")),
("2023", pd.Timestamp("2023-01-01 00:00:00+00:00"), pd.Timestamp("2024-01-01 00:00:00+00:00")),
("2024", pd.Timestamp("2024-01-01 00:00:00+00:00"), pd.Timestamp("2025-01-01 00:00:00+00:00")),
("2025", pd.Timestamp("2025-01-01 00:00:00+00:00"), pd.Timestamp("2026-01-01 00:00:00+00:00")),
)
EXPANSION_MODES = {
"prev_static": {"filter_name": "prev_balanced", "guarded": False},
"guarded_static": {"filter_name": "guarded_positive", "guarded": False},
"guarded_switch": {"filter_name": "prev_balanced", "guarded": True},
"overheat_static": {"filter_name": "overheat_tolerant", "guarded": False},
}
EUPHORIA_MODES = {
"overheat_static": {"filter_name": "overheat_tolerant", "guarded": False},
"guarded_static": {"filter_name": "guarded_euphoria", "guarded": False},
"guarded_switch": {"filter_name": "overheat_tolerant", "guarded": True},
"prev_static": {"filter_name": "prev_balanced", "guarded": False},
}
CAP_ENGINES = ("cap_cash", "cap_btc_rebound")
CHOP_ENGINES = ("chop_cash", "chop_inverse_carry", "chop_inverse_carry_strict")
DIST_ENGINES = ("dist_cash", "dist_inverse_carry_strict")
@dataclass(frozen=True, slots=True)
class HybridCandidate:
regime_profile: str
expansion_mode: str
euphoria_mode: str
cap_engine: str
chop_engine: str
dist_engine: str
@property
def name(self) -> str:
return (
f"{self.regime_profile}"
f"|exp:{self.expansion_mode}"
f"|eup:{self.euphoria_mode}"
f"|cap:{self.cap_engine}"
f"|chop:{self.chop_engine}"
f"|dist:{self.dist_engine}"
)
def _annualized_return(total_return: float, days: int) -> float:
if days <= 0:
return 0.0
return (1.0 + total_return) ** (365.0 / days) - 1.0
def _segment_curve(curve: pd.Series, start: pd.Timestamp, end: pd.Timestamp) -> pd.Series:
segment = curve.loc[(curve.index >= start) & (curve.index <= end)].copy()
if segment.empty:
return segment
base = float(segment.iloc[0])
if base <= 0:
return pd.Series(dtype=float)
return segment / base * 1000.0
def _segment_metrics(curve: pd.Series, start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float]:
segment = _segment_curve(curve, start, end)
if len(segment) < 2:
return {
"start": str(start),
"end": str(end),
"total_return": 0.0,
"annualized_return": 0.0,
"sharpe": 0.0,
"max_drawdown": 0.0,
}
total_return = float(segment.iloc[-1] / segment.iloc[0] - 1.0)
days = max(int((end - start) / pd.Timedelta(days=1)), 1)
return {
"start": str(start),
"end": str(end),
"total_return": total_return,
"annualized_return": _annualized_return(total_return, days),
"sharpe": sharpe_ratio(segment, 6),
"max_drawdown": max_drawdown(segment),
}
def _score_candidate(window_results: dict[str, dict[str, float]], year_results: dict[str, dict[str, float]]) -> tuple[float, int, int]:
year_returns = [float(metrics["total_return"]) for metrics in year_results.values()]
negative_years = sum(ret < 0 for ret in year_returns)
mdd_violations = sum(float(metrics["max_drawdown"]) < -0.20 for metrics in window_results.values())
score = 0.0
score += 4.0 * float(window_results["5y"]["annualized_return"])
score += 2.2 * float(window_results["1y"]["annualized_return"])
score += 1.5 * float(window_results["2y"]["annualized_return"])
score += 1.2 * float(window_results["4y"]["annualized_return"])
score += 0.8 * float(window_results["3y"]["annualized_return"])
score += 1.5 * float(window_results["5y"]["sharpe"])
score += 0.8 * float(window_results["1y"]["sharpe"])
score += 2.0 * min(year_returns)
score += 1.0 * median(year_returns)
score += 0.75 * sum(max(ret, 0.0) for ret in year_returns)
score -= 3.0 * negative_years
score -= 0.75 * mdd_violations
for label in ("1y", "2y", "3y", "4y", "5y"):
dd = abs(float(window_results[label]["max_drawdown"]))
score -= max(0.0, dd - 0.20) * 4.0
return score, negative_years, mdd_violations
def _compose_full_curve(
*,
latest_bar: pd.Timestamp,
timestamps: list[pd.Timestamp],
regime_map: dict[pd.Timestamp, str],
component_returns: dict[str, pd.Series],
candidate: HybridCandidate,
) -> pd.Series:
equity = 1000.0
idx = [timestamps[0]]
vals = [equity]
for i in range(1, len(timestamps)):
signal_ts = timestamps[i - 1]
execution_ts = timestamps[i]
regime = regime_map.get(signal_ts, "")
if regime == "MOMENTUM_EXPANSION":
key = f"MOMENTUM_EXPANSION::{candidate.expansion_mode}"
elif regime == "EUPHORIC_BREAKOUT":
key = f"EUPHORIC_BREAKOUT::{candidate.euphoria_mode}"
elif regime == "CAPITULATION_STRESS":
key = candidate.cap_engine
elif regime == "CHOPPY_ROTATION":
key = candidate.chop_engine
elif regime == "DISTRIBUTION_DRIFT":
key = candidate.dist_engine
else:
key = ""
ret = float(component_returns.get(key, pd.Series(dtype=float)).get(execution_ts, 0.0))
equity *= max(0.0, 1.0 + ret)
idx.append(execution_ts)
vals.append(equity)
return pd.Series(vals, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float)
def _exact_validate_candidate(
*,
bundle,
latest_bar: pd.Timestamp,
candidate: HybridCandidate,
) -> dict[str, object]:
def run_period(eval_start: pd.Timestamp, eval_end: pd.Timestamp) -> pd.Series:
raw_start = eval_start - pd.Timedelta(days=90)
sliced = slice_bundle(bundle, raw_start, eval_end)
regime_frame = build_strategic_regime_frame(sliced, eval_start, eval_end, profile=candidate.regime_profile)
regime_map = dict(zip(pd.to_datetime(regime_frame["timestamp"]), regime_frame["strategic_regime"]))
harness = AdverseRegimeResearchHarness(sliced, eval_end)
component_returns: dict[str, pd.Series] = {}
for mode_name, spec in EXPANSION_MODES.items():
filter_plan = _build_positive_filter_plan(regime_frame, "MOMENTUM_EXPANSION") if spec["guarded"] else None
curve = _run_static_component_curve(
sliced=sliced,
latest_bar=eval_end,
eval_start=eval_start,
regime_map=regime_map,
active_regime="MOMENTUM_EXPANSION",
filter_name=str(spec["filter_name"]),
filter_plan=filter_plan,
)
component_returns[f"MOMENTUM_EXPANSION::{mode_name}"] = _curve_returns(curve)
for mode_name, spec in EUPHORIA_MODES.items():
filter_plan = _build_positive_filter_plan(regime_frame, "EUPHORIC_BREAKOUT") if spec["guarded"] else None
curve = _run_static_component_curve(
sliced=sliced,
latest_bar=eval_end,
eval_start=eval_start,
regime_map=regime_map,
active_regime="EUPHORIC_BREAKOUT",
filter_name=str(spec["filter_name"]),
filter_plan=filter_plan,
)
component_returns[f"EUPHORIC_BREAKOUT::{mode_name}"] = _curve_returns(curve)
for engine_name in {candidate.cap_engine, candidate.chop_engine, candidate.dist_engine}:
curve = _run_adverse_component_curve(
eval_start=eval_start,
engine_name=engine_name,
harness=harness,
regime_frame=regime_frame,
)
component_returns[engine_name] = _curve_returns(curve)
timestamps = sorted(sliced.prices["BTC"]["timestamp"].loc[sliced.prices["BTC"]["timestamp"] >= eval_start].tolist())
return _compose_full_curve(
latest_bar=eval_end,
timestamps=timestamps,
regime_map=regime_map,
component_returns=component_returns,
candidate=candidate,
)
window_results: dict[str, dict[str, float]] = {}
for days, label in WINDOWS:
eval_end = latest_bar
eval_start = eval_end - pd.Timedelta(days=days)
curve = run_period(eval_start, eval_end)
window_results[label] = _segment_metrics(curve, eval_start, eval_end)
year_results: dict[str, dict[str, float]] = {}
for label, start, end_exclusive in YEAR_PERIODS:
eval_end = min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))
curve = run_period(start, eval_end)
year_results[label] = _segment_metrics(curve, start, eval_end)
ytd_start = pd.Timestamp("2026-01-01 00:00:00+00:00")
year_results["2026_YTD"] = _segment_metrics(run_period(ytd_start, latest_bar), ytd_start, latest_bar)
score, negative_years, mdd_violations = _score_candidate(window_results, {k: v for k, v in year_results.items() if k != "2026_YTD"})
return {
"candidate": asdict(candidate),
"score": score,
"negative_years": negative_years,
"mdd_violations": mdd_violations,
"windows": window_results,
"years": year_results,
"validation": "exact_independent_periods",
}
def _build_profile_cache(profile_name: str) -> tuple[str, dict[str, object]]:
bundle, latest_bar = load_fixed66_bundle("/tmp/strategy32_fixed66_bundle.pkl")
eval_start = latest_bar - pd.Timedelta(days=1825)
raw_start = eval_start - pd.Timedelta(days=90)
sliced = slice_bundle(bundle, raw_start, latest_bar)
regime_frame = build_strategic_regime_frame(sliced, eval_start, latest_bar, profile=profile_name)
regime_map = dict(zip(pd.to_datetime(regime_frame["timestamp"]), regime_frame["strategic_regime"]))
harness = AdverseRegimeResearchHarness(sliced, latest_bar)
component_returns: dict[str, pd.Series] = {}
for mode_name, spec in EXPANSION_MODES.items():
filter_plan = _build_positive_filter_plan(regime_frame, "MOMENTUM_EXPANSION") if spec["guarded"] else None
curve = _run_static_component_curve(
sliced=sliced,
latest_bar=latest_bar,
eval_start=eval_start,
regime_map=regime_map,
active_regime="MOMENTUM_EXPANSION",
filter_name=str(spec["filter_name"]),
filter_plan=filter_plan,
)
component_returns[f"MOMENTUM_EXPANSION::{mode_name}"] = _curve_returns(curve)
for mode_name, spec in EUPHORIA_MODES.items():
filter_plan = _build_positive_filter_plan(regime_frame, "EUPHORIC_BREAKOUT") if spec["guarded"] else None
curve = _run_static_component_curve(
sliced=sliced,
latest_bar=latest_bar,
eval_start=eval_start,
regime_map=regime_map,
active_regime="EUPHORIC_BREAKOUT",
filter_name=str(spec["filter_name"]),
filter_plan=filter_plan,
)
component_returns[f"EUPHORIC_BREAKOUT::{mode_name}"] = _curve_returns(curve)
for engine_name in sorted(set(CAP_ENGINES) | set(CHOP_ENGINES) | set(DIST_ENGINES)):
curve = _run_adverse_component_curve(
eval_start=eval_start,
engine_name=engine_name,
harness=harness,
regime_frame=regime_frame,
)
component_returns[engine_name] = _curve_returns(curve)
timestamps = sorted(sliced.prices["BTC"]["timestamp"].loc[sliced.prices["BTC"]["timestamp"] >= eval_start].tolist())
return profile_name, {
"regime_map": regime_map,
"component_returns": component_returns,
"timestamps": timestamps,
}
def _exact_validate_candidate_worker(candidate_payload: dict[str, str]) -> dict[str, object]:
bundle, latest_bar = load_fixed66_bundle("/tmp/strategy32_fixed66_bundle.pkl")
return _exact_validate_candidate(bundle=bundle, latest_bar=latest_bar, candidate=HybridCandidate(**candidate_payload))
def main() -> None:
bundle, latest_bar = load_fixed66_bundle("/tmp/strategy32_fixed66_bundle.pkl")
profile_caches: dict[str, dict[str, object]] = {}
ctx = mp.get_context("fork")
with ProcessPoolExecutor(max_workers=min(3, len(STRATEGIC_REGIME_PROFILES)), mp_context=ctx) as executor:
future_map = {executor.submit(_build_profile_cache, profile_name): profile_name for profile_name in STRATEGIC_REGIME_PROFILES}
for future in as_completed(future_map):
profile_name, cache = future.result()
profile_caches[profile_name] = cache
print(f"[cache] built {profile_name}", flush=True)
candidates = [
HybridCandidate(*combo)
for combo in itertools.product(
STRATEGIC_REGIME_PROFILES.keys(),
EXPANSION_MODES.keys(),
EUPHORIA_MODES.keys(),
CAP_ENGINES,
CHOP_ENGINES,
DIST_ENGINES,
)
]
rows: list[dict[str, object]] = []
for idx, candidate in enumerate(candidates, start=1):
cache = profile_caches[candidate.regime_profile]
full_curve = _compose_full_curve(
latest_bar=latest_bar,
timestamps=cache["timestamps"],
regime_map=cache["regime_map"],
component_returns=cache["component_returns"],
candidate=candidate,
)
window_results = {
label: _segment_metrics(full_curve, latest_bar - pd.Timedelta(days=days), latest_bar)
for days, label in WINDOWS
}
year_results = {
label: _segment_metrics(full_curve, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1)))
for label, start, end_exclusive in YEAR_PERIODS
}
year_results["2026_YTD"] = _segment_metrics(full_curve, pd.Timestamp("2026-01-01 00:00:00+00:00"), latest_bar)
score, negative_years, mdd_violations = _score_candidate(window_results, {k: v for k, v in year_results.items() if k != "2026_YTD"})
row = {
"candidate": asdict(candidate),
"name": candidate.name,
"score": score,
"negative_years": negative_years,
"mdd_violations": mdd_violations,
"windows": window_results,
"years": year_results,
"validation": "approx_full_curve_slice",
}
rows.append(row)
print(
f"[{idx:03d}/{len(candidates)}] {candidate.name} "
f"score={score:.3f} neg_years={negative_years} mdd_viol={mdd_violations} "
f"1y={window_results['1y']['total_return']*100:.2f}% "
f"5y_ann={window_results['5y']['annualized_return']*100:.2f}%",
flush=True,
)
rows.sort(key=lambda row: (int(row["negative_years"]), int(row["mdd_violations"]), -float(row["score"])))
with ProcessPoolExecutor(max_workers=min(3, len(rows[:3])), mp_context=ctx) as executor:
future_map = {
executor.submit(_exact_validate_candidate_worker, row["candidate"]): row["name"]
for row in rows[:3]
}
exact_top = []
for future in as_completed(future_map):
result = future.result()
exact_top.append(result)
print(f"[exact] validated {future_map[future]}", flush=True)
exact_top.sort(key=lambda row: (int(row["negative_years"]), int(row["mdd_violations"]), -float(row["score"])))
payload = {
"analysis": "strategy32_hybrid_strategy_search",
"latest_completed_bar": str(latest_bar),
"candidate_count": len(candidates),
"regime_profiles": list(STRATEGIC_REGIME_PROFILES.keys()),
"expansion_modes": list(EXPANSION_MODES.keys()),
"euphoria_modes": list(EUPHORIA_MODES.keys()),
"cap_engines": list(CAP_ENGINES),
"chop_engines": list(CHOP_ENGINES),
"dist_engines": list(DIST_ENGINES),
"summary": rows[:20],
"exact_top": exact_top,
}
OUT_JSON.write_text(json.dumps(payload, indent=2), encoding="utf-8")
print("\nTop 5 approximate candidates", flush=True)
for row in rows[:5]:
print(
row["name"],
"score",
round(float(row["score"]), 3),
"neg_years",
row["negative_years"],
"mdd_viol",
row["mdd_violations"],
"2025",
round(float(row["years"]["2025"]["total_return"]) * 100, 2),
"2024",
round(float(row["years"]["2024"]["total_return"]) * 100, 2),
"1y",
round(float(row["windows"]["1y"]["total_return"]) * 100, 2),
"5y_ann",
round(float(row["windows"]["5y"]["annualized_return"]) * 100, 2),
)
print("\nExact top candidates", flush=True)
for row in exact_top:
print(
HybridCandidate(**row["candidate"]).name,
"score",
round(float(row["score"]), 3),
"neg_years",
row["negative_years"],
"mdd_viol",
row["mdd_violations"],
"2025",
round(float(row["years"]["2025"]["total_return"]) * 100, 2),
"2024",
round(float(row["years"]["2024"]["total_return"]) * 100, 2),
"1y",
round(float(row["windows"]["1y"]["total_return"]) * 100, 2),
"5y_ann",
round(float(row["windows"]["5y"]["annualized_return"]) * 100, 2),
)
print("\nwrote", OUT_JSON, flush=True)
if __name__ == "__main__":
main()