from __future__ import annotations from dataclasses import asdict, dataclass import pickle from pathlib import Path import pandas as pd from strategy29.backtest.metrics import cagr, max_drawdown, sharpe_ratio from strategy29.common.constants import BTC_SYMBOL from strategy29.common.models import MarketDataBundle from strategy29.data.universe import select_tradeable_universe from strategy32.scripts.run_regime_filter_analysis import build_strategic_regime_frame @dataclass(slots=True) class AdverseRegimeEngineSpec: name: str target_regime: str family: str min_avg_dollar_volume: float = 50_000_000.0 rebalance_bars: int = 6 top_n: int = 2 transaction_cost_pct: float = 0.0015 @dataclass(slots=True) class AdverseRegimeEngineResult: name: str target_regime: str family: str total_return: float cagr: float sharpe: float max_drawdown: float active_bar_ratio: float rebalance_count: int equity_curve: pd.Series def to_payload(self) -> dict[str, object]: return { "name": self.name, "target_regime": self.target_regime, "family": self.family, "total_return": self.total_return, "cagr": self.cagr, "sharpe": self.sharpe, "max_drawdown": self.max_drawdown, "active_bar_ratio": self.active_bar_ratio, "rebalance_count": self.rebalance_count, } def default_engine_specs() -> list[AdverseRegimeEngineSpec]: return [ AdverseRegimeEngineSpec("cap_cash", "CAPITULATION_STRESS", "cash", rebalance_bars=1), AdverseRegimeEngineSpec("cap_btc_rebound", "CAPITULATION_STRESS", "btc_rebound", rebalance_bars=1), AdverseRegimeEngineSpec("cap_alt_panic_rebound", "CAPITULATION_STRESS", "alt_panic_rebound", rebalance_bars=1), AdverseRegimeEngineSpec("cap_funding_snapback_hedged", "CAPITULATION_STRESS", "funding_snapback_hedged", rebalance_bars=1), AdverseRegimeEngineSpec("chop_cash", "CHOPPY_ROTATION", "cash", rebalance_bars=1), AdverseRegimeEngineSpec("chop_pairs_mean_revert", "CHOPPY_ROTATION", "pairs_mean_revert", rebalance_bars=3), AdverseRegimeEngineSpec("chop_quality_rotation", "CHOPPY_ROTATION", "quality_rotation", rebalance_bars=3), AdverseRegimeEngineSpec("chop_carry_only", "CHOPPY_ROTATION", "carry_only", rebalance_bars=6), AdverseRegimeEngineSpec("chop_rs_spread", "CHOPPY_ROTATION", "rs_spread", rebalance_bars=6), AdverseRegimeEngineSpec("chop_btc_hedged_leader", "CHOPPY_ROTATION", "btc_hedged_leader", rebalance_bars=6, top_n=1), AdverseRegimeEngineSpec("chop_carry_strict", "CHOPPY_ROTATION", "carry_only_strict", rebalance_bars=6, top_n=1), AdverseRegimeEngineSpec("chop_inverse_carry", "CHOPPY_ROTATION", "inverse_carry", rebalance_bars=6, top_n=1), AdverseRegimeEngineSpec("chop_inverse_carry_strict", "CHOPPY_ROTATION", "inverse_carry_strict", rebalance_bars=6, top_n=1), AdverseRegimeEngineSpec("dist_cash", "DISTRIBUTION_DRIFT", "cash", rebalance_bars=1), AdverseRegimeEngineSpec("dist_btc_vs_weak_alt", "DISTRIBUTION_DRIFT", "btc_vs_weak_alt", rebalance_bars=3), AdverseRegimeEngineSpec("dist_short_rally", "DISTRIBUTION_DRIFT", "short_rally", rebalance_bars=1), AdverseRegimeEngineSpec("dist_weak_basket_short", "DISTRIBUTION_DRIFT", "weak_basket_short", rebalance_bars=3), AdverseRegimeEngineSpec("dist_relative_weakness_spread", "DISTRIBUTION_DRIFT", "relative_weakness_spread", rebalance_bars=6), AdverseRegimeEngineSpec("dist_btc_rally_short", "DISTRIBUTION_DRIFT", "btc_rally_short", rebalance_bars=1, top_n=1), AdverseRegimeEngineSpec("dist_btc_rally_short_strict", "DISTRIBUTION_DRIFT", "btc_rally_short_strict", rebalance_bars=1, top_n=1), AdverseRegimeEngineSpec("dist_weak_rally_spread", "DISTRIBUTION_DRIFT", "weak_rally_spread", rebalance_bars=3), AdverseRegimeEngineSpec("dist_inverse_carry", "DISTRIBUTION_DRIFT", "inverse_carry", rebalance_bars=6, top_n=1), AdverseRegimeEngineSpec("dist_inverse_carry_strict", "DISTRIBUTION_DRIFT", "inverse_carry_strict", rebalance_bars=6, top_n=1), ] def load_fixed66_cache(path: str | Path) -> tuple[MarketDataBundle, pd.Timestamp, list[str]]: payload = pickle.loads(Path(path).read_bytes()) return payload["bundle"], payload["latest_bar"], list(payload["accepted_symbols"]) class AdverseRegimeResearchHarness: def __init__(self, bundle: MarketDataBundle, latest_bar: pd.Timestamp): self.bundle = bundle self.latest_bar = pd.Timestamp(latest_bar) self.timestamps = sorted(bundle.prices[BTC_SYMBOL]["timestamp"].tolist()) self._regime_frame_cache: dict[pd.Timestamp, pd.DataFrame] = {} self.price_frames = { symbol: df.set_index("timestamp")[["close", "volume"]].sort_index() for symbol, df in bundle.prices.items() } self.funding_frames = { symbol: df.set_index("timestamp")[["funding_rate", "basis"]].sort_index() for symbol, df in bundle.funding.items() } def build_regime_frame(self, eval_start: pd.Timestamp) -> pd.DataFrame: eval_start = pd.Timestamp(eval_start) cached = self._regime_frame_cache.get(eval_start) if cached is not None: return cached raw_start = eval_start - pd.Timedelta(days=90) sliced = MarketDataBundle( prices={symbol: df.loc[df["timestamp"] >= raw_start].copy() for symbol, df in self.bundle.prices.items()}, funding={symbol: df.loc[df["timestamp"] >= raw_start].copy() for symbol, df in self.bundle.funding.items()}, ) frame = build_strategic_regime_frame(sliced, eval_start, self.latest_bar) self._regime_frame_cache[eval_start] = frame return frame def run_engine( self, spec: AdverseRegimeEngineSpec, *, eval_start: pd.Timestamp, initial_capital: float = 1000.0, regime_frame: pd.DataFrame | None = None, ) -> AdverseRegimeEngineResult: regime_frame = self.build_regime_frame(eval_start) if regime_frame is None else regime_frame regime_map = dict(zip(pd.to_datetime(regime_frame["timestamp"]), regime_frame["strategic_regime"])) timestamps = [ts for ts in self.timestamps if ts >= eval_start] if len(timestamps) < 3: raise ValueError("not enough timestamps for adverse regime simulation") equity = initial_capital equity_points = [pd.Timestamp(timestamps[0])] equity_values = [equity] current_weights: dict[str, float] = {} rebalance_count = 0 active_bars = 0 for i in range(1, len(timestamps)): signal_ts = timestamps[i - 1] execution_ts = timestamps[i] if current_weights: bar_ret = self._portfolio_return(current_weights, signal_ts, execution_ts) equity *= max(0.0, 1.0 + bar_ret) target_weights = current_weights regime_name = regime_map.get(signal_ts, "") if regime_name != spec.target_regime: target_weights = {} elif (i - 1) % spec.rebalance_bars == 0: target_weights = self._target_weights(spec, signal_ts) if target_weights: active_bars += 1 turnover = self._turnover(current_weights, target_weights) if turnover > 0: rebalance_count += 1 equity *= max(0.0, 1.0 - turnover * spec.transaction_cost_pct) current_weights = target_weights equity_points.append(pd.Timestamp(execution_ts)) equity_values.append(equity) equity_curve = pd.Series(equity_values, index=pd.Index(equity_points, name="timestamp"), dtype=float) return AdverseRegimeEngineResult( name=spec.name, target_regime=spec.target_regime, family=spec.family, total_return=equity_curve.iloc[-1] / equity_curve.iloc[0] - 1.0, cagr=cagr(equity_curve), sharpe=sharpe_ratio(equity_curve, 6), max_drawdown=max_drawdown(equity_curve), active_bar_ratio=active_bars / max(len(timestamps) - 1, 1), rebalance_count=rebalance_count, equity_curve=equity_curve, ) def target_weights(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: return self._target_weights(spec, pd.Timestamp(timestamp)) def _target_weights(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: if spec.family == "cash": return {} if spec.family == "btc_rebound": return self._cap_btc_rebound(timestamp) if spec.family == "alt_panic_rebound": return self._cap_alt_panic_rebound(spec, timestamp) if spec.family == "funding_snapback_hedged": return self._cap_funding_snapback_hedged(spec, timestamp) if spec.family == "pairs_mean_revert": return self._chop_pairs_mean_revert(spec, timestamp) if spec.family == "quality_rotation": return self._chop_quality_rotation(spec, timestamp) if spec.family == "carry_only": return self._carry_only(spec, timestamp) if spec.family == "rs_spread": return self._chop_rs_spread(spec, timestamp) if spec.family == "btc_hedged_leader": return self._chop_btc_hedged_leader(spec, timestamp) if spec.family == "carry_only_strict": return self._carry_only_strict(spec, timestamp) if spec.family == "inverse_carry": return self._inverse_carry(spec, timestamp, strict=False) if spec.family == "inverse_carry_strict": return self._inverse_carry(spec, timestamp, strict=True) if spec.family == "btc_vs_weak_alt": return self._dist_btc_vs_weak_alt(spec, timestamp) if spec.family == "short_rally": return self._dist_short_rally(spec, timestamp) if spec.family == "weak_basket_short": return self._dist_weak_basket_short(spec, timestamp) if spec.family == "relative_weakness_spread": return self._dist_relative_weakness_spread(spec, timestamp) if spec.family == "btc_rally_short": return self._dist_btc_rally_short(timestamp) if spec.family == "btc_rally_short_strict": return self._dist_btc_rally_short_strict(timestamp) if spec.family == "weak_rally_spread": return self._dist_weak_rally_spread(spec, timestamp) raise ValueError(f"unsupported family: {spec.family}") def _cap_btc_rebound(self, timestamp: pd.Timestamp) -> dict[str, float]: hist = self._price_hist(BTC_SYMBOL, timestamp, 24) if len(hist) < 19: return {} ret_3d = self._return_from_hist(hist, 18) ret_1b = self._return_from_hist(hist, 1) if ret_3d > -0.10 or ret_1b <= 0.0: return {} return {BTC_SYMBOL: 1.0} def _cap_alt_panic_rebound(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: candidates: list[tuple[float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 24) if len(hist) < 19: continue ret_3d = self._return_from_hist(hist, 18) ret_1b = self._return_from_hist(hist, 1) funding = self._latest_funding(symbol, timestamp) if ret_3d > -0.12 or ret_1b <= 0.0: continue score = (-ret_3d) + max(-funding, 0.0) * 200.0 + ret_1b * 2.0 candidates.append((score, symbol)) candidates.sort(reverse=True) symbols = [symbol for _, symbol in candidates[: spec.top_n]] return self._equal_weight(symbols, 1.0) def _cap_funding_snapback_hedged(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: candidates: list[tuple[float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 24) if len(hist) < 19: continue ret_3d = self._return_from_hist(hist, 18) ret_1b = self._return_from_hist(hist, 1) funding = self._latest_funding(symbol, timestamp) if funding >= 0.0 or ret_3d > -0.08 or ret_1b <= 0.0: continue score = max(-funding, 0.0) * 260.0 + (-ret_3d) * 0.6 + ret_1b candidates.append((score, symbol)) candidates.sort(reverse=True) symbols = [symbol for _, symbol in candidates[: spec.top_n]] if not symbols: return {} weights = self._equal_weight(symbols, 0.70) weights[BTC_SYMBOL] = weights.get(BTC_SYMBOL, 0.0) - 0.30 return weights def _chop_pairs_mean_revert(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: rows: list[tuple[float, float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 18) if len(hist) < 7: continue ret_1d = self._return_from_hist(hist, 6) vol = float(hist["close"].pct_change().dropna().tail(12).std(ddof=0)) if vol <= 0 or vol > 0.08: continue rows.append((ret_1d, vol, symbol)) if len(rows) < spec.top_n * 2: return {} rows.sort(key=lambda row: row[0]) longs = [symbol for _, _, symbol in rows[: spec.top_n]] shorts = [symbol for _, _, symbol in rows[-spec.top_n :]] return self._long_short_weights(longs, shorts) def _chop_quality_rotation(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: long_rows: list[tuple[float, str]] = [] short_rows: list[tuple[float, str]] = [] btc_hist = self._price_hist(BTC_SYMBOL, timestamp, 48) if len(btc_hist) < 43: return {} btc_ret_7d = self._return_from_hist(btc_hist, 42) for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 48) if len(hist) < 43: continue ret_7d = self._return_from_hist(hist, 42) ret_1d = self._return_from_hist(hist, 6) rs_7d = ret_7d - btc_ret_7d long_rows.append((rs_7d - ret_1d, symbol)) short_rows.append((-rs_7d + ret_1d, symbol)) long_rows.sort(reverse=True) short_rows.sort(reverse=True) longs = [symbol for _, symbol in long_rows[: spec.top_n]] shorts = [symbol for _, symbol in short_rows[: spec.top_n]] shorts = [symbol for symbol in shorts if symbol not in longs] return self._long_short_weights(longs, shorts[: spec.top_n]) def _carry_only(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: candidates: list[tuple[float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): if symbol not in self.funding_frames: continue f_hist = self.funding_frames[symbol].loc[:timestamp].tail(21) if len(f_hist) < 21: continue mean_funding = float(f_hist["funding_rate"].mean()) basis_vol = float(f_hist["basis"].std(ddof=0)) latest_basis = float(f_hist["basis"].iloc[-1]) expected_edge = mean_funding * 18 + max(latest_basis, 0.0) * 0.35 - 0.0030 - basis_vol * 1.5 if expected_edge <= 0: continue candidates.append((expected_edge, symbol)) candidates.sort(reverse=True) symbols = [symbol for _, symbol in candidates[: spec.top_n]] if not symbols: return {} weight = 1.0 / len(symbols) return {f"carry:{symbol}": weight for symbol in symbols} def _carry_only_strict(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: candidates: list[tuple[float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): if symbol not in self.funding_frames: continue f_hist = self.funding_frames[symbol].loc[:timestamp].tail(21) if len(f_hist) < 21: continue mean_funding = float(f_hist["funding_rate"].mean()) basis_vol = float(f_hist["basis"].std(ddof=0)) latest_basis = float(f_hist["basis"].iloc[-1]) positive_ratio = float((f_hist["funding_rate"] > 0).mean()) expected_edge = mean_funding * 18 + max(latest_basis, 0.0) * 0.35 - 0.0030 - basis_vol * 1.5 if expected_edge <= 0.004 or positive_ratio < 0.75: continue candidates.append((expected_edge, symbol)) candidates.sort(reverse=True) symbols = [symbol for _, symbol in candidates[: spec.top_n]] if not symbols: return {} return {f"carry:{symbol}": 1.0 / len(symbols) for symbol in symbols} def _inverse_carry(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp, *, strict: bool) -> dict[str, float]: candidates: list[tuple[float, str]] = [] min_edge = 0.004 if strict else 0.001 min_negative_ratio = 0.75 if strict else 0.60 for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): if symbol not in self.funding_frames: continue f_hist = self.funding_frames[symbol].loc[:timestamp].tail(21) if len(f_hist) < 21: continue mean_funding = float(f_hist["funding_rate"].mean()) negative_ratio = float((f_hist["funding_rate"] < 0).mean()) basis_vol = float(f_hist["basis"].std(ddof=0)) latest_basis = float(f_hist["basis"].iloc[-1]) expected_edge = (-mean_funding) * 18 + max(-latest_basis, 0.0) * 0.35 - 0.0030 - basis_vol * 1.5 if mean_funding >= 0 or negative_ratio < min_negative_ratio or expected_edge <= min_edge: continue candidates.append((expected_edge, symbol)) candidates.sort(reverse=True) symbols = [symbol for _, symbol in candidates[: spec.top_n]] if not symbols: return {} return {f"inverse_carry:{symbol}": 1.0 / len(symbols) for symbol in symbols} def _chop_rs_spread(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: rows: list[tuple[float, str]] = [] btc_hist = self._price_hist(BTC_SYMBOL, timestamp, 48) if len(btc_hist) < 43: return {} btc_ret_7d = self._return_from_hist(btc_hist, 42) for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 48) if len(hist) < 43: continue ret_7d = self._return_from_hist(hist, 42) ret_1d = self._return_from_hist(hist, 6) vol = float(hist["close"].pct_change().dropna().tail(12).std(ddof=0)) if vol <= 0 or vol > 0.10: continue rows.append((ret_7d - btc_ret_7d - abs(ret_1d) * 0.25, symbol)) if len(rows) < spec.top_n * 2: return {} rows.sort(reverse=True) longs = [symbol for _, symbol in rows[: spec.top_n]] shorts = [symbol for _, symbol in rows[-spec.top_n :]] return self._long_short_weights(longs, shorts) def _chop_btc_hedged_leader(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: btc_hist = self._price_hist(BTC_SYMBOL, timestamp, 48) if len(btc_hist) < 43: return {} btc_ret_7d = self._return_from_hist(btc_hist, 42) rows: list[tuple[float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 48) if len(hist) < 43: continue ret_7d = self._return_from_hist(hist, 42) ret_1d = self._return_from_hist(hist, 6) funding = self._latest_funding(symbol, timestamp) rows.append((ret_7d - btc_ret_7d - max(funding, 0.0) * 80.0 - abs(ret_1d) * 0.15, symbol)) rows.sort(reverse=True) if not rows or rows[0][0] <= 0: return {} leader = rows[0][1] return {leader: 0.70, BTC_SYMBOL: -0.30} def _dist_btc_vs_weak_alt(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: btc_hist = self._price_hist(BTC_SYMBOL, timestamp, 48) if len(btc_hist) < 43: return {} btc_ret_7d = self._return_from_hist(btc_hist, 42) weak_rows: list[tuple[float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 48) if len(hist) < 43: continue ret_7d = self._return_from_hist(hist, 42) ret_3d = self._return_from_hist(hist, 18) weak_rows.append((ret_7d - btc_ret_7d + ret_3d, symbol)) weak_rows.sort() shorts = [symbol for _, symbol in weak_rows[: spec.top_n]] if not shorts: return {} weights = {BTC_SYMBOL: 0.40} short_weight = -0.60 / len(shorts) for symbol in shorts: weights[symbol] = short_weight return weights def _dist_short_rally(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: rows: list[tuple[float, str]] = [] btc_hist = self._price_hist(BTC_SYMBOL, timestamp, 48) if len(btc_hist) < 43: return {} btc_ret_7d = self._return_from_hist(btc_hist, 42) for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 36) if len(hist) < 25: continue ret_7d = self._return_from_hist(hist, 42 if len(hist) > 42 else len(hist) - 1) ret_2b = self._return_from_hist(hist, 2) ema20 = hist["close"].ewm(span=20, adjust=False).mean().iloc[-1] close = float(hist["close"].iloc[-1]) rs = ret_7d - btc_ret_7d if rs >= -0.03 or ret_2b <= 0.0 or close >= float(ema20): continue score = -rs + ret_2b rows.append((score, symbol)) rows.sort(reverse=True) shorts = [symbol for _, symbol in rows[: spec.top_n]] return {symbol: -1.0 / len(shorts) for symbol in shorts} if shorts else {} def _dist_weak_basket_short(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: btc_hist = self._price_hist(BTC_SYMBOL, timestamp, 48) if len(btc_hist) < 43: return {} btc_ret_7d = self._return_from_hist(btc_hist, 42) rows: list[tuple[float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 48) if len(hist) < 43: continue ret_7d = self._return_from_hist(hist, 42) ret_3d = self._return_from_hist(hist, 18) rs = ret_7d - btc_ret_7d rows.append((rs + ret_3d, symbol)) rows.sort() shorts = [symbol for _, symbol in rows[: spec.top_n]] return {symbol: -1.0 / len(shorts) for symbol in shorts} if shorts else {} def _dist_relative_weakness_spread(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: btc_hist = self._price_hist(BTC_SYMBOL, timestamp, 48) if len(btc_hist) < 43: return {} btc_ret_7d = self._return_from_hist(btc_hist, 42) rows: list[tuple[float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 48) if len(hist) < 43: continue ret_7d = self._return_from_hist(hist, 42) ret_3d = self._return_from_hist(hist, 18) rows.append((ret_7d - btc_ret_7d + ret_3d * 0.25, symbol)) if len(rows) < spec.top_n * 2: return {} rows.sort(reverse=True) longs = [symbol for _, symbol in rows[: spec.top_n]] shorts = [symbol for _, symbol in rows[-spec.top_n :]] return self._long_short_weights(longs, shorts) def _dist_btc_rally_short(self, timestamp: pd.Timestamp) -> dict[str, float]: hist = self._price_hist(BTC_SYMBOL, timestamp, 36) if len(hist) < 21: return {} ret_2b = self._return_from_hist(hist, 2) ema20 = hist["close"].ewm(span=20, adjust=False).mean().iloc[-1] close = float(hist["close"].iloc[-1]) if ret_2b <= 0.0 or close >= float(ema20): return {} return {BTC_SYMBOL: -1.0} def _dist_btc_rally_short_strict(self, timestamp: pd.Timestamp) -> dict[str, float]: hist = self._price_hist(BTC_SYMBOL, timestamp, 72) if len(hist) < 43: return {} ret_2b = self._return_from_hist(hist, 2) ret_7d = self._return_from_hist(hist, 42) ema20 = hist["close"].ewm(span=20, adjust=False).mean().iloc[-1] ema50 = hist["close"].ewm(span=50, adjust=False).mean().iloc[-1] close = float(hist["close"].iloc[-1]) if ret_2b < 0.035 or ret_7d > -0.02: return {} if close >= float(ema20) or close >= float(ema50): return {} return {BTC_SYMBOL: -1.0} def _dist_weak_rally_spread(self, spec: AdverseRegimeEngineSpec, timestamp: pd.Timestamp) -> dict[str, float]: btc_hist = self._price_hist(BTC_SYMBOL, timestamp, 48) if len(btc_hist) < 43: return {} btc_ret_7d = self._return_from_hist(btc_hist, 42) strong_rows: list[tuple[float, str]] = [] weak_rows: list[tuple[float, str]] = [] for symbol in self._liquid_symbols(timestamp, spec.min_avg_dollar_volume): hist = self._price_hist(symbol, timestamp, 48) if len(hist) < 43: continue ret_7d = self._return_from_hist(hist, 42) ret_2b = self._return_from_hist(hist, 2) rs = ret_7d - btc_ret_7d strong_rows.append((rs - abs(ret_2b) * 0.1, symbol)) if ret_2b > 0: weak_rows.append((-rs + ret_2b, symbol)) strong_rows.sort(reverse=True) weak_rows.sort(reverse=True) longs = [symbol for _, symbol in strong_rows[: spec.top_n]] shorts = [symbol for _, symbol in weak_rows[: spec.top_n] if symbol not in longs] return self._long_short_weights(longs, shorts[: spec.top_n]) def _liquid_symbols(self, timestamp: pd.Timestamp, min_avg_dollar_volume: float) -> list[str]: return [ symbol for symbol in select_tradeable_universe( self.bundle.prices, timestamp, min_history_bars=120, min_avg_dollar_volume=min_avg_dollar_volume, ) if symbol != BTC_SYMBOL ] def _price_hist(self, symbol: str, timestamp: pd.Timestamp, bars: int) -> pd.DataFrame: return self.price_frames[symbol].loc[:timestamp].tail(bars).reset_index() def _return_from_hist(self, hist: pd.DataFrame, bars_back: int) -> float: if hist.empty: return 0.0 back = min(bars_back, len(hist) - 1) if back <= 0: return 0.0 prev_close = float(hist["close"].iloc[-(back + 1)]) close = float(hist["close"].iloc[-1]) if prev_close <= 0: return 0.0 return close / prev_close - 1.0 def _latest_funding(self, symbol: str, timestamp: pd.Timestamp) -> float: if symbol not in self.funding_frames: return 0.0 hist = self.funding_frames[symbol].loc[:timestamp].tail(1) if hist.empty: return 0.0 return float(hist["funding_rate"].iloc[-1]) def _portfolio_return(self, weights: dict[str, float], prev_ts: pd.Timestamp, ts: pd.Timestamp) -> float: total = 0.0 for symbol, weight in weights.items(): if symbol.startswith("carry:"): total += weight * self._carry_return(symbol.split(":", 1)[1], prev_ts, ts) elif symbol.startswith("inverse_carry:"): total += weight * self._inverse_carry_return(symbol.split(":", 1)[1], prev_ts, ts) else: total += weight * self._price_return(symbol, prev_ts, ts) return total def _price_return(self, symbol: str, prev_ts: pd.Timestamp, ts: pd.Timestamp) -> float: frame = self.price_frames[symbol] if prev_ts not in frame.index or ts not in frame.index: return 0.0 prev_close = float(frame.loc[prev_ts, "close"]) close = float(frame.loc[ts, "close"]) if prev_close <= 0: return 0.0 return close / prev_close - 1.0 def _carry_return(self, symbol: str, prev_ts: pd.Timestamp, ts: pd.Timestamp) -> float: if symbol not in self.funding_frames: return 0.0 frame = self.funding_frames[symbol] if prev_ts not in frame.index or ts not in frame.index: return 0.0 funding_rate = float(frame.loc[ts, "funding_rate"]) basis_change = float(frame.loc[ts, "basis"] - frame.loc[prev_ts, "basis"]) return funding_rate - basis_change def _inverse_carry_return(self, symbol: str, prev_ts: pd.Timestamp, ts: pd.Timestamp) -> float: if symbol not in self.funding_frames: return 0.0 frame = self.funding_frames[symbol] if prev_ts not in frame.index or ts not in frame.index: return 0.0 funding_rate = float(frame.loc[ts, "funding_rate"]) basis_change = float(frame.loc[ts, "basis"] - frame.loc[prev_ts, "basis"]) return -funding_rate + basis_change @staticmethod def _turnover(current: dict[str, float], target: dict[str, float]) -> float: symbols = set(current) | set(target) return sum(abs(target.get(symbol, 0.0) - current.get(symbol, 0.0)) for symbol in symbols) @staticmethod def _equal_weight(symbols: list[str], gross: float) -> dict[str, float]: if not symbols: return {} weight = gross / len(symbols) return {symbol: weight for symbol in symbols} @staticmethod def _long_short_weights(longs: list[str], shorts: list[str]) -> dict[str, float]: weights: dict[str, float] = {} if longs: long_weight = 0.50 / len(longs) for symbol in longs: weights[symbol] = weights.get(symbol, 0.0) + long_weight if shorts: short_weight = -0.50 / len(shorts) for symbol in shorts: weights[symbol] = weights.get(symbol, 0.0) + short_weight return {symbol: weight for symbol, weight in weights.items() if abs(weight) > 1e-9} def run_adverse_regime_search( *, cache_path: str | Path, eval_days: int = 1825, initial_capital: float = 1000.0, ) -> dict[str, object]: bundle, latest_bar, accepted_symbols = load_fixed66_cache(cache_path) harness = AdverseRegimeResearchHarness(bundle, latest_bar) eval_start = pd.Timestamp(latest_bar) - pd.Timedelta(days=eval_days) rows: list[dict[str, object]] = [] by_regime: dict[str, list[dict[str, object]]] = {} for spec in default_engine_specs(): result = harness.run_engine(spec, eval_start=eval_start, initial_capital=initial_capital) payload = result.to_payload() print( spec.target_regime, spec.name, f"ret={float(payload['total_return']) * 100:.2f}%", f"sharpe={float(payload['sharpe']):.2f}", f"mdd={float(payload['max_drawdown']) * 100:.2f}%", flush=True, ) rows.append(payload) by_regime.setdefault(spec.target_regime, []).append(payload) for regime_rows in by_regime.values(): regime_rows.sort(key=lambda row: (float(row["total_return"]), float(row["sharpe"]), -abs(float(row["max_drawdown"]))), reverse=True) return { "analysis": "adverse_regime_engine_search", "latest_completed_bar": str(latest_bar), "accepted_symbols": accepted_symbols, "eval_days": eval_days, "initial_capital": initial_capital, "results": rows, "by_regime": by_regime, }