MQTT Streaming

MQTT Streaming

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimize network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.

Further information on mqtt.org.

Paho Differences

Apache Pekko Connectors contains another MQTT connector which is based on the Eclipse Paho client. Unlike the Paho version, this library has no dependencies other than those of Apache Pekko Streams i.e. it is entirely reactive. As such, there should be a significant performance advantage given its pure-Apache Pekko foundations in terms of memory usage given its diligent use of threads.

This library also differs in that it separates out the concern of how MQTT is connected. Unlike Paho, where TCP is assumed, this library can join in any flow. The end result is that by using this library, Unix Domain Sockets, TCP, UDP or anything else can be used to transport MQTT.

The Apache Pekko Connectors MQTT Streaming connector provides an Apache Pekko Stream flow to connect to MQTT brokers. In addition, a flow is provided so that you can implement your own MQTT server in the case where you do not wish to use a broker–MQTT is a fine protocol for directed client/server interactions, as well as having an intermediary broker.

Apache Pekko Connectors MQTT Streaming implements the MQTT 3.1.1 protocol.

Project Info: Apache Pekko Connectors MQTT Streaming
Artifact
org.apache.pekko
pekko-connectors-mqtt-streaming
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.stream.connectors.mqttStreaming
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-mqtt-streaming" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
  "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-mqtt-streaming_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-mqtt-streaming_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
  implementation "org.apache.pekko:pekko-actor-typed_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Flow through a client session

The following code illustrates how to establish an MQTT client session and join it with a TCP connection:

Scala
sourceval settings = MqttSessionSettings()
val session = ActorMqttClientSession(settings)

val connection = Tcp().outgoingConnection("localhost", 1883)

val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
  Mqtt
    .clientSessionFlow(session, ByteString("1"))
    .join(connection)
Java
sourceMqttSessionSettings settings = MqttSessionSettings.create();
MqttClientSession session = ActorMqttClientSession.create(settings, system);

Flow<ByteString, ByteString, CompletionStage<Tcp.OutgoingConnection>> connection =
    Tcp.get(system).outgoingConnection("localhost", 1883);

Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
    Mqtt.clientSessionFlow(session, ByteString.fromString("1")).join(connection);

The resulting flow’s type shows how Commands are received and Events are emitted. With Event, they can be either decoded successfully or not.

Run the flow by connecting a source of messages to be published via a queue:

Scala
sourceval (commands: SourceQueueWithComplete[Command[Nothing]], events: Future[Publish]) =
  Source
    .queue(2, OverflowStrategy.fail)
    .via(mqttFlow)
    .collect {
      case Right(Event(p: Publish, _)) => p
    }
    .toMat(Sink.head)(Keep.both)
    .run()

commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
commands.offer(Command(Subscribe(topic)))
session ! Command(
  Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")))

// for shutting down properly
commands.complete()
commands.watchCompletion().foreach(_ => session.shutdown())
Java
sourcePair<SourceQueueWithComplete<Command<Object>>, CompletionStage<Publish>> run =
    Source.<Command<Object>>queue(3, OverflowStrategy.fail())
        .via(mqttFlow)
        .collect(
            new JavaPartialFunction<DecodeErrorOrEvent<Object>, Publish>() {
              @Override
              public Publish apply(DecodeErrorOrEvent<Object> x, boolean isCheck) {
                if (x.getEvent().isPresent() && x.getEvent().get().event() instanceof Publish)
                  return (Publish) x.getEvent().get().event();
                else throw noMatch();
              }
            })
        .toMat(Sink.head(), Keep.both())
        .run(system);

SourceQueueWithComplete<Command<Object>> commands = run.first();
commands.offer(new Command<>(new Connect(clientId, ConnectFlags.CleanSession())));
commands.offer(new Command<>(new Subscribe(topic)));
session.tell(
    new Command<>(
        new Publish(
            ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
            topic,
            ByteString.fromString("ohi"))));

// for shutting down properly
commands.complete();
commands.watchCompletion().thenAccept(done -> session.shutdown());

Note that the Publish command is not offered to the command flow given MQTT QoS requirements. Instead, the session is told to perform Publish given that it can retry continuously with buffering until a command flow is established.

We filter the events received as there will be ACKs to our connect, subscribe and publish. The collected event is the publication to the topic we just subscribed to.

To shut down the flow after use, the command queue commands is completed and after its completion the session is shut down.

Flow through a server session

The following code illustrates how to establish an MQTT server session and join it with a TCP binding:

Scala
sourceval settings = MqttSessionSettings()
val session = ActorMqttServerSession(settings)

val maxConnections = 1

val bindSource: Source[Either[MqttCodec.DecodeError, Event[Nothing]], Future[Tcp.ServerBinding]] =
  Tcp()
    .bind(host, 0)
    .flatMapMerge(
      maxConnections,
      { connection =>
        val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
          Mqtt
            .serverSessionFlow(session, ByteString(connection.remoteAddress.getAddress.getAddress))
            .join(connection.flow)

        val (queue, source) = Source
          .queue[Command[Nothing]](3, OverflowStrategy.dropHead)
          .via(mqttFlow)
          .toMat(BroadcastHub.sink)(Keep.both)
          .run()

        val subscribed = Promise[Done]()
        source
          .runForeach {
            case Right(Event(_: Connect, _)) =>
              queue.offer(Command(ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted)))
            case Right(Event(cp: Subscribe, _)) =>
              queue.offer(Command(SubAck(cp.packetId, cp.topicFilters.map(_._2)), Some(subscribed), None))
            case Right(Event(publish @ Publish(flags, _, Some(packetId), _), _))
                if flags.contains(ControlPacketFlags.RETAIN) =>
              queue.offer(Command(PubAck(packetId)))
              subscribed.future.foreach(_ => session ! Command(publish))
            case _ => // Ignore everything else
          }

        source
      })
Java
sourceMqttSessionSettings settings = MqttSessionSettings.create();
MqttServerSession session = ActorMqttServerSession.create(settings, system);

int maxConnections = 1;

Source<DecodeErrorOrEvent<Object>, CompletionStage<Tcp.ServerBinding>> bindSource =
    Tcp.get(system)
        .bind(host, port)
        .flatMapMerge(
            maxConnections,
            connection -> {
              Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
                  Mqtt.serverSessionFlow(
                          session,
                          ByteString.fromArray(
                              connection.remoteAddress().getAddress().getAddress()))
                      .join(connection.flow());

              Pair<
                      SourceQueueWithComplete<Command<Object>>,
                      Source<DecodeErrorOrEvent<Object>, NotUsed>>
                  run =
                      Source.<Command<Object>>queue(2, OverflowStrategy.dropHead())
                          .via(mqttFlow)
                          .toMat(BroadcastHub.of(DecodeErrorOrEvent.classOf()), Keep.both())
                          .run(system);

              SourceQueueWithComplete<Command<Object>> queue = run.first();
              Source<DecodeErrorOrEvent<Object>, NotUsed> source = run.second();

              CompletableFuture<Done> subscribed = new CompletableFuture<>();
              source.runForeach(
                  deOrE -> {
                    if (deOrE.getEvent().isPresent()) {
                      Event<Object> event = deOrE.getEvent().get();
                      ControlPacket cp = event.event();
                      if (cp instanceof Connect) {
                        queue.offer(
                            new Command<>(
                                new ConnAck(
                                    ConnAckFlags.None(),
                                    ConnAckReturnCode.ConnectionAccepted())));
                      } else if (cp instanceof Subscribe) {
                        Subscribe subscribe = (Subscribe) cp;
                        Collection<Tuple2<String, ControlPacketFlags>> topicFilters =
                            JavaConverters.asJavaCollectionConverter(subscribe.topicFilters())
                                .asJavaCollection();
                        List<Integer> flags =
                            topicFilters.stream()
                                .map(x -> x._2().underlying())
                                .collect(Collectors.toList());
                        queue.offer(
                            new Command<>(
                                new SubAck(subscribe.packetId(), flags),
                                Optional.of(subscribed),
                                Optional.empty()));
                      } else if (cp instanceof Publish) {
                        Publish publish = (Publish) cp;
                        if ((publish.flags() & ControlPacketFlags.RETAIN()) != 0) {
                          int packetId = publish.packetId().get().underlying();
                          queue.offer(new Command<>(new PubAck(packetId)));
                          subscribed.thenRun(() -> session.tell(new Command<>(publish)));
                        }
                      } // Ignore everything else
                    }
                  },
                  system);

              return source;
            });

The resulting source’s type shows how Events are received and Commands are queued in reply. Our example acknowledges a connection, subscription and publication. Upon receiving a publication, it is re-published from the server so that any client that is subscribed will receive it. An additional detail is that we hold off re-publishing until we have a subscription from the client. Note also how the session is told to perform Publish commands directly as they will be broadcasted to all clients subscribed to the topic.

Run the flow:

Scala
sourceval (bound: Future[Tcp.ServerBinding], server: UniqueKillSwitch) = bindSource
  .viaMat(KillSwitches.single)(Keep.both)
  .to(Sink.ignore)
  .run()

// for shutting down properly
server.shutdown()
session.shutdown()
Java
sourcePair<CompletionStage<Tcp.ServerBinding>, UniqueKillSwitch> bindingAndSwitch =
    bindSource.viaMat(KillSwitches.single(), Keep.both()).to(Sink.ignore()).run(system);

CompletionStage<Tcp.ServerBinding> bound = bindingAndSwitch.first();
UniqueKillSwitch server = bindingAndSwitch.second();

// for shutting down properly
server.shutdown();
commands.watchCompletion().thenAccept(done -> session.shutdown());

To shut down the server after use, the server flow is shut down via a KillSwitch and the session is shut down.