Jex’s Note

MQTT

Introduction

MQTT 是設計給 IOT 的 messaging protocal (pub/sub), 因為它很輕量且效率好, 在 client 實作上它比 AMQP 需要更少的資源, 另外 mqtt 本身沒有支援 authorization (這段需要在 server 端自已實作)

Install

Command line tool

node.js 的 mqtt tool

npm install mqtt --save
npm install mqtt -g

Command

Subscribe

mqtt sub -h my-mqtt-server.com -t my_topic -u mqtt_username -P mqtt_password

Publish

mqtt pub -h my-mqtt-server.com -t my_topic -u mqtt_username -P mqtt_password -m 'Hello world'

Via SSL

mqtt pub -h my-mqtt-server.com -p 8883 -C mqtts ...(略)

QoS

  • QoS 0 : received at most once : The packet is sent, and that’s it. There is no validation about whether it has been received.
  • QoS 1 : received at least once : The packet is sent and stored as long as the client has not received a confirmation from the server. MQTT ensures that it will be received, but there can be duplicates.
  • QoS 2 : received exactly once : Same as QoS 1 but there is no duplicates.

Golang - paho mqtt

介紹

這是 golang 實作的 mqtt package, 目前只有 client, 沒有實作 broker

操作

New & Connect & Close

func New(params map[string]string) (MQTT.Client, error) {
    opts := MQTT.NewClientOptions()
    opts.SetKeepAlive(4 * time.Second)
    opts.SetPingTimeout(2 * time.Second)
    opts.AddBroker(params["broker"])
    opts.SetClientID(params["client_id"])
    opts.SetUsername(params["username"])
    opts.SetPassword(params["password"])
    opts.SetAutoReconnect(true)
    client := MQTT.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return nil, token.Error()
    }
    return client, nil
}

params := map[string]string{
    "broker":    "ssl://mqtt.example.com:8883",
    "client_id": "Client ID",
    "username":  "Username",
    "password":  "Password",
}
mqtt_client, err := New(params)
if err != nil {
    return errors.New("New mqtt err: " + err.Error())
}

// Close
defer mqtt_client.Disconnect(250)

Publish

topic := "test/mqtt"
for {
    msg := strconv.FormatInt(time.Now().UnixNano(), 10)
    if token := mqtt_client.Publish(pub_topic, 0, false, "Hello"); token.Wait() && token.Error() != nil {
        log.Println(token.Error())
    } else {
        log.Printf("Successfully published `%s` to `%s`\n", msg, topic)
    }
}

publish 建議用 token.WaitTimeout, 避免極小機率造成 deadlock, ref: https://github.com/eclipse/paho.mqtt.golang/issues/185

Subscribe

var msg_payload, msg_topic string
if token := mqtt_client.Subscribe("test/mqtt", 0, func(client MQTT.Client, msg MQTT.Message) {
    msg_payload = string(msg.Payload())
    msg_topic = string(msg.Topic())
}); token.Wait() && token.Error() != nil {
    log.Fatal(token.Error())
}
var pre_msg string
for {
    time.Sleep(300 * time.Millisecond)
    if msg_payload != pre_msg {
        fmt.Printf("Successfully received `%s` from `%s`\n", msg_payload, msg_topic)
    }
    pre_msg = msg_payload
}

Unsubscribe

if token := mqtt_client.Unsubscribe(sub_topic); token.Wait() && token.Error() != nil {
    return token.Error()
}

Comments