from __future__ import annotations import json import sys from dataclasses import asdict, dataclass from pathlib import Path import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy32.live.runtime import BEST_CASH_OVERLAY, LIVE_STRATEGY_OVERRIDES from strategy32.research.soft_router import ( build_cash_overlay_period_components, load_component_bundle, score_candidate, segment_metrics, ) CACHE_PATH = "/tmp/strategy32_fixed66_bundle.pkl" BASELINE_PATH = Path("/tmp/strategy32_recent_core_filter_comparison.json") OUT_JSON = Path("/tmp/strategy32_current_relaxed_hybrid_experiment.json") WINDOWS = ( (365, "1y"), (730, "2y"), (1095, "3y"), (1460, "4y"), (1825, "5y"), ) YEAR_PERIODS = ( ("2021", pd.Timestamp("2021-03-16 04:00:00+00:00"), pd.Timestamp("2022-01-01 00:00:00+00:00")), ("2022", pd.Timestamp("2022-01-01 00:00:00+00:00"), pd.Timestamp("2023-01-01 00:00:00+00:00")), ("2023", pd.Timestamp("2023-01-01 00:00:00+00:00"), pd.Timestamp("2024-01-01 00:00:00+00:00")), ("2024", pd.Timestamp("2024-01-01 00:00:00+00:00"), pd.Timestamp("2025-01-01 00:00:00+00:00")), ("2025", pd.Timestamp("2025-01-01 00:00:00+00:00"), pd.Timestamp("2026-01-01 00:00:00+00:00")), ) YTD_START = pd.Timestamp("2026-01-01 00:00:00+00:00") CURRENT_OVERHEAT_OVERRIDES = { **LIVE_STRATEGY_OVERRIDES, "hard_filter_refresh_cadence": "1d", "hard_filter_min_history_bars": 120, "hard_filter_lookback_bars": 30, "hard_filter_min_avg_dollar_volume": 50_000_000.0, } RELAXED_OVERHEAT_OVERRIDES = { **LIVE_STRATEGY_OVERRIDES, "momentum_min_score": 0.58, "momentum_min_relative_strength": -0.03, "momentum_min_7d_return": 0.00, "universe_min_avg_dollar_volume": 75_000_000.0, "hard_filter_refresh_cadence": "1d", "hard_filter_min_history_bars": 120, "hard_filter_lookback_bars": 30, "hard_filter_min_avg_dollar_volume": 50_000_000.0, } @dataclass(frozen=True, slots=True) class HybridSwitchCandidate: positive_regimes: tuple[str, ...] core_score_min: float breadth_persist_min: float funding_persist_min: float panic_max: float choppy_max: float distribution_max: float @property def name(self) -> str: regimes = ",".join(self.positive_regimes) return ( f"regimes:{regimes}" f"|core>={self.core_score_min:.2f}" f"|breadth>={self.breadth_persist_min:.2f}" f"|funding>={self.funding_persist_min:.2f}" f"|panic<={self.panic_max:.2f}" f"|choppy<={self.choppy_max:.2f}" f"|dist<={self.distribution_max:.2f}" ) def _clip01(value: float) -> float: return min(max(float(value), 0.0), 1.0) def _overlay_weights(candidate, score_row: dict[str, float], core_cash_pct: float) -> tuple[float, float, float]: core_score = float(score_row.get("core_score", 0.0)) panic_score = float(score_row.get("panic_score", 0.0)) choppy_score = float(score_row.get("choppy_score", 0.0)) distribution_score = float(score_row.get("distribution_score", 0.0)) cap_signal = _clip01((panic_score - candidate.cap_threshold) / max(1.0 - candidate.cap_threshold, 1e-9)) chop_signal = _clip01((choppy_score - candidate.chop_threshold) / max(1.0 - candidate.chop_threshold, 1e-9)) dist_signal = _clip01((distribution_score - candidate.dist_threshold) / max(1.0 - candidate.dist_threshold, 1e-9)) if core_score > candidate.core_block_threshold: chop_signal *= 0.25 dist_signal *= 0.35 cap_weight = float(core_cash_pct) * candidate.cap_cash_weight * cap_signal chop_weight = float(core_cash_pct) * candidate.chop_cash_weight * chop_signal dist_weight = float(core_cash_pct) * candidate.dist_cash_weight * dist_signal overlay_total = cap_weight + chop_weight + dist_weight if overlay_total > core_cash_pct and overlay_total > 0.0: scale = float(core_cash_pct) / overlay_total cap_weight *= scale chop_weight *= scale dist_weight *= scale return cap_weight, chop_weight, dist_weight def _pick_relaxed(score_row: dict[str, float], candidate: HybridSwitchCandidate) -> bool: return ( str(score_row.get("strategic_regime")) in candidate.positive_regimes and float(score_row.get("core_score", 0.0)) >= candidate.core_score_min and float(score_row.get("breadth_persist", 0.0) or 0.0) >= candidate.breadth_persist_min and float(score_row.get("funding_persist", 0.0) or 0.0) >= candidate.funding_persist_min and float(score_row.get("panic_score", 0.0)) <= candidate.panic_max and float(score_row.get("choppy_score", 0.0)) <= candidate.choppy_max and float(score_row.get("distribution_score", 0.0)) <= candidate.distribution_max ) def _compose_hybrid_curve( *, current_components: dict[str, object], relaxed_components: dict[str, object], switch_candidate: HybridSwitchCandidate, ) -> tuple[pd.Series, pd.DataFrame]: timestamps = list(current_components["timestamps"]) score_map = current_components["score_frame"].set_index("timestamp").sort_index() current_cash_map = current_components["core_exposure_frame"].set_index("timestamp")["cash_pct"].sort_index() relaxed_cash_map = relaxed_components["core_exposure_frame"].set_index("timestamp")["cash_pct"].sort_index() current_core_returns = current_components["core_returns"] relaxed_core_returns = relaxed_components["core_returns"] cap_returns = current_components["cap_returns"] chop_returns = current_components["chop_returns"] dist_returns = current_components["dist_returns"] equity = 1000.0 idx = [timestamps[0]] vals = [equity] rows: list[dict[str, object]] = [] for i in range(1, len(timestamps)): signal_ts = pd.Timestamp(timestamps[i - 1]) execution_ts = pd.Timestamp(timestamps[i]) score_row = score_map.loc[signal_ts].to_dict() if signal_ts in score_map.index else {} use_relaxed = _pick_relaxed(score_row, switch_candidate) active_name = "relaxed_overheat" if use_relaxed else "current_overheat" core_returns = relaxed_core_returns if use_relaxed else current_core_returns cash_map = relaxed_cash_map if use_relaxed else current_cash_map core_cash_pct = float(cash_map.get(signal_ts, cash_map.iloc[-1] if not cash_map.empty else 1.0)) cap_weight, chop_weight, dist_weight = _overlay_weights(BEST_CASH_OVERLAY, score_row, core_cash_pct) bar_ret = ( float(core_returns.get(execution_ts, 0.0)) + cap_weight * float(cap_returns.get(execution_ts, 0.0)) + chop_weight * float(chop_returns.get(execution_ts, 0.0)) + dist_weight * float(dist_returns.get(execution_ts, 0.0)) ) equity *= max(0.0, 1.0 + bar_ret) idx.append(execution_ts) vals.append(equity) rows.append( { "timestamp": execution_ts, "active_core": active_name, "core_cash_pct": core_cash_pct, "core_score": float(score_row.get("core_score", 0.0)), "panic_score": float(score_row.get("panic_score", 0.0)), "choppy_score": float(score_row.get("choppy_score", 0.0)), "distribution_score": float(score_row.get("distribution_score", 0.0)), "breadth_persist": float(score_row.get("breadth_persist", 0.0) or 0.0), "funding_persist": float(score_row.get("funding_persist", 0.0) or 0.0), "portfolio_return": bar_ret, } ) curve = pd.Series(vals, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float) return curve, pd.DataFrame(rows) def _metrics_for_curve(curve: pd.Series, latest_bar: pd.Timestamp) -> tuple[dict[str, dict[str, float]], dict[str, dict[str, float]], float, int, int]: windows: dict[str, dict[str, float]] = {} for days, label in WINDOWS: start = latest_bar - pd.Timedelta(days=days) windows[label] = segment_metrics(curve, start, latest_bar) years: dict[str, dict[str, float]] = {} for label, start, end_exclusive in YEAR_PERIODS: years[label] = segment_metrics(curve, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) years["2026_YTD"] = segment_metrics(curve, YTD_START, latest_bar) score, negative_years, mdd_violations = score_candidate( {label: windows[label] for _, label in WINDOWS}, {label: years[label] for label, _, _ in YEAR_PERIODS}, ) return windows, years, score, negative_years, mdd_violations def _candidate_space() -> list[HybridSwitchCandidate]: space: list[HybridSwitchCandidate] = [] positive_sets = ( ("EUPHORIC_BREAKOUT",), ("MOMENTUM_EXPANSION", "EUPHORIC_BREAKOUT"), ) for positive_regimes in positive_sets: for core_score_min in (0.50, 0.55, 0.60): for breadth_persist_min in (0.50, 0.55, 0.60): for funding_persist_min in (0.55, 0.60, 0.65): for panic_max in (0.20, 0.30): for choppy_max in (0.40, 0.50): for distribution_max in (0.30, 0.40): space.append( HybridSwitchCandidate( positive_regimes=positive_regimes, core_score_min=core_score_min, breadth_persist_min=breadth_persist_min, funding_persist_min=funding_persist_min, panic_max=panic_max, choppy_max=choppy_max, distribution_max=distribution_max, ) ) return space def _baseline_summary() -> dict[str, object]: payload = json.loads(BASELINE_PATH.read_text(encoding="utf-8")) variants = payload["variants"] result: dict[str, object] = {} for name in ("current_overheat", "relaxed_overheat"): result[name] = variants[name]["results"] return result def _evaluate_exact_candidate(bundle, latest_bar: pd.Timestamp, switch_candidate: HybridSwitchCandidate) -> dict[str, object]: windows: dict[str, dict[str, float]] = {} years: dict[str, dict[str, float]] = {} latest_rows: list[dict[str, object]] = [] periods = [ *(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar) for days, label in WINDOWS), *(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) for label, start, end_exclusive in YEAR_PERIODS), ("year", "2026_YTD", YTD_START, latest_bar), ] for kind, label, start, end in periods: current = build_cash_overlay_period_components( bundle=bundle, eval_start=start, eval_end=end, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) relaxed = build_cash_overlay_period_components( bundle=bundle, eval_start=start, eval_end=end, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=RELAXED_OVERHEAT_OVERRIDES, ) curve, rows = _compose_hybrid_curve( current_components=current, relaxed_components=relaxed, switch_candidate=switch_candidate, ) metrics = segment_metrics(curve, start, end) if kind == "window": windows[label] = metrics else: years[label] = metrics if label == "2026_YTD": latest_rows = rows.tail(3).assign(timestamp=lambda df: df["timestamp"].astype(str)).to_dict(orient="records") score, negative_years, mdd_violations = score_candidate( {label: windows[label] for _, label in WINDOWS}, {label: years[label] for label, _, _ in YEAR_PERIODS}, ) return { "candidate": asdict(switch_candidate), "name": switch_candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": windows, "years": years, "latest_rows": latest_rows, } def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) eval_start = latest_bar - pd.Timedelta(days=1825) print("[phase] build current components", flush=True) current_components = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) print("[phase] build relaxed components", flush=True) relaxed_components = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=RELAXED_OVERHEAT_OVERRIDES, ) search_rows: list[dict[str, object]] = [] candidates = _candidate_space() print("[phase] search switch candidates", flush=True) for idx, candidate in enumerate(candidates, start=1): curve, rows = _compose_hybrid_curve( current_components=current_components, relaxed_components=relaxed_components, switch_candidate=candidate, ) windows, years, score, negative_years, mdd_violations = _metrics_for_curve(curve, latest_bar) relaxed_share = float((rows["active_core"] == "relaxed_overheat").mean()) if not rows.empty else 0.0 search_rows.append( { "candidate": asdict(candidate), "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "relaxed_share": relaxed_share, "windows": windows, "years": years, } ) if idx % 36 == 0 or idx == len(candidates): print(f"[search] {idx}/{len(candidates)}", flush=True) search_rows.sort(key=lambda row: float(row["score"]), reverse=True) best_search = search_rows[0] print(f"[phase] exact best {best_search['name']}", flush=True) best_exact = _evaluate_exact_candidate( bundle, latest_bar, HybridSwitchCandidate(**best_search["candidate"]), ) payload = { "analysis": "current_relaxed_hybrid_experiment", "latest_bar": str(latest_bar), "candidate": asdict(BEST_CASH_OVERLAY), "baselines": _baseline_summary(), "search_top": search_rows[:5], "best_exact": best_exact, } OUT_JSON.write_text(json.dumps(payload, indent=2), encoding="utf-8") print(json.dumps(payload, indent=2)) print(f"[saved] {OUT_JSON}") if __name__ == "__main__": main()