Raspberry PiとのProtocol Bufferを用いたUDP通信

前回紹介したProtocol Bufferを用いてRaspberry Pi Zeroと会話する方法


システム構成

  • ホスト
    • Windows 64bit
    • Msys2
    • Python 3.8.2
  • ターゲット
    • Raspberry Pi Zero W
    • Linux raspberrypi 4.19.97
    • Python 3.7.3


必要パッケージのインストール

Raspberry Pi側

Raspberry Piに必要なパッケージをインストールします

% sudo apt install protobuf-compiler python3-protobuf

ホスト側 (Msys2)

% pacman -S mingw-w64-x86_64-protobuf mingw-w64-x86_64-python-protobuf

メッセージ定義

前回の例で使用したものと同じものを使用します

syntax = "proto3";

message SensorData {
  string name = 1;           // Sensor Name
  uint64 uid = 2;            // Sensor Unique ID
  enum SensorType {
    Unknown = 0;
    Thermometer = 1;
    Humidity = 2;
  }
  SensorType type = 3;       // Sensor Type
  uint64 timestamp_us = 4;   // POSIX Timestamp in us
  int32 index = 5;           // Data index
  repeated double value = 6;
}

コンパイル

これも前回同様のコマンドでコンパイルします

% mkdir generated
% protoc -I proto --python_out=generated proto/sensor_data.proto
% ls genereted
sensor_data.pb.cc sensor_data.pb.h sensor_data_pb2.py

Pythonコード

Pythonコード自体はRaspberry Pi側、ホスト側どちらでも動作します。

送信側コード

メッセージをsocket.sendto()で送信するだけです

import os
import socket
import sys
import time

sys.path.append(os.path.join(os.path.dirname(__file__), "..", "generated"))
import sensor_data_pb2


def main():
    # 送信ソケット作成
    ts = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # ダミーセンサデータ作成
    dummy_data = sensor_data_pb2.SensorData()
    dummy_data.name = "Dummy PyThermo"
    dummy_data.uid = 2
    dummy_data.type = sensor_data_pb2.SensorData.SensorType.Thermometer
    dummy_data.timestamp_us = int(time.time_ns() / 1000)
    dummy_data.index = 1
    dummy_data.value.extend([23.2, 23.3, 23.5])

    # UDPで送信 (宛先IP: 192.168.0.10, ポート: 28000)
    ts.sendto(dummy_data.sensor_msg.SerializeToString(),
             ("192.168.0.10", 28000))

if __name__ == "__main__":
    main()

受信側コード

import os
from pprint import pprint
import socket
import sys

sys.path.append(os.path.join(os.path.dirname(__file__), "..", "generated"))
import sensor_data_pb2


def main():
    # 受信ソケット作成 (ポート 28000へのパケットをすべて受信)
    rs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    rs.bind(("0.0.0.0", 28000))

    # データ読み出し
    cpp_data = sensor_data_pb2.SensorData()
    data, addr = rs.recvfrom(1024)
    cpp_data.ParseFromString(data)
    pprint(cpp_data)

if __name__ == "__main__":
    main()

おすすめ