242 lines
9.1 KiB
Python
242 lines
9.1 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from dataclasses import asdict, dataclass
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
|
|
PACKAGE_PARENT = Path(__file__).resolve().parents[2]
|
|
if str(PACKAGE_PARENT) not in sys.path:
|
|
sys.path.insert(0, str(PACKAGE_PARENT))
|
|
|
|
from strategy32.live.runtime import BEST_CASH_OVERLAY
|
|
from strategy32.research.soft_router import build_cash_overlay_period_components, load_component_bundle
|
|
from strategy32.scripts.run_current_cash_learned_blocker import (
|
|
CACHE_PATH,
|
|
CURRENT_OVERHEAT_OVERRIDES,
|
|
LearnedBlockerCandidate,
|
|
_build_block_dataset,
|
|
_build_regime_columns,
|
|
_build_strategy_detail,
|
|
_curve_from_returns,
|
|
_metrics_for_curve,
|
|
_ridge_predict,
|
|
)
|
|
|
|
|
|
OUT_JSON = Path("/tmp/strategy32_current_cash_guarded_blocker.json")
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class GuardedBlockerCandidate:
|
|
blocker: LearnedBlockerCandidate
|
|
guard_regime_mode: str
|
|
min_core_score: float
|
|
min_breadth_persist: float
|
|
min_funding_persist: float
|
|
max_choppy_score: float
|
|
max_distribution_score: float
|
|
max_panic_score: float
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return (
|
|
f"{self.blocker.name}"
|
|
f"|guard:{self.guard_regime_mode}"
|
|
f"|core:{self.min_core_score:.2f}"
|
|
f"|breadth:{self.min_breadth_persist:.2f}"
|
|
f"|funding:{self.min_funding_persist:.2f}"
|
|
f"|chop<={self.max_choppy_score:.2f}"
|
|
f"|dist<={self.max_distribution_score:.2f}"
|
|
f"|panic<={self.max_panic_score:.2f}"
|
|
)
|
|
|
|
|
|
def _guard_active(block: pd.Series, candidate: GuardedBlockerCandidate) -> bool:
|
|
regime = str(block.get("strategic_regime", ""))
|
|
if candidate.guard_regime_mode == "momentum_only":
|
|
regime_ok = regime == "MOMENTUM_EXPANSION"
|
|
elif candidate.guard_regime_mode == "momentum_or_euphoric":
|
|
regime_ok = regime in {"MOMENTUM_EXPANSION", "EUPHORIC_BREAKOUT"}
|
|
else:
|
|
regime_ok = False
|
|
return (
|
|
regime_ok
|
|
and float(block["core_score"]) >= candidate.min_core_score
|
|
and float(block["breadth_persist"]) >= candidate.min_breadth_persist
|
|
and float(block["funding_persist"]) >= candidate.min_funding_persist
|
|
and float(block["choppy_score"]) <= candidate.max_choppy_score
|
|
and float(block["distribution_score"]) <= candidate.max_distribution_score
|
|
and float(block["panic_score"]) <= candidate.max_panic_score
|
|
)
|
|
|
|
|
|
def _simulate_guarded_candidate(
|
|
detail: pd.DataFrame,
|
|
block_frame: pd.DataFrame,
|
|
regime_columns: list[str],
|
|
candidate: GuardedBlockerCandidate,
|
|
) -> pd.Series:
|
|
rows = detail.reset_index(drop=True)
|
|
features = [
|
|
"core_score",
|
|
"breadth_persist",
|
|
"funding_persist",
|
|
"taker_persist",
|
|
"volume_accel_persist",
|
|
"mean_taker_imbalance",
|
|
"taker_imbalance_dispersion",
|
|
"positive_taker_ratio",
|
|
"mean_alt_volume_accel",
|
|
"positive_volume_accel_ratio",
|
|
"funding_dispersion",
|
|
"basis_dispersion",
|
|
"alt_return_dispersion_7d",
|
|
"mean_funding_acceleration",
|
|
"mean_basis_trend",
|
|
"panic_score",
|
|
"choppy_score",
|
|
"distribution_score",
|
|
"cash_pct",
|
|
"invested_pct",
|
|
"trailing_total_21",
|
|
"trailing_total_42",
|
|
"trailing_core_score_21",
|
|
"trailing_breadth_21",
|
|
"trailing_choppy_21",
|
|
*regime_columns,
|
|
]
|
|
returns: list[float] = []
|
|
idx: list[pd.Timestamp] = []
|
|
blocker = candidate.blocker
|
|
|
|
for block_idx, block in block_frame.iterrows():
|
|
start_idx = int(block["block_start_index"])
|
|
end_idx = int(block["block_end_index"])
|
|
bar_block = rows.iloc[start_idx : end_idx + 1]
|
|
|
|
exposure_scale = 1.0
|
|
if block_idx >= blocker.train_min_blocks:
|
|
train_start = max(0, block_idx - blocker.lookback_blocks)
|
|
train = block_frame.iloc[train_start:block_idx]
|
|
train_x = train[features].to_numpy(dtype=float)
|
|
train_y = train["block_total"].to_numpy(dtype=float)
|
|
test_x = block[features].to_numpy(dtype=float)
|
|
pred = _ridge_predict(train_x, train_y, test_x, blocker.ridge_alpha)
|
|
if pred <= blocker.prediction_threshold:
|
|
exposure_scale = blocker.blocked_scale
|
|
if _guard_active(block, candidate):
|
|
exposure_scale = 1.0
|
|
|
|
for row in bar_block.itertuples(index=False):
|
|
returns.append(float(getattr(row, "portfolio_return")) * exposure_scale)
|
|
idx.append(pd.Timestamp(getattr(row, "timestamp")))
|
|
|
|
return pd.Series(returns, index=pd.DatetimeIndex(idx, name="timestamp"), dtype=float)
|
|
|
|
|
|
def _candidate_space() -> list[GuardedBlockerCandidate]:
|
|
candidates: list[GuardedBlockerCandidate] = []
|
|
for blocked_scale in (0.0, 0.25):
|
|
blocker = LearnedBlockerCandidate(
|
|
block_bars=42,
|
|
train_min_blocks=12,
|
|
lookback_blocks=24,
|
|
ridge_alpha=1.0,
|
|
prediction_threshold=-0.0025,
|
|
blocked_scale=blocked_scale,
|
|
)
|
|
for guard_regime_mode in ("momentum_only", "momentum_or_euphoric"):
|
|
for min_core_score in (0.55, 0.60, 0.65):
|
|
for min_breadth_persist in (0.45, 0.50, 0.55):
|
|
for min_funding_persist in (0.50, 0.55, 0.60):
|
|
for max_choppy_score in (0.35, 0.40):
|
|
for max_distribution_score in (0.25, 0.30):
|
|
candidates.append(
|
|
GuardedBlockerCandidate(
|
|
blocker=blocker,
|
|
guard_regime_mode=guard_regime_mode,
|
|
min_core_score=min_core_score,
|
|
min_breadth_persist=min_breadth_persist,
|
|
min_funding_persist=min_funding_persist,
|
|
max_choppy_score=max_choppy_score,
|
|
max_distribution_score=max_distribution_score,
|
|
max_panic_score=0.20,
|
|
)
|
|
)
|
|
return candidates
|
|
|
|
|
|
def main() -> None:
|
|
bundle, latest_bar = load_component_bundle(CACHE_PATH)
|
|
eval_start = latest_bar - pd.Timedelta(days=1825)
|
|
print("[phase] build current baseline", flush=True)
|
|
components = build_cash_overlay_period_components(
|
|
bundle=bundle,
|
|
eval_start=eval_start,
|
|
eval_end=latest_bar,
|
|
profile_name=BEST_CASH_OVERLAY.regime_profile,
|
|
core_filter=BEST_CASH_OVERLAY.core_filter,
|
|
cap_engine=BEST_CASH_OVERLAY.cap_engine,
|
|
chop_engine=BEST_CASH_OVERLAY.chop_engine,
|
|
dist_engine=BEST_CASH_OVERLAY.dist_engine,
|
|
core_config_overrides=CURRENT_OVERHEAT_OVERRIDES,
|
|
)
|
|
detail = _build_strategy_detail(components)
|
|
regime_columns = _build_regime_columns(detail)
|
|
block_frame = _build_block_dataset(detail, 42, regime_columns)
|
|
|
|
baseline_curve = _curve_from_returns(detail.set_index("timestamp")["portfolio_return"])
|
|
baseline_windows, baseline_years, baseline_score, *_ = _metrics_for_curve(baseline_curve, latest_bar)
|
|
|
|
top: list[dict[str, object]] = []
|
|
candidates = _candidate_space()
|
|
print(f"[phase] guarded blocker search {len(candidates)} candidates", flush=True)
|
|
for idx, candidate in enumerate(candidates, start=1):
|
|
sim_returns = _simulate_guarded_candidate(detail, block_frame, regime_columns, candidate)
|
|
curve = _curve_from_returns(sim_returns)
|
|
windows, years, score, negative_years, mdd_violations = _metrics_for_curve(curve, latest_bar)
|
|
payload = {
|
|
"candidate": {
|
|
**asdict(candidate.blocker),
|
|
"guard_regime_mode": candidate.guard_regime_mode,
|
|
"min_core_score": candidate.min_core_score,
|
|
"min_breadth_persist": candidate.min_breadth_persist,
|
|
"min_funding_persist": candidate.min_funding_persist,
|
|
"max_choppy_score": candidate.max_choppy_score,
|
|
"max_distribution_score": candidate.max_distribution_score,
|
|
"max_panic_score": candidate.max_panic_score,
|
|
},
|
|
"name": candidate.name,
|
|
"score": score,
|
|
"negative_years": negative_years,
|
|
"mdd_violations": mdd_violations,
|
|
"windows": windows,
|
|
"years": years,
|
|
}
|
|
top.append(payload)
|
|
top.sort(key=lambda item: float(item["score"]), reverse=True)
|
|
top = top[:10]
|
|
if idx % 50 == 0 or idx == len(candidates):
|
|
print(f"[search] {idx}/{len(candidates)}", flush=True)
|
|
|
|
output = {
|
|
"analysis": "current_cash_guarded_blocker",
|
|
"latest_bar": str(latest_bar),
|
|
"baseline": {
|
|
"score": baseline_score,
|
|
"windows": baseline_windows,
|
|
"years": baseline_years,
|
|
},
|
|
"top10": top,
|
|
}
|
|
OUT_JSON.write_text(json.dumps(output, indent=2), encoding="utf-8")
|
|
print(json.dumps(top[:5], indent=2))
|
|
print(f"[saved] {OUT_JSON}", flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|