from __future__ import annotations import json import sys from pathlib import Path import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy32.live.runtime import BEST_CASH_OVERLAY from strategy32.research.soft_router import build_cash_overlay_period_components, load_component_bundle, segment_metrics from strategy32.scripts.run_current_relaxed_hybrid_experiment import ( CACHE_PATH, CURRENT_OVERHEAT_OVERRIDES, RELAXED_OVERHEAT_OVERRIDES, WINDOWS, YEAR_PERIODS, YTD_START, _overlay_weights, ) OUT_JSON = Path("/tmp/strategy32_current_relaxed_oracle_analysis.json") def _build_strategy_detail(components: dict[str, object]) -> pd.DataFrame: timestamps = list(components["timestamps"]) score_map = components["score_frame"].set_index("timestamp").sort_index() cash_map = components["core_exposure_frame"].set_index("timestamp")["cash_pct"].sort_index() core_returns = components["core_returns"] cap_returns = components["cap_returns"] chop_returns = components["chop_returns"] dist_returns = components["dist_returns"] rows: list[dict[str, object]] = [] for i in range(1, len(timestamps)): signal_ts = pd.Timestamp(timestamps[i - 1]) execution_ts = pd.Timestamp(timestamps[i]) score_row = score_map.loc[signal_ts].to_dict() if signal_ts in score_map.index else {} core_cash_pct = float(cash_map.get(signal_ts, cash_map.iloc[-1] if not cash_map.empty else 1.0)) cap_weight, chop_weight, dist_weight = _overlay_weights(BEST_CASH_OVERLAY, score_row, core_cash_pct) core_ret = float(core_returns.get(execution_ts, 0.0)) cap_ret = float(cap_returns.get(execution_ts, 0.0)) chop_ret = float(chop_returns.get(execution_ts, 0.0)) dist_ret = float(dist_returns.get(execution_ts, 0.0)) portfolio_return = ( core_ret + cap_weight * cap_ret + chop_weight * chop_ret + dist_weight * dist_ret ) rows.append( { "timestamp": execution_ts, "strategic_regime": str(score_row.get("strategic_regime", "")), "core_score": float(score_row.get("core_score", 0.0)), "panic_score": float(score_row.get("panic_score", 0.0)), "choppy_score": float(score_row.get("choppy_score", 0.0)), "distribution_score": float(score_row.get("distribution_score", 0.0)), "breadth_persist": float(score_row.get("breadth_persist", 0.0) or 0.0), "funding_persist": float(score_row.get("funding_persist", 0.0) or 0.0), "core_cash_pct": core_cash_pct, "cap_weight": cap_weight, "chop_weight": chop_weight, "dist_weight": dist_weight, "portfolio_return": portfolio_return, } ) return pd.DataFrame(rows) def _curve_from_returns(returns: pd.Series) -> pd.Series: equity = 1000.0 vals = [equity] idx = [returns.index[0] - pd.Timedelta(hours=4)] for ts, ret in returns.items(): equity *= max(0.0, 1.0 + float(ret)) idx.append(pd.Timestamp(ts)) vals.append(equity) return pd.Series(vals, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float) def _window_metrics(curve: pd.Series, latest_bar: pd.Timestamp) -> dict[str, dict[str, float]]: return { label: segment_metrics(curve, latest_bar - pd.Timedelta(days=days), latest_bar) for days, label in WINDOWS } def _year_metrics(curve: pd.Series, latest_bar: pd.Timestamp) -> dict[str, dict[str, float]]: years = { label: segment_metrics(curve, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) for label, start, end_exclusive in YEAR_PERIODS } years["2026_YTD"] = segment_metrics(curve, YTD_START, latest_bar) return years def _regime_summary(detail: pd.DataFrame) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] for regime, chunk in detail.groupby("strategic_regime", dropna=False): rows.append( { "strategic_regime": regime or "", "bars": int(len(chunk)), "share": float(len(chunk) / len(detail)) if len(detail) else 0.0, "current_avg_return": float(chunk["current_return"].mean()), "relaxed_avg_return": float(chunk["relaxed_return"].mean()), "oracle_avg_return": float(chunk["oracle_bar_return"].mean()), "relaxed_win_rate": float((chunk["relaxed_return"] > chunk["current_return"]).mean()), "avg_diff_relaxed_minus_current": float((chunk["relaxed_return"] - chunk["current_return"]).mean()), } ) return sorted(rows, key=lambda row: row["share"], reverse=True) def _winner_feature_summary(detail: pd.DataFrame, winner: str) -> dict[str, object]: if winner == "relaxed": mask = detail["relaxed_return"] > detail["current_return"] else: mask = detail["current_return"] >= detail["relaxed_return"] chunk = detail.loc[mask].copy() if chunk.empty: return {"bars": 0} return { "bars": int(len(chunk)), "share": float(len(chunk) / len(detail)) if len(detail) else 0.0, "avg_core_score": float(chunk["core_score"].mean()), "avg_breadth_persist": float(chunk["breadth_persist"].mean()), "avg_funding_persist": float(chunk["funding_persist"].mean()), "avg_panic_score": float(chunk["panic_score"].mean()), "avg_choppy_score": float(chunk["choppy_score"].mean()), "avg_distribution_score": float(chunk["distribution_score"].mean()), "avg_current_cash_pct": float(chunk["current_cash_pct"].mean()), "avg_relaxed_cash_pct": float(chunk["relaxed_cash_pct"].mean()), "top_regimes": ( chunk["strategic_regime"].value_counts(normalize=True).head(5).rename_axis("regime").reset_index(name="share").to_dict(orient="records") ), } def _oracle_block_returns(detail: pd.DataFrame, block_bars: int) -> pd.Series: rows: list[float] = [] idx: list[pd.Timestamp] = [] bar_count = len(detail) for start in range(0, bar_count, block_bars): end = min(start + block_bars, bar_count) chunk = detail.iloc[start:end] current_total = float((1.0 + chunk["current_return"]).prod() - 1.0) relaxed_total = float((1.0 + chunk["relaxed_return"]).prod() - 1.0) winner = "relaxed" if relaxed_total > current_total else "current" source_col = "relaxed_return" if winner == "relaxed" else "current_return" rows.extend(chunk[source_col].tolist()) idx.extend(chunk["timestamp"].tolist()) return pd.Series(rows, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float) def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) eval_start = latest_bar - pd.Timedelta(days=1825) print("[phase] build current", flush=True) current = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) print("[phase] build relaxed", flush=True) relaxed = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=RELAXED_OVERHEAT_OVERRIDES, ) current_detail = _build_strategy_detail(current).rename( columns={ "core_cash_pct": "current_cash_pct", "portfolio_return": "current_return", } ) relaxed_detail = _build_strategy_detail(relaxed).rename( columns={ "core_cash_pct": "relaxed_cash_pct", "portfolio_return": "relaxed_return", } ) merged = current_detail.merge( relaxed_detail[ [ "timestamp", "relaxed_cash_pct", "relaxed_return", ] ], on="timestamp", how="inner", ) merged["oracle_bar_return"] = merged[["current_return", "relaxed_return"]].max(axis=1) merged["winner"] = merged.apply( lambda row: "relaxed" if row["relaxed_return"] > row["current_return"] else "current", axis=1, ) current_curve = _curve_from_returns(merged.set_index("timestamp")["current_return"]) relaxed_curve = _curve_from_returns(merged.set_index("timestamp")["relaxed_return"]) oracle_bar_curve = _curve_from_returns(merged.set_index("timestamp")["oracle_bar_return"]) oracle_7d_curve = _curve_from_returns(_oracle_block_returns(merged, block_bars=42)) oracle_30d_curve = _curve_from_returns(_oracle_block_returns(merged, block_bars=180)) payload = { "analysis": "current_relaxed_oracle", "latest_bar": str(latest_bar), "bar_count": int(len(merged)), "relaxed_win_rate": float((merged["winner"] == "relaxed").mean()), "curves": { "current": { "windows": _window_metrics(current_curve, latest_bar), "years": _year_metrics(current_curve, latest_bar), }, "relaxed": { "windows": _window_metrics(relaxed_curve, latest_bar), "years": _year_metrics(relaxed_curve, latest_bar), }, "oracle_bar": { "windows": _window_metrics(oracle_bar_curve, latest_bar), "years": _year_metrics(oracle_bar_curve, latest_bar), }, "oracle_7d": { "windows": _window_metrics(oracle_7d_curve, latest_bar), "years": _year_metrics(oracle_7d_curve, latest_bar), }, "oracle_30d": { "windows": _window_metrics(oracle_30d_curve, latest_bar), "years": _year_metrics(oracle_30d_curve, latest_bar), }, }, "regime_summary": _regime_summary(merged), "winner_feature_summary": { "relaxed": _winner_feature_summary(merged, "relaxed"), "current": _winner_feature_summary(merged, "current"), }, "latest_rows": merged.tail(10).assign(timestamp=lambda df: df["timestamp"].astype(str)).to_dict(orient="records"), } OUT_JSON.write_text(json.dumps(payload, indent=2), encoding="utf-8") print(json.dumps(payload, indent=2)) print(f"[saved] {OUT_JSON}", flush=True) if __name__ == "__main__": main()