Carpe Diem

備忘録

MQTTを使ってみる2

概要

前回との違いは以下です。

  • Broker, Publisher, Subscriberを別々のサーバにする
  • プログラムでPub-Subを確認する

環境

VM構築

Vagrantfileに以下のコードを貼り付けて実行します。

  config.vm.define :broker do |node|
    node.vm.hostname = "broker"
    node.vm.network :private_network, ip: "192.168.33.11"
  end
  config.vm.define :publisher do |node|
    node.vm.hostname = "publisher"
    node.vm.network :private_network, ip: "192.168.33.12"
  end
  config.vm.define :subscriber do |node|
    node.vm.hostname = "subscriber"
    node.vm.network :private_network, ip: "192.168.33.13"
  end
$ vagrant up

Broker

こちらは前回と同じくMosquittoを使います。

broker:~$ sudo apt-get install mosquitto

Publisher, Subscriber共通

今回はPythonで構築します。

$ sudo apt-get install python-pip
$ sudo pip install paho-mqtt

このpahoというEclipseのプロジェクトがMQTTの開発を続けているので、クライアントに悩んだらとりあえずpahoと書いてあるものを探すといいです。

Publisher

publisher:~$ vim pub.py

以下のソースを貼り付けて下さい。

# coding=utf8

import paho.mqtt.client as paho

def on_connect(mqttc, obj, rc):
    print("rc: "+str(rc))

def on_message(mqttc, obj, msg):
    print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))

def on_publish(mqttc, obj, mid):
    print("mid: "+str(mid))

if __name__ == '__main__':
    mqttc = paho.Client(protocol=paho.MQTTv31)
    mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.on_publish = on_publish

    mqttc.connect("192.168.33.11", 1883, 60)

    mqttc.publish("my/topic/string", "hello world", 1)

Subscriber

subscriber:~$ vim sub.py

同様に以下のソースを貼り付けて下さい。

# coding=utf8

import paho.mqtt.client as paho

def on_connect(mqttc, obj, rc):
    print("rc: "+str(rc))

def on_message(mqttc, obj, msg):
    print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))

def on_subscribe(mqttc, obj, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))

if __name__ == '__main__':
    mqttc = paho.Client(protocol=paho.MQTTv31)
    mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.on_subscribe = on_subscribe

    mqttc.connect("192.168.33.11", 1883, 60)

    mqttc.subscribe("my/topic/string", 0)

    mqttc.loop_forever()

動作確認

ログ見やすいようにBrokerをフォアグラウンド実行します。
一度サービスを停止してから再度実行します。

broker:~$ sudo service mosquitto stop
broker:~$ mosquitto
1415940762: mosquitto version 0.15 (build date 2013-08-23 19:23:43+0000) starting
1415940762: Opening ipv4 listen socket on port 1883.
1415940762: Opening ipv6 listen socket on port 1883.

Subscriberを実行して受信状態にします。

subscriber:~$ python sub.py
rc: 0
Subscribed: 1 (0,)

Brokerを見ると接続できているか確認できます。

1415940965: New connection from 192.168.33.13.
1415940965: New client connected from 192.168.33.13 as paho/47D61414790CF69926.

この時に以下のエラーが出るときはバージョンが違います。3.1.1と3.1の違いみたいです。

Invalid protocol "MQTT" in CONNECT

Publisherを実行してhello worldを送信してみましょう。

publisher:~$ python pub.py

Subscriberの方を見るときちんと受信できてます。

my/topic/string 0 hello world

Brokerの方は以下の様なログが出ます。

1415941269: New connection from 192.168.33.13.
1415941269: New client connected from 192.168.33.13 as paho/EF0E5C9E20C84AFD5E.
1415941293: New connection from 192.168.33.12.
1415941293: New client connected from 192.168.33.12 as paho/7FF17CFD6C3A3D6234.
1415941293: Socket read error on client paho/7FF17CFD6C3A3D6234, disconnecting.

ソース