from __future__ import annotations import pandas as pd from strategy29.common.models import MarketDataBundle from strategy29.data.basis import compute_basis_frame from strategy29.data.binance_history import BinancePairSpec, discover_usd_quote_pair_specs, fetch_funding_paginated, fetch_klines_paginated from strategy29.data.funding import align_funding_to_price_bars def resolve_strategy32_pair_specs( *, symbols: list[str], auto_discover_symbols: bool, quote_assets: tuple[str, ...], excluded_base_assets: tuple[str, ...], min_quote_volume_24h: float, ) -> list[BinancePairSpec]: discovered = discover_usd_quote_pair_specs( quote_assets=quote_assets, excluded_base_assets=excluded_base_assets, min_quote_volume_24h=0.0, ) by_base = {spec.base_symbol: spec for spec in discovered} preferred_quote = quote_assets[0] if auto_discover_symbols: # Point-in-time liquidity is enforced inside the backtest using historical bars. # Filtering discovery by current 24h quote volume leaks future information into past windows. return discovered if symbols: specs: list[BinancePairSpec] = [] for symbol in symbols: base_symbol = symbol.upper() if base_symbol in by_base: spec = by_base[base_symbol] if min(spec.spot_quote_volume_24h, spec.perp_quote_volume_24h) >= min_quote_volume_24h: specs.append(spec) continue specs.append( BinancePairSpec( base_symbol=base_symbol, quote_asset=preferred_quote, spot_symbol=f"{base_symbol}{preferred_quote}", perp_symbol=f"{base_symbol}{preferred_quote}", spot_quote_volume_24h=0.0, perp_quote_volume_24h=0.0, ) ) return specs return [] def build_strategy32_market_bundle( *, symbols: list[str], auto_discover_symbols: bool, quote_assets: tuple[str, ...], excluded_base_assets: tuple[str, ...], min_quote_volume_24h: float, start: pd.Timestamp, end: pd.Timestamp, timeframe: str = "4h", max_staleness_days: int = 3, ) -> tuple[MarketDataBundle, pd.Timestamp, list[str], list[str], dict[str, str]]: specs = resolve_strategy32_pair_specs( symbols=symbols, auto_discover_symbols=auto_discover_symbols, quote_assets=quote_assets, excluded_base_assets=excluded_base_assets, min_quote_volume_24h=min_quote_volume_24h, ) return build_strategy32_market_bundle_from_specs( specs=specs, start=start, end=end, timeframe=timeframe, max_staleness_days=max_staleness_days, ) def build_strategy32_price_frames_from_specs( *, specs: list[BinancePairSpec], start: pd.Timestamp, end: pd.Timestamp, timeframe: str = "4h", max_staleness_days: int = 3, ) -> tuple[dict[str, pd.DataFrame], pd.Timestamp, list[str], list[str], dict[str, str]]: prices: dict[str, pd.DataFrame] = {} accepted: list[str] = [] rejected: list[str] = [] quote_by_symbol: dict[str, str] = {} latest_completed_bar: pd.Timestamp | None = None staleness_cutoff = end - pd.Timedelta(days=max_staleness_days) for spec in specs: symbol = spec.base_symbol try: perp = fetch_klines_paginated( symbol, timeframe=timeframe, start=start, end=end, market="perp", market_symbol=spec.perp_symbol, quote_asset=spec.quote_asset, ) except Exception: rejected.append(symbol) continue perp = perp.loc[perp["close_time"] <= end].reset_index(drop=True) if perp.empty: rejected.append(symbol) continue symbol_end = pd.Timestamp(perp["timestamp"].iloc[-1]) if symbol_end < staleness_cutoff: rejected.append(symbol) continue keep_columns = [ column for column in ( "timestamp", "open", "high", "low", "close", "volume", "quote_volume", "trade_count", "taker_base", "taker_quote", ) if column in perp.columns ] prices[symbol] = perp[keep_columns].copy() latest_completed_bar = symbol_end if latest_completed_bar is None else min(latest_completed_bar, symbol_end) accepted.append(symbol) quote_by_symbol[symbol] = spec.quote_asset if latest_completed_bar is None: raise ValueError("no completed bars fetched for strategy32 price frames") return prices, latest_completed_bar, accepted, rejected, quote_by_symbol def build_strategy32_market_bundle_from_specs( *, specs: list[BinancePairSpec], start: pd.Timestamp, end: pd.Timestamp, timeframe: str = "4h", max_staleness_days: int = 3, ) -> tuple[MarketDataBundle, pd.Timestamp, list[str], list[str], dict[str, str]]: prices, latest_completed_bar, accepted, rejected, quote_by_symbol = build_strategy32_price_frames_from_specs( specs=specs, start=start, end=end, timeframe=timeframe, max_staleness_days=max_staleness_days, ) funding: dict[str, pd.DataFrame] = {} by_base_symbol = {spec.base_symbol: spec for spec in specs} for symbol in accepted: if symbol == "BTC": continue spec = by_base_symbol[symbol] try: spot = fetch_klines_paginated( symbol, timeframe=timeframe, start=start, end=end, market="spot", market_symbol=spec.spot_symbol, quote_asset=spec.quote_asset, ) except Exception: continue spot = spot.loc[spot["close_time"] <= end].reset_index(drop=True) if spot.empty: continue basis = compute_basis_frame(spot[["timestamp", "close"]], prices[symbol][["timestamp", "close"]]) try: funding_rates = fetch_funding_paginated( symbol, start=start, end=end, market_symbol=spec.perp_symbol, quote_asset=spec.quote_asset, ) except Exception: funding_rates = pd.DataFrame(columns=["timestamp", "funding_rate"]) if funding_rates.empty: funding[symbol] = basis.assign(funding_rate=0.0)[["timestamp", "funding_rate", "basis"]] continue funding[symbol] = align_funding_to_price_bars( funding_rates.merge(basis, on="timestamp", how="outer").sort_values("timestamp").ffill(), prices[symbol]["timestamp"], )[["timestamp", "funding_rate", "basis"]] return MarketDataBundle(prices=prices, funding=funding), latest_completed_bar, accepted, rejected, quote_by_symbol