如何用Python实现树莓派远程视频流扫码

树莓派由于硬件配置低,在运行复杂计算的时候会比较吃力。为了解决这种瓶颈,可以考虑云计算。这篇文章分享下如何通过树莓派发送视频流到远程服务器做条形码识别。

环境配置

安装

树莓派

下载OpenCV, scipypillow:

$ sudo apt-get install libopencv-dev python-opencv python-scipy
$ python -m pip install pillow

Windows 10

下载条形码SDK,通过源码(https://github.com/dynamsoft-dbr/python)编译出Python条码识别模块。

下载OpenCV:

pip install opencv-python

使用Python Socket传输视频

有人实现了 NumpySocket,可以直接拿来使用,主要解决了NumPy Array(通过OpenCV获取的视频帧)的传输。

识别条形码,用JSON返回结果

通过Socket发送接收JSON:

import json
def sendJSON(self, data):
    out = json.dumps(data)
 
    try:
        self.connection.sendall(out)
    except Exception:
        exit()
 
def receiveJSON(self):
    try:
        chunk = self.socket.recv(1024)    
    except Exception:
        exit()
             
    return json.loads(chunk)

创建用于Windows的pc.py文件:

from numpysocket import NumpySocket
import cv2
import time
import json
import dbr
 
# Get the license of Dynamsoft Barcode Reader from https://www.dynamsoft.com/CustomerPortal/Portal/Triallicense.aspx
dbr.initLicense('LICENSE KEY')
 
npSocket = NumpySocket()
npSocket.startServer(9999)
 
# Receive frames for barcode detection
while(True):
    try:
        frame = npSocket.recieveNumpy()
        # cv2.imshow('PC Reader', frame)
        results = dbr.decodeBuffer(frame, 0x3FF | 0x2000000 | 0x4000000 | 0x8000000 | 0x10000000)
        out = {}
        out['results'] = results
 
        # Send barcode results to Raspberry Pi
        npSocket.sendJSON({'results': results})
    except:
        break
     
    # Press ESC to exit
    key = cv2.waitKey(20)
    if key == 27 or key == ord('q'):
        break
 
npSocket.endServer()
print('Closed')

这段代码主要是通过循环,不断接收视频帧做条形码识别。然后把结果用JSON格式打包发送。

在树莓派上创建一个rpi.py文件。通过OpenCV的接口可以不断获取视频帧。使用queue来保存:

def read_barcode():
    vc = cv2.VideoCapture(0)
    vc.set(3, 640) #set width
    vc.set(4, 480) #set height
 
    if not vc.isOpened():
        print('Camera is not ready.')
        return
 
    host_ip = '192.168.8.84'
    npSocket = NumpySocket()
    npSocket.startClient(host_ip, 9999)
 
    socket_thread = SocketThread('SocketThread', npSocket)
    socket_thread.start() 
 
    while vc.isOpened():
         
        ret, f = vc.read()
        cv2.imshow("RPi Reader", f)
        frame = imresize(f, .5)
 
        key = cv2.waitKey(20)
        if key == 27 or key == ord('q'):   
            socket_thread.isRunning = False
            socket_thread.join() 
            break
 
        if not socket_thread.is_alive():
            break
         
        try:
            frame_queue.put_nowait(frame)
        except:
            # Clear unused frames
            try:
                while True:
                    frame_queue.get_nowait()
            except:
                pass
 
    frame_queue.close()
    frame_queue.join_thread()
    vc.release()

创建了一个线程用于收发视频帧数据和结果:

class SocketThread (threading.Thread):
    def __init__(self, name, npSocket):
        threading.Thread.__init__(self)
        self.name = name
        self.npSocket = npSocket
        self.isRunning = True
 
    def run(self):      
        while self.isRunning:
            frame = frame_queue.get(timeout=100)
 
            try:
                start_time = time.time()
                self.npSocket.sendNumpy(frame)
                obj = self.npSocket.receiveJSON()
                print("--- %.2f ms seconds ---" % ((time.time() - start_time) * 1000))
                data = obj['results']
 
                if (len(data) > 0):
                    for result in data:
                        print("Type: " + result[0])
                        print("Value: " + result[1] + "\n")
                else:
                    print('No barcode detected.')
            except:
                break
             
        self.npSocket.endClient()      

在Windows和树莓派上分别运行对应的Python程序:
如何用Python实现树莓派远程视频流扫码

参考

源码

https://github.com/yushulx/python-socket-rpi-barcode