Files
strategy32/scripts/run_current_relaxed_hybrid_experiment.py

377 lines
16 KiB
Python

from __future__ import annotations
import json
import sys
from dataclasses import asdict, dataclass
from pathlib import Path
import pandas as pd
PACKAGE_PARENT = Path(__file__).resolve().parents[2]
if str(PACKAGE_PARENT) not in sys.path:
sys.path.insert(0, str(PACKAGE_PARENT))
from strategy32.live.runtime import BEST_CASH_OVERLAY, LIVE_STRATEGY_OVERRIDES
from strategy32.research.soft_router import (
build_cash_overlay_period_components,
load_component_bundle,
score_candidate,
segment_metrics,
)
CACHE_PATH = "/tmp/strategy32_fixed66_bundle.pkl"
BASELINE_PATH = Path("/tmp/strategy32_recent_core_filter_comparison.json")
OUT_JSON = Path("/tmp/strategy32_current_relaxed_hybrid_experiment.json")
WINDOWS = (
(365, "1y"),
(730, "2y"),
(1095, "3y"),
(1460, "4y"),
(1825, "5y"),
)
YEAR_PERIODS = (
("2021", pd.Timestamp("2021-03-16 04:00:00+00:00"), pd.Timestamp("2022-01-01 00:00:00+00:00")),
("2022", pd.Timestamp("2022-01-01 00:00:00+00:00"), pd.Timestamp("2023-01-01 00:00:00+00:00")),
("2023", pd.Timestamp("2023-01-01 00:00:00+00:00"), pd.Timestamp("2024-01-01 00:00:00+00:00")),
("2024", pd.Timestamp("2024-01-01 00:00:00+00:00"), pd.Timestamp("2025-01-01 00:00:00+00:00")),
("2025", pd.Timestamp("2025-01-01 00:00:00+00:00"), pd.Timestamp("2026-01-01 00:00:00+00:00")),
)
YTD_START = pd.Timestamp("2026-01-01 00:00:00+00:00")
CURRENT_OVERHEAT_OVERRIDES = {
**LIVE_STRATEGY_OVERRIDES,
"hard_filter_refresh_cadence": "1d",
"hard_filter_min_history_bars": 120,
"hard_filter_lookback_bars": 30,
"hard_filter_min_avg_dollar_volume": 50_000_000.0,
}
RELAXED_OVERHEAT_OVERRIDES = {
**LIVE_STRATEGY_OVERRIDES,
"momentum_min_score": 0.58,
"momentum_min_relative_strength": -0.03,
"momentum_min_7d_return": 0.00,
"universe_min_avg_dollar_volume": 75_000_000.0,
"hard_filter_refresh_cadence": "1d",
"hard_filter_min_history_bars": 120,
"hard_filter_lookback_bars": 30,
"hard_filter_min_avg_dollar_volume": 50_000_000.0,
}
@dataclass(frozen=True, slots=True)
class HybridSwitchCandidate:
positive_regimes: tuple[str, ...]
core_score_min: float
breadth_persist_min: float
funding_persist_min: float
panic_max: float
choppy_max: float
distribution_max: float
@property
def name(self) -> str:
regimes = ",".join(self.positive_regimes)
return (
f"regimes:{regimes}"
f"|core>={self.core_score_min:.2f}"
f"|breadth>={self.breadth_persist_min:.2f}"
f"|funding>={self.funding_persist_min:.2f}"
f"|panic<={self.panic_max:.2f}"
f"|choppy<={self.choppy_max:.2f}"
f"|dist<={self.distribution_max:.2f}"
)
def _clip01(value: float) -> float:
return min(max(float(value), 0.0), 1.0)
def _overlay_weights(candidate, score_row: dict[str, float], core_cash_pct: float) -> tuple[float, float, float]:
core_score = float(score_row.get("core_score", 0.0))
panic_score = float(score_row.get("panic_score", 0.0))
choppy_score = float(score_row.get("choppy_score", 0.0))
distribution_score = float(score_row.get("distribution_score", 0.0))
cap_signal = _clip01((panic_score - candidate.cap_threshold) / max(1.0 - candidate.cap_threshold, 1e-9))
chop_signal = _clip01((choppy_score - candidate.chop_threshold) / max(1.0 - candidate.chop_threshold, 1e-9))
dist_signal = _clip01((distribution_score - candidate.dist_threshold) / max(1.0 - candidate.dist_threshold, 1e-9))
if core_score > candidate.core_block_threshold:
chop_signal *= 0.25
dist_signal *= 0.35
cap_weight = float(core_cash_pct) * candidate.cap_cash_weight * cap_signal
chop_weight = float(core_cash_pct) * candidate.chop_cash_weight * chop_signal
dist_weight = float(core_cash_pct) * candidate.dist_cash_weight * dist_signal
overlay_total = cap_weight + chop_weight + dist_weight
if overlay_total > core_cash_pct and overlay_total > 0.0:
scale = float(core_cash_pct) / overlay_total
cap_weight *= scale
chop_weight *= scale
dist_weight *= scale
return cap_weight, chop_weight, dist_weight
def _pick_relaxed(score_row: dict[str, float], candidate: HybridSwitchCandidate) -> bool:
return (
str(score_row.get("strategic_regime")) in candidate.positive_regimes
and float(score_row.get("core_score", 0.0)) >= candidate.core_score_min
and float(score_row.get("breadth_persist", 0.0) or 0.0) >= candidate.breadth_persist_min
and float(score_row.get("funding_persist", 0.0) or 0.0) >= candidate.funding_persist_min
and float(score_row.get("panic_score", 0.0)) <= candidate.panic_max
and float(score_row.get("choppy_score", 0.0)) <= candidate.choppy_max
and float(score_row.get("distribution_score", 0.0)) <= candidate.distribution_max
)
def _compose_hybrid_curve(
*,
current_components: dict[str, object],
relaxed_components: dict[str, object],
switch_candidate: HybridSwitchCandidate,
) -> tuple[pd.Series, pd.DataFrame]:
timestamps = list(current_components["timestamps"])
score_map = current_components["score_frame"].set_index("timestamp").sort_index()
current_cash_map = current_components["core_exposure_frame"].set_index("timestamp")["cash_pct"].sort_index()
relaxed_cash_map = relaxed_components["core_exposure_frame"].set_index("timestamp")["cash_pct"].sort_index()
current_core_returns = current_components["core_returns"]
relaxed_core_returns = relaxed_components["core_returns"]
cap_returns = current_components["cap_returns"]
chop_returns = current_components["chop_returns"]
dist_returns = current_components["dist_returns"]
equity = 1000.0
idx = [timestamps[0]]
vals = [equity]
rows: list[dict[str, object]] = []
for i in range(1, len(timestamps)):
signal_ts = pd.Timestamp(timestamps[i - 1])
execution_ts = pd.Timestamp(timestamps[i])
score_row = score_map.loc[signal_ts].to_dict() if signal_ts in score_map.index else {}
use_relaxed = _pick_relaxed(score_row, switch_candidate)
active_name = "relaxed_overheat" if use_relaxed else "current_overheat"
core_returns = relaxed_core_returns if use_relaxed else current_core_returns
cash_map = relaxed_cash_map if use_relaxed else current_cash_map
core_cash_pct = float(cash_map.get(signal_ts, cash_map.iloc[-1] if not cash_map.empty else 1.0))
cap_weight, chop_weight, dist_weight = _overlay_weights(BEST_CASH_OVERLAY, score_row, core_cash_pct)
bar_ret = (
float(core_returns.get(execution_ts, 0.0))
+ cap_weight * float(cap_returns.get(execution_ts, 0.0))
+ chop_weight * float(chop_returns.get(execution_ts, 0.0))
+ dist_weight * float(dist_returns.get(execution_ts, 0.0))
)
equity *= max(0.0, 1.0 + bar_ret)
idx.append(execution_ts)
vals.append(equity)
rows.append(
{
"timestamp": execution_ts,
"active_core": active_name,
"core_cash_pct": core_cash_pct,
"core_score": float(score_row.get("core_score", 0.0)),
"panic_score": float(score_row.get("panic_score", 0.0)),
"choppy_score": float(score_row.get("choppy_score", 0.0)),
"distribution_score": float(score_row.get("distribution_score", 0.0)),
"breadth_persist": float(score_row.get("breadth_persist", 0.0) or 0.0),
"funding_persist": float(score_row.get("funding_persist", 0.0) or 0.0),
"portfolio_return": bar_ret,
}
)
curve = pd.Series(vals, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float)
return curve, pd.DataFrame(rows)
def _metrics_for_curve(curve: pd.Series, latest_bar: pd.Timestamp) -> tuple[dict[str, dict[str, float]], dict[str, dict[str, float]], float, int, int]:
windows: dict[str, dict[str, float]] = {}
for days, label in WINDOWS:
start = latest_bar - pd.Timedelta(days=days)
windows[label] = segment_metrics(curve, start, latest_bar)
years: dict[str, dict[str, float]] = {}
for label, start, end_exclusive in YEAR_PERIODS:
years[label] = segment_metrics(curve, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1)))
years["2026_YTD"] = segment_metrics(curve, YTD_START, latest_bar)
score, negative_years, mdd_violations = score_candidate(
{label: windows[label] for _, label in WINDOWS},
{label: years[label] for label, _, _ in YEAR_PERIODS},
)
return windows, years, score, negative_years, mdd_violations
def _candidate_space() -> list[HybridSwitchCandidate]:
space: list[HybridSwitchCandidate] = []
positive_sets = (
("EUPHORIC_BREAKOUT",),
("MOMENTUM_EXPANSION", "EUPHORIC_BREAKOUT"),
)
for positive_regimes in positive_sets:
for core_score_min in (0.50, 0.55, 0.60):
for breadth_persist_min in (0.50, 0.55, 0.60):
for funding_persist_min in (0.55, 0.60, 0.65):
for panic_max in (0.20, 0.30):
for choppy_max in (0.40, 0.50):
for distribution_max in (0.30, 0.40):
space.append(
HybridSwitchCandidate(
positive_regimes=positive_regimes,
core_score_min=core_score_min,
breadth_persist_min=breadth_persist_min,
funding_persist_min=funding_persist_min,
panic_max=panic_max,
choppy_max=choppy_max,
distribution_max=distribution_max,
)
)
return space
def _baseline_summary() -> dict[str, object]:
payload = json.loads(BASELINE_PATH.read_text(encoding="utf-8"))
variants = payload["variants"]
result: dict[str, object] = {}
for name in ("current_overheat", "relaxed_overheat"):
result[name] = variants[name]["results"]
return result
def _evaluate_exact_candidate(bundle, latest_bar: pd.Timestamp, switch_candidate: HybridSwitchCandidate) -> dict[str, object]:
windows: dict[str, dict[str, float]] = {}
years: dict[str, dict[str, float]] = {}
latest_rows: list[dict[str, object]] = []
periods = [
*(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar) for days, label in WINDOWS),
*(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) for label, start, end_exclusive in YEAR_PERIODS),
("year", "2026_YTD", YTD_START, latest_bar),
]
for kind, label, start, end in periods:
current = build_cash_overlay_period_components(
bundle=bundle,
eval_start=start,
eval_end=end,
profile_name=BEST_CASH_OVERLAY.regime_profile,
core_filter=BEST_CASH_OVERLAY.core_filter,
cap_engine=BEST_CASH_OVERLAY.cap_engine,
chop_engine=BEST_CASH_OVERLAY.chop_engine,
dist_engine=BEST_CASH_OVERLAY.dist_engine,
core_config_overrides=CURRENT_OVERHEAT_OVERRIDES,
)
relaxed = build_cash_overlay_period_components(
bundle=bundle,
eval_start=start,
eval_end=end,
profile_name=BEST_CASH_OVERLAY.regime_profile,
core_filter=BEST_CASH_OVERLAY.core_filter,
cap_engine=BEST_CASH_OVERLAY.cap_engine,
chop_engine=BEST_CASH_OVERLAY.chop_engine,
dist_engine=BEST_CASH_OVERLAY.dist_engine,
core_config_overrides=RELAXED_OVERHEAT_OVERRIDES,
)
curve, rows = _compose_hybrid_curve(
current_components=current,
relaxed_components=relaxed,
switch_candidate=switch_candidate,
)
metrics = segment_metrics(curve, start, end)
if kind == "window":
windows[label] = metrics
else:
years[label] = metrics
if label == "2026_YTD":
latest_rows = rows.tail(3).assign(timestamp=lambda df: df["timestamp"].astype(str)).to_dict(orient="records")
score, negative_years, mdd_violations = score_candidate(
{label: windows[label] for _, label in WINDOWS},
{label: years[label] for label, _, _ in YEAR_PERIODS},
)
return {
"candidate": asdict(switch_candidate),
"name": switch_candidate.name,
"score": score,
"negative_years": negative_years,
"mdd_violations": mdd_violations,
"windows": windows,
"years": years,
"latest_rows": latest_rows,
}
def main() -> None:
bundle, latest_bar = load_component_bundle(CACHE_PATH)
eval_start = latest_bar - pd.Timedelta(days=1825)
print("[phase] build current components", flush=True)
current_components = build_cash_overlay_period_components(
bundle=bundle,
eval_start=eval_start,
eval_end=latest_bar,
profile_name=BEST_CASH_OVERLAY.regime_profile,
core_filter=BEST_CASH_OVERLAY.core_filter,
cap_engine=BEST_CASH_OVERLAY.cap_engine,
chop_engine=BEST_CASH_OVERLAY.chop_engine,
dist_engine=BEST_CASH_OVERLAY.dist_engine,
core_config_overrides=CURRENT_OVERHEAT_OVERRIDES,
)
print("[phase] build relaxed components", flush=True)
relaxed_components = build_cash_overlay_period_components(
bundle=bundle,
eval_start=eval_start,
eval_end=latest_bar,
profile_name=BEST_CASH_OVERLAY.regime_profile,
core_filter=BEST_CASH_OVERLAY.core_filter,
cap_engine=BEST_CASH_OVERLAY.cap_engine,
chop_engine=BEST_CASH_OVERLAY.chop_engine,
dist_engine=BEST_CASH_OVERLAY.dist_engine,
core_config_overrides=RELAXED_OVERHEAT_OVERRIDES,
)
search_rows: list[dict[str, object]] = []
candidates = _candidate_space()
print("[phase] search switch candidates", flush=True)
for idx, candidate in enumerate(candidates, start=1):
curve, rows = _compose_hybrid_curve(
current_components=current_components,
relaxed_components=relaxed_components,
switch_candidate=candidate,
)
windows, years, score, negative_years, mdd_violations = _metrics_for_curve(curve, latest_bar)
relaxed_share = float((rows["active_core"] == "relaxed_overheat").mean()) if not rows.empty else 0.0
search_rows.append(
{
"candidate": asdict(candidate),
"name": candidate.name,
"score": score,
"negative_years": negative_years,
"mdd_violations": mdd_violations,
"relaxed_share": relaxed_share,
"windows": windows,
"years": years,
}
)
if idx % 36 == 0 or idx == len(candidates):
print(f"[search] {idx}/{len(candidates)}", flush=True)
search_rows.sort(key=lambda row: float(row["score"]), reverse=True)
best_search = search_rows[0]
print(f"[phase] exact best {best_search['name']}", flush=True)
best_exact = _evaluate_exact_candidate(
bundle,
latest_bar,
HybridSwitchCandidate(**best_search["candidate"]),
)
payload = {
"analysis": "current_relaxed_hybrid_experiment",
"latest_bar": str(latest_bar),
"candidate": asdict(BEST_CASH_OVERLAY),
"baselines": _baseline_summary(),
"search_top": search_rows[:5],
"best_exact": best_exact,
}
OUT_JSON.write_text(json.dumps(payload, indent=2), encoding="utf-8")
print(json.dumps(payload, indent=2))
print(f"[saved] {OUT_JSON}")
if __name__ == "__main__":
main()