147 lines
5.7 KiB
Python
147 lines
5.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from dataclasses import asdict
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
|
|
PACKAGE_PARENT = Path(__file__).resolve().parents[2]
|
|
if str(PACKAGE_PARENT) not in sys.path:
|
|
sys.path.insert(0, str(PACKAGE_PARENT))
|
|
|
|
from strategy32.live.runtime import BEST_CASH_OVERLAY
|
|
from strategy32.research.soft_router import (
|
|
build_cash_overlay_period_components,
|
|
load_component_bundle,
|
|
score_candidate,
|
|
segment_metrics,
|
|
)
|
|
from strategy32.scripts.run_current_cash_learned_blocker import (
|
|
CACHE_PATH,
|
|
CURRENT_OVERHEAT_OVERRIDES,
|
|
LearnedBlockerCandidate,
|
|
_build_block_dataset,
|
|
_build_regime_columns,
|
|
_build_strategy_detail,
|
|
_curve_from_returns,
|
|
_simulate_candidate,
|
|
)
|
|
from strategy32.scripts.run_current_relaxed_hybrid_experiment import WINDOWS, YEAR_PERIODS, YTD_START
|
|
|
|
|
|
OUT_JSON = Path("/tmp/strategy32_current_cash_blocker_exact_plateau.json")
|
|
BASELINE_JSON = Path("/tmp/strategy32_live_combo_backtest.json")
|
|
|
|
|
|
def _candidate_space() -> list[LearnedBlockerCandidate]:
|
|
candidates: list[LearnedBlockerCandidate] = []
|
|
for block_bars in (21, 42):
|
|
for train_min_blocks in (8, 12, 18):
|
|
for ridge_alpha in (0.5, 1.0, 5.0, 20.0):
|
|
for prediction_threshold in (-0.0050, -0.0025, 0.0):
|
|
for blocked_scale in (0.0, 0.25, 0.50):
|
|
candidates.append(
|
|
LearnedBlockerCandidate(
|
|
block_bars=block_bars,
|
|
train_min_blocks=train_min_blocks,
|
|
lookback_blocks=24,
|
|
ridge_alpha=ridge_alpha,
|
|
prediction_threshold=prediction_threshold,
|
|
blocked_scale=blocked_scale,
|
|
)
|
|
)
|
|
return candidates
|
|
|
|
|
|
def main() -> None:
|
|
bundle, latest_bar = load_component_bundle(CACHE_PATH)
|
|
period_specs: list[tuple[str, str, pd.Timestamp, pd.Timestamp]] = []
|
|
for days, label in WINDOWS:
|
|
period_specs.append(("window", label, latest_bar - pd.Timedelta(days=days), latest_bar))
|
|
for label, start, end_exclusive in YEAR_PERIODS:
|
|
period_specs.append(("year", label, start, min(latest_bar, end_exclusive - pd.Timedelta(seconds=1))))
|
|
period_specs.append(("year", "2026_YTD", YTD_START, latest_bar))
|
|
|
|
period_cache: dict[str, dict[str, object]] = {}
|
|
for kind, label, start, end in period_specs:
|
|
components = build_cash_overlay_period_components(
|
|
bundle=bundle,
|
|
eval_start=start,
|
|
eval_end=end,
|
|
profile_name=BEST_CASH_OVERLAY.regime_profile,
|
|
core_filter=BEST_CASH_OVERLAY.core_filter,
|
|
cap_engine=BEST_CASH_OVERLAY.cap_engine,
|
|
chop_engine=BEST_CASH_OVERLAY.chop_engine,
|
|
dist_engine=BEST_CASH_OVERLAY.dist_engine,
|
|
core_config_overrides=CURRENT_OVERHEAT_OVERRIDES,
|
|
)
|
|
detail = _build_strategy_detail(components)
|
|
regime_columns = _build_regime_columns(detail)
|
|
period_cache[label] = {
|
|
"kind": kind,
|
|
"start": start,
|
|
"end": end,
|
|
"detail": detail,
|
|
"regime_columns": regime_columns,
|
|
"block_frames": {},
|
|
}
|
|
print(f"[periods] cached {len(period_cache)} periods", flush=True)
|
|
|
|
candidates = _candidate_space()
|
|
print(f"[candidates] {len(candidates)} exact candidates", flush=True)
|
|
|
|
rows: list[dict[str, object]] = []
|
|
for idx, candidate in enumerate(candidates, start=1):
|
|
windows: dict[str, dict[str, float]] = {}
|
|
years: dict[str, dict[str, float]] = {}
|
|
for label, cached in period_cache.items():
|
|
detail = cached["detail"]
|
|
regime_columns = cached["regime_columns"]
|
|
block_frames = cached["block_frames"]
|
|
if candidate.block_bars not in block_frames:
|
|
block_frames[candidate.block_bars] = _build_block_dataset(detail, candidate.block_bars, regime_columns)
|
|
block_frame = block_frames[candidate.block_bars]
|
|
simulated_returns = _simulate_candidate(detail, block_frame, regime_columns, candidate)
|
|
curve = _curve_from_returns(simulated_returns)
|
|
metrics = segment_metrics(curve, cached["start"], cached["end"])
|
|
if cached["kind"] == "window":
|
|
windows[label] = metrics
|
|
else:
|
|
years[label] = metrics
|
|
|
|
score, negative_years, mdd_violations = score_candidate(
|
|
{label: windows[label] for _, label in WINDOWS},
|
|
{label: years[label] for label, _, _ in YEAR_PERIODS},
|
|
)
|
|
rows.append(
|
|
{
|
|
"candidate": asdict(candidate),
|
|
"name": candidate.name,
|
|
"score": score,
|
|
"negative_years": negative_years,
|
|
"mdd_violations": mdd_violations,
|
|
"windows": windows,
|
|
"years": years,
|
|
}
|
|
)
|
|
if idx % 24 == 0 or idx == len(candidates):
|
|
print(f"[progress] {idx}/{len(candidates)}", flush=True)
|
|
|
|
rows.sort(key=lambda item: float(item["score"]), reverse=True)
|
|
output = {
|
|
"analysis": "current_cash_blocker_exact_plateau",
|
|
"latest_bar": str(latest_bar),
|
|
"all_count": len(rows),
|
|
"top10": rows[:10],
|
|
"baseline_exact": json.loads(BASELINE_JSON.read_text(encoding="utf-8")) if BASELINE_JSON.exists() else None,
|
|
}
|
|
OUT_JSON.write_text(json.dumps(output, indent=2), encoding="utf-8")
|
|
print(json.dumps(output["top10"], indent=2))
|
|
print(f"[saved] {OUT_JSON}", flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|