from __future__ import annotations import asyncio import hashlib import json import logging import os from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any import pandas as pd from strategy29.common.constants import BTC_SYMBOL from strategy32.backtest.simulator import Strategy32Backtester from strategy32.config import PROFILE_V7_DEFAULT, Strategy32Config, build_strategy32_config from strategy32.data import ( build_strategy32_market_bundle_from_specs, build_strategy32_price_frames_from_specs, resolve_strategy32_pair_specs, ) from strategy32.live.binance_account import BinanceUsdMAccountClient from strategy32.live.env import env_bool from strategy32.live.executor import LiveExecutionConfig, LiveFuturesExecutor from strategy32.live.notifier import Notifier from strategy32.research.adverse_regime import AdverseRegimeResearchHarness, default_engine_specs from strategy32.research.hybrid_regime import STATIC_FILTERS from strategy32.research.soft_router import CashOverlayCandidate, build_regime_score_frame from strategy32.scripts.run_regime_filter_analysis import build_strategic_regime_frame from strategy32.universe import select_dynamic_universe LOGGER = logging.getLogger("strategy32.live") BEST_CASH_OVERLAY = CashOverlayCandidate( regime_profile="loose_positive", core_filter="overheat_tolerant", cap_engine="cap_btc_rebound", chop_engine="chop_inverse_carry_strict", dist_engine="dist_inverse_carry_strict", cap_cash_weight=0.65, chop_cash_weight=0.40, dist_cash_weight=0.20, cap_threshold=0.20, chop_threshold=0.35, dist_threshold=0.35, core_block_threshold=0.60, ) LIVE_STATIC_FILTER_NAME = "overheat_tolerant" LIVE_STRATEGY_OVERRIDES: dict[str, Any] = { **STATIC_FILTERS[LIVE_STATIC_FILTER_NAME], # Keep live aligned with the research winner, not the later diagnostics. "enable_liquidity_universe_fallback": False, "enable_momentum_filter_fallback": False, "enable_carry_score_fallback": False, } @dataclass(slots=True) class LiveMonitorConfig: timeframe: str = "4h" macro_filter_timeframe: str = "1w" hard_filter_timeframe: str = "1d" execution_refinement_timeframe: str = "1h" lookback_days: int = 365 warmup_days: int = 90 poll_seconds: int = 60 runtime_dir: Path = Path("runtime") live_min_quote_volume_24h: float = 100_000_000.0 macro_filter_fast_weeks: int = 10 macro_filter_slow_weeks: int = 30 hard_filter_min_history_bars: int = 120 hard_filter_lookback_bars: int = 30 hard_filter_min_avg_dollar_volume: float = 50_000_000.0 execution_refinement_lookback_bars: int = 48 execution_refinement_fast_ema: int = 8 execution_refinement_slow_ema: int = 21 execution_refinement_scale_down_gap: float = 0.008 execution_refinement_max_chase_gap: float = 0.018 execution_refinement_max_recent_return: float = 0.03 execution_refinement_scale_down_factor: float = 0.5 max_specs: int = 0 paper_capital_usd: float = 1_000.0 max_staleness_days: int = 3 include_account_snapshot: bool = True execution: LiveExecutionConfig = field(default_factory=LiveExecutionConfig) def _clip01(value: float) -> float: return min(max(float(value), 0.0), 1.0) def _now_utc() -> pd.Timestamp: return pd.Timestamp.now(tz="UTC") def _completed_bar_time(now: pd.Timestamp, timeframe: str) -> pd.Timestamp: now = pd.Timestamp(now).tz_convert("UTC") if pd.Timestamp(now).tzinfo else pd.Timestamp(now, tz="UTC") step = pd.Timedelta(timeframe) epoch = pd.Timestamp("1970-01-01 00:00:00+00:00") steps = int((now - epoch) // step) return epoch + steps * step def _heartbeat_slot(now: pd.Timestamp) -> tuple[int, int, int, int, int]: ts = pd.Timestamp(now).tz_convert("UTC") if pd.Timestamp(now).tzinfo else pd.Timestamp(now, tz="UTC") return (ts.year, ts.month, ts.day, ts.hour, 0 if ts.minute < 30 else 30) def _execution_client_from_env() -> BinanceUsdMAccountClient | None: api_key = os.getenv("GAMMA_BOT_API_KEY", "").strip() or os.getenv("BN_API_KEY", "").strip() api_secret = os.getenv("GAMMA_BOT_API_SECRET", "").strip() or os.getenv("BN_API_SECRET", "").strip() if not api_key or not api_secret: return None use_testnet = env_bool("STRATEGY32_BINANCE_TESTNET", True) return BinanceUsdMAccountClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet) def _load_monitor_config(runtime_dir: str | None = None) -> LiveMonitorConfig: return LiveMonitorConfig( timeframe=os.getenv("STRATEGY32_TIMEFRAME", "4h").strip() or "4h", macro_filter_timeframe=os.getenv("STRATEGY32_MACRO_FILTER_TIMEFRAME", "1w").strip() or "1w", hard_filter_timeframe=os.getenv("STRATEGY32_HARD_FILTER_TIMEFRAME", "1d").strip() or "1d", execution_refinement_timeframe=os.getenv("STRATEGY32_EXECUTION_REFINEMENT_TIMEFRAME", "1h").strip() or "1h", lookback_days=int(os.getenv("STRATEGY32_LOOKBACK_DAYS", "365")), warmup_days=int(os.getenv("STRATEGY32_WARMUP_DAYS", "90")), poll_seconds=int(os.getenv("STRATEGY32_POLL_SECONDS", "60")), runtime_dir=Path(runtime_dir or os.getenv("STRATEGY32_RUNTIME_DIR", "runtime")), live_min_quote_volume_24h=float(os.getenv("STRATEGY32_LIVE_MIN_QUOTE_VOLUME_24H", "100000000")), macro_filter_fast_weeks=int(os.getenv("STRATEGY32_MACRO_FILTER_FAST_WEEKS", "10")), macro_filter_slow_weeks=int(os.getenv("STRATEGY32_MACRO_FILTER_SLOW_WEEKS", "30")), hard_filter_min_history_bars=int(os.getenv("STRATEGY32_HARD_FILTER_MIN_HISTORY_BARS", "120")), hard_filter_lookback_bars=int(os.getenv("STRATEGY32_HARD_FILTER_LOOKBACK_BARS", "30")), hard_filter_min_avg_dollar_volume=float(os.getenv("STRATEGY32_HARD_FILTER_MIN_AVG_DOLLAR_VOLUME", "50000000")), execution_refinement_lookback_bars=int(os.getenv("STRATEGY32_EXECUTION_REFINEMENT_LOOKBACK_BARS", "48")), execution_refinement_fast_ema=int(os.getenv("STRATEGY32_EXECUTION_REFINEMENT_FAST_EMA", "8")), execution_refinement_slow_ema=int(os.getenv("STRATEGY32_EXECUTION_REFINEMENT_SLOW_EMA", "21")), execution_refinement_scale_down_gap=float(os.getenv("STRATEGY32_EXECUTION_REFINEMENT_SCALE_DOWN_GAP", "0.008")), execution_refinement_max_chase_gap=float(os.getenv("STRATEGY32_EXECUTION_REFINEMENT_MAX_CHASE_GAP", "0.018")), execution_refinement_max_recent_return=float(os.getenv("STRATEGY32_EXECUTION_REFINEMENT_MAX_RECENT_RETURN", "0.03")), execution_refinement_scale_down_factor=float(os.getenv("STRATEGY32_EXECUTION_REFINEMENT_SCALE_DOWN_FACTOR", "0.5")), max_specs=int(os.getenv("STRATEGY32_MAX_SPECS", "0")), paper_capital_usd=float(os.getenv("STRATEGY32_PAPER_CAPITAL_USD", "1000")), max_staleness_days=int(os.getenv("STRATEGY32_MAX_STALENESS_DAYS", "3")), include_account_snapshot=env_bool("STRATEGY32_INCLUDE_ACCOUNT_SNAPSHOT", True), execution=LiveExecutionConfig( enabled=env_bool("STRATEGY32_ENABLE_LIVE_ORDERS", False), leverage=int(os.getenv("STRATEGY32_EXECUTION_LEVERAGE", os.getenv("GAMMA_BOT_LEVERAGE", "1"))), min_target_notional_usd=float(os.getenv("STRATEGY32_MIN_TARGET_NOTIONAL_USD", "25")), min_rebalance_notional_usd=float(os.getenv("STRATEGY32_MIN_REBALANCE_NOTIONAL_USD", "10")), close_orphan_positions=env_bool("STRATEGY32_CLOSE_ORPHAN_POSITIONS", True), entry_only_refinement=env_bool("STRATEGY32_ENTRY_ONLY_REFINEMENT", True), ), ) def _runtime_paths(runtime_dir: Path) -> dict[str, Path]: runtime_dir.mkdir(parents=True, exist_ok=True) return { "runtime_dir": runtime_dir, "snapshots_jsonl": runtime_dir / "strategy32_live_snapshots.jsonl", "orders_jsonl": runtime_dir / "strategy32_live_orders.jsonl", "latest_json": runtime_dir / "strategy32_live_latest.json", "state_json": runtime_dir / "strategy32_live_state.json", } def _read_state(path: Path) -> dict[str, Any]: if not path.exists(): return {} return json.loads(path.read_text(encoding="utf-8")) def _write_json(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, ensure_ascii=True), encoding="utf-8") def _append_jsonl(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, ensure_ascii=True) + "\n") def _position_to_serializable(position: dict[str, Any]) -> dict[str, Any]: payload = dict(position) if "entry_time" in payload: payload["entry_time"] = str(payload["entry_time"]) return payload def _expand_core_targets(final_positions: list[dict[str, Any]], final_equity: float) -> list[dict[str, Any]]: rows: dict[str, dict[str, Any]] = {} equity = max(float(final_equity), 1e-9) def add_row(instrument: str, *, source: str, weight: float, note: str, tradeable: bool) -> None: if abs(weight) <= 1e-9: return row = rows.setdefault( instrument, { "instrument": instrument, "source": source, "weight": 0.0, "tradeable": tradeable, "note": note, }, ) row["weight"] = float(row["weight"]) + float(weight) for pos in final_positions: weight = float(pos["value"]) / equity hedge_ratio = float((pos.get("meta") or {}).get("hedge_ratio", 0.0) or 0.0) engine = str(pos["engine"]) symbol = str(pos["symbol"]) if engine in {"momentum", "sideways"}: add_row(f"perp:{symbol}", source="core", weight=weight, note=engine, tradeable=True) if hedge_ratio > 0: add_row(f"perp:{BTC_SYMBOL}", source="core", weight=-weight * hedge_ratio, note=f"{engine}_hedge", tradeable=True) elif engine == "carry": add_row(f"carry:{symbol}", source="core", weight=weight, note="synthetic_carry", tradeable=False) result = [] for row in rows.values(): row["notional_usd"] = float(row["weight"]) * equity result.append(row) result.sort(key=lambda row: abs(float(row["notional_usd"])), reverse=True) return result def _overlay_signal_strengths(candidate: CashOverlayCandidate, score_row: dict[str, Any]) -> dict[str, float]: core_score = float(score_row.get("core_score", 0.0)) panic_score = float(score_row.get("panic_score", 0.0)) choppy_score = float(score_row.get("choppy_score", 0.0)) distribution_score = float(score_row.get("distribution_score", 0.0)) cap_signal = _clip01((panic_score - candidate.cap_threshold) / max(1.0 - candidate.cap_threshold, 1e-9)) chop_signal = _clip01((choppy_score - candidate.chop_threshold) / max(1.0 - candidate.chop_threshold, 1e-9)) dist_signal = _clip01((distribution_score - candidate.dist_threshold) / max(1.0 - candidate.dist_threshold, 1e-9)) if core_score > candidate.core_block_threshold: chop_signal *= 0.25 dist_signal *= 0.35 return { "core_score": core_score, "panic_score": panic_score, "choppy_score": choppy_score, "distribution_score": distribution_score, "cap_signal": cap_signal, "chop_signal": chop_signal, "dist_signal": dist_signal, } def _overlay_targets( *, bundle, latest_bar: pd.Timestamp, score_row: dict[str, Any], core_cash_pct: float, equity: float, candidate: CashOverlayCandidate, ) -> tuple[list[dict[str, Any]], dict[str, Any]]: harness = AdverseRegimeResearchHarness(bundle, latest_bar) specs = {spec.name: spec for spec in default_engine_specs()} signals = _overlay_signal_strengths(candidate, score_row) budgets = { candidate.cap_engine: float(core_cash_pct) * candidate.cap_cash_weight * signals["cap_signal"], candidate.chop_engine: float(core_cash_pct) * candidate.chop_cash_weight * signals["chop_signal"], candidate.dist_engine: float(core_cash_pct) * candidate.dist_cash_weight * signals["dist_signal"], } rows: dict[str, dict[str, Any]] = {} for engine_name, budget in budgets.items(): if budget <= 0: continue raw_weights = harness.target_weights(specs[engine_name], latest_bar) if not raw_weights: continue for key, raw_weight in raw_weights.items(): tradeable = ":" not in key instrument = key if ":" in key else f"perp:{key}" row = rows.setdefault( instrument, { "instrument": instrument, "source": "overlay", "weight": 0.0, "tradeable": tradeable, "note": engine_name, }, ) row["weight"] = float(row["weight"]) + float(budget) * float(raw_weight) result = [] for row in rows.values(): row["notional_usd"] = float(row["weight"]) * float(equity) result.append(row) result.sort(key=lambda row: abs(float(row["notional_usd"])), reverse=True) signals["engine_budgets"] = budgets return result, signals def _combine_targets(core_targets: list[dict[str, Any]], overlay_targets: list[dict[str, Any]], equity: float) -> list[dict[str, Any]]: rows: dict[str, dict[str, Any]] = {} for target in core_targets + overlay_targets: instrument = str(target["instrument"]) row = rows.setdefault( instrument, { "instrument": instrument, "weight": 0.0, "notional_usd": 0.0, "tradeable": bool(target["tradeable"]), "sources": [], }, ) row["weight"] = float(row["weight"]) + float(target["weight"]) row["sources"].append({"source": target["source"], "note": target["note"], "weight": float(target["weight"])}) row["tradeable"] = bool(row["tradeable"]) and bool(target["tradeable"]) combined = [] for row in rows.values(): row["notional_usd"] = float(row["weight"]) * float(equity) combined.append(row) combined.sort(key=lambda row: abs(float(row["notional_usd"])), reverse=True) return combined def _current_specs(config: LiveMonitorConfig, strategy_config: Strategy32Config) -> list: specs = resolve_strategy32_pair_specs( symbols=strategy_config.symbols, auto_discover_symbols=True, quote_assets=strategy_config.quote_assets, excluded_base_assets=strategy_config.excluded_base_assets, min_quote_volume_24h=0.0, ) specs = [ spec for spec in specs if min(spec.spot_quote_volume_24h, spec.perp_quote_volume_24h) >= config.live_min_quote_volume_24h ] specs.sort(key=lambda spec: min(spec.spot_quote_volume_24h, spec.perp_quote_volume_24h), reverse=True) if config.max_specs > 0: specs = specs[: config.max_specs] return specs def _select_live_hard_filter_symbols( prices: dict[str, pd.DataFrame], *, timestamp: pd.Timestamp, config: LiveMonitorConfig, ) -> list[str]: selected = select_dynamic_universe( prices, timestamp=timestamp, min_history_bars=config.hard_filter_min_history_bars, lookback_bars=config.hard_filter_lookback_bars, max_symbols=0, min_avg_dollar_volume=config.hard_filter_min_avg_dollar_volume, base_symbol=BTC_SYMBOL, ) result: list[str] = [] if BTC_SYMBOL in prices: result.append(BTC_SYMBOL) result.extend(selected) return result def _weekly_macro_filter_state( prices: dict[str, pd.DataFrame], *, timestamp: pd.Timestamp, config: LiveMonitorConfig, ) -> dict[str, Any]: hist = prices.get(BTC_SYMBOL) if hist is None or hist.empty: return { "timeframe": config.macro_filter_timeframe, "risk_on": True, "reason": "missing_btc_prices", } frame = hist.loc[hist["timestamp"] <= timestamp, ["timestamp", "close"]].copy() if frame.empty: return { "timeframe": config.macro_filter_timeframe, "risk_on": True, "reason": "empty_btc_prices", } daily = frame.set_index("timestamp")["close"].resample("1D").last().dropna() weekly = daily.resample("W-SUN").last().dropna() if len(weekly) < config.macro_filter_slow_weeks: return { "timeframe": config.macro_filter_timeframe, "risk_on": True, "reason": "insufficient_weekly_history", "latest_bar": str(weekly.index[-1]) if len(weekly) else None, "bars": int(len(weekly)), } fast = weekly.ewm(span=config.macro_filter_fast_weeks, adjust=False).mean().iloc[-1] slow = weekly.ewm(span=config.macro_filter_slow_weeks, adjust=False).mean().iloc[-1] close = float(weekly.iloc[-1]) risk_on = bool(close > float(slow) and float(fast) > float(slow)) return { "timeframe": config.macro_filter_timeframe, "risk_on": risk_on, "latest_bar": str(weekly.index[-1]), "bars": int(len(weekly)), "close": close, "ema_fast": float(fast), "ema_slow": float(slow), "trend_gap": float(close / float(slow) - 1.0), } def _apply_weekly_macro_filter(core_targets: list[dict[str, Any]], *, macro_state: dict[str, Any]) -> list[dict[str, Any]]: if bool(macro_state.get("risk_on", True)): return core_targets filtered: list[dict[str, Any]] = [] for row in core_targets: instrument = str(row.get("instrument", "")) if bool(row.get("tradeable")) and instrument.startswith("perp:"): continue filtered.append(row) return filtered def _execution_refinement_states( prices: dict[str, pd.DataFrame], *, timestamp: pd.Timestamp, config: LiveMonitorConfig, ) -> dict[str, dict[str, Any]]: min_bars = max( config.execution_refinement_lookback_bars, config.execution_refinement_slow_ema + 5, 8, ) states: dict[str, dict[str, Any]] = {} for symbol, df in prices.items(): hist = df.loc[df["timestamp"] <= timestamp].tail(min_bars) if len(hist) < min_bars: states[symbol] = { "action": "allow", "scale": 1.0, "reason": "insufficient_history", } continue closes = hist["close"].astype(float) close = float(closes.iloc[-1]) ema_fast = float(closes.ewm(span=config.execution_refinement_fast_ema, adjust=False).mean().iloc[-1]) ema_slow = float(closes.ewm(span=config.execution_refinement_slow_ema, adjust=False).mean().iloc[-1]) recent_return = 0.0 if len(closes) >= 4: recent_return = float(close / float(closes.iloc[-4]) - 1.0) chase_gap = float(close / max(ema_fast, 1e-9) - 1.0) action = "allow" scale = 1.0 reason = "trend_confirmed" if close <= ema_slow or ema_fast <= ema_slow: action = "block" scale = 0.0 reason = "below_1h_trend" elif chase_gap >= config.execution_refinement_max_chase_gap or recent_return >= config.execution_refinement_max_recent_return: action = "block" scale = 0.0 reason = "too_extended" elif chase_gap >= config.execution_refinement_scale_down_gap: action = "scale_down" scale = config.execution_refinement_scale_down_factor reason = "slightly_extended" states[symbol] = { "action": action, "scale": scale, "reason": reason, "close": close, "ema_fast": ema_fast, "ema_slow": ema_slow, "chase_gap": chase_gap, "recent_return": recent_return, "timestamp": str(hist["timestamp"].iloc[-1]), } return states def _refine_execution_targets( combined_targets: list[dict[str, Any]], *, refinement_states: dict[str, dict[str, Any]], ) -> list[dict[str, Any]]: execution_targets: list[dict[str, Any]] = [] for row in combined_targets: if not bool(row.get("tradeable")) or not str(row.get("instrument", "")).startswith("perp:"): continue instrument = str(row["instrument"]) base_symbol = instrument.split(":", 1)[1] desired_weight = float(row.get("weight", 0.0) or 0.0) state = refinement_states.get(base_symbol, {"action": "allow", "scale": 1.0, "reason": "no_state"}) adjusted_weight = desired_weight if desired_weight > 0.0 and base_symbol != BTC_SYMBOL: adjusted_weight = desired_weight * float(state.get("scale", 1.0) or 0.0) execution_targets.append( { **row, "desired_weight": desired_weight, "weight": adjusted_weight, "refinement_action": state.get("action", "allow"), "refinement_scale": float(state.get("scale", 1.0) or 1.0), "refinement_reason": state.get("reason", "unknown"), } ) return execution_targets def _account_snapshot() -> dict[str, Any] | None: client = _execution_client_from_env() if client is None: return None try: balances = client.get_balance() positions = client.get_position_risk() except Exception as exc: return {"error": str(exc)} active_positions = [] for row in positions: amt = float(row.get("positionAmt", 0.0) or 0.0) if abs(amt) <= 1e-9: continue active_positions.append( { "symbol": str(row.get("symbol", "")), "position_amt": amt, "entry_price": float(row.get("entryPrice", 0.0) or 0.0), "notional": float(row.get("notional", 0.0) or 0.0), "unrealized_pnl": float(row.get("unRealizedProfit", 0.0) or 0.0), } ) return { "balances": balances, "active_positions": active_positions, } def _capital_summary(account: dict[str, Any] | None) -> dict[str, float]: if not isinstance(account, dict): return {"usdt": 0.0, "usdc": 0.0, "total_quote": 0.0} balances = account.get("balances", []) if not isinstance(balances, list): return {"usdt": 0.0, "usdc": 0.0, "total_quote": 0.0} by_asset: dict[str, float] = {} for row in balances: asset = str(row.get("asset", "")).upper() if asset not in {"USDT", "USDC"}: continue by_asset[asset] = float(row.get("balance", 0.0) or 0.0) usdt = float(by_asset.get("USDT", 0.0)) usdc = float(by_asset.get("USDC", 0.0)) return { "usdt": usdt, "usdc": usdc, "total_quote": usdt + usdc, } def _snapshot_hash(payload: dict[str, Any]) -> str: compact = { "latest_bar": payload["latest_bar"], "strategic_regime": payload["regime"]["strategic_regime"], "macro_risk_on": payload["regime"].get("macro_risk_on"), "execution_targets": [ { "instrument": row["instrument"], "weight": round(float(row["weight"]), 6), "tradeable": bool(row["tradeable"]), } for row in payload.get("execution_targets", [])[:20] ], } return hashlib.sha1(json.dumps(compact, sort_keys=True).encode("utf-8")).hexdigest() def _build_live_strategy_config() -> Strategy32Config: return build_strategy32_config(PROFILE_V7_DEFAULT, **LIVE_STRATEGY_OVERRIDES) def _live_strategy_signature() -> str: payload = { "profile": PROFILE_V7_DEFAULT, "core_filter": LIVE_STATIC_FILTER_NAME, "core_filter_overrides": {key: LIVE_STRATEGY_OVERRIDES[key] for key in sorted(LIVE_STRATEGY_OVERRIDES)}, "cash_overlay_candidate": asdict(BEST_CASH_OVERLAY), } return hashlib.sha1(json.dumps(payload, sort_keys=True).encode("utf-8")).hexdigest() def _resolve_specs_for_symbols(strategy_config: Strategy32Config, symbols: list[str]) -> list: return resolve_strategy32_pair_specs( symbols=symbols, auto_discover_symbols=False, quote_assets=strategy_config.quote_assets, excluded_base_assets=strategy_config.excluded_base_assets, min_quote_volume_24h=0.0, ) def _build_daily_hard_filter_state( config: LiveMonitorConfig, strategy_config: Strategy32Config, *, end: pd.Timestamp, ) -> dict[str, Any]: specs = _current_specs(config, strategy_config) if not specs: raise ValueError("no live specs matched current volume filter") start = end - pd.Timedelta(days=config.lookback_days) daily_prices, daily_latest_bar, daily_accepted, daily_rejected, _ = build_strategy32_price_frames_from_specs( specs=specs, start=start, end=end, timeframe=config.hard_filter_timeframe, max_staleness_days=config.max_staleness_days, ) hard_filter_symbols = _select_live_hard_filter_symbols( daily_prices, timestamp=daily_latest_bar, config=config, ) selected_symbols = [spec.base_symbol for spec in specs if spec.base_symbol in set(hard_filter_symbols)] if not selected_symbols: raise ValueError("daily hard filter removed every live spec") return { "generated_at": str(_now_utc()), "refresh_timeframe": config.hard_filter_timeframe, "discovered_specs": len(specs), "discovered_symbols": [spec.base_symbol for spec in specs], "hard_filter_latest_bar": str(daily_latest_bar), "hard_filter_candidates": daily_accepted, "hard_filter_rejected_symbols": daily_rejected, "hard_filter_symbols": hard_filter_symbols, "selected_symbols": selected_symbols, } def _build_cached_macro_state( config: LiveMonitorConfig, strategy_config: Strategy32Config, *, end: pd.Timestamp, ) -> dict[str, Any]: start = end - pd.Timedelta(days=config.lookback_days) btc_specs = _resolve_specs_for_symbols(strategy_config, [BTC_SYMBOL]) daily_prices, daily_latest_bar, _, _, _ = build_strategy32_price_frames_from_specs( specs=btc_specs, start=start, end=end, timeframe=config.hard_filter_timeframe, max_staleness_days=config.max_staleness_days, ) macro_state = _weekly_macro_filter_state( daily_prices, timestamp=daily_latest_bar, config=config, ) macro_state["generated_at"] = str(_now_utc()) macro_state["daily_latest_bar"] = str(daily_latest_bar) return macro_state def build_monitor_snapshot( config: LiveMonitorConfig, *, hard_filter_state: dict[str, Any] | None = None, macro_state: dict[str, Any] | None = None, ) -> dict[str, Any]: strategy_config = _build_live_strategy_config() end = _now_utc() start = end - pd.Timedelta(days=config.lookback_days) if hard_filter_state is None: hard_filter_state = _build_daily_hard_filter_state(config, strategy_config, end=end) if macro_state is None: macro_state = _build_cached_macro_state(config, strategy_config, end=end) selected_symbols = [str(symbol) for symbol in hard_filter_state.get("selected_symbols", [])] filtered_specs = _resolve_specs_for_symbols(strategy_config, selected_symbols) if not filtered_specs: raise ValueError("cached hard filter symbols no longer resolved to live specs") bundle, latest_bar, accepted, rejected, quote_by_symbol = build_strategy32_market_bundle_from_specs( specs=filtered_specs, start=start, end=end, timeframe=config.timeframe, max_staleness_days=config.max_staleness_days, ) earliest_ts = pd.Timestamp(bundle.prices[BTC_SYMBOL]["timestamp"].iloc[0]) trade_start = min(latest_bar, earliest_ts + pd.Timedelta(days=config.warmup_days)) backtester = Strategy32Backtester(strategy_config, bundle, trade_start=trade_start) backtester.engine_config.initial_capital = config.paper_capital_usd core_result = backtester.run(close_final_positions=False) final_positions = list(core_result.metadata.get("final_positions", [])) exposure_rows = list(core_result.metadata.get("exposure_rows", [])) latest_exposure = exposure_rows[-1] if exposure_rows else {} raw_core_targets = _expand_core_targets(final_positions, core_result.final_equity) core_targets = _apply_weekly_macro_filter(raw_core_targets, macro_state=macro_state) strategic_frame = build_strategic_regime_frame(bundle, trade_start, latest_bar, profile=BEST_CASH_OVERLAY.regime_profile) strategic_row = strategic_frame.iloc[-1].to_dict() score_frame = build_regime_score_frame(bundle, trade_start, latest_bar, profile_name=BEST_CASH_OVERLAY.regime_profile) score_row = score_frame.iloc[-1].to_dict() overlay_targets, overlay_meta = _overlay_targets( bundle=bundle, latest_bar=latest_bar, score_row=score_row, core_cash_pct=float(latest_exposure.get("cash_pct", 1.0) or 1.0), equity=core_result.final_equity, candidate=BEST_CASH_OVERLAY, ) combined_targets = _combine_targets(core_targets, overlay_targets, core_result.final_equity) execution_prices, execution_latest_bar, execution_accepted, execution_rejected, _ = build_strategy32_price_frames_from_specs( specs=filtered_specs, start=start, end=end, timeframe=config.execution_refinement_timeframe, max_staleness_days=config.max_staleness_days, ) execution_refinement = _execution_refinement_states( execution_prices, timestamp=execution_latest_bar, config=config, ) execution_targets = _refine_execution_targets( combined_targets, refinement_states=execution_refinement, ) snapshot = { "generated_at": str(_now_utc()), "latest_bar": str(latest_bar), "trade_start": str(trade_start), "timeframe": config.timeframe, "lookback_days": config.lookback_days, "paper_capital_usd": config.paper_capital_usd, "schedule": { "macro_filter_refresh": f"on new completed {config.macro_filter_timeframe} bar", "regime_refresh": f"on new completed {config.timeframe} bar", "ticker_hard_filter_refresh": f"on new completed {config.hard_filter_timeframe} bar", "ticker_ranking_refresh": f"on new completed {config.timeframe} bar", "execution_refinement_refresh": f"on new completed {config.execution_refinement_timeframe} bar", "execution_reconcile_seconds": config.poll_seconds, "heartbeat_minutes": [0, 30], }, "strategy": { "profile": PROFILE_V7_DEFAULT, "core_filter": LIVE_STATIC_FILTER_NAME, "core_filter_overrides": {key: LIVE_STRATEGY_OVERRIDES[key] for key in sorted(LIVE_STRATEGY_OVERRIDES)}, "cash_overlay_candidate": asdict(BEST_CASH_OVERLAY), }, "regime": { "strategic_regime": strategic_row.get("strategic_regime"), "btc_regime": latest_exposure.get("regime"), "macro_risk_on": bool(macro_state.get("risk_on", True)), "breadth": float(strategic_row.get("breadth", 0.0) or 0.0), "mean_alt_funding": float(strategic_row.get("mean_alt_funding", 0.0) or 0.0), "btc_7d_return": float(strategic_row.get("btc_7d_return", 0.0) or 0.0), }, "score_row": { "core_score": float(score_row.get("core_score", 0.0) or 0.0), "panic_score": float(score_row.get("panic_score", 0.0) or 0.0), "choppy_score": float(score_row.get("choppy_score", 0.0) or 0.0), "distribution_score": float(score_row.get("distribution_score", 0.0) or 0.0), }, "universe": { "discovered_specs": int(hard_filter_state.get("discovered_specs", len(selected_symbols))), "discovered_symbols": hard_filter_state.get("discovered_symbols", []), "macro_filter": macro_state, "hard_filter_timeframe": config.hard_filter_timeframe, "hard_filter_latest_bar": str(hard_filter_state.get("hard_filter_latest_bar", "")), "hard_filter_candidates": hard_filter_state.get("hard_filter_candidates", []), "hard_filter_rejected_symbols": hard_filter_state.get("hard_filter_rejected_symbols", []), "hard_filter_symbols": hard_filter_state.get("hard_filter_symbols", []), "hard_filter_spec_count": len(filtered_specs), "accepted_symbols": accepted, "rejected_symbols": rejected, "quote_by_symbol": quote_by_symbol, }, "core": { "filter": LIVE_STATIC_FILTER_NAME, "final_equity": core_result.final_equity, "cash_pct": float(latest_exposure.get("cash_pct", 1.0) or 1.0), "momentum_pct": float(latest_exposure.get("momentum_pct", 0.0) or 0.0), "carry_pct": float(latest_exposure.get("carry_pct", 0.0) or 0.0), "sideways_pct": float(latest_exposure.get("sideways_pct", 0.0) or 0.0), "raw_final_positions": [_position_to_serializable(pos) for pos in final_positions], "raw_expanded_targets": raw_core_targets, "expanded_targets": core_targets, }, "overlay": { "candidate": asdict(BEST_CASH_OVERLAY), "meta": overlay_meta, "targets": overlay_targets, }, "execution_refinement": { "timeframe": config.execution_refinement_timeframe, "latest_bar": str(execution_latest_bar), "accepted_symbols": execution_accepted, "rejected_symbols": execution_rejected, "states": execution_refinement, }, "combined_targets": combined_targets, "execution_targets": execution_targets, } account = _account_snapshot() if config.include_account_snapshot else None if account is not None: snapshot["account"] = account snapshot["capital"] = _capital_summary(account) snapshot["snapshot_hash"] = _snapshot_hash(snapshot) return snapshot def _telegram_summary(snapshot: dict[str, Any]) -> str: capital = snapshot.get("capital", {}) if isinstance(snapshot.get("capital"), dict) else {} targets = _format_execution_targets(snapshot, limit=6) accepted = _format_symbol_list(snapshot.get("universe", {}).get("accepted_symbols", []), limit=8) return ( "STRATEGY32 UPDATE\n" f"Bar: {snapshot['latest_bar']}\n" f"Regime: {'RISK_ON' if snapshot['regime']['macro_risk_on'] else 'RISK_OFF'}" f" | strategic={snapshot['regime']['strategic_regime']}" f" | btc={snapshot['regime']['btc_regime']}\n" f"Capital: USDT {float(capital.get('usdt', 0.0)):.2f}" f" | USDC {float(capital.get('usdc', 0.0)):.2f}" f" | Total {float(capital.get('total_quote', 0.0)):.2f}\n" f"Universe: {accepted}\n" f"Core cash: {float(snapshot['core']['cash_pct']) * 100:.2f}%" f" | panic={float(snapshot['overlay']['meta']['panic_score']):.2f}" f" | choppy={float(snapshot['overlay']['meta']['choppy_score']):.2f}" f" | dist={float(snapshot['overlay']['meta']['distribution_score']):.2f}\n" "Execution targets:\n" f"{targets}" ) def _heartbeat_summary(snapshot: dict[str, Any], state: dict[str, Any]) -> str: account = snapshot.get("account", {}) if isinstance(snapshot.get("account"), dict) else {} capital = snapshot.get("capital", {}) if isinstance(snapshot.get("capital"), dict) else {} active_positions = account.get("active_positions", []) if isinstance(account, dict) else [] available_symbols = _format_symbol_list(snapshot.get("universe", {}).get("accepted_symbols", []), limit=10) hard_filter_symbols = _format_symbol_list(snapshot.get("universe", {}).get("hard_filter_symbols", []), limit=10) open_positions = _format_open_positions(active_positions, limit=6) targets = _format_execution_targets(snapshot, limit=6) return ( "STRATEGY32 HEARTBEAT\n" f"Time: {_now_utc()}\n" f"Bar: {snapshot['latest_bar']}\n" f"Regime: {'RISK_ON' if snapshot['regime']['macro_risk_on'] else 'RISK_OFF'}" f" | strategic={snapshot['regime']['strategic_regime']}" f" | btc={snapshot['regime']['btc_regime']}\n" f"Scores: core={float(snapshot['score_row']['core_score']):.2f}" f" | panic={float(snapshot['score_row']['panic_score']):.2f}" f" | choppy={float(snapshot['score_row']['choppy_score']):.2f}" f" | dist={float(snapshot['score_row']['distribution_score']):.2f}\n" f"Capital: USDT {float(capital.get('usdt', 0.0)):.2f}" f" | USDC {float(capital.get('usdc', 0.0)):.2f}" f" | Total {float(capital.get('total_quote', 0.0)):.2f}\n" f"Schedule: macro={snapshot['schedule']['macro_filter_refresh']}" f" | hard={snapshot['schedule']['ticker_hard_filter_refresh']}" f" | rank={snapshot['schedule']['ticker_ranking_refresh']}" f" | exec={snapshot['schedule']['execution_refinement_refresh']}\n" f"Universe: hard={hard_filter_symbols}\n" f"Available: {available_symbols}\n" "Execution targets:\n" f"{targets}\n" f"Open positions:\n{open_positions}\n" f"Orders last cycle: {len(state.get('last_execution_orders', []))}" ) def _format_symbol_list(symbols: list[str], *, limit: int) -> str: values = [str(symbol) for symbol in symbols if str(symbol)] if not values: return "none" head = values[:limit] suffix = f" (+{len(values) - limit} more)" if len(values) > limit else "" return ", ".join(head) + suffix def _format_execution_targets(snapshot: dict[str, Any], *, limit: int) -> str: rows = list(snapshot.get("execution_targets", []) or []) if not rows: return "- none" lines: list[str] = [] for row in rows[:limit]: instrument = str(row.get("instrument", "")) symbol = instrument.split(":", 1)[1] if ":" in instrument else instrument weight = float(row.get("weight", 0.0) or 0.0) * 100.0 desired_weight = float(row.get("desired_weight", row.get("weight", 0.0)) or 0.0) * 100.0 notional = float(row.get("notional_usd", 0.0) or 0.0) action = str(row.get("refinement_action", "allow")) reason = str(row.get("refinement_reason", "")).strip() if abs(weight - desired_weight) > 1e-6: lines.append( f"- {symbol}: {weight:+.2f}% (desired {desired_weight:+.2f}%, {notional:+.2f} USD, {action}, {reason})" ) else: lines.append(f"- {symbol}: {weight:+.2f}% ({notional:+.2f} USD, {action})") if len(rows) > limit: lines.append(f"- ... +{len(rows) - limit} more") return "\n".join(lines) def _format_open_positions(active_positions: list[dict[str, Any]], *, limit: int) -> str: if not active_positions: return "- none" lines: list[str] = [] for row in active_positions[:limit]: symbol = str(row.get("symbol", "")) qty = float(row.get("position_amt", 0.0) or 0.0) notional = float(row.get("notional", 0.0) or 0.0) unrealized = float(row.get("unrealized_pnl", 0.0) or 0.0) lines.append(f"- {symbol}: qty={qty:+.6f}, notional={notional:+.2f}, upnl={unrealized:+.2f}") if len(active_positions) > limit: lines.append(f"- ... +{len(active_positions) - limit} more") return "\n".join(lines) def _read_latest_snapshot(path: Path) -> dict[str, Any] | None: if not path.exists(): return None try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None async def run_monitor(*, once: bool = False, runtime_dir: str | None = None) -> None: config = _load_monitor_config(runtime_dir) paths = _runtime_paths(config.runtime_dir) notifier = Notifier.from_env() await notifier.start() state = _read_state(paths["state_json"]) snapshot = _read_latest_snapshot(paths["latest_json"]) client = _execution_client_from_env() executor = LiveFuturesExecutor(client, config.execution) if client is not None else None strategy_signature = _live_strategy_signature() try: while True: try: now = _now_utc() refresh_bar = str(_completed_bar_time(now, config.execution_refinement_timeframe)) hard_filter_refresh_bar = str(_completed_bar_time(now, config.hard_filter_timeframe)) macro_refresh_bar = str(_completed_bar_time(now, config.macro_filter_timeframe)) state_signature = str(state.get("strategy_signature", "")) hard_filter_state = state.get("hard_filter_state") macro_state = state.get("macro_state") state_changed = False if ( not isinstance(hard_filter_state, dict) or hard_filter_refresh_bar != str(state.get("last_hard_filter_refresh_bar", "")) or strategy_signature != state_signature ): hard_filter_state = await asyncio.to_thread( _build_daily_hard_filter_state, config, _build_live_strategy_config(), end=now, ) state["hard_filter_state"] = hard_filter_state state["last_hard_filter_refresh_bar"] = hard_filter_refresh_bar state_changed = True if ( not isinstance(macro_state, dict) or macro_refresh_bar != str(state.get("last_macro_refresh_bar", "")) or strategy_signature != state_signature ): macro_state = await asyncio.to_thread( _build_cached_macro_state, config, _build_live_strategy_config(), end=now, ) state["macro_state"] = macro_state state["last_macro_refresh_bar"] = macro_refresh_bar state_changed = True if state_changed: state["strategy_signature"] = strategy_signature _write_json(paths["state_json"], state) if ( snapshot is None or refresh_bar != str(state.get("last_snapshot_refresh_bar", "")) or strategy_signature != state_signature ): snapshot = await asyncio.to_thread( build_monitor_snapshot, config, hard_filter_state=hard_filter_state, macro_state=macro_state, ) _write_json(paths["latest_json"], snapshot) _append_jsonl(paths["snapshots_jsonl"], {"event": "snapshot", **snapshot}) LOGGER.info( "snapshot bar=%s strategic=%s targets=%d", snapshot["latest_bar"], snapshot["regime"]["strategic_regime"], len(snapshot["combined_targets"]), ) state["last_snapshot_refresh_bar"] = refresh_bar state["last_completed_bar"] = str(snapshot["latest_bar"]) state["strategy_signature"] = strategy_signature if snapshot["snapshot_hash"] != state.get("last_snapshot_hash"): await notifier.send("INFO", _telegram_summary(snapshot)) state["last_snapshot_hash"] = snapshot["snapshot_hash"] state["last_latest_bar"] = snapshot["latest_bar"] _write_json(paths["state_json"], state) if snapshot is not None and executor is not None: execution_result = await asyncio.to_thread(executor.reconcile, snapshot) if execution_result.orders: if config.include_account_snapshot: live_account = await asyncio.to_thread(_account_snapshot) if live_account is not None: snapshot["account"] = live_account snapshot["capital"] = _capital_summary(live_account) _write_json(paths["latest_json"], snapshot) payload = {"event": "execution", **asdict(execution_result)} _append_jsonl(paths["orders_jsonl"], payload) state["last_execution_orders"] = execution_result.orders[-10:] state["last_execution_at"] = execution_result.executed_at _write_json(paths["state_json"], state) order_lines = [ f"{row['symbol']} {row['side']} qty={row['quantity']:.6f}" for row in execution_result.orders[:8] ] capital = snapshot.get("capital", {}) if isinstance(snapshot.get("capital"), dict) else {} await notifier.send( "WARNING", "strategy32 orders\n" + "\n".join(order_lines) + "\n" + ( f"capital usdt={float(capital.get('usdt', 0.0)):.2f} " f"usdc={float(capital.get('usdc', 0.0)):.2f} " f"total={float(capital.get('total_quote', 0.0)):.2f}" ), ) heartbeat_slot = list(_heartbeat_slot(now)) if snapshot is not None and heartbeat_slot != state.get("last_heartbeat_slot"): if config.include_account_snapshot: live_account = await asyncio.to_thread(_account_snapshot) if live_account is not None: snapshot["account"] = live_account snapshot["capital"] = _capital_summary(live_account) _write_json(paths["latest_json"], snapshot) state["last_heartbeat_slot"] = heartbeat_slot _write_json(paths["state_json"], state) await notifier.send("INFO", _heartbeat_summary(snapshot, state)) except Exception as exc: LOGGER.exception("monitor cycle failed") error_payload = { "event": "error", "generated_at": str(_now_utc()), "error": str(exc), } _append_jsonl(paths["snapshots_jsonl"], error_payload) await notifier.send("CRITICAL", f"strategy32 monitor failed: {exc}") if once: break await asyncio.sleep(config.poll_seconds) finally: await notifier.stop()