从零开始搭建物联网平台(9):快捷指令和指令发送
回来更新了,先说明一下,这个项目没有烂尾,没有烂尾,没有烂尾,第一版其实已经做完几个月了,后来想实现小程序端的部分功能,最后结果就是导致小程序和web两个的接口不兼容,而且第一次设计的时候有很多地方没有设计好,所以又推翻重做了,在这一版本中,会对接入设备的订阅和发布做出严格的限制(无法订阅除系统topic之外的消息,无法发布没有注册的消息),但是为了实现部分设备之间的通信会设置一个自组网络,可将自己的设备添加到这个网络中,在这个网络中进行通信。废话不多说进入正题。
Models
首先需要建立表来存储用户保存的指令内容,这里还是用django的models创建,和user、device表是一对多的关联,其中device_id 会作为消息的topic,所以设备想要接收消息需要订阅device_id的topic
from django.db import models
class CMD(models.Model):
name = models.CharField(max_length=32, null=False) # 指令名称
cmd_id = models.IntegerField(unique=True, null=False, db_index=True) # 指令ID
payload = models.CharField(max_length=32, null=False) # 消息内容
qos = models.IntegerField(default=0,choices=[(0, 0), (1, 1), (2, 2),]) # 消息质量
ctime = models.DateTimeField(auto_now_add=True) # 创建时间
token = models.CharField(max_length=64, null=False) # 鉴权信息
introduce = models.TextField(null=True) # 指令介绍
user = models.ForeignKey('user.UserInfo', to_field='uid', on_delete=models.CASCADE) # 用户ID
device_id = models.ForeignKey('device.Device', to_field='did', on_delete=None) # 指令接收的设备ID(指令的消息头topic)
class Meta:
db_table = 'quick_cmd'
获取快捷指令
使用了rest_framework做了全局的认证,request.user就是认证通过后返回的user_id,返回的数据格式为:{'code':0,'msg':'', 'data':[ {指令1}, {指令2}]}
class CMDInfo(APIView):
"""
快捷指令信息
"""
@staticmethod
def get(request, *args):
"""
获取指令信息(全部)
:param request:
:param args: 指定cmd_id 获取单个信息
:return: json
"""
res = {'code': 0, 'msg': '', 'data': []}
try:
if not args:
cmd_obj = CMD.objects.filter(user_id= request.user).all()
else:
cmd_obj = CMD.objects.filter(cmd_id=args[0]).all()
for i in cmd_obj:
res['data'].append(model_to_dict(i))
except Exception as e:
res['code'] = 1
res['msg'] = e.__repr__()
return JsonResponse(res)
展示指令标签
以小程序端为例:在onLoad函数里请求后台接口,然后在wxml将数据渲染出来,这里使用了iview Weapp的组件
// .js文件
onLoad: function(options) {
api.cmdInfo((res) => {
$Toast.hide()
if (res.data.code === 0) {
//生成随机的颜色
var tagsinfo = res.data.data
for (var i = 0; i < tagsinfo.length; i++){
tagsinfo[i].color = this.data.color[Math.floor(Math.random() * 5)]
}
this.setData({
quickCMD: tagsinfo
})
}
})
},
// .wxml文件
<view class='tags-pannel'>
快捷指令
<view class='edit'>编辑</view>
<view wx:if='{{quickCMD.length>0}}' style="padding-top:20px">
<i-tag class="tags" color="{{item.color}}" data-cmd="{{item}}" bindtap="quickeSend" wx:for="{{quickCMD}}" wx:key='{{index}}'>
{{item.name}}
</i-tag>
</view>
<view wx:else>
<no-data title='未找到已经保存快捷指令!' />
</view>
</view>
发送指令
在wxml文件中绑定了点击事件:quickSend事件,通过target.dataset来传递参数,并将需要用到的数据转换为键值对类型,传给后台接口。
quickSend: function(e) {
let data = {
'topic': e.target.dataset.cmd.device_id,
'qos': e.target.dataset.cmd.qos,
'payload': e.target.dataset.cmd.payload
}
api.sendCmd(data,(res)=>{
if (res.data.code === 0) {
$Toast({
content: '发送成功',
type: 'success',
duration: 1
});
} else{
$Toast({
content: '发送失败!',
type: 'error',
duration: 1
});
}
})
},
后端接收发送过来的data还需要对数据格式进行验证,这里使用了django 的forms实现
from django import forms
from django.core.validators import ValidationError
class QuickCMDForm(forms.Form):
topic = forms.CharField(
required=True, # 是否为空
error_messages={
'required': 'topic不能为空',
'max_length': '用户名长度超过限制'
}
)
payload = forms.CharField(
required=True,
max_length=32, # 最长长度
error_messages={
'required': '消息不能为空',
'max_length': '消息报文超过长度限制'
}
)
qos = forms.IntegerField(
required=True,
error_messages={
'required': 'QoS不能为空',
}
)
# 自定义校验
# 检查QoS值是否正确
def clean_qos(self):
qos = self.cleaned_data['qos']
is_exits = True if qos in [0,1,2] else False
if not is_exits:
raise ValidationError('QoS设置错误')
return qos
对应的view当验证通过之后,还需要对数据进行处理,QoS要为int类型,topic和payload都是str类型的,以免EMQ接收请求失败,再通过HTTP请求EMQ的API实现指令的下发。
class SendCMD(APIView):
"""
发送指令
"""
parser_classes = [JSONParser, FormParser]
@staticmethod
def post(request):
print(request.data)
cmd_form = QuickCMDForm(request.data)
if cmd_form.is_valid(): # 检查消息格式是否正确
try:
data = {
'topic': str(request.data.get('topic')),
'payload': str(request.data.get('payload')),
'qos': int(request.data.get('qos')) # qos必须为整数
}
data = json.dumps(data) # Dict转Json
print(data)
auth = HTTPBasicAuth(settings.MQTT_USER['username'], settings.MQTT_USER['password']) # basicAuth认证
recv = requests.post(url=settings.BASE_EMQ_URL+"/api/v2/mqtt/publish" ,auth=auth, data=data)
if recv.status_code == 200:
return JsonResponse(recv.json())
# return json.loads(res.text)
else:
return JsonResponse({'code': 1, 'result': []})
except Exception as e:
return JsonResponse({"code": 1, 'msg': e.__repr__(),'result': []})
else:
res = {'code': 1, 'msg': ""}
for i, error in cmd_form.errors.items():
res['msg'] = error[0]
return JsonResponse(res)
EMQ提供了HTTP请求的方式来发布消息
API 定义:POST api/v2/mqtt/publish
请求参数:{ "topic" : "/World", "payload": "hello", "qos": 0, "retain" : false, "client_id": "mqttjs_722b4d845f" }
topic 参数必填,其他参数可选。payload 默认值空字符串,qos 默认为 0,retain 默认为 false,client_id 默认为 ‘http’。
请求成功:{ "code": 0, "result": [] }
总结
主要流程:获取用户的快捷指令信息---> 页面展示 --->点击发送--->后台验证消息可靠性--->通过HTTP请求EMQ的API实现发布消息---> 将结果返回给前端显示
最后放几张小程序的截图: