Files
strategy32/research/soft_router.py

849 lines
33 KiB
Python

from __future__ import annotations
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import asdict, dataclass
import pandas as pd
from strategy29.backtest.metrics import max_drawdown, sharpe_ratio
from strategy29.backtest.window_analysis import slice_bundle
from strategy32.research.adverse_regime import AdverseRegimeResearchHarness, default_engine_specs
from strategy32.research.hybrid_regime import (
STATIC_FILTERS,
_curve_returns,
_run_adverse_component_curve,
load_fixed66_bundle,
)
from strategy32.backtest.simulator import Strategy32Backtester
from strategy32.config import PROFILE_V7_DEFAULT, build_strategy32_config
from strategy32.scripts.run_regime_filter_analysis import STRATEGIC_REGIME_PROFILES, build_strategic_regime_frame
@dataclass(frozen=True, slots=True)
class SoftRouterCandidate:
regime_profile: str
core_filter: str
cap_engine: str
chop_engine: str
dist_engine: str
core_floor: float
cap_max_weight: float
chop_max_weight: float
dist_max_weight: float
chop_blend_floor: float
@property
def name(self) -> str:
return (
f"{self.regime_profile}"
f"|core:{self.core_filter}"
f"|cap:{self.cap_engine}"
f"|chop:{self.chop_engine}"
f"|dist:{self.dist_engine}"
f"|floor:{self.core_floor:.2f}"
f"|capw:{self.cap_max_weight:.2f}"
f"|chopw:{self.chop_max_weight:.2f}"
f"|distw:{self.dist_max_weight:.2f}"
f"|chopf:{self.chop_blend_floor:.2f}"
)
@dataclass(frozen=True, slots=True)
class CashOverlayCandidate:
regime_profile: str
core_filter: str
cap_engine: str
chop_engine: str
dist_engine: str
cap_cash_weight: float
chop_cash_weight: float
dist_cash_weight: float
cap_threshold: float
chop_threshold: float
dist_threshold: float
core_block_threshold: float
@property
def name(self) -> str:
return (
f"{self.regime_profile}"
f"|core:{self.core_filter}"
f"|cap:{self.cap_engine}"
f"|chop:{self.chop_engine}"
f"|dist:{self.dist_engine}"
f"|capcw:{self.cap_cash_weight:.2f}"
f"|chopcw:{self.chop_cash_weight:.2f}"
f"|distcw:{self.dist_cash_weight:.2f}"
f"|capth:{self.cap_threshold:.2f}"
f"|chopth:{self.chop_threshold:.2f}"
f"|distth:{self.dist_threshold:.2f}"
f"|block:{self.core_block_threshold:.2f}"
)
@dataclass(frozen=True, slots=True)
class MacroScaleSpec:
floor: float
close_gap_start: float
close_gap_full: float
fast_gap_start: float
fast_gap_full: float
close_weight: float = 0.60
fast_weeks: int = 10
slow_weeks: int = 30
@property
def name(self) -> str:
return (
f"floor:{self.floor:.2f}"
f"|close:{self.close_gap_start:.3f}->{self.close_gap_full:.3f}"
f"|fast:{self.fast_gap_start:.3f}->{self.fast_gap_full:.3f}"
f"|w:{self.close_weight:.2f}"
)
WINDOWS = (
(365, "1y"),
(730, "2y"),
(1095, "3y"),
(1460, "4y"),
(1825, "5y"),
)
YEAR_PERIODS = (
("2021", pd.Timestamp("2021-03-16 04:00:00+00:00"), pd.Timestamp("2022-01-01 00:00:00+00:00")),
("2022", pd.Timestamp("2022-01-01 00:00:00+00:00"), pd.Timestamp("2023-01-01 00:00:00+00:00")),
("2023", pd.Timestamp("2023-01-01 00:00:00+00:00"), pd.Timestamp("2024-01-01 00:00:00+00:00")),
("2024", pd.Timestamp("2024-01-01 00:00:00+00:00"), pd.Timestamp("2025-01-01 00:00:00+00:00")),
("2025", pd.Timestamp("2025-01-01 00:00:00+00:00"), pd.Timestamp("2026-01-01 00:00:00+00:00")),
)
YTD_START = pd.Timestamp("2026-01-01 00:00:00+00:00")
def _clip01(value: float) -> float:
return min(max(float(value), 0.0), 1.0)
def _ramp(value: float, start: float, end: float) -> float:
if end == start:
return 1.0 if value >= end else 0.0
if value <= start:
return 0.0
if value >= end:
return 1.0
return (value - start) / (end - start)
def _inverse_ramp(value: float, start: float, end: float) -> float:
if end == start:
return 1.0 if value <= end else 0.0
if value >= start:
return 0.0
if value <= end:
return 1.0
return (start - value) / (start - end)
def build_regime_score_frame(
bundle,
eval_start: pd.Timestamp,
eval_end: pd.Timestamp,
*,
profile_name: str,
) -> pd.DataFrame:
profile = STRATEGIC_REGIME_PROFILES[profile_name]
frame = build_strategic_regime_frame(bundle, eval_start, eval_end, profile=profile).copy()
panic_scores: list[float] = []
euphoria_scores: list[float] = []
expansion_scores: list[float] = []
distribution_scores: list[float] = []
choppy_scores: list[float] = []
core_scores: list[float] = []
for row in frame.itertuples(index=False):
breadth = float(row.breadth)
breadth_persist = float(row.breadth_persist) if pd.notna(row.breadth_persist) else breadth
atr = float(row.atr_pct) if pd.notna(row.atr_pct) else 0.0
bar_ret = float(row.bar_return) if pd.notna(row.bar_return) else 0.0
daily_gap = float(row.daily_trend_gap) if pd.notna(row.daily_trend_gap) else 0.0
intra_gap = float(row.intraday_trend_gap) if pd.notna(row.intraday_trend_gap) else 0.0
avg_funding = float(row.mean_alt_funding)
positive_ratio = float(row.positive_funding_ratio)
funding_persist = float(row.funding_persist) if pd.notna(row.funding_persist) else positive_ratio
btc_7d = float(row.btc_7d_return)
panic_score = max(
_ramp(atr, profile.panic_atr * 0.85, profile.panic_atr * 1.35),
_ramp(-bar_ret, abs(profile.panic_bar_return) * 0.75, abs(profile.panic_bar_return) * 1.35),
min(
_ramp(profile.panic_breadth - breadth, 0.0, max(profile.panic_breadth, 0.15)),
_ramp(profile.panic_funding - avg_funding, 0.0, abs(profile.panic_funding) + 0.00015),
),
)
euphoria_components = [
_ramp(daily_gap, profile.euphoria_daily_gap * 0.75, profile.euphoria_daily_gap * 1.6),
_ramp(intra_gap, profile.euphoria_intraday_gap * 0.6, profile.euphoria_intraday_gap * 1.8),
_ramp(breadth, profile.euphoria_breadth - 0.05, min(profile.euphoria_breadth + 0.12, 0.95)),
_ramp(breadth_persist, profile.euphoria_breadth_persist - 0.06, min(profile.euphoria_breadth_persist + 0.12, 0.95)),
_ramp(positive_ratio, profile.euphoria_positive_ratio - 0.08, min(profile.euphoria_positive_ratio + 0.12, 0.98)),
_ramp(funding_persist, profile.euphoria_funding_persist - 0.08, min(profile.euphoria_funding_persist + 0.12, 0.98)),
max(
_ramp(avg_funding, profile.euphoria_funding * 0.5, max(profile.euphoria_funding * 2.0, profile.euphoria_funding + 0.00008)),
_ramp(btc_7d, profile.euphoria_btc_7d * 0.6, max(profile.euphoria_btc_7d * 1.8, profile.euphoria_btc_7d + 0.08)),
),
]
euphoria_score = sum(euphoria_components) / len(euphoria_components)
expansion_components = [
_ramp(daily_gap, max(profile.expansion_daily_gap - 0.02, -0.02), profile.expansion_daily_gap + 0.06),
_ramp(intra_gap, profile.expansion_intraday_gap - 0.01, profile.expansion_intraday_gap + 0.05),
_ramp(breadth, profile.expansion_breadth - 0.06, min(profile.expansion_breadth + 0.14, 0.92)),
_ramp(breadth_persist, profile.expansion_breadth_persist - 0.06, min(profile.expansion_breadth_persist + 0.14, 0.92)),
_inverse_ramp(atr, profile.expansion_atr * 1.10, max(profile.expansion_atr * 0.60, 0.015)),
_ramp(avg_funding, profile.expansion_min_funding - 0.00005, profile.expansion_min_funding + 0.00015),
_ramp(btc_7d, profile.expansion_btc_7d - 0.04, profile.expansion_btc_7d + 0.10),
]
expansion_score = sum(expansion_components) / len(expansion_components)
expansion_score *= 1.0 - 0.55 * euphoria_score
distribution_components = [
max(
_ramp(profile.distribution_daily_gap - daily_gap, 0.0, abs(profile.distribution_daily_gap) + 0.05),
_ramp(profile.distribution_intraday_gap - intra_gap, 0.0, abs(profile.distribution_intraday_gap) + 0.04),
),
_ramp(profile.distribution_breadth - breadth, 0.0, max(profile.distribution_breadth, 0.18)),
_ramp(profile.distribution_positive_ratio - positive_ratio, 0.0, max(profile.distribution_positive_ratio, 0.18)),
_ramp(-avg_funding, 0.0, 0.00020),
]
distribution_score = sum(distribution_components) / len(distribution_components)
distribution_score *= 1.0 - 0.35 * panic_score
trendlessness = 1.0 - max(
_clip01(abs(daily_gap) / max(profile.euphoria_daily_gap, 0.03)),
_clip01(abs(intra_gap) / max(profile.euphoria_intraday_gap, 0.015)),
)
centered_breadth = 1.0 - min(abs(breadth - 0.5) / 0.30, 1.0)
funding_neutral = 1.0 - min(abs(avg_funding) / 0.00012, 1.0)
choppy_score = (trendlessness + centered_breadth + funding_neutral) / 3.0
choppy_score *= 1.0 - max(euphoria_score, expansion_score, distribution_score, panic_score) * 0.65
choppy_score = max(choppy_score, 0.0)
core_score = max(expansion_score, euphoria_score)
panic_scores.append(_clip01(panic_score))
euphoria_scores.append(_clip01(euphoria_score))
expansion_scores.append(_clip01(expansion_score))
distribution_scores.append(_clip01(distribution_score))
choppy_scores.append(_clip01(choppy_score))
core_scores.append(_clip01(core_score))
frame["panic_score"] = panic_scores
frame["euphoria_score"] = euphoria_scores
frame["expansion_score"] = expansion_scores
frame["distribution_score"] = distribution_scores
frame["choppy_score"] = choppy_scores
frame["core_score"] = core_scores
return frame
def _annualized_return(total_return: float, days: int) -> float:
if days <= 0:
return 0.0
return (1.0 + total_return) ** (365.0 / days) - 1.0
def segment_metrics(curve: pd.Series, start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float]:
segment = curve.loc[(curve.index >= start) & (curve.index <= end)].copy()
if len(segment) < 2:
return {
"start": str(start),
"end": str(end),
"total_return": 0.0,
"annualized_return": 0.0,
"sharpe": 0.0,
"max_drawdown": 0.0,
}
base = float(segment.iloc[0])
if base <= 0:
return {
"start": str(start),
"end": str(end),
"total_return": 0.0,
"annualized_return": 0.0,
"sharpe": 0.0,
"max_drawdown": 0.0,
}
normalized = segment / base * 1000.0
total_return = float(normalized.iloc[-1] / normalized.iloc[0] - 1.0)
days = max(int((end - start) / pd.Timedelta(days=1)), 1)
return {
"start": str(start),
"end": str(end),
"total_return": total_return,
"annualized_return": _annualized_return(total_return, days),
"sharpe": sharpe_ratio(normalized, 6),
"max_drawdown": max_drawdown(normalized),
}
def score_candidate(window_results: dict[str, dict[str, float]], year_results: dict[str, dict[str, float]]) -> tuple[float, int, int]:
year_returns = [float(metrics["total_return"]) for metrics in year_results.values()]
negative_years = sum(ret < 0 for ret in year_returns)
mdd_violations = sum(float(metrics["max_drawdown"]) < -0.20 for metrics in window_results.values())
score = 0.0
score += 4.5 * float(window_results["5y"]["annualized_return"])
score += 2.0 * float(window_results["1y"]["annualized_return"])
score += 1.4 * float(window_results["2y"]["annualized_return"])
score += 1.0 * float(window_results["4y"]["annualized_return"])
score += 0.6 * float(window_results["3y"]["annualized_return"])
score += 1.3 * float(window_results["5y"]["sharpe"])
score += 0.6 * float(window_results["1y"]["sharpe"])
score += 2.5 * min(year_returns)
score += 0.7 * sum(max(ret, 0.0) for ret in year_returns)
score -= 3.25 * negative_years
score -= 0.9 * mdd_violations
for label in ("1y", "2y", "3y", "4y", "5y"):
score -= max(0.0, abs(float(window_results[label]["max_drawdown"])) - 0.20) * 5.0
return score, negative_years, mdd_violations
def load_component_bundle(cache_path: str | None = None) -> tuple[object, pd.Timestamp]:
return load_fixed66_bundle(cache_path or "/tmp/strategy32_fixed66_bundle.pkl")
def compose_soft_router_curve(
*,
timestamps: list[pd.Timestamp],
score_frame: pd.DataFrame,
core_returns: pd.Series,
cap_returns: pd.Series,
chop_returns: pd.Series,
dist_returns: pd.Series,
candidate: SoftRouterCandidate,
) -> tuple[pd.Series, pd.DataFrame]:
score_map = score_frame.set_index("timestamp")[
["core_score", "panic_score", "choppy_score", "distribution_score"]
].sort_index()
equity = 1000.0
idx = [timestamps[0]]
vals = [equity]
rows: list[dict[str, float | str]] = []
for i in range(1, len(timestamps)):
signal_ts = pd.Timestamp(timestamps[i - 1])
execution_ts = pd.Timestamp(timestamps[i])
score_row = score_map.loc[signal_ts] if signal_ts in score_map.index else None
if score_row is None:
core_score = panic_score = choppy_score = distribution_score = 0.0
else:
core_score = float(score_row["core_score"])
panic_score = float(score_row["panic_score"])
choppy_score = float(score_row["choppy_score"])
distribution_score = float(score_row["distribution_score"])
cap_weight = candidate.cap_max_weight * panic_score
dist_weight = candidate.dist_max_weight * distribution_score * (1.0 - 0.60 * panic_score)
chop_signal = max(choppy_score, candidate.chop_blend_floor * (1.0 - core_score))
chop_weight = candidate.chop_max_weight * chop_signal * (1.0 - 0.45 * panic_score)
overlay_weight = cap_weight + dist_weight + chop_weight
if overlay_weight > 0.90:
scale = 0.90 / overlay_weight
cap_weight *= scale
dist_weight *= scale
chop_weight *= scale
overlay_weight = 0.90
core_target = candidate.core_floor + (1.0 - candidate.core_floor) * core_score
core_weight = max(0.0, core_target * (1.0 - overlay_weight))
total_weight = core_weight + cap_weight + chop_weight + dist_weight
if total_weight > 1.0:
scale = 1.0 / total_weight
core_weight *= scale
cap_weight *= scale
chop_weight *= scale
dist_weight *= scale
bar_ret = (
core_weight * float(core_returns.get(execution_ts, 0.0))
+ cap_weight * float(cap_returns.get(execution_ts, 0.0))
+ chop_weight * float(chop_returns.get(execution_ts, 0.0))
+ dist_weight * float(dist_returns.get(execution_ts, 0.0))
)
equity *= max(0.0, 1.0 + bar_ret)
idx.append(execution_ts)
vals.append(equity)
rows.append(
{
"timestamp": execution_ts,
"core_weight": core_weight,
"cap_weight": cap_weight,
"chop_weight": chop_weight,
"dist_weight": dist_weight,
"cash_weight": max(0.0, 1.0 - core_weight - cap_weight - chop_weight - dist_weight),
"core_score": core_score,
"panic_score": panic_score,
"choppy_score": choppy_score,
"distribution_score": distribution_score,
"portfolio_return": bar_ret,
}
)
curve = pd.Series(vals, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float)
weights = pd.DataFrame(rows)
return curve, weights
def build_period_components(
*,
bundle,
eval_start: pd.Timestamp,
eval_end: pd.Timestamp,
profile_name: str,
core_filter: str,
cap_engine: str,
chop_engine: str,
dist_engine: str,
) -> dict[str, object]:
raw_start = eval_start - pd.Timedelta(days=90)
sliced = slice_bundle(bundle, raw_start, eval_end)
score_frame = build_regime_score_frame(sliced, eval_start, eval_end, profile_name=profile_name)
regime_frame = score_frame.copy()
harness = AdverseRegimeResearchHarness(sliced, eval_end)
cfg = build_strategy32_config(PROFILE_V7_DEFAULT, **STATIC_FILTERS[core_filter])
backtester = Strategy32Backtester(cfg, sliced, trade_start=eval_start)
backtester.engine_config.initial_capital = 1000.0
core_curve = backtester.run().equity_curve.loc[lambda s: s.index >= eval_start]
cap_curve = _run_adverse_component_curve(
eval_start=eval_start,
engine_name=cap_engine,
harness=harness,
regime_frame=regime_frame,
)
chop_curve = _run_adverse_component_curve(
eval_start=eval_start,
engine_name=chop_engine,
harness=harness,
regime_frame=regime_frame,
)
dist_curve = _run_adverse_component_curve(
eval_start=eval_start,
engine_name=dist_engine,
harness=harness,
regime_frame=regime_frame,
)
timestamps = sorted(sliced.prices["BTC"]["timestamp"].loc[sliced.prices["BTC"]["timestamp"] >= eval_start].tolist())
return {
"score_frame": score_frame,
"timestamps": timestamps,
"core_returns": _curve_returns(core_curve),
"cap_returns": _curve_returns(cap_curve),
"chop_returns": _curve_returns(chop_curve),
"dist_returns": _curve_returns(dist_curve),
}
def build_cash_overlay_period_components(
*,
bundle,
eval_start: pd.Timestamp,
eval_end: pd.Timestamp,
profile_name: str,
core_filter: str,
cap_engine: str,
chop_engine: str,
dist_engine: str,
core_config_overrides: dict[str, object] | None = None,
macro_scale_spec: MacroScaleSpec | None = None,
) -> dict[str, object]:
raw_start = eval_start - pd.Timedelta(days=365 if macro_scale_spec is not None else 90)
sliced = slice_bundle(bundle, raw_start, eval_end)
score_frame = build_regime_score_frame(sliced, eval_start, eval_end, profile_name=profile_name)
regime_frame = score_frame.copy()
harness = AdverseRegimeResearchHarness(sliced, eval_end)
core_config = dict(STATIC_FILTERS[core_filter])
core_config.update(core_config_overrides or {})
cfg = build_strategy32_config(PROFILE_V7_DEFAULT, **core_config)
backtester = Strategy32Backtester(cfg, sliced, trade_start=eval_start)
backtester.engine_config.initial_capital = 1000.0
core_result = backtester.run()
core_curve = core_result.equity_curve.loc[lambda s: s.index >= eval_start]
exposure_frame = pd.DataFrame(core_result.metadata.get("exposure_rows", []))
if not exposure_frame.empty:
exposure_frame = exposure_frame.loc[exposure_frame["timestamp"] >= eval_start].copy()
cap_curve = _run_adverse_component_curve(
eval_start=eval_start,
engine_name=cap_engine,
harness=harness,
regime_frame=regime_frame,
)
chop_curve = _run_adverse_component_curve(
eval_start=eval_start,
engine_name=chop_engine,
harness=harness,
regime_frame=regime_frame,
)
dist_curve = _run_adverse_component_curve(
eval_start=eval_start,
engine_name=dist_engine,
harness=harness,
regime_frame=regime_frame,
)
timestamps = sorted(sliced.prices["BTC"]["timestamp"].loc[sliced.prices["BTC"]["timestamp"] >= eval_start].tolist())
macro_scale_map = _build_macro_scale_map(
sliced,
timestamps=timestamps[:-1],
spec=macro_scale_spec,
)
return {
"score_frame": score_frame,
"timestamps": timestamps,
"core_returns": _curve_returns(core_curve),
"core_exposure_frame": exposure_frame,
"cap_returns": _curve_returns(cap_curve),
"chop_returns": _curve_returns(chop_curve),
"dist_returns": _curve_returns(dist_curve),
"macro_scale_map": macro_scale_map,
}
def _build_macro_scale_map(
bundle,
*,
timestamps: list[pd.Timestamp],
spec: MacroScaleSpec | None,
) -> pd.Series | None:
if spec is None or not timestamps:
return None
btc_prices = bundle.prices.get("BTC")
if btc_prices is None or btc_prices.empty:
return None
closes = btc_prices.set_index("timestamp")["close"].astype(float).sort_index()
daily = closes.resample("1D").last().dropna()
weekly = daily.resample("W-SUN").last().dropna()
if weekly.empty:
return None
fast = weekly.ewm(span=spec.fast_weeks, adjust=False).mean()
slow = weekly.ewm(span=spec.slow_weeks, adjust=False).mean()
frame = pd.DataFrame(
{
"close_gap": weekly / slow - 1.0,
"fast_gap": fast / slow - 1.0,
}
)
close_scale = frame["close_gap"].apply(lambda value: _ramp(float(value), spec.close_gap_start, spec.close_gap_full))
fast_scale = frame["fast_gap"].apply(lambda value: _ramp(float(value), spec.fast_gap_start, spec.fast_gap_full))
blended = spec.close_weight * close_scale + (1.0 - spec.close_weight) * fast_scale
macro_scale = spec.floor + (1.0 - spec.floor) * blended.clip(0.0, 1.0)
aligned = macro_scale.reindex(pd.DatetimeIndex(timestamps, name="timestamp"), method="ffill")
aligned = aligned.fillna(1.0).clip(spec.floor, 1.0)
return aligned.astype(float)
def compose_cash_overlay_curve(
*,
timestamps: list[pd.Timestamp],
score_frame: pd.DataFrame,
core_returns: pd.Series,
core_exposure_frame: pd.DataFrame,
cap_returns: pd.Series,
chop_returns: pd.Series,
dist_returns: pd.Series,
candidate: CashOverlayCandidate,
macro_scale_map: pd.Series | None = None,
) -> tuple[pd.Series, pd.DataFrame]:
score_map = score_frame.set_index("timestamp")[
["core_score", "panic_score", "choppy_score", "distribution_score"]
].sort_index()
if core_exposure_frame.empty:
cash_map = pd.Series(1.0, index=pd.DatetimeIndex(timestamps[:-1], name="timestamp"))
else:
cash_map = core_exposure_frame.set_index("timestamp")["cash_pct"].sort_index()
equity = 1000.0
idx = [timestamps[0]]
vals = [equity]
rows: list[dict[str, float | str]] = []
for i in range(1, len(timestamps)):
signal_ts = pd.Timestamp(timestamps[i - 1])
execution_ts = pd.Timestamp(timestamps[i])
score_row = score_map.loc[signal_ts] if signal_ts in score_map.index else None
if score_row is None:
core_score = panic_score = choppy_score = distribution_score = 0.0
else:
core_score = float(score_row["core_score"])
panic_score = float(score_row["panic_score"])
choppy_score = float(score_row["choppy_score"])
distribution_score = float(score_row["distribution_score"])
macro_scale = float(macro_scale_map.get(signal_ts, 1.0)) if macro_scale_map is not None else 1.0
raw_cash_pct = float(cash_map.get(signal_ts, cash_map.iloc[-1] if not cash_map.empty else 1.0))
cash_pct = raw_cash_pct + (1.0 - raw_cash_pct) * (1.0 - macro_scale)
cap_signal = _clip01((panic_score - candidate.cap_threshold) / max(1.0 - candidate.cap_threshold, 1e-9))
chop_signal = _clip01((choppy_score - candidate.chop_threshold) / max(1.0 - candidate.chop_threshold, 1e-9))
dist_signal = _clip01((distribution_score - candidate.dist_threshold) / max(1.0 - candidate.dist_threshold, 1e-9))
if core_score > candidate.core_block_threshold:
chop_signal *= 0.25
dist_signal *= 0.35
cap_weight = cash_pct * candidate.cap_cash_weight * cap_signal
chop_weight = cash_pct * candidate.chop_cash_weight * chop_signal
dist_weight = cash_pct * candidate.dist_cash_weight * dist_signal
overlay_total = cap_weight + chop_weight + dist_weight
if overlay_total > cash_pct and overlay_total > 0:
scale = cash_pct / overlay_total
cap_weight *= scale
chop_weight *= scale
dist_weight *= scale
overlay_total = cash_pct
bar_ret = (
float(core_returns.get(execution_ts, 0.0)) * macro_scale
+ cap_weight * float(cap_returns.get(execution_ts, 0.0))
+ chop_weight * float(chop_returns.get(execution_ts, 0.0))
+ dist_weight * float(dist_returns.get(execution_ts, 0.0))
)
equity *= max(0.0, 1.0 + bar_ret)
idx.append(execution_ts)
vals.append(equity)
rows.append(
{
"timestamp": execution_ts,
"raw_core_cash_pct": raw_cash_pct,
"core_cash_pct": cash_pct,
"macro_scale": macro_scale,
"cap_weight": cap_weight,
"chop_weight": chop_weight,
"dist_weight": dist_weight,
"overlay_total": overlay_total,
"core_score": core_score,
"panic_score": panic_score,
"choppy_score": choppy_score,
"distribution_score": distribution_score,
"portfolio_return": bar_ret,
}
)
curve = pd.Series(vals, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float)
weights = pd.DataFrame(rows)
return curve, weights
def evaluate_candidate_exact(
*,
bundle,
latest_bar: pd.Timestamp,
candidate: SoftRouterCandidate,
cache_path: str | None = None,
max_workers: int = 6,
) -> dict[str, object]:
period_specs: list[tuple[str, str, pd.Timestamp, pd.Timestamp]] = []
for days, label in WINDOWS:
period_specs.append(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar))
for label, start, end_exclusive in YEAR_PERIODS:
period_specs.append(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))))
period_specs.append(("year", "2026_YTD", YTD_START, latest_bar))
ctx = mp.get_context("fork")
cache_path = cache_path or "/tmp/strategy32_fixed66_bundle.pkl"
window_results: dict[str, dict[str, float]] = {}
year_results: dict[str, dict[str, float]] = {}
latest_weights: list[dict[str, object]] = []
with ProcessPoolExecutor(max_workers=min(max_workers, len(period_specs)), mp_context=ctx) as executor:
future_map = {
executor.submit(
_exact_period_worker,
cache_path,
asdict(candidate),
kind,
label,
str(start),
str(end),
): (kind, label)
for kind, label, start, end in period_specs
}
for future in as_completed(future_map):
kind, label, metrics, weight_tail = future.result()
if kind == "window":
window_results[label] = metrics
else:
year_results[label] = metrics
if label == "2026_YTD":
latest_weights = weight_tail
score, negative_years, mdd_violations = score_candidate(
{label: window_results[label] for _, label in WINDOWS},
{k: year_results[k] for k, _, _ in YEAR_PERIODS},
)
return {
"candidate": asdict(candidate),
"name": candidate.name,
"score": score,
"negative_years": negative_years,
"mdd_violations": mdd_violations,
"windows": {label: window_results[label] for _, label in WINDOWS},
"years": year_results,
"latest_weights": latest_weights,
"validation": "exact_independent_periods_soft_router",
}
def evaluate_cash_overlay_exact(
*,
bundle,
latest_bar: pd.Timestamp,
candidate: CashOverlayCandidate,
cache_path: str | None = None,
max_workers: int = 6,
core_config_overrides: dict[str, object] | None = None,
macro_scale_spec: MacroScaleSpec | None = None,
) -> dict[str, object]:
period_specs: list[tuple[str, str, pd.Timestamp, pd.Timestamp]] = []
for days, label in WINDOWS:
period_specs.append(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar))
for label, start, end_exclusive in YEAR_PERIODS:
period_specs.append(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))))
period_specs.append(("year", "2026_YTD", YTD_START, latest_bar))
ctx = mp.get_context("fork")
cache_path = cache_path or "/tmp/strategy32_fixed66_bundle.pkl"
window_results: dict[str, dict[str, float]] = {}
year_results: dict[str, dict[str, float]] = {}
latest_weights: list[dict[str, object]] = []
with ProcessPoolExecutor(max_workers=min(max_workers, len(period_specs)), mp_context=ctx) as executor:
future_map = {
executor.submit(
_exact_cash_overlay_period_worker,
cache_path,
asdict(candidate),
core_config_overrides or {},
asdict(macro_scale_spec) if macro_scale_spec is not None else None,
kind,
label,
str(start),
str(end),
): (kind, label)
for kind, label, start, end in period_specs
}
for future in as_completed(future_map):
kind, label, metrics, weight_tail = future.result()
if kind == "window":
window_results[label] = metrics
else:
year_results[label] = metrics
if label == "2026_YTD":
latest_weights = weight_tail
score, negative_years, mdd_violations = score_candidate(
{label: window_results[label] for _, label in WINDOWS},
{k: year_results[k] for k, _, _ in YEAR_PERIODS},
)
return {
"candidate": asdict(candidate),
"name": candidate.name,
"score": score,
"negative_years": negative_years,
"mdd_violations": mdd_violations,
"macro_scale_spec": asdict(macro_scale_spec) if macro_scale_spec is not None else None,
"windows": {label: window_results[label] for _, label in WINDOWS},
"years": year_results,
"latest_weights": latest_weights,
"validation": "exact_independent_periods_cash_overlay",
}
def _exact_period_worker(
cache_path: str,
candidate_payload: dict[str, object],
kind: str,
label: str,
start_text: str,
end_text: str,
) -> tuple[str, str, dict[str, float], list[dict[str, object]]]:
bundle, _ = load_component_bundle(cache_path)
candidate = SoftRouterCandidate(**candidate_payload)
eval_start = pd.Timestamp(start_text)
eval_end = pd.Timestamp(end_text)
components = build_period_components(
bundle=bundle,
eval_start=eval_start,
eval_end=eval_end,
profile_name=candidate.regime_profile,
core_filter=candidate.core_filter,
cap_engine=candidate.cap_engine,
chop_engine=candidate.chop_engine,
dist_engine=candidate.dist_engine,
)
curve, weights = compose_soft_router_curve(candidate=candidate, **components)
weight_tail = weights.tail(1).copy()
if not weight_tail.empty and "timestamp" in weight_tail.columns:
weight_tail["timestamp"] = weight_tail["timestamp"].astype(str)
return kind, label, segment_metrics(curve, eval_start, eval_end), weight_tail.to_dict(orient="records")
def _exact_cash_overlay_period_worker(
cache_path: str,
candidate_payload: dict[str, object],
core_config_overrides_payload: dict[str, object],
macro_scale_spec_payload: dict[str, object] | None,
kind: str,
label: str,
start_text: str,
end_text: str,
) -> tuple[str, str, dict[str, float], list[dict[str, object]]]:
bundle, _ = load_component_bundle(cache_path)
candidate = CashOverlayCandidate(**candidate_payload)
macro_scale_spec = MacroScaleSpec(**macro_scale_spec_payload) if macro_scale_spec_payload else None
eval_start = pd.Timestamp(start_text)
eval_end = pd.Timestamp(end_text)
components = build_cash_overlay_period_components(
bundle=bundle,
eval_start=eval_start,
eval_end=eval_end,
profile_name=candidate.regime_profile,
core_filter=candidate.core_filter,
cap_engine=candidate.cap_engine,
chop_engine=candidate.chop_engine,
dist_engine=candidate.dist_engine,
core_config_overrides=core_config_overrides_payload,
macro_scale_spec=macro_scale_spec,
)
curve, weights = compose_cash_overlay_curve(candidate=candidate, **components)
weight_tail = weights.tail(1).copy()
if not weight_tail.empty and "timestamp" in weight_tail.columns:
weight_tail["timestamp"] = weight_tail["timestamp"].astype(str)
return kind, label, segment_metrics(curve, eval_start, eval_end), weight_tail.to_dict(orient="records")
def build_full_period_components(
*,
bundle,
latest_bar: pd.Timestamp,
profile_name: str,
core_filter: str,
cap_engine: str,
chop_engine: str,
dist_engine: str,
) -> dict[str, object]:
eval_start = latest_bar - pd.Timedelta(days=1825)
return build_period_components(
bundle=bundle,
eval_start=eval_start,
eval_end=latest_bar,
profile_name=profile_name,
core_filter=core_filter,
cap_engine=cap_engine,
chop_engine=chop_engine,
dist_engine=dist_engine,
)