Recently I decided to build a small project to learn Go and, at the same time, to learn a little about messaging protocols, which I've read a lot about but never implemented something from scratch.
This post is about how to create a basic chat, using the Go language, the Go package for MQTT protocol, RabbitMQ and its MQTT plugin. The code was inspired by examples from CloudMQTT documentation.
Messaging Protocols and Brokers
There are a lot of messaging protocols, but most of the use cases are covered by the three main messaging protocols: AMQP, MQTT and STOMP. All of them work with a message broker, for instance, RabbitMQ, Mosquitto, NSQ and ZeroMQ.
For this project I chose MQTT v3.1.1 with RabbitMQ. I found MQTT much easier and interesting, especially its use cases in IoT.
Quality of Service
One important concept in MQTT is Quality of Service. There are three QoS levels:
- Level 0 (At most once): Fire-and-Forget. The sender does not wait for acknowledgment and the message is delivered at most once.
- Level 1 (At least once): The sender stores the message until it gets an acknowledgment from the receiver. The message may be sent multiple times.
- Level 2 (Exactly once): A four-part handshake ensures the message is delivered exactly once.
For our chat, we will use QoS 0 since occasional message loss is acceptable.
Setup
You will need:
- Go language installed
- RabbitMQ with the MQTT plugin enabled
- Two Go libraries:
go get github.com/akamensky/argparse
go get github.com/eclipse/paho.mqtt.golangThe Implementation
The architecture is simple: two goroutines running concurrently -- one subscriber listening for messages (consumer) and one producer publishing user input to the broker. Communication happens through MQTT topics.
The Main Function
The main loop gets user and password from the input arguments and assembles the full URL service:
func main() {
user, passwd := parseUserArgs()
fullUrl := fmt.Sprintf("mqtt://%s:%s@localhost:1883/test", user, passwd)
uri, err := url.Parse(fullUrl)
failOnError(err, "Failed to parse given URL")
forever := make(chan bool)
go listen(uri)
go poolMessage(uri, user)
<-forever
}The Consumer
The consumer is very simple, it creates a client, connected to the given URI, and, every time it receives a message, it calls the callback function and prints it on the screen:
func showMessage(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("* %s\n", string(msg.Payload()))
}
func listen(uri *url.URL) {
client := connect(uri)
client.Subscribe(parseTopic(uri), QOS_AT_MOST_ONCE, showMessage)
}The Producer
The producer, at its turn, waits until a message is typed and then sends it to the broker:
func sendMessage(msg string, uri *url.URL) {
client := connect(uri)
RETAIN_MESSAGE := false
client.Publish(parseTopic(uri), QOS_AT_MOST_ONCE, RETAIN_MESSAGE, msg)
}
func poolMessage(uri *url.URL, user string) {
for {
r := bufio.NewReader(os.Stdin)
msg, _ := r.ReadString('\n')
msg = fmt.Sprintf("%s: %s", user, strings.TrimSpace(msg))
sendMessage(msg, uri)
}
}The Connection
The last piece of this code is the connect function, which is quite simple:
func connect(uri *url.URL) mqtt.Client {
opts := createClientOptions(uri)
client := mqtt.NewClient(opts)
token := client.Connect()
for !token.WaitTimeout(time.Microsecond) {
}
failOnError(token.Error(), "Failed while connecting")
return client
}The options are built using the data passed in URI:
func createClientOptions(uri *url.URL) *mqtt.ClientOptions {
password, _ := uri.User.Password()
name := uri.User.Username()
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s", uri.Host))
opts.SetUsername(name)
opts.SetPassword(password)
return opts
}A Challenge I Faced
I encountered difficulty clearing retained messages when experimenting with the RETAIN_MESSAGE parameter. Unable to publish null values through Go's MQTT library, I resolved the issue using the Mosquitto client instead, highlighting a gap in my Go toolkit knowledge at the time.
Final Thoughts
This is a quite simple tutorial and I know it did not cover all the subject. The complete code can be seen on GitHub.