Raspberry PiとのProtocol Bufferを用いたUDP通信
前回紹介したProtocol Bufferを用いてRaspberry Pi Zeroと会話する方法
システム構成
- ホスト
- Windows 64bit
- Msys2
- Python 3.8.2
- ターゲット
- Raspberry Pi Zero W
- Linux raspberrypi 4.19.97
- Python 3.7.3
必要パッケージのインストール
Raspberry Pi側
Raspberry Piに必要なパッケージをインストールします
% sudo apt install protobuf-compiler python3-protobuf
ホスト側 (Msys2)
% pacman -S mingw-w64-x86_64-protobuf mingw-w64-x86_64-python-protobuf
メッセージ定義
前回の例で使用したものと同じものを使用します
syntax = "proto3";
message SensorData {
string name = 1; // Sensor Name
uint64 uid = 2; // Sensor Unique ID
enum SensorType {
Unknown = 0;
Thermometer = 1;
Humidity = 2;
}
SensorType type = 3; // Sensor Type
uint64 timestamp_us = 4; // POSIX Timestamp in us
int32 index = 5; // Data index
repeated double value = 6;
}
コンパイル
これも前回同様のコマンドでコンパイルします
% mkdir generated
% protoc -I proto --python_out=generated proto/sensor_data.proto
% ls genereted
sensor_data.pb.cc sensor_data.pb.h sensor_data_pb2.py
Pythonコード
Pythonコード自体はRaspberry Pi側、ホスト側どちらでも動作します。
送信側コード
メッセージをsocket.sendto()
で送信するだけです
import os
import socket
import sys
import time
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "generated"))
import sensor_data_pb2
def main():
# 送信ソケット作成
ts = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# ダミーセンサデータ作成
dummy_data = sensor_data_pb2.SensorData()
dummy_data.name = "Dummy PyThermo"
dummy_data.uid = 2
dummy_data.type = sensor_data_pb2.SensorData.SensorType.Thermometer
dummy_data.timestamp_us = int(time.time_ns() / 1000)
dummy_data.index = 1
dummy_data.value.extend([23.2, 23.3, 23.5])
# UDPで送信 (宛先IP: 192.168.0.10, ポート: 28000)
ts.sendto(dummy_data.sensor_msg.SerializeToString(),
("192.168.0.10", 28000))
if __name__ == "__main__":
main()
受信側コード
import os
from pprint import pprint
import socket
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "generated"))
import sensor_data_pb2
def main():
# 受信ソケット作成 (ポート 28000へのパケットをすべて受信)
rs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rs.bind(("0.0.0.0", 28000))
# データ読み出し
cpp_data = sensor_data_pb2.SensorData()
data, addr = rs.recvfrom(1024)
cpp_data.ParseFromString(data)
pprint(cpp_data)
if __name__ == "__main__":
main()
最近のコメント