远程逗猫 —— 小程序 mqtt IoT
配置
所使用的物联网平台为阿里云物联网平台
根据已准备好的物联网平台设备信息,创建链接参数
创建 mqtt_cfg.yaml
文件
# DeviceSecret
productKey: ""
deviceName: ""
deviceSecret: ""
# MQTT Connection Parameters
clientId: ""
username: ""
mqttHostUrl: ""
port: 1883
passwd: ""
- 根据阿里云给出的信息,分别填写这些参数
Python 接入阿里云 MQTT 订阅话题
- 导入所需的包
import yaml
import time
import sys
import json
import paho.mqtt.client as mqtt
- 导入配置信息
cfg = yaml.load(open(sys.path[0] + '/mqtt_cfg.yaml', 'r', encoding='utf-8').read(), Loader=yaml.FullLoader)
productKey = cfg['productKey']
deviceName = cfg['deviceName']
deviceSecret = cfg['deviceSecret']
subTopic = "/" + productKey + "/" + deviceName + "/user/get"
clientId = cfg['clientId']
username = cfg['username']
mqttHostUrl = cfg['mqttHostUrl']
port = cfg['port']
passwd= cfg['passwd']
connect_mqtt()
keepAlive = 300
def connect_mqtt() -> mqtt:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Host! ......")
print("Connect aliyun IoT Cloud Sucess")
else:
print("Connect failed... error code is:" + str(rc))
client = mqtt.Client(clientId)
client.username_pw_set(username=username, password=passwd)
client.on_connect = on_connect
client.connect(mqttHostUrl, port, keepAlive)
return client
subscribe()
def subscribe(client: mqtt):
def on_message(client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode()
print("receive message ---------- topic is : " + topic)
print("receive message ---------- payload is : " + payload)
print("Waiting for reception")
client.subscribe(subTopic)
client.on_message = on_message
run()
def run():
client = connect_mqtt()
client.loop_start()
time.sleep(2)
subscribe(client)
while True:
time.sleep(1)
if __name__ == '__main__':
run()
即可订阅 "/user/get"
话题,如需要在接收到消息后,执行操作,在 on_message()
函数执行操作即可
加入串口控制的完整程序
import yaml
import time
import sys
import json
import paho.mqtt.client as mqtt
cfg = yaml.load(open(sys.path[0] + '/mqtt_cfg.yaml', 'r', encoding='utf-8').read(), Loader=yaml.FullLoader)
productKey = cfg['productKey']
deviceName = cfg['deviceName']
deviceSecret = cfg['deviceSecret']
subTopic = "/" + productKey + "/" + deviceName + "/user/get"
clientId = cfg['clientId']
username = cfg['username']
mqttHostUrl = cfg['mqttHostUrl']
port = cfg['port']
passwd= cfg['passwd']
keepAlive = 300
import serial
ser = serial.Serial(port='/dev/ttyUSB0',
baudrate=115200,
timeout=0.5)
KEYS = {'小球':[0xA5, 0xA5, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00],
'激光笔':[0xA5, 0xA5, 0x05, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00],
'逗猫棒':[0xA5, 0xA5, 0x0A, 0x02, 0x01, 0x00, 0x00, 0x00, 0x00]}
def connect_mqtt() -> mqtt:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("[Connect]: Connected to MQTT Host! ......")
print("[Connect]: Connect aliyun IoT Cloud Sucess")
else:
print("[Connect]: Connect failed... error code is:" + str(rc))
client = mqtt.Client(clientId)
client.username_pw_set(username=username, password=passwd)
client.on_connect = on_connect
client.connect(mqttHostUrl, port, keepAlive)
return client
def subscribe(client: mqtt):
def on_message(client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode()
print("\033[0;;42m[Message]\033[0m: receive message ---------- topic is : " + topic)
print("\033[0;;42m[Message]\033[0m: receive message ---------- payload is : " + payload)
payload_dict = json.loads(payload)
if 'stick' in payload_dict['params']:
data = KEYS['逗猫棒']
ser.write(bytes(data))
print("\033[0;36m[Publisher]: Use item: stick\033[0m")
elif 'ball' in payload_dict['params']:
data = KEYS['小球']
ser.write(bytes(data))
print("\033[0;36m[Publisher]: Use item: ball\033[0m")
elif 'laser' in payload_dict['params']:
data = KEYS['激光笔']
ser.write(bytes(data))
print("\033[0;36m[Publisher]: Use item: laser\033[0m")
print("[Subscribe]: Waiting for reception")
client.subscribe(subTopic)
client.on_message = on_message
def run():
client = connect_mqtt()
client.loop_start()
time.sleep(2)
subscribe(client)
while True:
time.sleep(1)
if __name__ == '__main__':
run()