跳到主要内容

远程逗猫 —— 弹幕交互

视频演示

项目地址

Python aiowebsocket 实时获取弹幕数据

AioWebSocket 是一个支持异步操作的库,使用前可以对 python 的异步库进行了解

与 B站 直播服务器进行 websocket 连接

import asyncio

from aiowebsocket.converses import AioWebSocket

remote = 'ws://broadcastlv.chat.bilibili.com:2244/sub'
room_id = ''

data_raw = '000000{Header}0010000100000007000000017b22726f6f6d6964223a{ID}7d'
data_raw = data_raw.format(Header=hex(27 + len(room_id))[2:],
ID=''.join(map(lambda x: hex(ord(x))[2:], list(room_id))))
async def startUp():
async with AioWebSocket(remote) as aws:
converse = aws.manipulator
await converse.send(bytes.fromhex(data_raw))
mes = await converse.receive()
print('Client receive: {}'.format(mes))

if __name__ == '__main__':
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(startUp())

except KeyboardInterrupt as exc:
print('Quit.')

  • room_id 中填入你想要获取弹幕的直播间ID

终端输出如下,成功接收服务器消息

Client receive: b'\x00\x00\x00\x1a\x00\x10\x00\x01\x00\x00\x00\x08\x00\x00\x00\x01{"code":0}'

定时发送心跳包,保持连接

定义发送心跳包的协程函数

# 心跳包内容
heart_beat_msg = '00 00 00 10 00 10 00 01 00 00 00 02 00 00 00 01'
# 发送间隔时间
send_heart_beat_interval = 30
async def sendHeartBeat(_webscoket):
while True:
await asyncio.sleep(send_heart_beat_interval)
await _webscoket.send(bytes.fromhex(heart_beat_msg))
print('[Notice] Sent HeartBeat.')

在事件循环中添加心跳包发送任务

async def startUp():
async with AioWebSocket(remote) as aws:
converse = aws.manipulator
await converse.send(bytes.fromhex(data_raw))
mes = await converse.receive()
print('Client receive: {}'.format(mes))
task = asyncio.create_task(sendHeartBeat(converse))
await asyncio.wait({task})

执行后,每间隔 30s 都会向服务器发送心跳包,维持连接

接收并解析数据

async def receivePackage(_webscoket):
while True:
recv_text = await _webscoket.receive()

if recv_text == None:
recv_text = b'\x00\x00\x00\x1a\x00\x10\x00\x01\x00\x00\x00\x08\x00\x00\x00\x01{"code":0}'

parseData(recv_text)
  • recv_text 返回的数据包可能为空,这时候就需要手动设置为连接成功时返回的数据包
import json
import zlib

def parseData(_data):
# 获取数据包的长度,版本和操作类型
packet_len = int(_data[:4].hex(), 16)
ver = int(_data[6:8].hex(), 16)
op = int(_data[8:12].hex(), 16)

if (len(_data) > packet_len):
parseData(_data[packet_len:])
_data = _data[:packet_len]
if (ver == 2):
_data = zlib.decompress(_data[16:])
parseData(_data)
return
if (ver == 1):
return

if (op == 5):
global status_counter
try:
jd = json.loads(_data[16:].decode('utf-8', errors='ignore'))
if (jd['cmd'] == 'DANMU_MSG'):
msg = jd['info'][1]
user = jd['info'][2][1]
print("[DANMU_MSG]: User: {} Msg: {}".format(user, msg))
except Exception as e:
pass
  • 需要数据的哪个字段,就在数据包中获取哪个字段,这里只需要弹幕数据,即 DANMU_MSG 字段

对应字段信息可以参考

在事件循环中添加数据接收任务

async def startUp():
async with AioWebSocket(remote) as aws:
converse = aws.manipulator
await converse.send(bytes.fromhex(data_raw))
mes = await converse.receive()
print('Client receive: {}'.format(mes))
task_heart_beat = asyncio.create_task(sendHeartBeat(converse))
task_receive_package = asyncio.create_task(receivePackage(converse))
await asyncio.wait({task_heart_beat, task_receive_package})

实时接收弹幕消息

定时向串口发送控制信息

被控制的各玩具终端需要一定的执行时间,不能每收到一次弹幕消息就发送一次控制信息,采用计数形式,每收到一种种类的弹幕信息后进行计数,一定后对计数最多的玩具类型执行控制

创建计数器,修改解析函数,对计数器执行累加

KEYS = {'小球':[0xA5, 0xA5, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00],
'激光笔':[0xA5, 0xA5, 0x05, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00],
'逗猫棒':[0xA5, 0xA5, 0x09, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00]}

status_counter = {'小球':0,
'激光笔':0,
'逗猫棒':0}

def parseData(_data):
# 获取数据包的长度,版本和操作类型
...

if (op == 5):
global status_counter
try:
jd = json.loads(_data[16:].decode('utf-8', errors='ignore'))
if (jd['cmd'] == 'DANMU_MSG'):
msg = jd['info'][1]
user = jd['info'][2][1]
print("[DANMU_MSG]: User: {} Msg: {}".format(user, msg))
if msg in KEYS.keys():
status_counter[msg] += 1

except Exception as e:
pass
  • KEYS = {} 为自定义的控制信息

串口定时任务

def statusWrite():
global status_counter
max_p = max(zip(status_counter.values(), status_counter.keys()))
if max_p[0] > 0:
data = KEYS[max_p[1]]
ser.write(bytes(data))
print("\033[0;;42m[Publisher]\033[0m: Use item: ", max_p[1], " \033[0;36m[Counter]\033[0m: Clear")
else:
print("\033[0;;41m[Publisher]\033[0m: No items are selected. \033[0;36m[Counter]\033[0m: Clear")
# 每次发送控制信息后将计数器清零
status_counter = {'小球':0, '激光笔':0, '逗猫棒':0}

SER_WRITE_INTERVAL = 10
async def intervalSend():
global status_counter
while True:
await asyncio.sleep(SER_WRITE_INTERVAL)
statusWrite()

执行效果