from __future__ import annotations import json import multiprocessing as mp import sys from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import asdict, dataclass from pathlib import Path import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy32.live.runtime import BEST_CASH_OVERLAY from strategy32.research.soft_router import build_cash_overlay_period_components, load_component_bundle, score_candidate, segment_metrics from strategy32.scripts.run_current_relaxed_hybrid_experiment import ( BASELINE_PATH, CACHE_PATH, CURRENT_OVERHEAT_OVERRIDES, RELAXED_OVERHEAT_OVERRIDES, WINDOWS, YEAR_PERIODS, YTD_START, _baseline_summary, _overlay_weights, ) OUT_JSON = Path("/tmp/strategy32_current_relaxed_entry_overlay.json") @dataclass(frozen=True, slots=True) class EntryOverlayCandidate: positive_regimes: tuple[str, ...] core_score_min: float breadth_persist_min: float funding_persist_min: float panic_max: float choppy_max: float distribution_max: float current_cash_min: float relaxed_invested_min: float incremental_scale: float @property def name(self) -> str: regimes = ",".join(self.positive_regimes) return ( f"regimes:{regimes}" f"|core>={self.core_score_min:.2f}" f"|breadth>={self.breadth_persist_min:.2f}" f"|funding>={self.funding_persist_min:.2f}" f"|panic<={self.panic_max:.2f}" f"|choppy<={self.choppy_max:.2f}" f"|dist<={self.distribution_max:.2f}" f"|cash>={self.current_cash_min:.2f}" f"|relaxed>={self.relaxed_invested_min:.2f}" f"|scale:{self.incremental_scale:.2f}" ) def _pick_entry_overlay(score_row: dict[str, float], candidate: EntryOverlayCandidate) -> bool: return ( str(score_row.get("strategic_regime")) in candidate.positive_regimes and float(score_row.get("core_score", 0.0)) >= candidate.core_score_min and float(score_row.get("breadth_persist", 0.0) or 0.0) >= candidate.breadth_persist_min and float(score_row.get("funding_persist", 0.0) or 0.0) >= candidate.funding_persist_min and float(score_row.get("panic_score", 0.0)) <= candidate.panic_max and float(score_row.get("choppy_score", 0.0)) <= candidate.choppy_max and float(score_row.get("distribution_score", 0.0)) <= candidate.distribution_max ) def _compose_entry_overlay_curve( *, current_components: dict[str, object], relaxed_components: dict[str, object], candidate: EntryOverlayCandidate, ) -> tuple[pd.Series, pd.DataFrame]: timestamps = list(current_components["timestamps"]) score_map = current_components["score_frame"].set_index("timestamp").sort_index() current_cash_map = current_components["core_exposure_frame"].set_index("timestamp")["cash_pct"].sort_index() relaxed_cash_map = relaxed_components["core_exposure_frame"].set_index("timestamp")["cash_pct"].sort_index() current_core_returns = current_components["core_returns"] relaxed_core_returns = relaxed_components["core_returns"] cap_returns = current_components["cap_returns"] chop_returns = current_components["chop_returns"] dist_returns = current_components["dist_returns"] equity = 1000.0 idx = [timestamps[0]] vals = [equity] rows: list[dict[str, object]] = [] for i in range(1, len(timestamps)): signal_ts = pd.Timestamp(timestamps[i - 1]) execution_ts = pd.Timestamp(timestamps[i]) score_row = score_map.loc[signal_ts].to_dict() if signal_ts in score_map.index else {} current_cash_pct = float(current_cash_map.get(signal_ts, current_cash_map.iloc[-1] if not current_cash_map.empty else 1.0)) current_invested = max(0.0, 1.0 - current_cash_pct) relaxed_cash_pct = float(relaxed_cash_map.get(signal_ts, relaxed_cash_map.iloc[-1] if not relaxed_cash_map.empty else 1.0)) relaxed_invested = max(0.0, 1.0 - relaxed_cash_pct) add_relaxed = 0.0 use_relaxed = False if ( current_cash_pct >= candidate.current_cash_min and relaxed_invested >= candidate.relaxed_invested_min and _pick_entry_overlay(score_row, candidate) ): use_relaxed = True extra_invested = max(relaxed_invested - current_invested, 0.0) if extra_invested > 0.0: add_relaxed = min(current_cash_pct, extra_invested * candidate.incremental_scale) relaxed_unit_return = 0.0 if add_relaxed > 0.0 and relaxed_invested > 1e-9: relaxed_unit_return = float(relaxed_core_returns.get(execution_ts, 0.0)) / relaxed_invested residual_cash = max(current_cash_pct - add_relaxed, 0.0) cap_weight, chop_weight, dist_weight = _overlay_weights(BEST_CASH_OVERLAY, score_row, residual_cash) bar_ret = ( float(current_core_returns.get(execution_ts, 0.0)) + add_relaxed * relaxed_unit_return + cap_weight * float(cap_returns.get(execution_ts, 0.0)) + chop_weight * float(chop_returns.get(execution_ts, 0.0)) + dist_weight * float(dist_returns.get(execution_ts, 0.0)) ) equity *= max(0.0, 1.0 + bar_ret) idx.append(execution_ts) vals.append(equity) rows.append( { "timestamp": execution_ts, "used_relaxed_overlay": use_relaxed, "current_cash_pct": current_cash_pct, "relaxed_invested": relaxed_invested, "added_relaxed_weight": add_relaxed, "core_score": float(score_row.get("core_score", 0.0)), "panic_score": float(score_row.get("panic_score", 0.0)), "choppy_score": float(score_row.get("choppy_score", 0.0)), "distribution_score": float(score_row.get("distribution_score", 0.0)), "breadth_persist": float(score_row.get("breadth_persist", 0.0) or 0.0), "funding_persist": float(score_row.get("funding_persist", 0.0) or 0.0), "portfolio_return": bar_ret, } ) curve = pd.Series(vals, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float) return curve, pd.DataFrame(rows) def _metrics_for_curve(curve: pd.Series, latest_bar: pd.Timestamp) -> tuple[dict[str, dict[str, float]], dict[str, dict[str, float]], float, int, int]: windows: dict[str, dict[str, float]] = {} for days, label in WINDOWS: start = latest_bar - pd.Timedelta(days=days) windows[label] = segment_metrics(curve, start, latest_bar) years: dict[str, dict[str, float]] = {} for label, start, end_exclusive in YEAR_PERIODS: years[label] = segment_metrics(curve, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) years["2026_YTD"] = segment_metrics(curve, YTD_START, latest_bar) score, negative_years, mdd_violations = score_candidate( {label: windows[label] for _, label in WINDOWS}, {label: years[label] for label, _, _ in YEAR_PERIODS}, ) return windows, years, score, negative_years, mdd_violations def _candidate_space() -> list[EntryOverlayCandidate]: space: list[EntryOverlayCandidate] = [] positive_sets = ( ("EUPHORIC_BREAKOUT",), ("MOMENTUM_EXPANSION", "EUPHORIC_BREAKOUT"), ) for positive_regimes in positive_sets: for core_score_min in (0.50, 0.60): for breadth_persist_min in (0.50, 0.55): for funding_persist_min in (0.55, 0.60): for panic_max in (0.20, 0.30): for choppy_max in (0.40, 0.50): for distribution_max in (0.30, 0.40): for current_cash_min in (0.35, 0.50, 0.65): for relaxed_invested_min in (0.15, 0.25, 0.35): for incremental_scale in (0.50, 0.75, 1.00): space.append( EntryOverlayCandidate( positive_regimes=positive_regimes, core_score_min=core_score_min, breadth_persist_min=breadth_persist_min, funding_persist_min=funding_persist_min, panic_max=panic_max, choppy_max=choppy_max, distribution_max=distribution_max, current_cash_min=current_cash_min, relaxed_invested_min=relaxed_invested_min, incremental_scale=incremental_scale, ) ) return space def _period_worker( cache_path: str, candidate_payload: dict[str, object], kind: str, label: str, start_text: str, end_text: str, ) -> tuple[str, str, dict[str, float], list[dict[str, object]]]: bundle, _ = load_component_bundle(cache_path) candidate = EntryOverlayCandidate(**candidate_payload) start = pd.Timestamp(start_text) end = pd.Timestamp(end_text) current = build_cash_overlay_period_components( bundle=bundle, eval_start=start, eval_end=end, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) relaxed = build_cash_overlay_period_components( bundle=bundle, eval_start=start, eval_end=end, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=RELAXED_OVERHEAT_OVERRIDES, ) curve, rows = _compose_entry_overlay_curve( current_components=current, relaxed_components=relaxed, candidate=candidate, ) latest_rows: list[dict[str, object]] = [] if label == "2026_YTD": latest_rows = rows.tail(5).assign(timestamp=lambda df: df["timestamp"].astype(str)).to_dict(orient="records") return kind, label, segment_metrics(curve, start, end), latest_rows def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) eval_start = latest_bar - pd.Timedelta(days=1825) print("[phase] build current components", flush=True) current_components = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) print("[phase] build relaxed components", flush=True) relaxed_components = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=RELAXED_OVERHEAT_OVERRIDES, ) search_rows: list[dict[str, object]] = [] candidates = _candidate_space() print("[phase] search entry-overlay candidates", flush=True) for idx, candidate in enumerate(candidates, start=1): curve, rows = _compose_entry_overlay_curve( current_components=current_components, relaxed_components=relaxed_components, candidate=candidate, ) windows, years, score, negative_years, mdd_violations = _metrics_for_curve(curve, latest_bar) relaxed_share = float((rows["added_relaxed_weight"] > 0.0).mean()) if not rows.empty else 0.0 avg_added = float(rows["added_relaxed_weight"].mean()) if not rows.empty else 0.0 search_rows.append( { "candidate": asdict(candidate), "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "relaxed_entry_share": relaxed_share, "avg_added_relaxed_weight": avg_added, "windows": windows, "years": years, } ) if idx % 96 == 0 or idx == len(candidates): print(f"[search] {idx}/{len(candidates)}", flush=True) search_rows.sort(key=lambda row: float(row["score"]), reverse=True) best_candidate = EntryOverlayCandidate(**search_rows[0]["candidate"]) print(f"[phase] exact best {best_candidate.name}", flush=True) window_results: dict[str, dict[str, float]] = {} year_results: dict[str, dict[str, float]] = {} latest_rows: list[dict[str, object]] = [] specs = [ *(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar) for days, label in WINDOWS), *(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) for label, start, end_exclusive in YEAR_PERIODS), ("year", "2026_YTD", YTD_START, latest_bar), ] ctx = mp.get_context("fork") with ProcessPoolExecutor(max_workers=min(6, len(specs)), mp_context=ctx) as executor: future_map = { executor.submit( _period_worker, CACHE_PATH, asdict(best_candidate), kind, label, str(start), str(end), ): (kind, label) for kind, label, start, end in specs } for future in as_completed(future_map): kind, label = future_map[future] kind_result, label_result, metrics, latest = future.result() if kind_result == "window": window_results[label_result] = metrics else: year_results[label_result] = metrics if latest: latest_rows = latest print(f"[done] {label_result}", flush=True) score, negative_years, mdd_violations = score_candidate( {label: window_results[label] for _, label in WINDOWS}, {label: year_results[label] for label, _, _ in YEAR_PERIODS}, ) payload = { "analysis": "current_relaxed_entry_overlay", "latest_bar": str(latest_bar), "candidate": asdict(best_candidate), "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": {label: window_results[label] for _, label in WINDOWS}, "years": year_results, "latest_rows": latest_rows, "baselines": _baseline_summary(), "search_top": search_rows[:5], } OUT_JSON.write_text(json.dumps(payload, indent=2), encoding="utf-8") print(json.dumps(payload, indent=2)) print(f"[saved] {OUT_JSON}") if __name__ == "__main__": main()