WebSocket实时行情接入bitget钱包教程
流量次数: 作者:小编 发布时间:2025-03-20 18:17:11
一、WebSocket接入核心步骤
1. 获取WebSocket地址
公共行情(无需API Key)
python
复制
# 基础行情(现货/合约)
WS_URL = "wss://ws.bitget.com/v2/ws/public"
# 专业版(更低延迟)
WS_PRO_URL = "wss://ws.bitget.com/mix/v1/stream"
2. 订阅行情频道
Bitget采用结构化主题订阅模型,常见频道格式:
{频道类型}:{标的物}.{数据粒度}
示例:
python
复制
# 订阅BTC/USDT现货的1分钟K线
subscribe_msg = {
"op": "subscribe",
"args": [{
"instType": "SP", # SP=现货, MC=合约
"channel": "candle1m", # 频道类型
"instId": "BTCUSDT" # 交易对
}]
}
二、Python代码实现
1. 基础连接与订阅
python
复制
import websockets
import json
import asyncio
async def bitget_websocket_client():
async with websockets.connect(WS_PRO_URL) as ws:
# 发送订阅请求
await ws.send(json.dumps(subscribe_msg))
# 持续接收数据
while True:
response = await ws.recv()
data = json.loads(response)
print("Received:", data)
asyncio.get_event_loop().run_until_complete(bitget_websocket_client())
2. 数据解析示例(K线格式)
python
复制
def parse_kline(data):
if data.get("action") != "snapshot":
return # 仅处理实时推送
kline = data["data"][0]
return {
"timestamp": kline[0], # K线开盘时间
"open": float(kline[1]),
"high": float(kline[2]),
"low": float(kline[3]),
"close": float(kline[4]),
"volume": float(kline[5]),
}
三、主流行情频道列表
频道类型 参数值 数据内容 更新频率
实时K线 candle1m 1分钟/5分钟/15分钟K线 按周期推送
订单簿深度 books 5/15档深度数据 100ms
最新成交 trade 实时成交记录 实时触发
24小时行情 ticker 涨跌幅/成交量等汇总数据 1s
标记价格 markPrice 合约标记价格与资金费率 3s
四、连接维护与优化
1. 心跳保持机制
python
复制
# 每30秒发送心跳包
async def keep_alive(ws):
while True:
await asyncio.sleep(30)
await ws.send(json.dumps({"op": "ping"}))
2. 断线重连策略
python
复制
async def resilient_client():
while True:
try:
async with websockets.connect(WS_PRO_URL) as ws:
await ws.send(subscribe_msg)
asyncio.create_task(keep_alive(ws))
# ... 数据接收逻辑
except websockets.ConnectionClosed:
print("Connection lost, retrying in 5s...")
await asyncio.sleep(5)
五、高级功能实现
1. 多频道批量订阅
python
复制
multi_args = [
{"instType": "SP", "channel": "ticker", "instId": "BTCUSDT"},
{"instType": "MC", "channel": "books", "instId": "BTCUSD_DMCBL"}
]
await ws.send(json.dumps({"op": "subscribe", "args": multi_args}))
2. 订单簿深度维护
python
复制
class OrderBook:
def __init__(self):
self.bids = {}
self.asks = {}
def update(self, data):
if data["action"] == "snapshot":
self.bids = {float(price): float(vol) for price, vol in data["bids"]}
self.asks = {float(price): float(vol) for price, vol in data["asks"]}
elif data["action"] == "update":
# 增量更新逻辑
...
六、错误代码处理
错误码 含义 解决方案
30001 无效订阅请求 检查频道名与参数格式
30003 超过最大连接数 减少并行连接或申请扩容
30006 请求频率过高 降低订阅/查询频率
七、安全注意事项
生产环境部署:
使用wss://协议确保加密传输
通过VPN或专用服务器降低网络延迟
资源管理:
限制单个连接订阅频道数(建议≤50)
启用Zstandard压缩减少带宽占用(添加compress: zstd参数)