MQTT

MQTT

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimise network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.

Further information on mqtt.org.

Streaming Differences

Apache Pekko Connectors contains another MQTT connector which is not based on the Eclipse Paho client, unlike this one. Please refer to the other connector where the differences are expanded on.

The Apache Pekko Connectors MQTT connector provides an Apache Pekko Stream source, sink and flow to connect to MQTT brokers. It is based on the Eclipse Paho Java client.

Project Info: Apache Pekko Connectors MQTT
Artifact
org.apache.pekko
pekko-connectors-mqtt
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.stream.connectors.mqtt
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-mqtt" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-mqtt_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-mqtt_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Settings

The required MqttConnectionSettings (API) settings to connect to an MQTT server are

  1. the MQTT broker address
  2. a unique ID for the client (setting it to the empty string should let the MQTT broker assign it, but not all do; you might want to generate it)
  3. the MQTT client persistence to use (eg. MemoryPersistence) which allows to control reliability guarantees
Scala
sourceval connectionSettings = MqttConnectionSettings(
  "tcp://localhost:1883", // (1)
  "test-scala-client", // (2)
  new MemoryPersistence // (3)
)
Java
sourceMqttConnectionSettings connectionSettings =
    MqttConnectionSettings.create(
        "tcp://localhost:1883", // (1)
        "test-java-client", // (2)
        new MemoryPersistence() // (3)
        );

Most settings are passed on to Paho’s MqttConnectOptions (API) and documented there.

Use delayed stream restarts

Note that the following examples do not provide any connection management and are designed to get you going quickly. Consider empty client IDs to auto-generate unique identifiers and the use of delayed stream restarts. The underlying Paho library’s auto-reconnect feature does not handle initial connections by design.

Configure encrypted connections

To connect with transport-level security configure the address as ssl://, set authentication details and pass in a socket factory.

Scala
sourceval connectionSettings = MqttConnectionSettings(
  "ssl://localhost:1885",
  "ssl-client",
  new MemoryPersistence).withAuth("mqttUser", "mqttPassword")
  .withSocketFactory(SSLContext.getDefault.getSocketFactory)
Java
sourceMqttConnectionSettings connectionSettings =
    MqttConnectionSettings.create("ssl://localhost:1885", "ssl-client", new MemoryPersistence())
        .withAuth("mqttUser", "mqttPassword")
        .withSocketFactory(SSLContext.getDefault().getSocketFactory());

Reading from MQTT

At most once

Then let’s create a source that connects to the MQTT server and receives messages from the subscribed topics.

The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies.

Scala
sourceval mqttSource: Source[MqttMessage, Future[Done]] =
  MqttSource.atMostOnce(
    connectionSettings.withClientId(clientId = "source-spec/source"),
    MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)),
    bufferSize = 8)

val (subscribed, streamResult) = mqttSource
  .take(messages.size)
  .toMat(Sink.seq)(Keep.both)
  .run()
Java
sourceMqttSubscriptions subscriptions =
    MqttSubscriptions.create(topic1, MqttQoS.atMostOnce())
        .addSubscription(topic2, MqttQoS.atMostOnce());

Source<MqttMessage, CompletionStage<Done>> mqttSource =
    MqttSource.atMostOnce(
        connectionSettings.withClientId("source-test/source"), subscriptions, bufferSize);

Pair<CompletionStage<Done>, CompletionStage<List<String>>> materialized =
    mqttSource
        .map(m -> m.topic() + "-" + m.payload().utf8String())
        .take(messageCount * 2)
        .toMat(Sink.seq(), Keep.both())
        .run(system);

CompletionStage<Done> subscribed = materialized.first();
CompletionStage<List<String>> streamResult = materialized.second();

This source has a materialized value (Future[Done]CompletionStage&lt;Done&gt;) which is completed when the subscription to the MQTT broker has been established.

MQTT atMostOnce automatically acknowledges messages back to the server when they are passed downstream.

At least once

The atLeastOnce source allow users to acknowledge the messages anywhere downstream. Please note that for manual acks to work CleanSession should be set to false and MqttQoS should be AtLeastOnce.

The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies.

Scala
sourceval mqttSource: Source[MqttMessageWithAck, Future[Done]] =
  MqttSource.atLeastOnce(
    connectionSettings
      .withClientId(clientId = "source-spec/source1")
      .withCleanSession(false),
    MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
    bufferSize = 8)
Java
sourceSource<MqttMessageWithAck, CompletionStage<Done>> mqttSource =
    MqttSource.atLeastOnce(
        connectionSettings
            .withClientId("source-test/source-withoutAutoAck")
            .withCleanSession(false),
        MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()),
        bufferSize);

The atLeastOnce source returns MqttMessageWithAckMqttMessageWithAck so you can acknowledge them by calling ack().

Scala
sourceval result = mqttSource
  .via(businessLogic)
  .mapAsync(1)(messageWithAck => messageWithAck.ack().map(_ => messageWithAck.message))
  .take(input.size)
  .runWith(Sink.seq)
Java
sourcefinal CompletionStage<List<MqttMessage>> result =
    mqttSource
        .via(businessLogic)
        .mapAsync(
            1,
            messageWithAck ->
                messageWithAck.ack().thenApply(unused2 -> messageWithAck.message()))
        .take(input.size())
        .runWith(Sink.seq(), system);

Publishing to MQTT

To publish messages to the MQTT server create a sink be specifying MqttConnectionSettings (API) and a default Quality of Service-level.

Scala
sourceval sink: Sink[MqttMessage, Future[Done]] =
  MqttSink(connectionSettings, MqttQoS.AtLeastOnce)
Source(messages).runWith(sink)
Java
sourceSink<MqttMessage, CompletionStage<Done>> mqttSink =
    MqttSink.create(connectionSettings.withClientId("source-test/sink"), MqttQoS.atLeastOnce());
Source.from(messages).runWith(mqttSink, system);

The Quality of Service-level and the retained flag can be configured on a per-message basis.

Scala
sourceval lastWill = MqttMessage(willTopic, ByteString("ohi"))
  .withQos(MqttQoS.AtLeastOnce)
  .withRetained(true)
Java
sourceMqttMessage lastWill =
    MqttMessage.create(willTopic, ByteString.fromString("ohi"))
        .withQos(MqttQoS.atLeastOnce())
        .withRetained(true);

Publish and subscribe in a single flow

It is also possible to connect to the MQTT server in bidirectional fashion, using a single underlying connection (and client ID). To do that create an MQTT flow that combines the functionalities of an MQTT source and an MQTT sink.

The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies.

Scala
sourceval mqttFlow: Flow[MqttMessage, MqttMessage, Future[Done]] =
  MqttFlow.atMostOnce(
    connectionSettings.withClientId("flow-spec/flow"),
    MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
    bufferSize = 8,
    MqttQoS.AtLeastOnce)
Java
sourcefinal Flow<MqttMessage, MqttMessage, CompletionStage<Done>> mqttFlow =
    MqttFlow.atMostOnce(
        connectionSettings,
        MqttSubscriptions.create("flow-test/topic", MqttQoS.atMostOnce()),
        bufferSize,
        MqttQoS.atLeastOnce());

Run the flow by connecting a source of messages to be published and a sink for received messages.

Scala
sourceval ((mqttMessagePromise, subscribed), result) = source
  .viaMat(mqttFlow)(Keep.both)
  .toMat(Sink.seq)(Keep.both)
  .run()
Java
sourcefinal Pair<
        Pair<CompletableFuture<Optional<MqttMessage>>, CompletionStage<Done>>,
        CompletionStage<List<MqttMessage>>>
    materialized =
        source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(), Keep.both()).run(system);

CompletableFuture<Optional<MqttMessage>> mqttMessagePromise = materialized.first().first();
CompletionStage<Done> subscribedToMqtt = materialized.first().second();
CompletionStage<List<MqttMessage>> streamResult = materialized.second();

Using flow with Acknowledge on message sent

It is possible to create a flow that receives MqttMessageWithAck instead of MqttMessage. In this case, when the message is successfully sent to the broker, an ack is sent. This flow can be used when the source must be acknowledged only when the message is successfully sent to the destination topic. This provides at-least-once semantics.

The flow emits MqttMessageWithAcks with the message swapped with the new content and keeps the ack function from the original source.

Scala
sourceval mqttFlow: Flow[MqttMessageWithAck, MqttMessageWithAck, Future[Done]] =
  MqttFlow.atLeastOnceWithAck(
    connectionSettings,
    MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
    bufferSize = 8,
    MqttQoS.AtLeastOnce)
Java
sourcefinal Flow<MqttMessageWithAck, MqttMessageWithAck, CompletionStage<Done>> mqttFlow =
    MqttFlow.atLeastOnceWithAck(
        connectionSettings,
        MqttSubscriptions.create("flow-test/topic-ack", MqttQoS.atMostOnce()),
        bufferSize,
        MqttQoS.atLeastOnce());

Run the flow by connecting a source of messages to be published and a sink for received messages. When the message are sent, an ack is called.

Scala
sourceval (subscribed, result) = source
  .viaMat(mqttFlow)(Keep.right)
  .toMat(Sink.seq)(Keep.both)
  .run()
Java
sourcefinal Pair<Pair<NotUsed, CompletionStage<Done>>, CompletionStage<List<MqttMessageWithAck>>>
    materialized =
        source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(), Keep.both()).run(system);

Capturing MQTT client logging

The Paho library uses its own logging adapter and contains a default implementation to use java.util.logging. See Paho/Log and Debug.

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires a MQTT server running in the background. You can start one quickly using docker:

docker compose up mqtt

Scala
sbt
> mqtt/testOnly *.MqttSourceSpec
Java
sbt
> mqtt/testOnly *.MqttSourceTest