from __future__ import annotations import json import os import sys from dataclasses import asdict from pathlib import Path import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy29.backtest.window_analysis import slice_bundle from strategy32.live.runtime import BEST_CASH_OVERLAY, LIVE_STRATEGY_OVERRIDES from strategy32.research.soft_router import ( MacroScaleSpec, build_cash_overlay_period_components, compose_cash_overlay_curve, load_component_bundle, score_candidate, segment_metrics, ) CACHE_PATH = "/tmp/strategy32_fixed66_bundle.pkl" OUT_JSON = Path("/tmp/strategy32_relaxed_macro_scaling_search.json") RELAXED_OVERHEAT_OVERRIDES = { **LIVE_STRATEGY_OVERRIDES, "momentum_min_score": 0.58, "momentum_min_relative_strength": -0.03, "momentum_min_7d_return": 0.00, "universe_min_avg_dollar_volume": 75_000_000.0, "hard_filter_refresh_cadence": "1d", "hard_filter_min_history_bars": 120, "hard_filter_lookback_bars": 30, "hard_filter_min_avg_dollar_volume": 50_000_000.0, } CURRENT_OVERHEAT_OVERRIDES = { **LIVE_STRATEGY_OVERRIDES, "hard_filter_refresh_cadence": "1d", "hard_filter_min_history_bars": 120, "hard_filter_lookback_bars": 30, "hard_filter_min_avg_dollar_volume": 50_000_000.0, } WINDOWS = ( (365, "1y"), (730, "2y"), (1095, "3y"), (1460, "4y"), (1825, "5y"), ) YEAR_PERIODS = ( ("2021", pd.Timestamp("2021-03-16 04:00:00+00:00"), pd.Timestamp("2022-01-01 00:00:00+00:00")), ("2022", pd.Timestamp("2022-01-01 00:00:00+00:00"), pd.Timestamp("2023-01-01 00:00:00+00:00")), ("2023", pd.Timestamp("2023-01-01 00:00:00+00:00"), pd.Timestamp("2024-01-01 00:00:00+00:00")), ("2024", pd.Timestamp("2024-01-01 00:00:00+00:00"), pd.Timestamp("2025-01-01 00:00:00+00:00")), ("2025", pd.Timestamp("2025-01-01 00:00:00+00:00"), pd.Timestamp("2026-01-01 00:00:00+00:00")), ) YTD_START = pd.Timestamp("2026-01-01 00:00:00+00:00") def _clip01(value: float) -> float: return min(max(float(value), 0.0), 1.0) def _ramp(value: float, start: float, end: float) -> float: if end == start: return 1.0 if value >= end else 0.0 if value <= start: return 0.0 if value >= end: return 1.0 return (value - start) / (end - start) def _build_macro_scale_map(sliced_bundle, *, timestamps: list[pd.Timestamp], spec: MacroScaleSpec) -> pd.Series: btc_prices = sliced_bundle.prices["BTC"] closes = btc_prices.set_index("timestamp")["close"].astype(float).sort_index() daily = closes.resample("1D").last().dropna() weekly = daily.resample("W-SUN").last().dropna() fast = weekly.ewm(span=spec.fast_weeks, adjust=False).mean() slow = weekly.ewm(span=spec.slow_weeks, adjust=False).mean() close_scale = (weekly / slow - 1.0).apply(lambda value: _ramp(float(value), spec.close_gap_start, spec.close_gap_full)) fast_scale = (fast / slow - 1.0).apply(lambda value: _ramp(float(value), spec.fast_gap_start, spec.fast_gap_full)) blended = spec.close_weight * close_scale + (1.0 - spec.close_weight) * fast_scale macro_scale = spec.floor + (1.0 - spec.floor) * blended.clip(0.0, 1.0) aligned = macro_scale.reindex(pd.DatetimeIndex(timestamps, name="timestamp"), method="ffill") return aligned.fillna(1.0).clip(spec.floor, 1.0).astype(float) def _candidate_specs() -> list[MacroScaleSpec]: specs: list[MacroScaleSpec] = [] for floor in (0.25, 0.35, 0.45): for close_gap_start, close_gap_full in ((-0.08, 0.02), (-0.06, 0.02), (-0.05, 0.04)): for fast_gap_start, fast_gap_full in ((-0.04, 0.01), (-0.03, 0.02)): for close_weight in (0.55, 0.65): specs.append( MacroScaleSpec( floor=floor, close_gap_start=close_gap_start, close_gap_full=close_gap_full, fast_gap_start=fast_gap_start, fast_gap_full=fast_gap_full, close_weight=close_weight, ) ) return specs def _collect_metrics(curve: pd.Series, latest_bar: pd.Timestamp) -> tuple[dict[str, dict[str, float]], dict[str, dict[str, float]], float, int, int]: window_results: dict[str, dict[str, float]] = {} for days, label in WINDOWS: start = latest_bar - pd.Timedelta(days=days) window_results[label] = segment_metrics(curve, start, latest_bar) year_results: dict[str, dict[str, float]] = {} for label, start, end_exclusive in YEAR_PERIODS: year_results[label] = segment_metrics(curve, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) year_results["2026_YTD"] = segment_metrics(curve, YTD_START, latest_bar) score, negative_years, mdd_violations = score_candidate( {label: window_results[label] for _, label in WINDOWS}, {label: year_results[label] for label, _, _ in YEAR_PERIODS}, ) return window_results, year_results, score, negative_years, mdd_violations def _evaluate_exact_sequential( bundle, latest_bar: pd.Timestamp, *, core_overrides: dict[str, object], macro_scale_spec: MacroScaleSpec | None, ) -> dict[str, object]: window_results: dict[str, dict[str, float]] = {} year_results: dict[str, dict[str, float]] = {} periods = [ *(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar) for days, label in WINDOWS), *(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) for label, start, end_exclusive in YEAR_PERIODS), ("year", "2026_YTD", YTD_START, latest_bar), ] latest_weights: list[dict[str, object]] = [] for kind, label, start, end in periods: components = build_cash_overlay_period_components( bundle=bundle, eval_start=start, eval_end=end, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=core_overrides, macro_scale_spec=macro_scale_spec, ) curve, weights = compose_cash_overlay_curve(candidate=BEST_CASH_OVERLAY, **components) metrics = segment_metrics(curve, start, end) if kind == "window": window_results[label] = metrics else: year_results[label] = metrics if label == "2026_YTD": latest_weights = weights.tail(1).assign(timestamp=lambda df: df["timestamp"].astype(str)).to_dict(orient="records") score, negative_years, mdd_violations = score_candidate( {label: window_results[label] for _, label in WINDOWS}, {label: year_results[label] for label, _, _ in YEAR_PERIODS}, ) return { "candidate": asdict(BEST_CASH_OVERLAY), "core_overrides": core_overrides, "macro_scale_spec": asdict(macro_scale_spec) if macro_scale_spec is not None else None, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": window_results, "years": year_results, "latest_weights": latest_weights, "validation": "exact_independent_periods_cash_overlay_sequential", } def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) eval_start = latest_bar - pd.Timedelta(days=1825) sliced = slice_bundle(bundle, eval_start - pd.Timedelta(days=365), latest_bar) print("[phase] build relaxed core components", flush=True) relaxed_components = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=RELAXED_OVERHEAT_OVERRIDES, ) print("[phase] search macro specs", flush=True) search_rows: list[dict[str, object]] = [] specs = _candidate_specs() for idx, spec in enumerate(specs, start=1): macro_scale_map = _build_macro_scale_map(sliced, timestamps=relaxed_components["timestamps"][:-1], spec=spec) curve, _weights = compose_cash_overlay_curve( candidate=BEST_CASH_OVERLAY, timestamps=relaxed_components["timestamps"], score_frame=relaxed_components["score_frame"], core_returns=relaxed_components["core_returns"], core_exposure_frame=relaxed_components["core_exposure_frame"], cap_returns=relaxed_components["cap_returns"], chop_returns=relaxed_components["chop_returns"], dist_returns=relaxed_components["dist_returns"], macro_scale_map=macro_scale_map, ) windows, years, score, negative_years, mdd_violations = _collect_metrics(curve, latest_bar) search_rows.append( { "macro_scale_spec": asdict(spec), "windows": windows, "years": years, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, } ) if idx % 6 == 0 or idx == len(specs): print(f"[search] {idx}/{len(specs)}", flush=True) search_rows.sort(key=lambda row: float(row["score"]), reverse=True) top_search = search_rows[:5] search_only = os.getenv("STRATEGY32_SEARCH_ONLY", "").strip().lower() in {"1", "true", "yes", "on"} if search_only: payload = { "analysis": "relaxed_overheat_macro_scaling_search", "mode": "search_only", "latest_bar": str(latest_bar), "core_filter": "relaxed_overheat", "candidate": asdict(BEST_CASH_OVERLAY), "search_top": top_search, } OUT_JSON.write_text(json.dumps(payload, indent=2), encoding="utf-8") print(json.dumps(payload, indent=2)) print(f"[saved] {OUT_JSON}") return print("[phase] exact baselines", flush=True) baselines = { "current_overheat": _evaluate_exact_sequential( bundle, latest_bar, core_overrides=CURRENT_OVERHEAT_OVERRIDES, macro_scale_spec=None, ), "relaxed_overheat": _evaluate_exact_sequential( bundle, latest_bar, core_overrides=RELAXED_OVERHEAT_OVERRIDES, macro_scale_spec=None, ), } best_spec = MacroScaleSpec(**top_search[0]["macro_scale_spec"]) print(f"[phase] exact best spec {best_spec.name}", flush=True) best_exact = _evaluate_exact_sequential( bundle, latest_bar, core_overrides=RELAXED_OVERHEAT_OVERRIDES, macro_scale_spec=best_spec, ) payload = { "analysis": "relaxed_overheat_macro_scaling_search", "latest_bar": str(latest_bar), "core_filter": "relaxed_overheat", "candidate": asdict(BEST_CASH_OVERLAY), "baselines": baselines, "search_top": top_search, "best_exact": best_exact, } OUT_JSON.write_text(json.dumps(payload, indent=2), encoding="utf-8") print(json.dumps(payload, indent=2)) print(f"[saved] {OUT_JSON}") if __name__ == "__main__": main()