from __future__ import annotations import numpy as np import pandas as pd def select_dynamic_universe( prices: dict[str, pd.DataFrame], *, timestamp: pd.Timestamp, min_history_bars: int, lookback_bars: int, max_symbols: int, min_avg_dollar_volume: float, base_symbol: str = "BTC", ) -> list[str]: ranked: list[tuple[float, str]] = [] for symbol, df in prices.items(): if symbol == base_symbol: continue hist = df.loc[df["timestamp"] <= timestamp] if len(hist) < min_history_bars: continue recent = hist.tail(lookback_bars) if recent.empty: continue avg_dollar_volume = float((recent["close"] * recent["volume"]).mean()) if avg_dollar_volume < min_avg_dollar_volume: continue ranked.append((avg_dollar_volume, symbol)) ranked.sort(reverse=True) if max_symbols > 0: ranked = ranked[:max_symbols] return [symbol for _, symbol in ranked] def score_momentum_universe( prices: dict[str, pd.DataFrame], funding: dict[str, pd.DataFrame], *, btc_symbol: str, timestamp: pd.Timestamp, candidate_symbols: list[str], min_history_bars: int, liquidity_lookback_bars: int, short_lookback_bars: int, long_lookback_bars: int, overheat_funding_rate: float, ) -> pd.DataFrame: btc_hist = prices[btc_symbol].loc[prices[btc_symbol]["timestamp"] <= timestamp] if len(btc_hist) < max(min_history_bars, long_lookback_bars + 5): return pd.DataFrame( columns=[ "symbol", "avg_dollar_volume", "volume_stability", "short_return", "relative_strength_short", "relative_strength_long", "beta", "positive_bar_ratio", "short_volatility", "low_volatility", "latest_funding_rate", "funding_penalty", "score", ] ) btc_hist = btc_hist.tail(long_lookback_bars + 5) btc_close = btc_hist["close"] btc_returns = btc_close.pct_change().dropna() btc_ret_short = float(btc_close.iloc[-1] / btc_close.iloc[-short_lookback_bars] - 1.0) btc_ret_long = float(btc_close.iloc[-1] / btc_close.iloc[0] - 1.0) rows: list[dict[str, float | str]] = [] for symbol in candidate_symbols: hist = prices[symbol].loc[prices[symbol]["timestamp"] <= timestamp] if len(hist) < max(min_history_bars, long_lookback_bars + 5): continue hist = hist.tail(long_lookback_bars + 5) recent = hist.tail(liquidity_lookback_bars) dollar_volume = recent["close"] * recent["volume"] returns = hist["close"].pct_change().dropna() if len(returns) < 12: continue overlap = min(len(returns), len(btc_returns), long_lookback_bars) alt_beta = returns.tail(overlap).to_numpy() btc_beta = btc_returns.tail(overlap).to_numpy() btc_var = float(np.var(btc_beta)) beta = float(np.cov(alt_beta, btc_beta)[0, 1] / btc_var) if overlap >= 10 and btc_var > 1e-12 else 0.0 short_returns = returns.tail(short_lookback_bars) short_volatility = float(short_returns.std(ddof=0)) if len(short_returns) >= 6 else float("inf") positive_bar_ratio = float((short_returns > 0).mean()) if len(short_returns) else 0.0 short_return = float(hist["close"].iloc[-1] / hist["close"].iloc[-short_lookback_bars] - 1.0) long_return = float(hist["close"].iloc[-1] / hist["close"].iloc[0] - 1.0) latest_funding = 0.0 if symbol in funding: f_hist = funding[symbol].loc[funding[symbol]["timestamp"] <= timestamp] if not f_hist.empty: latest_funding = float(f_hist["funding_rate"].iloc[-1]) rows.append( { "symbol": symbol, "avg_dollar_volume": float(dollar_volume.mean()), "volume_stability": float(1.0 / ((dollar_volume.std(ddof=0) / (dollar_volume.mean() + 1e-9)) + 1e-9)), "short_return": short_return, "relative_strength_short": short_return - btc_ret_short, "relative_strength_long": long_return - btc_ret_long, "beta": beta, "positive_bar_ratio": positive_bar_ratio, "short_volatility": short_volatility, "low_volatility": float(1.0 / (returns.tail(short_lookback_bars).std(ddof=0) + 1e-9)), "latest_funding_rate": latest_funding, } ) if not rows: return pd.DataFrame(columns=["symbol", "score"]) frame = pd.DataFrame(rows) for column in ( "avg_dollar_volume", "volume_stability", "short_return", "relative_strength_short", "relative_strength_long", "positive_bar_ratio", "low_volatility", ): frame[f"{column}_rank"] = frame[column].rank(pct=True) frame["funding_penalty"] = ( (frame["latest_funding_rate"] - overheat_funding_rate).clip(lower=0.0) / max(overheat_funding_rate, 1e-9) ).clip(upper=1.5) frame["score"] = ( 0.15 * frame["avg_dollar_volume_rank"] + 0.10 * frame["volume_stability_rank"] + 0.15 * frame["short_return_rank"] + 0.25 * frame["relative_strength_short_rank"] + 0.20 * frame["relative_strength_long_rank"] + 0.10 * frame["low_volatility_rank"] + 0.05 * frame["positive_bar_ratio_rank"] - 0.10 * frame["funding_penalty"] ) return frame.sort_values("score", ascending=False).reset_index(drop=True) def filter_momentum_frame( frame: pd.DataFrame, *, min_score: float, min_relative_strength: float, min_7d_return: float, max_7d_return: float | None = None, min_positive_bar_ratio: float | None = None, max_short_volatility: float | None = None, max_latest_funding_rate: float | None = None, max_beta: float | None = None, ) -> pd.DataFrame: if frame.empty: return frame mask = ( (frame["score"] >= min_score) & (frame["relative_strength_short"] >= min_relative_strength) & (frame["relative_strength_long"] >= min_relative_strength) & (frame["short_return"] >= min_7d_return) ) if max_7d_return is not None: mask &= frame["short_return"] <= max_7d_return if min_positive_bar_ratio is not None: mask &= frame["positive_bar_ratio"] >= min_positive_bar_ratio if max_short_volatility is not None: mask &= frame["short_volatility"] <= max_short_volatility if max_latest_funding_rate is not None: mask &= frame["latest_funding_rate"] <= max_latest_funding_rate if max_beta is not None: mask &= frame["beta"] <= max_beta return frame.loc[mask].sort_values("score", ascending=False).reset_index(drop=True) def rank_momentum_universe( prices: dict[str, pd.DataFrame], funding: dict[str, pd.DataFrame], *, btc_symbol: str, timestamp: pd.Timestamp, candidate_symbols: list[str], min_history_bars: int, liquidity_lookback_bars: int, short_lookback_bars: int, long_lookback_bars: int, overheat_funding_rate: float, max_symbols: int, max_7d_return: float | None = None, min_positive_bar_ratio: float | None = None, max_short_volatility: float | None = None, max_latest_funding_rate: float | None = None, max_beta: float | None = None, ) -> list[str]: frame = score_momentum_universe( prices, funding, btc_symbol=btc_symbol, timestamp=timestamp, candidate_symbols=candidate_symbols, min_history_bars=min_history_bars, liquidity_lookback_bars=liquidity_lookback_bars, short_lookback_bars=short_lookback_bars, long_lookback_bars=long_lookback_bars, overheat_funding_rate=overheat_funding_rate, ) frame = filter_momentum_frame( frame, min_score=float("-inf"), min_relative_strength=float("-inf"), min_7d_return=float("-inf"), max_7d_return=max_7d_return, min_positive_bar_ratio=min_positive_bar_ratio, max_short_volatility=max_short_volatility, max_latest_funding_rate=max_latest_funding_rate, max_beta=max_beta, ) if frame.empty: return candidate_symbols[:max_symbols] if max_symbols > 0 else candidate_symbols if max_symbols > 0: frame = frame.head(max_symbols) return [str(symbol) for symbol in frame["symbol"].tolist()] def score_carry_universe( prices: dict[str, pd.DataFrame], funding: dict[str, pd.DataFrame], *, timestamp: pd.Timestamp, candidate_symbols: list[str], lookback_bars: int, expected_horizon_bars: int, roundtrip_cost_pct: float, basis_risk_multiplier: float, ) -> pd.DataFrame: rows: list[dict[str, float | str]] = [] for symbol in candidate_symbols: if symbol not in funding: continue f_hist = funding[symbol].loc[funding[symbol]["timestamp"] <= timestamp].tail(lookback_bars) p_hist = prices[symbol].loc[prices[symbol]["timestamp"] <= timestamp].tail(30) if len(f_hist) < lookback_bars or p_hist.empty: continue positive_ratio = float((f_hist["funding_rate"] > 0).mean()) mean_funding = float(f_hist["funding_rate"].mean()) basis_volatility = float(f_hist["basis"].std(ddof=0)) latest_basis = float(f_hist["basis"].iloc[-1]) expected_edge = ( mean_funding * expected_horizon_bars + max(latest_basis, 0.0) * 0.35 - roundtrip_cost_pct - basis_volatility * basis_risk_multiplier ) avg_dollar_volume = float((p_hist["close"] * p_hist["volume"]).mean()) rows.append( { "symbol": symbol, "expected_edge": expected_edge, "positive_ratio": positive_ratio, "mean_funding": mean_funding, "low_basis_volatility": float(1.0 / (basis_volatility + 1e-9)), "avg_dollar_volume": avg_dollar_volume, } ) if not rows: return pd.DataFrame(columns=["symbol", "score", "expected_edge"]) frame = pd.DataFrame(rows) for column in ( "expected_edge", "positive_ratio", "mean_funding", "low_basis_volatility", "avg_dollar_volume", ): frame[f"{column}_rank"] = frame[column].rank(pct=True) frame["score"] = ( 0.40 * frame["expected_edge_rank"] + 0.20 * frame["positive_ratio_rank"] + 0.20 * frame["mean_funding_rank"] + 0.10 * frame["low_basis_volatility_rank"] + 0.10 * frame["avg_dollar_volume_rank"] ) return frame.sort_values("score", ascending=False).reset_index(drop=True) def rank_carry_universe( prices: dict[str, pd.DataFrame], funding: dict[str, pd.DataFrame], *, timestamp: pd.Timestamp, candidate_symbols: list[str], lookback_bars: int, expected_horizon_bars: int, roundtrip_cost_pct: float, basis_risk_multiplier: float, max_symbols: int, ) -> list[str]: frame = score_carry_universe( prices, funding, timestamp=timestamp, candidate_symbols=candidate_symbols, lookback_bars=lookback_bars, expected_horizon_bars=expected_horizon_bars, roundtrip_cost_pct=roundtrip_cost_pct, basis_risk_multiplier=basis_risk_multiplier, ) if frame.empty: return [] if max_symbols > 0: frame = frame.head(max_symbols) return [str(symbol) for symbol in frame["symbol"].tolist()] def limit_correlated_symbols( prices: dict[str, pd.DataFrame], *, timestamp: pd.Timestamp, candidate_symbols: list[str], lookback_bars: int, max_pairwise_correlation: float, max_symbols: int, ) -> list[str]: selected: list[str] = [] returns_cache: dict[str, np.ndarray] = {} for symbol in candidate_symbols: hist = prices[symbol].loc[prices[symbol]["timestamp"] <= timestamp].tail(lookback_bars + 1) rets = hist["close"].pct_change().dropna().to_numpy() if len(rets) < max(10, lookback_bars // 3): continue returns_cache[symbol] = rets too_correlated = False for chosen in selected: chosen_rets = returns_cache[chosen] overlap = min(len(rets), len(chosen_rets)) corr = float(np.corrcoef(rets[-overlap:], chosen_rets[-overlap:])[0, 1]) if overlap >= 10 else 0.0 if np.isfinite(corr) and corr >= max_pairwise_correlation: too_correlated = True break if too_correlated: continue selected.append(symbol) if max_symbols > 0 and len(selected) >= max_symbols: break return selected def select_strategic_universe( prices: dict[str, pd.DataFrame], funding: dict[str, pd.DataFrame], *, btc_symbol: str, timestamp: pd.Timestamp, min_history_bars: int, lookback_bars: int, min_avg_dollar_volume: float, short_lookback_bars: int, long_lookback_bars: int, overheat_funding_rate: float, carry_lookback_bars: int, carry_expected_horizon_bars: int, carry_roundtrip_cost_pct: float, carry_basis_risk_multiplier: float, momentum_min_score: float, momentum_min_relative_strength: float, momentum_min_7d_return: float, momentum_max_7d_return: float | None, momentum_min_positive_bar_ratio: float | None, momentum_max_short_volatility: float | None, momentum_max_latest_funding_rate: float | None, momentum_max_beta: float | None, carry_min_expected_edge: float, max_symbols: int = 0, ) -> list[str]: liquid_symbols = select_dynamic_universe( prices, timestamp=timestamp, min_history_bars=min_history_bars, lookback_bars=lookback_bars, max_symbols=0, min_avg_dollar_volume=min_avg_dollar_volume, base_symbol=btc_symbol, ) momentum_frame = score_momentum_universe( prices, funding, btc_symbol=btc_symbol, timestamp=timestamp, candidate_symbols=liquid_symbols, min_history_bars=min_history_bars, liquidity_lookback_bars=lookback_bars, short_lookback_bars=short_lookback_bars, long_lookback_bars=long_lookback_bars, overheat_funding_rate=overheat_funding_rate, ) momentum_frame = filter_momentum_frame( momentum_frame, min_score=momentum_min_score, min_relative_strength=momentum_min_relative_strength, min_7d_return=momentum_min_7d_return, max_7d_return=momentum_max_7d_return, min_positive_bar_ratio=momentum_min_positive_bar_ratio, max_short_volatility=momentum_max_short_volatility, max_latest_funding_rate=momentum_max_latest_funding_rate, max_beta=momentum_max_beta, ) carry_frame = score_carry_universe( prices, funding, timestamp=timestamp, candidate_symbols=liquid_symbols, lookback_bars=carry_lookback_bars, expected_horizon_bars=carry_expected_horizon_bars, roundtrip_cost_pct=carry_roundtrip_cost_pct, basis_risk_multiplier=carry_basis_risk_multiplier, ) ranked_symbols: dict[str, float] = {} if not momentum_frame.empty: for row in momentum_frame.itertuples(index=False): ranked_symbols[str(row.symbol)] = max(ranked_symbols.get(str(row.symbol), float("-inf")), float(row.score)) if not carry_frame.empty: for row in carry_frame.itertuples(index=False): if float(row.expected_edge) <= carry_min_expected_edge: continue ranked_symbols[str(row.symbol)] = max(ranked_symbols.get(str(row.symbol), float("-inf")), float(row.score)) if not ranked_symbols: return [] ranked = sorted(ranked_symbols.items(), key=lambda item: item[1], reverse=True) if max_symbols > 0: ranked = ranked[:max_symbols] return [symbol for symbol, _ in ranked]