Consumer

The Apache Pekko Connectors JMS connector offers consuming JMS messages from topics or queues:

  • Read javax.jms.Messages from an Apache Pekko Streams source
  • Allow for client acknowledgement to the JMS broker
  • Allow for JMS transactions
  • Read raw JVM types from an Apache Pekko Streams Source

The JMS message model supports several types of message bodies in (see javax.jms.Message), which may be created directly from the Apache Pekko Stream elements, or in wrappers to access more advanced features.

Receiving messages

JmsConsumerJmsConsumer offers factory methods to consume JMS messages in a number of ways.

This examples shows how to listen to a JMS queue and emit javax.jms.Message elements into the stream.

The materialized value JmsConsumerControlJmsConsumerControl is used to shut down the consumer (it is a KillSwitchKillSwitch) and offers the possibility to inspect the connectivity state of the consumer.

Scala
sourceval jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer(
  JmsConsumerSettings(system, connectionFactory).withQueue("numbers"))

val (control, result): (JmsConsumerControl, Future[immutable.Seq[String]]) =
  jmsSource
    .take(msgsIn.size)
    .map {
      case t: javax.jms.TextMessage => t.getText
      case other                    => sys.error(s"unexpected message type ${other.getClass}")
    }
    .toMat(Sink.seq)(Keep.both)
    .run()

control.shutdown()
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Source<javax.jms.Message, JmsConsumerControl> jmsSource =
    JmsConsumer.create(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

Pair<JmsConsumerControl, CompletionStage<List<String>>> controlAndResult =
    jmsSource
        .take(expectedMessages)
        .map(
            msg -> {
              if (msg instanceof TextMessage) {
                TextMessage t = (TextMessage) msg;
                return t.getText();
              } else
                throw new RuntimeException("unexpected message type " + msg.getClass());
            })
        .toMat(Sink.seq(), Keep.both())
        .run(system);

JmsConsumerControl control = controlAndResult.first();
control.shutdown();

Configure JMS consumers

To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory. The Apache Pekko Connectors tests and all examples use Active MQ.

Scala
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
Java
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();

The created ConnectionFactory is then used for the creation of the different JMS sources.

The JmsConsumerSettingsJmsConsumerSettings factories allow for passing the actor system to read from the default pekko.connectors.jms.consumer section, or you may pass a Config instance which is resolved to a section of the same structure.

Scala
sourceval consumerConfig: Config = system.settings.config.getConfig(JmsConsumerSettings.configPath)
// reiterating defaults from reference.conf
val settings = JmsConsumerSettings(consumerConfig, connectionFactory)
  .withQueue("target-queue")
  .withCredentials(Credentials("username", "password"))
  .withConnectionRetrySettings(retrySettings)
  .withSessionCount(1)
  .withBufferSize(100)
  .withAckTimeout(1.second)
Java
sourceConfig consumerConfig = config.getConfig(JmsConsumerSettings.configPath());
JmsConsumerSettings settings =
    JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url"))
        .withTopic("message-topic")
        .withCredentials(Credentials.create("username", "password"))
        .withConnectionRetrySettings(retrySettings)
        .withSessionCount(10)
        .withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge())
        .withSelector("Important = TRUE");

The Apache Pekko Connectors JMS consumer is configured via default settings in the HOCON config file section pekko.connectors.jms.consumer in your application.conf, and settings may be tweaked in the code using the withXyz methods. On the second tab the section from reference.conf shows the structure to use for configuring multiple set-ups.

Table
Setting Description Default Value
connectionFactory Factory to use for creating JMS connections Must be set in code
destination Destination (queue or topic) to send JMS messages to Must be set in code
credentials JMS broker credentials Empty
connectionRetrySettings Retry characteristics if the connection failed to be established or is taking a long time. See Connection Retries
sessionCount Number of parallel sessions to use for receiving JMS messages. defaults to 1
bufferSize Maximum number of messages to prefetch before applying backpressure. 100
ackTimeout For use with JMS transactions, only: maximum time given to a message to be committed or rolled back. 1 second
maxAckInterval For use with AckSource, only: The max duration before the queued acks are sent to the broker Empty
maxPendingAcks For use with AckSource, only: The amount of acks that get queued before being sent to the broker 100
selector JMS selector expression (see below) Empty
connectionStatusSubscriptionTimeout 5 seconds Time to wait for subscriber of connection status events before starting to discard them
reference.conf
source# Jms Consumer Settings
# sets default values
consumer {
  # Configure connection retrying by providing settings for ConnectionRetrySettings.
  connection-retry = ${pekko.connectors.jms.connection-retry}
  # Credentials to connect to the JMS broker.
  # credentials {
  #   username = "some text"
  #   password = "some text"
  # }
  # "off" to not use any credentials.
  credentials = off
  # Number of parallel sessions to use for receiving JMS messages.
  session-count = 1
  # Buffer size for maximum number for messages read from JMS when there is no demand.
  buffer-size = 100
  # JMS selector expression.
  # See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
  # empty string for unset
  selector = "" # optional
  # Set an explicit acknowledge mode.
  # (Consumers have specific defaults.)
  # See eg. javax.jms.Session.AUTO_ACKNOWLEDGE
  # Allowed values: "off", "auto", "client", "duplicates-ok", "session", integer value
  acknowledge-mode = off
  # Timeout for acknowledge.
  # (Used by TX consumers.)
  ack-timeout = 1 second
  # For use with transactions, if true the stream fails if Apache Pekko Connectors rolls back the transaction
  # when `ack-timeout` is hit.
  fail-stream-on-ack-timeout = false
  # Max interval before sending queued acknowledges back to the broker. (Used by AckSources.)
  # max-ack-interval = 5 seconds
  # Max number of acks queued by AckSource before they are sent to broker. (Unless MaxAckInterval is specified).
  max-pending-acks = ${pekko.connectors.jms.consumer.buffer-size}
  # How long the stage should preserve connection status events for the first subscriber before discarding them
  connection-status-subscription-timeout = 5 seconds
}

Broker specific destinations

To reach out to special features of the JMS broker, destinations can be created as CustomDestination which takes a factory method for creating destinations.

Scala
sourcedef createQueue(destinationName: String): Session => javax.jms.Queue = { (session: Session) =>
  val amqSession = session.asInstanceOf[ActiveMQSession]
  amqSession.createQueue(s"my-$destinationName")
}

val jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer(
  JmsConsumerSettings(consumerConfig, connectionFactory)
    .withDestination(CustomDestination("custom", createQueue("custom"))))
Java
sourceFunction<javax.jms.Session, javax.jms.Destination> createQueue(String destinationName) {
  return (session) -> {
    ActiveMQSession amqSession = (ActiveMQSession) session;
    try {
      return amqSession.createQueue("my-" + destinationName);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    }
  };
}

        Source<Message, JmsConsumerControl> jmsSource =
            JmsConsumer.create(
                JmsConsumerSettings.create(system, connectionFactory)
                    .withDestination(new CustomDestination("custom", createQueue("custom"))));

Using JMS client acknowledgement

Client acknowledgement ensures a message is successfully received by the consumer and notifies the JMS broker for every message. Due to the threading details in JMS brokers, this special source is required (see the explanation below).

Scala
sourceval jmsSource: Source[AckEnvelope, JmsConsumerControl] = JmsConsumer.ackSource(
  JmsConsumerSettings(consumerConfig, connectionFactory)
    .withSessionCount(5)
    .withQueue(queueName))

val result: Future[immutable.Seq[javax.jms.Message]] =
  jmsSource
    .take(msgsIn.size)
    .map { ackEnvelope =>
      ackEnvelope.acknowledge()
      ackEnvelope.message
    }
    .runWith(Sink.seq)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Source<org.apache.pekko.stream.connectors.jms.AckEnvelope, JmsConsumerControl> jmsSource =
    JmsConsumer.ackSource(
        JmsConsumerSettings.create(system, connectionFactory)
            .withSessionCount(5)
            .withQueue("test"));

CompletionStage<List<javax.jms.Message>> result =
    jmsSource
        .take(msgsIn.size())
        .map(
            envelope -> {
              envelope.acknowledge();
              return envelope.message();
            })
        .runWith(Sink.seq(), system);

The sessionCount parameter controls the number of JMS sessions to run in parallel.

Notes:

  • Using multiple sessions increases throughput, especially if acknowledging message by message is desired.
  • Messages may arrive out of order if sessionCount is larger than 1.
  • Message-by-message acknowledgement can be achieved by setting bufferSize to 0, thus disabling buffering. The outstanding messages before backpressure will be the sessionCount.
  • If buffering is enabled then it’s possible for messages to remain in the buffer and never be acknowledged (or acknowledged after a long time) when no new elements arrive to reach the maxPendingAcks threshold. By setting maxAckInterval messages will be acknowledged after the defined interval or number of pending acks, whichever comes first.
  • The default AcknowledgeMode is ClientAcknowledge but can be overridden to custom AcknowledgeModes, even implementation-specific ones by setting the AcknowledgeMode in the JmsConsumerSettings when creating the stream.
Warning

Using a regular JmsConsumer with AcknowledgeMode.ClientAcknowledge and using message.acknowledge() from the stream is not compliant with the JMS specification and can cause issues for some message brokers. message.acknowledge() in many cases acknowledges the session and not the message itself, contrary to what the API makes you believe.

Use this JmsConsumer.ackSource as shown above instead.

Using JMS transactions

JMS transactions may be used with this connector. Be aware that transactions are a heavy-weight tool and may not perform very good.

Scala
sourceval jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource(
  JmsConsumerSettings(consumerConfig, connectionFactory)
    .withSessionCount(5)
    .withAckTimeout(1.second)
    .withQueue(queueName))

val result: Future[immutable.Seq[javax.jms.Message]] =
  jmsSource
    .take(msgsIn.size)
    .map { txEnvelope =>
      txEnvelope.commit()
      txEnvelope.message
    }
    .runWith(Sink.seq)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Source<org.apache.pekko.stream.connectors.jms.TxEnvelope, JmsConsumerControl> jmsSource =
    JmsConsumer.txSource(
        JmsConsumerSettings.create(system, connectionFactory)
            .withSessionCount(5)
            .withAckTimeout(Duration.ofSeconds(1))
            .withQueue("test"));

CompletionStage<List<javax.jms.Message>> result =
    jmsSource
        .take(msgsIn.size())
        .map(
            txEnvelope -> {
              txEnvelope.commit();
              return txEnvelope.message();
            })
        .runWith(Sink.seq(), system);

The sessionCount parameter controls the number of JMS sessions to run in parallel.

The ackTimeout parameter controls the maximum time given to a message to be committed or rolled back. If the message times out it will automatically be rolled back. This is to prevent stream from starvation if the application fails to commit or rollback a message, or if the message errors out and the stream is resumed by a decider.

Notes:

  • Higher throughput is achieved by increasing the sessionCount.
  • Messages will arrive out of order if sessionCount is larger than 1.
  • Buffering is not supported in transaction mode. The bufferSize is ignored.
  • The default AcknowledgeMode is SessionTransacted but can be overridden to custom AcknowledgeModes, even implementation-specific ones by setting the AcknowledgeMode in the JmsConsumerSettings when creating the stream.

Using JMS selectors

Create a javax.jms.Message source specifying a JMS selector expression: Verify that we are only receiving messages according to the selector:

Scala
sourceval jmsSource = JmsConsumer(
  JmsConsumerSettings(consumerConfig, connectionFactory)
    .withQueue("numbers")
    .withSelector("IsOdd = TRUE"))
Java
sourceSource<Message, JmsConsumerControl> jmsSource =
    JmsConsumer.create(
        JmsConsumerSettings.create(system, connectionFactory)
            .withQueue("test")
            .withSelector("IsOdd = TRUE"));

Raw JVM type sources

Stream element type Apache Pekko Connectors source factory
String JmsConsumer.textSource
Array[Byte]byte[] JmsConsumer.bytesSource
Map[String, AnyRef]Map<String, Object> JmsConsumer.mapSource
Object (java.io.Serializable) JmsConsumer.objectSource

Text sources

The textSource emits the received message body as String:

Scala
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource(
  JmsConsumerSettings(system, connectionFactory).withQueue("test"))

val result: Future[immutable.Seq[String]] = jmsSource.take(in.size).runWith(Sink.seq)
Java
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
Source<String, JmsConsumerControl> jmsSource =
    JmsConsumer.textSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

CompletionStage<List<String>> result =
    jmsSource.take(in.size()).runWith(Sink.seq(), system);

Byte array sources

The bytesSource emits the received message body as byte array:

Scala
sourceval jmsSource: Source[Array[Byte], JmsConsumerControl] = JmsConsumer.bytesSource(
  JmsConsumerSettings(system, connectionFactory).withQueue("test"))

val result: Future[Array[Byte]] =
  jmsSource
    .take(1)
    .runWith(Sink.head)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Source<byte[], JmsConsumerControl> jmsSource =
    JmsConsumer.bytesSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

CompletionStage<byte[]> result = jmsSource.take(1).runWith(Sink.head(), system);

Map sources

The mapSource emits the received message body as Map[String, Object]Map<String, Object>:

Scala
sourceval jmsSource: Source[Map[String, Any], JmsConsumerControl] = JmsConsumer.mapSource(
  JmsConsumerSettings(system, connectionFactory).withQueue("test"))

val result: Future[immutable.Seq[Map[String, Any]]] =
  jmsSource
    .take(1)
    .runWith(Sink.seq)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Source<Map<String, Object>, JmsConsumerControl> jmsSource =
    JmsConsumer.mapSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

CompletionStage<Map<String, Object>> resultStage =
    jmsSource.take(1).runWith(Sink.head(), system);

Object sources

The objectSource emits the received message body as deserialized JVM instance. As serialization may be a security concern, JMS clients require special configuration to allow this. The example shows how to configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this.

Scala
sourceval connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory]
connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava)
val jmsSource: Source[java.io.Serializable, JmsConsumerControl] = JmsConsumer.objectSource(
  JmsConsumerSettings(system, connectionFactory).withQueue("test"))

val result: Future[java.io.Serializable] =
  jmsSource
    .take(1)
    .runWith(Sink.head)
Java
sourceActiveMQConnectionFactory connectionFactory =
    (ActiveMQConnectionFactory) server.createConnectionFactory();
connectionFactory.setTrustedPackages(
    Arrays.asList(DummyJavaTests.class.getPackage().getName()));

Source<java.io.Serializable, JmsConsumerControl> jmsSource =
    JmsConsumer.objectSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

CompletionStage<java.io.Serializable> result =
    jmsSource.take(1).runWith(Sink.head(), system);

Request / Reply

The request / reply pattern can be implemented by streaming a JmsConsumerJmsConsumer to a JmsProducerJmsProducer, with a stage in between that extracts the ReplyTo and CorrelationID from the original message and adds them to the response.

Scala
sourceval respondStreamControl: JmsConsumerControl =
  JmsConsumer(JmsConsumerSettings(system, connectionFactory).withQueue("test"))
    .collect {
      case message: TextMessage => JmsTextMessage(message)
    }
    .map { textMessage =>
      textMessage.headers.foldLeft(JmsTextMessage(textMessage.body.reverse)) {
        case (acc, rt: JmsReplyTo)        => acc.to(rt.jmsDestination)
        case (acc, cId: JmsCorrelationId) => acc.withHeader(cId)
        case (acc, _)                     => acc
      }
    }
    .via {
      JmsProducer.flow(
        JmsProducerSettings(system, connectionFactory).withQueue("ignored"))
    }
    .to(Sink.ignore)
    .run()
Java
sourceJmsConsumerControl respondStreamControl =
    JmsConsumer.create(
            JmsConsumerSettings.create(system, connectionFactory).withQueue("test"))
        .map(JmsMessageFactory::create)
        .collectType(JmsTextMessage.class)
        .map(
            textMessage -> {
              JmsTextMessage m = JmsTextMessage.create(reverse.apply(textMessage.body()));
              for (JmsHeader h : textMessage.getHeaders())
                if (h.getClass().equals(JmsReplyTo.class))
                  m = m.to(((JmsReplyTo) h).jmsDestination());
                else if (h.getClass().equals(JmsCorrelationId.class)) m = m.withHeader(h);
              return m;
            })
        .via(
            JmsProducer.flow(
                JmsProducerSettings.create(system, connectionFactory)
                    .withQueue("ignored")))
        .to(Sink.ignore())
        .run(system);