276 lines
11 KiB
Python
276 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import math
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from strategy32.live.binance_account import BinanceUsdMAccountClient
|
|
|
|
|
|
KNOWN_QUOTES = ("USDT", "USDC")
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class SymbolRule:
|
|
contract_symbol: str
|
|
base_asset: str
|
|
quote_asset: str
|
|
step_size: float
|
|
min_qty: float
|
|
min_notional: float
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class LiveExecutionConfig:
|
|
enabled: bool = False
|
|
leverage: int = 1
|
|
min_target_notional_usd: float = 25.0
|
|
min_rebalance_notional_usd: float = 10.0
|
|
close_orphan_positions: bool = True
|
|
entry_only_refinement: bool = True
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ExecutionResult:
|
|
executed_at: str
|
|
enabled: bool
|
|
account_equity_usd: float
|
|
target_symbols: list[str] = field(default_factory=list)
|
|
orders: list[dict[str, Any]] = field(default_factory=list)
|
|
warnings: list[str] = field(default_factory=list)
|
|
|
|
|
|
class LiveFuturesExecutor:
|
|
def __init__(self, client: BinanceUsdMAccountClient, config: LiveExecutionConfig) -> None:
|
|
self.client = client
|
|
self.config = config
|
|
self._rules: dict[str, SymbolRule] | None = None
|
|
self._applied_leverage: set[str] = set()
|
|
|
|
def reconcile(self, snapshot: dict[str, Any]) -> ExecutionResult:
|
|
result = ExecutionResult(
|
|
executed_at=str(snapshot.get("generated_at", "")),
|
|
enabled=self.config.enabled,
|
|
account_equity_usd=self._account_equity_usd(),
|
|
)
|
|
quote_by_symbol = dict(snapshot.get("universe", {}).get("quote_by_symbol", {}))
|
|
execution_targets = snapshot.get("execution_targets")
|
|
if isinstance(execution_targets, list):
|
|
target_rows = list(execution_targets)
|
|
else:
|
|
target_rows = [
|
|
row
|
|
for row in snapshot.get("combined_targets", [])
|
|
if bool(row.get("tradeable")) and str(row.get("instrument", "")).startswith("perp:")
|
|
]
|
|
result.target_symbols = [str(row["instrument"]).split(":", 1)[1] for row in target_rows]
|
|
if not self.config.enabled:
|
|
return result
|
|
|
|
target_weights = {
|
|
str(row["instrument"]).split(":", 1)[1]: float(row.get("weight", 0.0) or 0.0)
|
|
for row in target_rows
|
|
}
|
|
desired_weights = {
|
|
str(row["instrument"]).split(":", 1)[1]: float(row.get("desired_weight", row.get("weight", 0.0)) or 0.0)
|
|
for row in target_rows
|
|
}
|
|
current_positions = self._current_positions()
|
|
all_symbols = sorted(set(target_weights) | set(current_positions if self.config.close_orphan_positions else target_weights))
|
|
prices = self._prices_for_symbols(all_symbols, quote_by_symbol, current_positions)
|
|
|
|
for base_symbol in all_symbols:
|
|
quote_asset = quote_by_symbol.get(base_symbol, current_positions.get(base_symbol, {}).get("quote_asset", "USDT"))
|
|
rule = self._symbol_rule(base_symbol, quote_asset)
|
|
if rule is None:
|
|
result.warnings.append(f"missing_symbol_rule:{base_symbol}")
|
|
continue
|
|
|
|
target_weight = float(target_weights.get(base_symbol, 0.0) or 0.0)
|
|
price = float(prices.get(base_symbol, 0.0) or 0.0)
|
|
if price <= 0:
|
|
result.warnings.append(f"missing_price:{base_symbol}")
|
|
continue
|
|
|
|
target_notional = target_weight * result.account_equity_usd
|
|
if abs(target_notional) < max(self.config.min_target_notional_usd, rule.min_notional):
|
|
target_notional = 0.0
|
|
|
|
current_qty = float(current_positions.get(base_symbol, {}).get("qty", 0.0) or 0.0)
|
|
current_contract = current_positions.get(base_symbol, {}).get("contract_symbol", rule.contract_symbol)
|
|
if self.config.entry_only_refinement:
|
|
desired_weight = float(desired_weights.get(base_symbol, target_weight) or 0.0)
|
|
desired_notional = desired_weight * result.account_equity_usd
|
|
current_notional = current_qty * price
|
|
if (
|
|
current_qty != 0.0
|
|
and desired_notional != 0.0
|
|
and math.copysign(1.0, current_notional) == math.copysign(1.0, desired_notional)
|
|
and abs(target_notional) < abs(desired_notional)
|
|
):
|
|
target_notional = math.copysign(
|
|
min(abs(current_notional), abs(desired_notional)),
|
|
desired_notional,
|
|
)
|
|
|
|
target_qty = self._normalize_qty(rule, target_notional / price)
|
|
|
|
if current_qty != 0.0 and target_qty != 0.0 and math.copysign(1.0, current_qty) != math.copysign(1.0, target_qty):
|
|
close_order = self._submit_delta(
|
|
contract_symbol=current_contract,
|
|
delta_qty=-current_qty,
|
|
price=price,
|
|
reduce_only=True,
|
|
)
|
|
if close_order is not None:
|
|
result.orders.append(close_order)
|
|
current_qty = 0.0
|
|
|
|
delta_qty = target_qty - current_qty
|
|
delta_notional = abs(delta_qty) * price
|
|
if delta_notional < max(self.config.min_rebalance_notional_usd, rule.min_notional):
|
|
continue
|
|
|
|
order = self._submit_delta(
|
|
contract_symbol=rule.contract_symbol,
|
|
delta_qty=delta_qty,
|
|
price=price,
|
|
reduce_only=False,
|
|
)
|
|
if order is not None:
|
|
result.orders.append(order)
|
|
|
|
return result
|
|
|
|
def _submit_delta(
|
|
self,
|
|
*,
|
|
contract_symbol: str,
|
|
delta_qty: float,
|
|
price: float,
|
|
reduce_only: bool,
|
|
) -> dict[str, Any] | None:
|
|
qty = abs(float(delta_qty))
|
|
if qty <= 0:
|
|
return None
|
|
side = "BUY" if delta_qty > 0 else "SELL"
|
|
self._ensure_leverage(contract_symbol)
|
|
response = self.client.place_market_order(
|
|
symbol=contract_symbol,
|
|
side=side,
|
|
quantity=qty,
|
|
reduce_only=reduce_only,
|
|
client_order_id=f"s32-{contract_symbol.lower()}-{side.lower()}",
|
|
)
|
|
return {
|
|
"symbol": contract_symbol,
|
|
"side": side,
|
|
"quantity": qty,
|
|
"price_ref": price,
|
|
"reduce_only": reduce_only,
|
|
"response": response,
|
|
}
|
|
|
|
def _ensure_leverage(self, contract_symbol: str) -> None:
|
|
if contract_symbol in self._applied_leverage:
|
|
return
|
|
self.client.set_leverage(contract_symbol, self.config.leverage)
|
|
self._applied_leverage.add(contract_symbol)
|
|
|
|
def _account_equity_usd(self) -> float:
|
|
balances = self.client.get_balance()
|
|
total = 0.0
|
|
for row in balances:
|
|
asset = str(row.get("asset", "")).upper()
|
|
if asset in KNOWN_QUOTES:
|
|
total += float(row.get("balance", 0.0) or 0.0)
|
|
return total
|
|
|
|
def _current_positions(self) -> dict[str, dict[str, Any]]:
|
|
rows = self.client.get_position_risk()
|
|
positions: dict[str, dict[str, Any]] = {}
|
|
for row in rows:
|
|
qty = float(row.get("positionAmt", 0.0) or 0.0)
|
|
if abs(qty) <= 1e-12:
|
|
continue
|
|
contract_symbol = str(row.get("symbol", "")).upper()
|
|
base_asset, quote_asset = self._split_contract_symbol(contract_symbol)
|
|
positions[base_asset] = {
|
|
"qty": qty,
|
|
"quote_asset": quote_asset,
|
|
"contract_symbol": contract_symbol,
|
|
"mark_price": float(row.get("markPrice", 0.0) or 0.0),
|
|
}
|
|
return positions
|
|
|
|
def _prices_for_symbols(
|
|
self,
|
|
symbols: list[str],
|
|
quote_by_symbol: dict[str, str],
|
|
current_positions: dict[str, dict[str, Any]],
|
|
) -> dict[str, float]:
|
|
prices: dict[str, float] = {}
|
|
for base_symbol in symbols:
|
|
current = current_positions.get(base_symbol)
|
|
if current is not None and float(current.get("mark_price", 0.0) or 0.0) > 0:
|
|
prices[base_symbol] = float(current["mark_price"])
|
|
continue
|
|
quote_asset = quote_by_symbol.get(base_symbol, "USDT")
|
|
contract_symbol = f"{base_symbol}{quote_asset}"
|
|
ticker = self.client.get_ticker_price(contract_symbol)
|
|
if isinstance(ticker, dict):
|
|
prices[base_symbol] = float(ticker.get("price", 0.0) or 0.0)
|
|
return prices
|
|
|
|
def _symbol_rule(self, base_symbol: str, quote_asset: str) -> SymbolRule | None:
|
|
rules = self._load_rules()
|
|
return rules.get(f"{base_symbol}{quote_asset}")
|
|
|
|
def _load_rules(self) -> dict[str, SymbolRule]:
|
|
if self._rules is not None:
|
|
return self._rules
|
|
info = self.client.get_exchange_info()
|
|
rules: dict[str, SymbolRule] = {}
|
|
for row in info.get("symbols", []):
|
|
contract_symbol = str(row.get("symbol", "")).upper()
|
|
if not contract_symbol:
|
|
continue
|
|
step_size = 0.0
|
|
min_qty = 0.0
|
|
min_notional = 5.0
|
|
for flt in row.get("filters", []):
|
|
flt_type = str(flt.get("filterType", ""))
|
|
if flt_type in {"LOT_SIZE", "MARKET_LOT_SIZE"}:
|
|
step_size = max(step_size, float(flt.get("stepSize", 0.0) or 0.0))
|
|
min_qty = max(min_qty, float(flt.get("minQty", 0.0) or 0.0))
|
|
elif flt_type == "MIN_NOTIONAL":
|
|
min_notional = max(min_notional, float(flt.get("notional", 0.0) or 0.0))
|
|
rules[contract_symbol] = SymbolRule(
|
|
contract_symbol=contract_symbol,
|
|
base_asset=str(row.get("baseAsset", "")).upper(),
|
|
quote_asset=str(row.get("quoteAsset", "")).upper(),
|
|
step_size=step_size or 0.001,
|
|
min_qty=min_qty or step_size or 0.001,
|
|
min_notional=min_notional,
|
|
)
|
|
self._rules = rules
|
|
return rules
|
|
|
|
@staticmethod
|
|
def _normalize_qty(rule: SymbolRule, raw_qty: float) -> float:
|
|
if abs(raw_qty) <= 0:
|
|
return 0.0
|
|
sign = 1.0 if raw_qty > 0 else -1.0
|
|
step = max(rule.step_size, 1e-12)
|
|
qty = math.floor(abs(raw_qty) / step) * step
|
|
if qty < max(rule.min_qty, step):
|
|
return 0.0
|
|
return sign * qty
|
|
|
|
@staticmethod
|
|
def _split_contract_symbol(contract_symbol: str) -> tuple[str, str]:
|
|
for quote in KNOWN_QUOTES:
|
|
if contract_symbol.endswith(quote):
|
|
return contract_symbol[: -len(quote)], quote
|
|
return contract_symbol, "USDT"
|