MQTT
MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimise network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.
Further information on mqtt.org.
Apache Pekko Connectors contains another MQTT connector which is not based on the Eclipse Paho client, unlike this one. Please refer to the other connector where the differences are expanded on.
The Apache Pekko Connectors MQTT connector provides an Apache Pekko Stream source, sink and flow to connect to MQTT brokers. It is based on the Eclipse Paho Java client.
Project Info: Apache Pekko Connectors MQTT | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-mqtt
1.0.2
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.14, 2.12.20, 3.3.3 |
JPMS module name | pekko.stream.connectors.mqtt |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.0.3" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-mqtt" % "1.0.2", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
<properties> <pekko.version>1.0.3</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-mqtt_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.0.3", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-mqtt_${versions.ScalaBinary}:1.0.2" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Settings
The required MqttConnectionSettings
(API
) settings to connect to an MQTT server are
- the MQTT broker address
- a unique ID for the client (setting it to the empty string should let the MQTT broker assign it, but not all do; you might want to generate it)
- the MQTT client persistence to use (eg.
MemoryPersistence
) which allows to control reliability guarantees
- Scala
-
source
val connectionSettings = MqttConnectionSettings( "tcp://localhost:1883", // (1) "test-scala-client", // (2) new MemoryPersistence // (3) )
- Java
-
source
MqttConnectionSettings connectionSettings = MqttConnectionSettings.create( "tcp://localhost:1883", // (1) "test-java-client", // (2) new MemoryPersistence() // (3) );
Most settings are passed on to Paho’s MqttConnectOptions
(API
) and documented there.
Note that the following examples do not provide any connection management and are designed to get you going quickly. Consider empty client IDs to auto-generate unique identifiers and the use of delayed stream restarts. The underlying Paho library’s auto-reconnect feature does not handle initial connections by design.
Configure encrypted connections
To connect with transport-level security configure the address as ssl://
, set authentication details and pass in a socket factory.
- Scala
-
source
val connectionSettings = MqttConnectionSettings( "ssl://localhost:1885", "ssl-client", new MemoryPersistence).withAuth("mqttUser", "mqttPassword") .withSocketFactory(SSLContext.getDefault.getSocketFactory)
- Java
-
source
MqttConnectionSettings connectionSettings = MqttConnectionSettings.create("ssl://localhost:1885", "ssl-client", new MemoryPersistence()) .withAuth("mqttUser", "mqttPassword") .withSocketFactory(SSLContext.getDefault().getSocketFactory());
Reading from MQTT
At most once
Then let’s create a source that connects to the MQTT server and receives messages from the subscribed topics.
The bufferSize
sets the maximum number of messages read from MQTT before back-pressure applies.
- Scala
-
source
val mqttSource: Source[MqttMessage, Future[Done]] = MqttSource.atMostOnce( connectionSettings.withClientId(clientId = "source-spec/source"), MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)), bufferSize = 8) val (subscribed, streamResult) = mqttSource .take(messages.size) .toMat(Sink.seq)(Keep.both) .run()
- Java
-
source
MqttSubscriptions subscriptions = MqttSubscriptions.create(topic1, MqttQoS.atMostOnce()) .addSubscription(topic2, MqttQoS.atMostOnce()); Source<MqttMessage, CompletionStage<Done>> mqttSource = MqttSource.atMostOnce( connectionSettings.withClientId("source-test/source"), subscriptions, bufferSize); Pair<CompletionStage<Done>, CompletionStage<List<String>>> materialized = mqttSource .map(m -> m.topic() + "-" + m.payload().utf8String()) .take(messageCount * 2) .toMat(Sink.seq(), Keep.both()) .run(system); CompletionStage<Done> subscribed = materialized.first(); CompletionStage<List<String>> streamResult = materialized.second();
This source has a materialized value (Future[Done]
CompletionStage<Done>
) which is completed when the subscription to the MQTT broker has been established.
MQTT atMostOnce
automatically acknowledges messages back to the server when they are passed downstream.
At least once
The atLeastOnce
source allow users to acknowledge the messages anywhere downstream. Please note that for manual acks to work CleanSession
should be set to false and MqttQoS
should be AtLeastOnce
.
The bufferSize
sets the maximum number of messages read from MQTT before back-pressure applies.
- Scala
-
source
val mqttSource: Source[MqttMessageWithAck, Future[Done]] = MqttSource.atLeastOnce( connectionSettings .withClientId(clientId = "source-spec/source1") .withCleanSession(false), MqttSubscriptions(topic, MqttQoS.AtLeastOnce), bufferSize = 8)
- Java
-
source
Source<MqttMessageWithAck, CompletionStage<Done>> mqttSource = MqttSource.atLeastOnce( connectionSettings .withClientId("source-test/source-withoutAutoAck") .withCleanSession(false), MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()), bufferSize);
The atLeastOnce
source returns MqttMessageWithAck
MqttMessageWithAck
so you can acknowledge them by calling ack()
.
- Scala
-
source
val result = mqttSource .via(businessLogic) .mapAsync(1)(messageWithAck => messageWithAck.ack().map(_ => messageWithAck.message)) .take(input.size) .runWith(Sink.seq)
- Java
-
source
final CompletionStage<List<MqttMessage>> result = mqttSource .via(businessLogic) .mapAsync( 1, messageWithAck -> messageWithAck.ack().thenApply(unused2 -> messageWithAck.message())) .take(input.size()) .runWith(Sink.seq(), system);
Publishing to MQTT
To publish messages to the MQTT server create a sink be specifying MqttConnectionSettings
(API
) and a default Quality of Service-level.
- Scala
-
source
val sink: Sink[MqttMessage, Future[Done]] = MqttSink(connectionSettings, MqttQoS.AtLeastOnce) Source(messages).runWith(sink)
- Java
-
source
Sink<MqttMessage, CompletionStage<Done>> mqttSink = MqttSink.create(connectionSettings.withClientId("source-test/sink"), MqttQoS.atLeastOnce()); Source.from(messages).runWith(mqttSink, system);
The Quality of Service-level and the retained flag can be configured on a per-message basis.
- Scala
-
source
val lastWill = MqttMessage(willTopic, ByteString("ohi")) .withQos(MqttQoS.AtLeastOnce) .withRetained(true)
- Java
-
source
MqttMessage lastWill = MqttMessage.create(willTopic, ByteString.fromString("ohi")) .withQos(MqttQoS.atLeastOnce()) .withRetained(true);
Publish and subscribe in a single flow
It is also possible to connect to the MQTT server in bidirectional fashion, using a single underlying connection (and client ID). To do that create an MQTT flow that combines the functionalities of an MQTT source and an MQTT sink.
The bufferSize
sets the maximum number of messages read from MQTT before back-pressure applies.
- Scala
-
source
val mqttFlow: Flow[MqttMessage, MqttMessage, Future[Done]] = MqttFlow.atMostOnce( connectionSettings.withClientId("flow-spec/flow"), MqttSubscriptions(topic, MqttQoS.AtLeastOnce), bufferSize = 8, MqttQoS.AtLeastOnce)
- Java
-
source
final Flow<MqttMessage, MqttMessage, CompletionStage<Done>> mqttFlow = MqttFlow.atMostOnce( connectionSettings, MqttSubscriptions.create("flow-test/topic", MqttQoS.atMostOnce()), bufferSize, MqttQoS.atLeastOnce());
Run the flow by connecting a source of messages to be published and a sink for received messages.
- Scala
-
source
val ((mqttMessagePromise, subscribed), result) = source .viaMat(mqttFlow)(Keep.both) .toMat(Sink.seq)(Keep.both) .run()
- Java
-
source
final Pair< Pair<CompletableFuture<Optional<MqttMessage>>, CompletionStage<Done>>, CompletionStage<List<MqttMessage>>> materialized = source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(), Keep.both()).run(system); CompletableFuture<Optional<MqttMessage>> mqttMessagePromise = materialized.first().first(); CompletionStage<Done> subscribedToMqtt = materialized.first().second(); CompletionStage<List<MqttMessage>> streamResult = materialized.second();
Using flow with Acknowledge on message sent
It is possible to create a flow that receives MqttMessageWithAck
instead of MqttMessage
. In this case, when the message is successfully sent to the broker, an ack is sent. This flow can be used when the source must be acknowledged only when the message is successfully sent to the destination topic. This provides at-least-once semantics.
The flow emits MqttMessageWithAck
s with the message swapped with the new content and keeps the ack function from the original source.
- Scala
-
source
val mqttFlow: Flow[MqttMessageWithAck, MqttMessageWithAck, Future[Done]] = MqttFlow.atLeastOnceWithAck( connectionSettings, MqttSubscriptions(topic, MqttQoS.AtLeastOnce), bufferSize = 8, MqttQoS.AtLeastOnce)
- Java
-
source
final Flow<MqttMessageWithAck, MqttMessageWithAck, CompletionStage<Done>> mqttFlow = MqttFlow.atLeastOnceWithAck( connectionSettings, MqttSubscriptions.create("flow-test/topic-ack", MqttQoS.atMostOnce()), bufferSize, MqttQoS.atLeastOnce());
Run the flow by connecting a source of messages to be published and a sink for received messages. When the message are sent, an ack is called.
- Scala
-
source
val (subscribed, result) = source .viaMat(mqttFlow)(Keep.right) .toMat(Sink.seq)(Keep.both) .run()
- Java
-
source
final Pair<Pair<NotUsed, CompletionStage<Done>>, CompletionStage<List<MqttMessageWithAck>>> materialized = source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(), Keep.both()).run(system);
Capturing MQTT client logging
The Paho library uses its own logging adapter and contains a default implementation to use java.util.logging
. See Paho/Log and Debug.
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Test code requires a MQTT server running in the background. You can start one quickly using docker:
docker compose up mqtt
- Scala
-
sbt > mqtt/testOnly *.MqttSourceSpec
- Java
-
sbt > mqtt/testOnly *.MqttSourceTest