from __future__ import annotations import json import sys from dataclasses import asdict, dataclass from pathlib import Path import numpy as np import pandas as pd PACKAGE_PARENT = Path(__file__).resolve().parents[2] if str(PACKAGE_PARENT) not in sys.path: sys.path.insert(0, str(PACKAGE_PARENT)) from strategy32.live.runtime import BEST_CASH_OVERLAY from strategy32.research.soft_router import build_cash_overlay_period_components, load_component_bundle, score_candidate, segment_metrics from strategy32.scripts.run_current_relaxed_hybrid_experiment import ( CACHE_PATH, CURRENT_OVERHEAT_OVERRIDES, RELAXED_OVERHEAT_OVERRIDES, WINDOWS, YEAR_PERIODS, YTD_START, _baseline_summary, _overlay_weights, ) OUT_JSON = Path("/tmp/strategy32_current_relaxed_learned_entry_overlay.json") @dataclass(frozen=True, slots=True) class LearnedOverlayCandidate: block_bars: int train_min_blocks: int lookback_blocks: int ridge_alpha: float prediction_threshold: float overlay_scale: float @property def name(self) -> str: return ( f"block:{self.block_bars}" f"|train:{self.train_min_blocks}" f"|lookback:{self.lookback_blocks}" f"|alpha:{self.ridge_alpha:.2f}" f"|th:{self.prediction_threshold:.4f}" f"|scale:{self.overlay_scale:.2f}" ) def _build_strategy_detail(components: dict[str, object]) -> pd.DataFrame: timestamps = list(components["timestamps"]) score_map = components["score_frame"].set_index("timestamp").sort_index() cash_map = components["core_exposure_frame"].set_index("timestamp")["cash_pct"].sort_index() core_returns = components["core_returns"] cap_returns = components["cap_returns"] chop_returns = components["chop_returns"] dist_returns = components["dist_returns"] rows: list[dict[str, object]] = [] for i in range(1, len(timestamps)): signal_ts = pd.Timestamp(timestamps[i - 1]) execution_ts = pd.Timestamp(timestamps[i]) score_row = score_map.loc[signal_ts].to_dict() if signal_ts in score_map.index else {} core_cash_pct = float(cash_map.get(signal_ts, cash_map.iloc[-1] if not cash_map.empty else 1.0)) cap_weight, chop_weight, dist_weight = _overlay_weights(BEST_CASH_OVERLAY, score_row, core_cash_pct) rows.append( { "timestamp": execution_ts, "strategic_regime": str(score_row.get("strategic_regime", "")), "core_score": float(score_row.get("core_score", 0.0)), "panic_score": float(score_row.get("panic_score", 0.0)), "choppy_score": float(score_row.get("choppy_score", 0.0)), "distribution_score": float(score_row.get("distribution_score", 0.0)), "breadth_persist": float(score_row.get("breadth_persist", 0.0) or 0.0), "funding_persist": float(score_row.get("funding_persist", 0.0) or 0.0), "taker_persist": float(score_row.get("taker_persist", 0.0) or 0.0), "volume_accel_persist": float(score_row.get("volume_accel_persist", 0.0) or 0.0), "mean_taker_imbalance": float(score_row.get("mean_taker_imbalance", 0.0) or 0.0), "taker_imbalance_dispersion": float(score_row.get("taker_imbalance_dispersion", 0.0) or 0.0), "positive_taker_ratio": float(score_row.get("positive_taker_ratio", 0.0) or 0.0), "mean_alt_volume_accel": float(score_row.get("mean_alt_volume_accel", 0.0) or 0.0), "positive_volume_accel_ratio": float(score_row.get("positive_volume_accel_ratio", 0.0) or 0.0), "funding_dispersion": float(score_row.get("funding_dispersion", 0.0) or 0.0), "basis_dispersion": float(score_row.get("basis_dispersion", 0.0) or 0.0), "alt_return_dispersion_7d": float(score_row.get("alt_return_dispersion_7d", 0.0) or 0.0), "mean_funding_acceleration": float(score_row.get("mean_funding_acceleration", 0.0) or 0.0), "mean_basis_trend": float(score_row.get("mean_basis_trend", 0.0) or 0.0), "cash_pct": core_cash_pct, "invested_pct": max(0.0, 1.0 - core_cash_pct), "cap_weight": cap_weight, "chop_weight": chop_weight, "dist_weight": dist_weight, "portfolio_return": ( float(core_returns.get(execution_ts, 0.0)) + cap_weight * float(cap_returns.get(execution_ts, 0.0)) + chop_weight * float(chop_returns.get(execution_ts, 0.0)) + dist_weight * float(dist_returns.get(execution_ts, 0.0)) ), } ) return pd.DataFrame(rows) def _curve_from_returns(returns: pd.Series) -> pd.Series: equity = 1000.0 vals = [equity] idx = [returns.index[0] - pd.Timedelta(hours=4)] for ts, ret in returns.items(): equity *= max(0.0, 1.0 + float(ret)) idx.append(pd.Timestamp(ts)) vals.append(equity) return pd.Series(vals, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float) def _metrics_for_curve(curve: pd.Series, latest_bar: pd.Timestamp) -> tuple[dict[str, dict[str, float]], dict[str, dict[str, float]], float, int, int]: windows = { label: segment_metrics(curve, latest_bar - pd.Timedelta(days=days), latest_bar) for days, label in WINDOWS } years = { label: segment_metrics(curve, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))) for label, start, end_exclusive in YEAR_PERIODS } years["2026_YTD"] = segment_metrics(curve, YTD_START, latest_bar) score, negative_years, mdd_violations = score_candidate( {label: windows[label] for _, label in WINDOWS}, {label: years[label] for label, _, _ in YEAR_PERIODS}, ) return windows, years, score, negative_years, mdd_violations def _ridge_predict(train_x: np.ndarray, train_y: np.ndarray, test_x: np.ndarray, alpha: float) -> float: if len(train_x) == 0: return 0.0 train_x = np.nan_to_num(train_x, nan=0.0, posinf=0.0, neginf=0.0) train_y = np.nan_to_num(train_y, nan=0.0, posinf=0.0, neginf=0.0) test_x = np.nan_to_num(test_x, nan=0.0, posinf=0.0, neginf=0.0) mean = train_x.mean(axis=0) std = train_x.std(axis=0) std[std < 1e-9] = 1.0 x_train = (train_x - mean) / std x_test = (test_x - mean) / std x_train = np.clip(x_train, -8.0, 8.0) x_test = np.clip(x_test, -8.0, 8.0) train_y = np.clip(train_y, -0.50, 0.50) x_train = np.column_stack([np.ones(len(x_train)), x_train]) x_test = np.concatenate([[1.0], x_test]) penalty = np.eye(x_train.shape[1]) * alpha penalty[0, 0] = 0.0 lhs = x_train.T @ x_train + penalty rhs = x_train.T @ train_y try: beta = np.linalg.solve(lhs, rhs) except np.linalg.LinAlgError: beta = np.linalg.pinv(lhs) @ rhs return float(x_test @ beta) def _build_regime_columns(detail: pd.DataFrame) -> list[str]: regime_dummies = pd.get_dummies(detail["strategic_regime"], prefix="regime", dtype=float) for column in regime_dummies.columns: detail[column] = regime_dummies[column] return sorted(regime_dummies.columns.tolist()) def _build_block_dataset(detail: pd.DataFrame, block_bars: int, regime_columns: list[str]) -> pd.DataFrame: rows: list[dict[str, object]] = [] frame = detail.copy() frame["trailing_current_42"] = frame["current_return"].shift(1).rolling(42, min_periods=6).sum() frame["trailing_relaxed_42"] = frame["relaxed_return"].shift(1).rolling(42, min_periods=6).sum() frame["trailing_diff_42"] = frame["trailing_relaxed_42"] - frame["trailing_current_42"] frame["trailing_core_score_21"] = frame["core_score"].shift(1).rolling(21, min_periods=6).mean() frame["trailing_breadth_21"] = frame["breadth_persist"].shift(1).rolling(21, min_periods=6).mean() frame["trailing_choppy_21"] = frame["choppy_score"].shift(1).rolling(21, min_periods=6).mean() frame["extra_raw"] = np.minimum(frame["current_cash_pct"], np.maximum(frame["relaxed_invested_pct"] - frame["current_invested_pct"], 0.0)) relaxed_unit = np.where(frame["relaxed_invested_pct"] > 1e-9, frame["relaxed_return"] / frame["relaxed_invested_pct"], 0.0) frame["overlay_add_return_full"] = frame["extra_raw"] * relaxed_unit for start in range(0, len(frame), block_bars): block = frame.iloc[start : start + block_bars] if block.empty: continue trigger = block.iloc[0] current_total = float((1.0 + block["current_return"]).prod() - 1.0) relaxed_total = float((1.0 + block["relaxed_return"]).prod() - 1.0) overlay_total = float((1.0 + (block["current_return"] + block["overlay_add_return_full"])).prod() / (1.0 + current_total) - 1.0) row = { "timestamp": trigger["timestamp"], "current_total": current_total, "relaxed_total": relaxed_total, "overlay_total_full": overlay_total, "current_cash_pct": float(trigger["current_cash_pct"]), "relaxed_invested_pct": float(trigger["relaxed_invested_pct"]), "core_score": float(trigger["core_score"]), "breadth_persist": float(trigger["breadth_persist"]), "funding_persist": float(trigger["funding_persist"]), "taker_persist": float(trigger["taker_persist"]), "volume_accel_persist": float(trigger["volume_accel_persist"]), "mean_taker_imbalance": float(trigger["mean_taker_imbalance"]), "taker_imbalance_dispersion": float(trigger["taker_imbalance_dispersion"]), "positive_taker_ratio": float(trigger["positive_taker_ratio"]), "mean_alt_volume_accel": float(trigger["mean_alt_volume_accel"]), "positive_volume_accel_ratio": float(trigger["positive_volume_accel_ratio"]), "funding_dispersion": float(trigger["funding_dispersion"]), "basis_dispersion": float(trigger["basis_dispersion"]), "alt_return_dispersion_7d": float(trigger["alt_return_dispersion_7d"]), "mean_funding_acceleration": float(trigger["mean_funding_acceleration"]), "mean_basis_trend": float(trigger["mean_basis_trend"]), "panic_score": float(trigger["panic_score"]), "choppy_score": float(trigger["choppy_score"]), "distribution_score": float(trigger["distribution_score"]), "trailing_current_42": float(trigger["trailing_current_42"]) if pd.notna(trigger["trailing_current_42"]) else 0.0, "trailing_relaxed_42": float(trigger["trailing_relaxed_42"]) if pd.notna(trigger["trailing_relaxed_42"]) else 0.0, "trailing_diff_42": float(trigger["trailing_diff_42"]) if pd.notna(trigger["trailing_diff_42"]) else 0.0, "trailing_core_score_21": float(trigger["trailing_core_score_21"]) if pd.notna(trigger["trailing_core_score_21"]) else 0.0, "trailing_breadth_21": float(trigger["trailing_breadth_21"]) if pd.notna(trigger["trailing_breadth_21"]) else 0.0, "trailing_choppy_21": float(trigger["trailing_choppy_21"]) if pd.notna(trigger["trailing_choppy_21"]) else 0.0, "block_start_index": int(start), "block_end_index": int(block.index[-1]), } for column in regime_columns: row[column] = float(trigger.get(column, 0.0)) rows.append(row) return pd.DataFrame(rows) def _feature_columns(regime_columns: list[str]) -> list[str]: return [ "core_score", "breadth_persist", "funding_persist", "taker_persist", "volume_accel_persist", "mean_taker_imbalance", "taker_imbalance_dispersion", "positive_taker_ratio", "mean_alt_volume_accel", "positive_volume_accel_ratio", "funding_dispersion", "basis_dispersion", "alt_return_dispersion_7d", "mean_funding_acceleration", "mean_basis_trend", "panic_score", "choppy_score", "distribution_score", "current_cash_pct", "relaxed_invested_pct", "trailing_current_42", "trailing_relaxed_42", "trailing_diff_42", "trailing_core_score_21", "trailing_breadth_21", "trailing_choppy_21", *regime_columns, ] def _simulate_candidate( detail: pd.DataFrame, block_frame: pd.DataFrame, regime_columns: list[str], candidate: LearnedOverlayCandidate, ) -> pd.Series: rows = detail.reset_index(drop=True) features = _feature_columns(regime_columns) returns: list[float] = [] idx: list[pd.Timestamp] = [] for block_idx, block in block_frame.iterrows(): start_idx = int(block["block_start_index"]) end_idx = int(block["block_end_index"]) bar_block = rows.iloc[start_idx : end_idx + 1] use_overlay = False if block_idx >= candidate.train_min_blocks: train_start = max(0, block_idx - candidate.lookback_blocks) train = block_frame.iloc[train_start:block_idx] train_x = train[features].to_numpy(dtype=float) train_y = train["overlay_total_full"].to_numpy(dtype=float) test_x = block[features].to_numpy(dtype=float) pred = _ridge_predict(train_x, train_y, test_x, candidate.ridge_alpha) use_overlay = pred > candidate.prediction_threshold for row in bar_block.itertuples(index=False): extra_add = 0.0 if use_overlay: extra_add = float(getattr(row, "overlay_add_return_full")) * candidate.overlay_scale returns.append(float(getattr(row, "current_return")) + extra_add) idx.append(pd.Timestamp(getattr(row, "timestamp"))) return pd.Series(returns, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float) def _candidate_space() -> list[LearnedOverlayCandidate]: space: list[LearnedOverlayCandidate] = [] for block_bars in (42, 84): for train_min_blocks in (12, 18, 24): for lookback_blocks in (24, 60): if lookback_blocks < train_min_blocks: continue for ridge_alpha in (0.5, 1.0, 5.0, 20.0): for prediction_threshold in (0.0, 0.0010, 0.0025, 0.0050): for overlay_scale in (0.25, 0.50, 0.75, 1.00): space.append( LearnedOverlayCandidate( block_bars=block_bars, train_min_blocks=train_min_blocks, lookback_blocks=lookback_blocks, ridge_alpha=ridge_alpha, prediction_threshold=prediction_threshold, overlay_scale=overlay_scale, ) ) return space def main() -> None: bundle, latest_bar = load_component_bundle(CACHE_PATH) eval_start = latest_bar - pd.Timedelta(days=1825) print("[phase] build current", flush=True) current = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=CURRENT_OVERHEAT_OVERRIDES, ) print("[phase] build relaxed", flush=True) relaxed = build_cash_overlay_period_components( bundle=bundle, eval_start=eval_start, eval_end=latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile, core_filter=BEST_CASH_OVERLAY.core_filter, cap_engine=BEST_CASH_OVERLAY.cap_engine, chop_engine=BEST_CASH_OVERLAY.chop_engine, dist_engine=BEST_CASH_OVERLAY.dist_engine, core_config_overrides=RELAXED_OVERHEAT_OVERRIDES, ) current_detail = _build_strategy_detail(current).rename( columns={ "cash_pct": "current_cash_pct", "invested_pct": "current_invested_pct", "portfolio_return": "current_return", } ) relaxed_detail = _build_strategy_detail(relaxed).rename( columns={ "cash_pct": "relaxed_cash_pct", "invested_pct": "relaxed_invested_pct", "portfolio_return": "relaxed_return", } ) detail = current_detail.merge( relaxed_detail[ [ "timestamp", "relaxed_cash_pct", "relaxed_invested_pct", "relaxed_return", ] ], on="timestamp", how="inner", ) detail["extra_raw"] = np.minimum(detail["current_cash_pct"], np.maximum(detail["relaxed_invested_pct"] - detail["current_invested_pct"], 0.0)) relaxed_unit = np.where(detail["relaxed_invested_pct"] > 1e-9, detail["relaxed_return"] / detail["relaxed_invested_pct"], 0.0) detail["overlay_add_return_full"] = detail["extra_raw"] * relaxed_unit regime_columns = _build_regime_columns(detail) candidates = _candidate_space() rows: list[dict[str, object]] = [] print(f"[phase] learned search {len(candidates)} candidates", flush=True) block_cache: dict[int, pd.DataFrame] = {} for idx, candidate in enumerate(candidates, start=1): block_frame = block_cache.get(candidate.block_bars) if block_frame is None: block_frame = _build_block_dataset(detail, candidate.block_bars, regime_columns) block_cache[candidate.block_bars] = block_frame returns = _simulate_candidate(detail, block_frame, regime_columns, candidate) curve = _curve_from_returns(returns) windows, years, score, negative_years, mdd_violations = _metrics_for_curve(curve, latest_bar) rows.append( { "candidate": asdict(candidate), "name": candidate.name, "score": score, "negative_years": negative_years, "mdd_violations": mdd_violations, "windows": windows, "years": years, } ) if idx % 96 == 0 or idx == len(candidates): print(f"[search] {idx}/{len(candidates)}", flush=True) rows.sort(key=lambda row: float(row["score"]), reverse=True) best = rows[0] payload = { "analysis": "current_relaxed_learned_entry_overlay", "latest_bar": str(latest_bar), "candidate": best["candidate"], "score": best["score"], "negative_years": best["negative_years"], "mdd_violations": best["mdd_violations"], "windows": best["windows"], "years": best["years"], "baselines": _baseline_summary(), "search_top": rows[:10], } OUT_JSON.write_text(json.dumps(payload, indent=2), encoding="utf-8") print(json.dumps(payload, indent=2)) print(f"[saved] {OUT_JSON}", flush=True) if __name__ == "__main__": main()