from __future__ import annotations import json import math import os import sys from dataclasses import asdict, dataclass from pathlib import Path import numpy as np import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy32.live.runtime import BEST_CASH_OVERLAY from strategy32.research.soft_router import build_cash_overlay_period_components, load_component_bundle from strategy32.scripts.run_current_cash_learned_blocker import ( CACHE_PATH, CURRENT_OVERHEAT_OVERRIDES, LearnedBlockerCandidate, _build_block_dataset, _build_regime_columns, _build_strategy_detail, _curve_from_returns, _metrics_for_curve, _simulate_candidate, ) OUT_JSON = Path("/tmp/strategy32_current_cash_risk_overlay_search.json") @dataclass(frozen=True, slots=True) class RiskOverlayCandidate: base_name: str vol_lookback_bars: int vol_target_mult: float min_scale: float max_scale: float dd_lookback_bars: int dd_cut: float dd_scale: float @property def name(self) -> str: return ( f"{self.base_name}" f"|vol:{self.vol_lookback_bars}" f"|vm:{self.vol_target_mult:.2f}" f"|min:{self.min_scale:.2f}" f"|max:{self.max_scale:.2f}" f"|ddlb:{self.dd_lookback_bars}" f"|ddcut:{self.dd_cut:.3f}" f"|ddscale:{self.dd_scale:.2f}" ) def _segment_score(windows: dict[str, dict[str, float]], years: dict[str, dict[str, float]]) -> tuple[float, int, int]: negative_years = sum(1 for year in ("2021", "2022", "2023", "2024", "2025") if years[year]["total_return"] < 0.0) mdd_violations = sum(1 for label in ("1y", "2y", "3y", "4y", "5y") if windows[label]["max_drawdown"] < -0.20) score = 0.0 score += 4.0 * windows["1y"]["total_return"] score += 2.0 * windows["2y"]["annualized_return"] score += 1.5 * windows["3y"]["annualized_return"] score += 2.5 * windows["5y"]["annualized_return"] score += 0.75 * years["2025"]["total_return"] score += 0.50 * years["2024"]["total_return"] score += 0.20 * years["2026_YTD"]["total_return"] score += 0.25 * max(0.0, -0.15 - windows["5y"]["max_drawdown"]) score -= 1.5 * negative_years score -= 0.4 * mdd_violations return score, negative_years, mdd_violations def _compute_metrics(returns: pd.Series, latest_bar: pd.Timestamp) -> tuple[dict[str, dict[str, float]], dict[str, dict[str, float]], float, int, int]: curve = _curve_from_returns(returns) windows, years, _, _, _ = _metrics_for_curve(curve, latest_bar) score, negative_years, mdd_violations = _segment_score(windows, years) return windows, years, score, negative_years, mdd_violations def _safe_ratio(num: float, den: float, fallback: float) -> float: if not math.isfinite(num) or not math.isfinite(den) or den <= 1e-12: return fallback return num / den def _apply_risk_overlay(returns: pd.Series, candidate: RiskOverlayCandidate) -> tuple[pd.Series, pd.DataFrame]: idx = returns.index realized_vol = returns.shift(1).rolling(candidate.vol_lookback_bars, min_periods=max(6, candidate.vol_lookback_bars // 3)).std() anchor_vol = realized_vol.shift(1).expanding(min_periods=max(6, candidate.vol_lookback_bars // 2)).median() scaled: list[float] = [] equities: list[float] = [] scales: list[float] = [] dd_series: list[float] = [] equity = 1000.0 history: list[float] = [equity] for ts, base_ret in returns.items(): current_vol = float(realized_vol.get(ts, np.nan)) target_vol = float(anchor_vol.get(ts, np.nan)) vol_scale = _safe_ratio(target_vol * candidate.vol_target_mult, current_vol, candidate.max_scale) vol_scale = min(candidate.max_scale, max(candidate.min_scale, vol_scale)) dd_lookback = max(2, candidate.dd_lookback_bars) peak = max(history[-dd_lookback:]) current_dd = (equity / peak) - 1.0 if peak > 0.0 else 0.0 dd_scale = candidate.dd_scale if current_dd <= -candidate.dd_cut else 1.0 scale = min(vol_scale, dd_scale) scaled_ret = float(base_ret) * scale equity *= max(0.0, 1.0 + scaled_ret) history.append(equity) scaled.append(scaled_ret) equities.append(equity) scales.append(scale) dd_series.append(current_dd) frame = pd.DataFrame( { "timestamp": idx, "base_return": returns.values, "overlay_return": scaled, "scale": scales, "equity": equities, "drawdown": dd_series, "realized_vol": realized_vol.reindex(idx).values, "anchor_vol": anchor_vol.reindex(idx).values, } ) return pd.Series(scaled, index=idx, dtype=float), frame def _base_return_map(latest_bar: pd.Timestamp) -> tuple[dict[str, pd.Series], dict[str, object]]: bundle, _ = load_component_bundle(CACHE_PATH) eval_start = latest_bar - pd.Timedelta(days=1825) components = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) detail = _build_strategy_detail(components) regime_columns = _build_regime_columns(detail) baseline_returns = detail.set_index("timestamp")["portfolio_return"].astype(float) blocker_025 = LearnedBlockerCandidate(42, 12, 24, 1.0, -0.0025, 0.25) blocker_050 = LearnedBlockerCandidate(42, 12, 24, 1.0, -0.0025, 0.50) block_frame = _build_block_dataset(detail, blocker_025.block_bars, regime_columns) blocker_025_returns = _simulate_candidate(detail, block_frame, regime_columns, blocker_025) blocker_050_returns = _simulate_candidate(detail, block_frame, regime_columns, blocker_050) return { "baseline": baseline_returns, "blocker_025": blocker_025_returns, "blocker_050": blocker_050_returns, }, { "detail": detail, "regime_columns": regime_columns, } def _candidate_space() -> list[RiskOverlayCandidate]: space: list[RiskOverlayCandidate] = [] grid_mode = os.getenv("STRATEGY32_RISK_OVERLAY_GRID", "frontier").strip().lower() if grid_mode == "wide": base_names = ("baseline", "blocker_025", "blocker_050") vol_lookbacks = (21, 42, 84) vol_targets = (0.90, 1.00, 1.10) min_scales = (0.50, 0.75) max_scales = (1.00, 1.10) dd_lookbacks = (42, 84) dd_cuts = (0.08, 0.12, 0.16) dd_scales = (0.25, 0.50, 0.75) else: base_names = ("baseline", "blocker_050") vol_lookbacks = (42,) vol_targets = (0.95, 1.00, 1.05) min_scales = (0.50, 0.75) max_scales = (1.00,) dd_lookbacks = (42,) dd_cuts = (0.08, 0.12) dd_scales = (0.50, 0.75) for base_name in base_names: for vol_lookback_bars in vol_lookbacks: for vol_target_mult in vol_targets: for min_scale in min_scales: for max_scale in max_scales: if max_scale < min_scale: continue for dd_lookback_bars in dd_lookbacks: for dd_cut in dd_cuts: for dd_scale in dd_scales: space.append( RiskOverlayCandidate( base_name=base_name, vol_lookback_bars=vol_lookback_bars, vol_target_mult=vol_target_mult, min_scale=min_scale, max_scale=max_scale, dd_lookback_bars=dd_lookback_bars, dd_cut=dd_cut, dd_scale=dd_scale, ) ) return space def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) base_returns_map, _ = _base_return_map(latest_bar) baseline_windows, baseline_years, baseline_score, baseline_negative_years, baseline_mdd_violations = _compute_metrics( base_returns_map["baseline"], latest_bar ) blocker025_windows, blocker025_years, blocker025_score, blocker025_negative_years, blocker025_mdd_violations = _compute_metrics( base_returns_map["blocker_025"], latest_bar ) blocker050_windows, blocker050_years, blocker050_score, blocker050_negative_years, blocker050_mdd_violations = _compute_metrics( base_returns_map["blocker_050"], latest_bar ) space = _candidate_space() top: list[dict[str, object]] = [] for idx, candidate in enumerate(space, start=1): overlay_returns, overlay_frame = _apply_risk_overlay(base_returns_map[candidate.base_name], candidate) windows, years, score, negative_years, mdd_violations = _compute_metrics(overlay_returns, latest_bar) payload = { "candidate": asdict(candidate), "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": windows, "years": years, "mean_scale": float(overlay_frame["scale"].mean()), "min_scale": float(overlay_frame["scale"].min()), "max_scale": float(overlay_frame["scale"].max()), } top.append(payload) top.sort(key=lambda item: float(item["score"]), reverse=True) top = top[:15] if idx % 150 == 0 or idx == len(space): print(f"[search] {idx}/{len(space)}", flush=True) output = { "analysis": "current_cash_risk_overlay_search", "latest_bar": str(latest_bar), "baseline": { "score": baseline_score, "negative_years": baseline_negative_years, "mdd_violations": baseline_mdd_violations, "windows": baseline_windows, "years": baseline_years, }, "blocker_025": { "score": blocker025_score, "negative_years": blocker025_negative_years, "mdd_violations": blocker025_mdd_violations, "windows": blocker025_windows, "years": blocker025_years, }, "blocker_050": { "score": blocker050_score, "negative_years": blocker050_negative_years, "mdd_violations": blocker050_mdd_violations, "windows": blocker050_windows, "years": blocker050_years, }, "top15": top, } OUT_JSON.write_text(json.dumps(output, indent=2), encoding="utf-8") print(json.dumps(top[:5], indent=2)) print(f"[saved] {OUT_JSON}", flush=True) if __name__ == "__main__": main()