Files
strategy32/research/hybrid_regime.py

327 lines
12 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
import json
import pickle
from pathlib import Path
import pandas as pd
from strategy29.backtest.metrics import cagr, max_drawdown, sharpe_ratio
from strategy29.backtest.window_analysis import slice_bundle
from strategy29.common.models import AllocationDecision, BacktestResult, MarketDataBundle
from strategy32.backtest.simulator import Strategy32Backtester, Strategy32MomentumCarryBacktester, build_engine_config
from strategy32.config import PROFILE_V7_DEFAULT, build_strategy32_config
from strategy32.research.adverse_regime import AdverseRegimeResearchHarness, default_engine_specs
from strategy32.scripts.run_regime_filter_analysis import build_strategic_regime_frame
STATIC_FILTERS: dict[str, dict[str, float]] = {
"prev_balanced": {
"universe_min_avg_dollar_volume": 50_000_000.0,
"momentum_min_score": 0.60,
"momentum_min_relative_strength": 0.00,
"momentum_min_7d_return": 0.00,
"max_pairwise_correlation": 0.70,
"carry_min_expected_edge": 0.0,
},
"guarded_positive": {
"universe_min_avg_dollar_volume": 50_000_000.0,
"momentum_min_score": 0.60,
"momentum_min_relative_strength": 0.00,
"momentum_min_7d_return": 0.00,
"momentum_max_7d_return": 0.35,
"momentum_min_positive_bar_ratio": 0.52,
"momentum_max_short_volatility": 0.075,
"momentum_max_beta": 2.50,
"momentum_max_latest_funding_rate": 0.00045,
"max_pairwise_correlation": 0.70,
"carry_min_expected_edge": 0.0,
},
"overheat_tolerant": {
"universe_min_avg_dollar_volume": 100_000_000.0,
"momentum_min_score": 0.60,
"momentum_min_relative_strength": -0.02,
"momentum_min_7d_return": 0.02,
"max_pairwise_correlation": 0.78,
"carry_min_expected_edge": 0.0,
},
"guarded_euphoria": {
"universe_min_avg_dollar_volume": 100_000_000.0,
"momentum_min_score": 0.62,
"momentum_min_relative_strength": -0.01,
"momentum_min_7d_return": 0.02,
"momentum_max_7d_return": 0.28,
"momentum_min_positive_bar_ratio": 0.55,
"momentum_max_short_volatility": 0.070,
"momentum_max_beta": 2.20,
"momentum_max_latest_funding_rate": 0.00035,
"max_pairwise_correlation": 0.72,
"carry_min_expected_edge": 0.0,
},
}
STATIC_FILTER_ATTRS = tuple(sorted({key for overrides in STATIC_FILTERS.values() for key in overrides}))
STATIC_COMPONENT_MAP = {
"MOMENTUM_EXPANSION": "prev_balanced",
"EUPHORIC_BREAKOUT": "overheat_tolerant",
}
ADVERSE_COMPONENT_MAP = {
"CAPITULATION_STRESS": "cap_btc_rebound",
"CHOPPY_ROTATION": "chop_inverse_carry_strict",
"DISTRIBUTION_DRIFT": "dist_inverse_carry_strict",
}
ADVERSE_REGIMES = {"CAPITULATION_STRESS", "CHOPPY_ROTATION", "DISTRIBUTION_DRIFT"}
@dataclass(slots=True)
class HybridWindowResult:
label: str
start: pd.Timestamp
end: pd.Timestamp
total_return: float
annualized_return: float
sharpe: float
max_drawdown: float
component_map: dict[str, str]
def to_payload(self) -> dict[str, object]:
return {
"start": str(self.start),
"end": str(self.end),
"total_return": self.total_return,
"annualized_return": self.annualized_return,
"sharpe": self.sharpe,
"max_drawdown": self.max_drawdown,
"component_map": self.component_map,
}
class StrategicRegimeFilterBacktester(Strategy32MomentumCarryBacktester):
def __init__(
self,
strategy_config,
data: MarketDataBundle,
*,
trade_start: pd.Timestamp,
strategic_regime_map: dict[pd.Timestamp, str],
active_regime: str,
default_filter_name: str,
filter_plan: dict[pd.Timestamp, str] | None = None,
):
self._strategic_regime_map = strategic_regime_map
self._active_regime = active_regime
self._default_filter_name = default_filter_name
self._filter_plan = filter_plan or {}
super().__init__(strategy_config, data, trade_start=trade_start)
def _govern_decision(
self,
decision: AllocationDecision,
*,
signal_timestamp: pd.Timestamp,
current_equity: float,
equity_history: list[float],
) -> AllocationDecision:
governed = super()._govern_decision(
decision,
signal_timestamp=signal_timestamp,
current_equity=current_equity,
equity_history=equity_history,
)
if self._strategic_regime_map.get(signal_timestamp) != self._active_regime:
return AllocationDecision(
regime=governed.regime,
momentum_budget_pct=0.0,
carry_budget_pct=0.0,
spread_budget_pct=0.0,
cash_budget_pct=1.0,
)
return governed
def _rebalance(
self,
portfolio,
signal_timestamp: pd.Timestamp,
execution_timestamp: pd.Timestamp,
decision: AllocationDecision,
rebalance_momentum: bool,
rebalance_carry: bool,
rebalance_spread: bool,
) -> list:
originals = {attr: getattr(self.strategy_config, attr) for attr in STATIC_FILTER_ATTRS}
try:
filter_name = self._filter_plan.get(signal_timestamp, self._default_filter_name)
for attr, value in STATIC_FILTERS[filter_name].items():
setattr(self.strategy_config, attr, value)
return super()._rebalance(
portfolio,
signal_timestamp,
execution_timestamp,
decision,
rebalance_momentum,
rebalance_carry,
rebalance_spread,
)
finally:
for attr, value in originals.items():
setattr(self.strategy_config, attr, value)
def load_fixed66_bundle(path: str | Path) -> tuple[MarketDataBundle, pd.Timestamp]:
payload = pickle.loads(Path(path).read_bytes())
return payload["bundle"], pd.Timestamp(payload["latest_bar"])
def _run_static_component_curve(
*,
sliced: MarketDataBundle,
latest_bar: pd.Timestamp,
eval_start: pd.Timestamp,
regime_map: dict[pd.Timestamp, str],
active_regime: str,
filter_name: str,
filter_plan: dict[pd.Timestamp, str] | None = None,
) -> pd.Series:
cfg = build_strategy32_config(PROFILE_V7_DEFAULT, **STATIC_FILTERS[filter_name])
backtester = StrategicRegimeFilterBacktester(
cfg,
sliced,
trade_start=eval_start,
strategic_regime_map=regime_map,
active_regime=active_regime,
default_filter_name=filter_name,
filter_plan=filter_plan,
)
backtester.config.initial_capital = build_engine_config().initial_capital
result = backtester.run()
return result.equity_curve.loc[result.equity_curve.index >= eval_start]
def _run_adverse_component_curve(
*,
eval_start: pd.Timestamp,
engine_name: str,
harness: AdverseRegimeResearchHarness,
regime_frame: pd.DataFrame,
) -> pd.Series:
spec = next(spec for spec in default_engine_specs() if spec.name == engine_name)
result = harness.run_engine(spec, eval_start=eval_start, initial_capital=1000.0, regime_frame=regime_frame)
return result.equity_curve.loc[result.equity_curve.index >= eval_start]
def _curve_returns(curve: pd.Series) -> pd.Series:
return curve.pct_change().fillna(0.0)
def _annualized_return(total_return: float, days: int) -> float:
if days <= 0:
return 0.0
return (1.0 + total_return) ** (365.0 / days) - 1.0
def _build_positive_filter_plan(regime_frame: pd.DataFrame, active_regime: str) -> dict[pd.Timestamp, str]:
frame = regime_frame.sort_values("timestamp").copy()
frame["is_adverse"] = frame["strategic_regime"].isin(ADVERSE_REGIMES).astype(float)
frame["recent_adverse_share"] = frame["is_adverse"].rolling(18, min_periods=1).mean()
plan: dict[pd.Timestamp, str] = {}
for row in frame.itertuples(index=False):
ts = pd.Timestamp(row.timestamp)
if active_regime == "MOMENTUM_EXPANSION":
guarded = float(row.recent_adverse_share) >= 0.40 or float(row.breadth_persist) < 0.58
plan[ts] = "guarded_positive" if guarded else "prev_balanced"
elif active_regime == "EUPHORIC_BREAKOUT":
guarded = float(row.recent_adverse_share) >= 0.25 or float(row.funding_persist) < 0.72
plan[ts] = "guarded_euphoria" if guarded else "overheat_tolerant"
return plan
def run_hybrid_backtest(
*,
cache_path: str | Path = "/tmp/strategy32_fixed66_bundle.pkl",
windows: tuple[tuple[int, str], ...] = ((365, "1y"), (730, "2y"), (1095, "3y"), (1460, "4y"), (1825, "5y")),
) -> dict[str, object]:
bundle, latest_bar = load_fixed66_bundle(cache_path)
payload: dict[str, object] = {
"analysis": "fixed66_hybrid_regime_backtest",
"latest_completed_bar": str(latest_bar),
"static_component_map": STATIC_COMPONENT_MAP,
"adverse_component_map": ADVERSE_COMPONENT_MAP,
"results": {},
}
for days, label in windows:
eval_start = latest_bar - pd.Timedelta(days=days)
raw_start = eval_start - pd.Timedelta(days=90)
sliced = slice_bundle(bundle, raw_start, latest_bar)
regime_frame = build_strategic_regime_frame(sliced, eval_start, latest_bar)
regime_map = dict(zip(pd.to_datetime(regime_frame["timestamp"]), regime_frame["strategic_regime"]))
harness = AdverseRegimeResearchHarness(sliced, latest_bar)
component_curves: dict[str, pd.Series] = {}
for regime_name, filter_name in STATIC_COMPONENT_MAP.items():
filter_plan = _build_positive_filter_plan(regime_frame, regime_name)
component_curves[regime_name] = _run_static_component_curve(
sliced=sliced,
latest_bar=latest_bar,
eval_start=eval_start,
regime_map=regime_map,
active_regime=regime_name,
filter_name=filter_name,
filter_plan=filter_plan,
)
for regime_name, engine_name in ADVERSE_COMPONENT_MAP.items():
component_curves[regime_name] = _run_adverse_component_curve(
eval_start=eval_start,
engine_name=engine_name,
harness=harness,
regime_frame=regime_frame,
)
return_frames = {name: _curve_returns(curve) for name, curve in component_curves.items()}
timestamps = sorted(sliced.prices["BTC"]["timestamp"].loc[sliced.prices["BTC"]["timestamp"] >= eval_start].tolist())
equity = 1000.0
equity_idx = [timestamps[0]]
equity_values = [equity]
for i in range(1, len(timestamps)):
signal_ts = timestamps[i - 1]
execution_ts = timestamps[i]
regime_name = regime_map.get(signal_ts, "")
ret = float(return_frames.get(regime_name, pd.Series(dtype=float)).get(execution_ts, 0.0))
equity *= max(0.0, 1.0 + ret)
equity_idx.append(execution_ts)
equity_values.append(equity)
equity_curve = pd.Series(equity_values, index=pd.Index(equity_idx, name="timestamp"), dtype=float)
total_return = float(equity_curve.iloc[-1] / equity_curve.iloc[0] - 1.0)
payload["results"][label] = HybridWindowResult(
label=label,
start=pd.Timestamp(eval_start),
end=pd.Timestamp(latest_bar),
total_return=total_return,
annualized_return=_annualized_return(total_return, days),
sharpe=sharpe_ratio(equity_curve, 6),
max_drawdown=max_drawdown(equity_curve),
component_map={
**{regime: filter_name for regime, filter_name in STATIC_COMPONENT_MAP.items()},
**{regime: engine_name for regime, engine_name in ADVERSE_COMPONENT_MAP.items()},
},
).to_payload()
print(
label,
f"ret={total_return * 100:.2f}%",
f"ann={payload['results'][label]['annualized_return'] * 100:.2f}%",
f"sharpe={payload['results'][label]['sharpe']:.2f}",
f"mdd={payload['results'][label]['max_drawdown'] * 100:.2f}%",
flush=True,
)
return payload
def write_hybrid_backtest(out_path: str | Path = "/tmp/strategy32_hybrid_regime_backtest.json") -> Path:
payload = run_hybrid_backtest()
out = Path(out_path)
out.write_text(json.dumps(payload, indent=2), encoding="utf-8")
return out