AMQP

The AMQP connector provides Apache Pekko Stream sources and sinks to connect to AMQP 0.9.1 servers (RabbitMQ, OpenAMQ, etc.).

AMQP 1.0 is currently not supported (Qpid, ActiveMQ, Solace, etc.).

Project Info: Apache Pekko Connectors AMQP
Artifact
org.apache.pekko
pekko-connectors-amqp
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.stream.connectors.amqp
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-amqp" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-amqp_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-amqp_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Connecting to server

All the AMQP connectors are configured using a AmqpConnectionProviderAmqpConnectionProvider and a list of DeclarationDeclaration

There are several types of AmqpConnectionProviderAmqpConnectionProvider:

Warning

Please be aware that the basic usage of AmqpConnectionProviderAmqpConnectionProvider like this AmqpUriConnectionProvider(s"amqp://$host:$port") has an issue with recovering connections, more details can be found in this issue

Sending messages

First define a queue name and the declaration of the queue that the messages will be sent to.

Scala
sourceval queueName = "amqp-conn-it-spec-simple-queue-" + System.currentTimeMillis()
val queueDeclaration = QueueDeclaration(queueName)
Java
sourcefinal String queueName = "amqp-conn-it-test-simple-queue-" + System.currentTimeMillis();
final QueueDeclaration queueDeclaration = QueueDeclaration.create(queueName);

Here we used QueueDeclarationQueueDeclaration configuration class to create a queue declaration.

With flow

Similarly as with Sink, the first step is to create Flow which accepts WriteMessageWriteMessages and forwards it’s content to the AMQP server. Flow emits WriteResultWriteResults informing about publication result (see below for summary of delivery guarantees for different Flow variants).

AmqpFlowAmqpFlow is a collection of factory methods that facilitates creation of flows. Here we created a simple sink, which means that we are able to pass ByteStrings to the sink instead of wrapping data into WriteMessageWriteMessages.

Last step is to materialize and run the flow we have created.

Scala
sourceval settings = AmqpWriteSettings(connectionProvider)
  .withRoutingKey(queueName)
  .withDeclaration(queueDeclaration)
  .withBufferSize(10)
  .withConfirmationTimeout(200.millis)

val amqpFlow: Flow[WriteMessage, WriteResult, Future[Done]] =
  AmqpFlow.withConfirm(settings)

val input = Vector("one", "two", "three", "four", "five")
val result: Future[Seq[WriteResult]] =
  Source(input)
    .map(message => WriteMessage(ByteString(message)))
    .via(amqpFlow)
    .runWith(Sink.seq)
Java
sourcefinal AmqpWriteSettings settings =
    AmqpWriteSettings.create(connectionProvider)
        .withRoutingKey(queueName)
        .withDeclaration(queueDeclaration)
        .withBufferSize(10)
        .withConfirmationTimeout(Duration.ofMillis(200));

final Flow<WriteMessage, WriteResult, CompletionStage<Done>> amqpFlow =
    AmqpFlow.createWithConfirm(settings);

final List<String> input = Arrays.asList("one", "two", "three", "four", "five");

final List<WriteResult> result =
    Source.from(input)
        .map(message -> WriteMessage.create(ByteString.fromString(message)))
        .via(amqpFlow)
        .runWith(Sink.seq(), system)
        .toCompletableFuture()
        .get();

Various variants of AMQP flow offer different delivery and ordering guarantees:

AMQP flow factory Description
AmqpFlow.apply The most basic type of flow. Does not impose delivery guarantees, messages are published in a fire-and-forget manner. Emitted results have confirmed always set to true.
AmqpFlow.withConfirm Variant that uses asynchronous confirmations. Maximum number of messages simultaneously waiting for confirmation before signaling backpressure is configured with a bufferSize parameter. Emitted results preserve the order of messages pulled from upstream - due to that restriction this flow is expected to be slightly less effective than it’s unordered counterpart.
AmqpFlow.withConfirmUnordered The same as AmqpFlow.withConfirm with the exception of ordering guarantee - results are emitted downstream as soon as confirmation is received, meaning that there is no ordering guarantee of any sort.

For FlowWithContextFlowWithContext counterparts of above flows see AmqpFlowWithContextAmqpFlowWithContext.

Warning

AmqpFlow.withConfirm and AmqpFlow.withConfirmUnordered are implemented using RabbitMQ’s extension to AMQP protocol (Publisher Confirms), therefore they are not intended to work with another AMQP brokers.

With sink

Create a sink, that accepts and forwards ByteStringByteStrings to the AMQP server.

AmqpSinkAmqpSink is a collection of factory methods that facilitates creation of sinks. Here we created a simple sink, which means that we are able to pass ByteStrings to the sink instead of wrapping data into WriteMessageWriteMessages.

Last step is to materialize and run the sink we have created.

Scala
sourceval amqpSink: Sink[ByteString, Future[Done]] =
  AmqpSink.simple(
    AmqpWriteSettings(connectionProvider)
      .withRoutingKey(queueName)
      .withDeclaration(queueDeclaration))

val input = Vector("one", "two", "three", "four", "five")
val writing: Future[Done] =
  Source(input)
    .map(s => ByteString(s))
    .runWith(amqpSink)
Java
sourcefinal Sink<ByteString, CompletionStage<Done>> amqpSink =
    AmqpSink.createSimple(
        AmqpWriteSettings.create(connectionProvider)
            .withRoutingKey(queueName)
            .withDeclaration(queueDeclaration));

final List<String> input = Arrays.asList("one", "two", "three", "four", "five");
CompletionStage<Done> writing =
    Source.from(input).map(ByteString::fromString).runWith(amqpSink, system);

Receiving messages

Create a source using the same queue declaration as before.

The bufferSize parameter controls the maximum number of messages to prefetch from the AMQP server.

Run the source and take the same amount of messages as we previously sent to it.

Scala
sourceval amqpSource: Source[ReadResult, NotUsed] =
  AmqpSource.atMostOnceSource(
    NamedQueueSourceSettings(connectionProvider, queueName)
      .withDeclaration(queueDeclaration)
      .withAckRequired(false),
    bufferSize = 10)

val result: Future[immutable.Seq[ReadResult]] =
  amqpSource
    .take(input.size)
    .runWith(Sink.seq)
Java
sourcefinal Integer bufferSize = 10;
final Source<ReadResult, NotUsed> amqpSource =
    AmqpSource.atMostOnceSource(
        NamedQueueSourceSettings.create(connectionProvider, queueName)
            .withDeclaration(queueDeclaration)
            .withAckRequired(false),
        bufferSize);

final CompletionStage<List<ReadResult>> result =
    amqpSource.take(input.size()).runWith(Sink.seq(), system);

This is how you send and receive message from AMQP server using this connector.

Using Pub/Sub

Instead of sending messages directly to queues, it is possible to send messages to an exchange and then provide instructions to the AMQP server what to do with incoming messages. We are going to use the fanout type of the exchange, which enables message broadcasting to multiple consumers. We are going to do that by using an exchange declaration for the sink and all of the sources.

Scala
sourceval exchangeName = "amqp-conn-it-spec-pub-sub-" + System.currentTimeMillis()
val exchangeDeclaration = ExchangeDeclaration(exchangeName, "fanout")
Java
sourcefinal String exchangeName = "amqp-conn-it-test-pub-sub-" + System.currentTimeMillis();
final ExchangeDeclaration exchangeDeclaration =
    ExchangeDeclaration.create(exchangeName, "fanout");

The sink for the exchange is created in a very similar way.

Scala
sourceval amqpSink = AmqpSink.simple(
  AmqpWriteSettings(connectionProvider)
    .withExchange(exchangeName)
    .withDeclaration(exchangeDeclaration))
Java
sourcefinal Sink<ByteString, CompletionStage<Done>> amqpSink =
    AmqpSink.createSimple(
        AmqpWriteSettings.create(connectionProvider)
            .withExchange(exchangeName)
            .withDeclaration(exchangeDeclaration));

For the source, we are going to create multiple sources and merge them using Apache Pekko Streams operators.

Scala
sourceval fanoutSize = 4

val mergedSources = (0 until fanoutSize).foldLeft(Source.empty[(Int, String)]) {
  case (source, fanoutBranch) =>
    source.merge(
      AmqpSource
        .atMostOnceSource(
          TemporaryQueueSourceSettings(
            connectionProvider,
            exchangeName).withDeclaration(exchangeDeclaration),
          bufferSize = 1)
        .map(msg => (fanoutBranch, msg.bytes.utf8String)))
}
Java
sourcefinal int fanoutSize = 4;
final int bufferSize = 1;

Source<Pair<Integer, String>, NotUsed> mergedSources = Source.empty();
for (int i = 0; i < fanoutSize; i++) {
  final int fanoutBranch = i;
  mergedSources =
      mergedSources.merge(
          AmqpSource.atMostOnceSource(
                  TemporaryQueueSourceSettings.create(connectionProvider, exchangeName)
                      .withDeclaration(exchangeDeclaration),
                  bufferSize)
              .map(msg -> Pair.create(fanoutBranch, msg.bytes().utf8String())));
}

We merge all sources into one and add the index of the source to all incoming messages, so we can distinguish which source the incoming message came from.

Such sink and source can be started the same way as in the previous example.

Using rabbitmq as an RPC mechanism

If you have remote workers that you want to incorporate into a stream, you can do it using rabbit RPC workflow RabbitMQ RPC

Scala
sourceval amqpRpcFlow = AmqpRpcFlow.simple(
  AmqpWriteSettings(connectionProvider).withRoutingKey(queueName).withDeclaration(queueDeclaration))

val (rpcQueueF: Future[String], probe: TestSubscriber.Probe[ByteString]) = Source(input)
  .map(s => ByteString(s))
  .viaMat(amqpRpcFlow)(Keep.right)
  .toMat(TestSink.probe)(Keep.both)
  .run()
Java
sourcefinal Flow<ByteString, ByteString, CompletionStage<String>> ampqRpcFlow =
    AmqpRpcFlow.createSimple(
        AmqpWriteSettings.create(connectionProvider)
            .withRoutingKey(queueName)
            .withDeclaration(queueDeclaration),
        1);

Pair<CompletionStage<String>, TestSubscriber.Probe<ByteString>> result =
    Source.from(input)
        .map(ByteString::fromString)
        .viaMat(ampqRpcFlow, Keep.right())
        .toMat(TestSink.probe(system), Keep.both())
        .run(system);

Acknowledging messages downstream

Committable sources return CommittableReadResultCommittableReadResult which wraps the ReadResultReadResult and exposes the methods ack and nack.

Use ack to acknowledge the message back to RabbitMQ. ack takes an optional boolean parameter multiple indicating whether you are acknowledging the individual message or all the messages up to it.

Use nack to reject a message. Apart from the multiple argument, nack takes another optional boolean parameter indicating whether the item should be requeued or not.

Scala
sourceval amqpSource = AmqpSource.committableSource(
  NamedQueueSourceSettings(connectionProvider, queueName)
    .withDeclaration(queueDeclaration),
  bufferSize = 10)

val result: Future[immutable.Seq[ReadResult]] = amqpSource
  .mapAsync(1)(businessLogic)
  .mapAsync(1)(cm => cm.ack().map(_ => cm.message))
  .take(input.size)
  .runWith(Sink.seq)

val nackedResults: Future[immutable.Seq[ReadResult]] = amqpSource
  .mapAsync(1)(businessLogic)
  .take(input.size)
  .mapAsync(1)(cm => cm.nack(multiple = false, requeue = true).map(_ => cm.message))
  .runWith(Sink.seq)
Java
sourcefinal Integer bufferSize = 10;
final Source<CommittableReadResult, NotUsed> amqpSource =
    AmqpSource.committableSource(
        NamedQueueSourceSettings.create(connectionProvider, queueName)
            .withDeclaration(queueDeclaration),
        bufferSize);

final CompletionStage<List<ReadResult>> result =
    amqpSource
        .mapAsync(1, this::businessLogic)
        .mapAsync(1, cm -> cm.ack(/* multiple */ false).thenApply(unused -> cm.message()))
        .take(input.size())
        .runWith(Sink.seq(), system);

final CompletionStage<List<ReadResult>> nackedResults =
    amqpSource
        .take(input.size())
        .mapAsync(1, this::businessLogic)
        .mapAsync(
            1,
            cm ->
                cm.nack(/* multiple */ false, /* requeue */ true)
                    .thenApply(unused -> cm.message()))
        .runWith(Sink.seq(), system);