Consumer
The Apache Pekko Connectors JMS connector offers consuming JMS messages from topics or queues:
- Read
javax.jms.Message
s from an Apache Pekko Streams source - Allow for client acknowledgement to the JMS broker
- Allow for JMS transactions
- Read raw JVM types from an Apache Pekko Streams Source
The JMS message model supports several types of message bodies in (see javax.jms.Message
), which may be created directly from the Apache Pekko Stream elements, or in wrappers to access more advanced features.
Receiving messages
JmsConsumer
JmsConsumer
offers factory methods to consume JMS messages in a number of ways.
This examples shows how to listen to a JMS queue and emit javax.jms.Message
elements into the stream.
The materialized value JmsConsumerControl
JmsConsumerControl
is used to shut down the consumer (it is a KillSwitch
KillSwitch
) and offers the possibility to inspect the connectivity state of the consumer.
- Scala
-
source
val jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer( JmsConsumerSettings(system, connectionFactory).withQueue("numbers")) val (control, result): (JmsConsumerControl, Future[immutable.Seq[String]]) = jmsSource .take(msgsIn.size) .map { case t: javax.jms.TextMessage => t.getText case other => sys.error(s"unexpected message type ${other.getClass}") } .toMat(Sink.seq)(Keep.both) .run() control.shutdown()
- Java
-
source
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<javax.jms.Message, JmsConsumerControl> jmsSource = JmsConsumer.create( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); Pair<JmsConsumerControl, CompletionStage<List<String>>> controlAndResult = jmsSource .take(expectedMessages) .map( msg -> { if (msg instanceof TextMessage) { TextMessage t = (TextMessage) msg; return t.getText(); } else throw new RuntimeException("unexpected message type " + msg.getClass()); }) .toMat(Sink.seq(), Keep.both()) .run(system); JmsConsumerControl control = controlAndResult.first(); control.shutdown();
Configure JMS consumers
To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory
. The Apache Pekko Connectors tests and all examples use Active MQ.
- Scala
-
source
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
- Java
-
source
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
The created ConnectionFactory
is then used for the creation of the different JMS sources.
The JmsConsumerSettings
JmsConsumerSettings
factories allow for passing the actor system to read from the default pekko.connectors.jms.consumer
section, or you may pass a Config
instance which is resolved to a section of the same structure.
- Scala
-
source
val consumerConfig: Config = system.settings.config.getConfig(JmsConsumerSettings.configPath) // reiterating defaults from reference.conf val settings = JmsConsumerSettings(consumerConfig, connectionFactory) .withQueue("target-queue") .withCredentials(Credentials("username", "password")) .withConnectionRetrySettings(retrySettings) .withSessionCount(1) .withBufferSize(100) .withAckTimeout(1.second)
- Java
-
source
Config consumerConfig = config.getConfig(JmsConsumerSettings.configPath()); JmsConsumerSettings settings = JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url")) .withTopic("message-topic") .withCredentials(Credentials.create("username", "password")) .withConnectionRetrySettings(retrySettings) .withSessionCount(10) .withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge()) .withSelector("Important = TRUE");
The Apache Pekko Connectors JMS consumer is configured via default settings in the HOCON config file section pekko.connectors.jms.consumer
in your application.conf
, and settings may be tweaked in the code using the withXyz
methods. On the second tab the section from reference.conf
shows the structure to use for configuring multiple set-ups.
- Table
-
Setting Description Default Value connectionFactory Factory to use for creating JMS connections Must be set in code destination Destination (queue or topic) to send JMS messages to Must be set in code credentials JMS broker credentials Empty connectionRetrySettings Retry characteristics if the connection failed to be established or is taking a long time. See Connection Retries sessionCount Number of parallel sessions to use for receiving JMS messages. defaults to 1
bufferSize Maximum number of messages to prefetch before applying backpressure. 100 ackTimeout For use with JMS transactions, only: maximum time given to a message to be committed or rolled back. 1 second maxAckInterval For use with AckSource, only: The max duration before the queued acks are sent to the broker Empty maxPendingAcks For use with AckSource, only: The amount of acks that get queued before being sent to the broker 100 selector JMS selector expression (see below) Empty connectionStatusSubscriptionTimeout 5 seconds Time to wait for subscriber of connection status events before starting to discard them - reference.conf
-
source
# Jms Consumer Settings # sets default values consumer { # Configure connection retrying by providing settings for ConnectionRetrySettings. connection-retry = ${pekko.connectors.jms.connection-retry} # Credentials to connect to the JMS broker. # credentials { # username = "some text" # password = "some text" # } # "off" to not use any credentials. credentials = off # Number of parallel sessions to use for receiving JMS messages. session-count = 1 # Buffer size for maximum number for messages read from JMS when there is no demand. buffer-size = 100 # JMS selector expression. # See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html # empty string for unset selector = "" # optional # Set an explicit acknowledge mode. # (Consumers have specific defaults.) # See eg. javax.jms.Session.AUTO_ACKNOWLEDGE # Allowed values: "off", "auto", "client", "duplicates-ok", "session", integer value acknowledge-mode = off # Timeout for acknowledge. # (Used by TX consumers.) ack-timeout = 1 second # For use with transactions, if true the stream fails if Apache Pekko Connectors rolls back the transaction # when `ack-timeout` is hit. fail-stream-on-ack-timeout = false # Max interval before sending queued acknowledges back to the broker. (Used by AckSources.) # max-ack-interval = 5 seconds # Max number of acks queued by AckSource before they are sent to broker. (Unless MaxAckInterval is specified). max-pending-acks = ${pekko.connectors.jms.consumer.buffer-size} # How long the stage should preserve connection status events for the first subscriber before discarding them connection-status-subscription-timeout = 5 seconds }
Broker specific destinations
To reach out to special features of the JMS broker, destinations can be created as CustomDestination
which takes a factory method for creating destinations.
- Scala
-
source
def createQueue(destinationName: String): Session => javax.jms.Queue = { (session: Session) => val amqSession = session.asInstanceOf[ActiveMQSession] amqSession.createQueue(s"my-$destinationName") } val jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer( JmsConsumerSettings(consumerConfig, connectionFactory) .withDestination(CustomDestination("custom", createQueue("custom"))))
- Java
-
source
Function<javax.jms.Session, javax.jms.Destination> createQueue(String destinationName) { return (session) -> { ActiveMQSession amqSession = (ActiveMQSession) session; try { return amqSession.createQueue("my-" + destinationName); } catch (JMSException e) { throw new RuntimeException(e); } }; } Source<Message, JmsConsumerControl> jmsSource = JmsConsumer.create( JmsConsumerSettings.create(system, connectionFactory) .withDestination(new CustomDestination("custom", createQueue("custom"))));
Using JMS client acknowledgement
Client acknowledgement ensures a message is successfully received by the consumer and notifies the JMS broker for every message. Due to the threading details in JMS brokers, this special source is required (see the explanation below).
- Scala
-
source
val jmsSource: Source[AckEnvelope, JmsConsumerControl] = JmsConsumer.ackSource( JmsConsumerSettings(consumerConfig, connectionFactory) .withSessionCount(5) .withQueue(queueName)) val result: Future[immutable.Seq[javax.jms.Message]] = jmsSource .take(msgsIn.size) .map { ackEnvelope => ackEnvelope.acknowledge() ackEnvelope.message } .runWith(Sink.seq)
- Java
-
source
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<org.apache.pekko.stream.connectors.jms.AckEnvelope, JmsConsumerControl> jmsSource = JmsConsumer.ackSource( JmsConsumerSettings.create(system, connectionFactory) .withSessionCount(5) .withQueue("test")); CompletionStage<List<javax.jms.Message>> result = jmsSource .take(msgsIn.size()) .map( envelope -> { envelope.acknowledge(); return envelope.message(); }) .runWith(Sink.seq(), system);
The sessionCount
parameter controls the number of JMS sessions to run in parallel.
Notes:
- Using multiple sessions increases throughput, especially if acknowledging message by message is desired.
- Messages may arrive out of order if
sessionCount
is larger than 1. - Message-by-message acknowledgement can be achieved by setting
bufferSize
to 0, thus disabling buffering. The outstanding messages before backpressure will be thesessionCount
. - If buffering is enabled then it’s possible for messages to remain in the buffer and never be acknowledged (or acknowledged after a long time) when no new elements arrive to reach the
maxPendingAcks
threshold. By settingmaxAckInterval
messages will be acknowledged after the defined interval or number of pending acks, whichever comes first. - The default
AcknowledgeMode
isClientAcknowledge
but can be overridden to customAcknowledgeMode
s, even implementation-specific ones by setting theAcknowledgeMode
in theJmsConsumerSettings
when creating the stream.
Using a regular JmsConsumer
with AcknowledgeMode.ClientAcknowledge
and using message.acknowledge()
from the stream is not compliant with the JMS specification and can cause issues for some message brokers. message.acknowledge()
in many cases acknowledges the session and not the message itself, contrary to what the API makes you believe.
Use this JmsConsumer.ackSource
as shown above instead.
Using JMS transactions
JMS transactions may be used with this connector. Be aware that transactions are a heavy-weight tool and may not perform very good.
- Scala
-
source
val jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource( JmsConsumerSettings(consumerConfig, connectionFactory) .withSessionCount(5) .withAckTimeout(1.second) .withQueue(queueName)) val result: Future[immutable.Seq[javax.jms.Message]] = jmsSource .take(msgsIn.size) .map { txEnvelope => txEnvelope.commit() txEnvelope.message } .runWith(Sink.seq)
- Java
-
source
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<org.apache.pekko.stream.connectors.jms.TxEnvelope, JmsConsumerControl> jmsSource = JmsConsumer.txSource( JmsConsumerSettings.create(system, connectionFactory) .withSessionCount(5) .withAckTimeout(Duration.ofSeconds(1)) .withQueue("test")); CompletionStage<List<javax.jms.Message>> result = jmsSource .take(msgsIn.size()) .map( txEnvelope -> { txEnvelope.commit(); return txEnvelope.message(); }) .runWith(Sink.seq(), system);
The sessionCount
parameter controls the number of JMS sessions to run in parallel.
The ackTimeout
parameter controls the maximum time given to a message to be committed or rolled back. If the message times out it will automatically be rolled back. This is to prevent stream from starvation if the application fails to commit or rollback a message, or if the message errors out and the stream is resumed by a decider
.
Notes:
- Higher throughput is achieved by increasing the
sessionCount
. - Messages will arrive out of order if
sessionCount
is larger than 1. - Buffering is not supported in transaction mode. The
bufferSize
is ignored. - The default
AcknowledgeMode
isSessionTransacted
but can be overridden to customAcknowledgeMode
s, even implementation-specific ones by setting theAcknowledgeMode
in theJmsConsumerSettings
when creating the stream.
Using JMS selectors
Create a javax.jms.Message
source specifying a JMS selector expression: Verify that we are only receiving messages according to the selector:
- Scala
-
source
val jmsSource = JmsConsumer( JmsConsumerSettings(consumerConfig, connectionFactory) .withQueue("numbers") .withSelector("IsOdd = TRUE"))
- Java
-
source
Source<Message, JmsConsumerControl> jmsSource = JmsConsumer.create( JmsConsumerSettings.create(system, connectionFactory) .withQueue("test") .withSelector("IsOdd = TRUE"));
Raw JVM type sources
Stream element type | Apache Pekko Connectors source factory |
---|---|
String |
JmsConsumer.textSource |
Array[Byte] byte[] |
JmsConsumer.bytesSource |
Map[String, AnyRef] Map<String, Object> |
JmsConsumer.mapSource |
Object (java.io.Serializable ) |
JmsConsumer.objectSource |
Text sources
The textSource
emits the received message body as String:
- Scala
-
source
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url) val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource( JmsConsumerSettings(system, connectionFactory).withQueue("test")) val result: Future[immutable.Seq[String]] = jmsSource.take(in.size).runWith(Sink.seq)
- Java
-
source
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<String, JmsConsumerControl> jmsSource = JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); CompletionStage<List<String>> result = jmsSource.take(in.size()).runWith(Sink.seq(), system);
Byte array sources
The bytesSource
emits the received message body as byte array:
- Scala
-
source
val jmsSource: Source[Array[Byte], JmsConsumerControl] = JmsConsumer.bytesSource( JmsConsumerSettings(system, connectionFactory).withQueue("test")) val result: Future[Array[Byte]] = jmsSource .take(1) .runWith(Sink.head)
- Java
-
source
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<byte[], JmsConsumerControl> jmsSource = JmsConsumer.bytesSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); CompletionStage<byte[]> result = jmsSource.take(1).runWith(Sink.head(), system);
Map sources
The mapSource
emits the received message body as Map[String, Object]Map<String, Object>:
- Scala
-
source
val jmsSource: Source[Map[String, Any], JmsConsumerControl] = JmsConsumer.mapSource( JmsConsumerSettings(system, connectionFactory).withQueue("test")) val result: Future[immutable.Seq[Map[String, Any]]] = jmsSource .take(1) .runWith(Sink.seq)
- Java
-
source
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<Map<String, Object>, JmsConsumerControl> jmsSource = JmsConsumer.mapSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); CompletionStage<Map<String, Object>> resultStage = jmsSource.take(1).runWith(Sink.head(), system);
Object sources
The objectSource
emits the received message body as deserialized JVM instance. As serialization may be a security concern, JMS clients require special configuration to allow this. The example shows how to configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this.
- Scala
-
source
val connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory] connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava) val jmsSource: Source[java.io.Serializable, JmsConsumerControl] = JmsConsumer.objectSource( JmsConsumerSettings(system, connectionFactory).withQueue("test")) val result: Future[java.io.Serializable] = jmsSource .take(1) .runWith(Sink.head)
- Java
-
source
ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) server.createConnectionFactory(); connectionFactory.setTrustedPackages( Arrays.asList(DummyJavaTests.class.getPackage().getName())); Source<java.io.Serializable, JmsConsumerControl> jmsSource = JmsConsumer.objectSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); CompletionStage<java.io.Serializable> result = jmsSource.take(1).runWith(Sink.head(), system);
Request / Reply
The request / reply pattern can be implemented by streaming a JmsConsumer
JmsConsumer
to a JmsProducer
JmsProducer
, with a stage in between that extracts the ReplyTo
and CorrelationID
from the original message and adds them to the response.
- Scala
-
source
val respondStreamControl: JmsConsumerControl = JmsConsumer(JmsConsumerSettings(system, connectionFactory).withQueue("test")) .collect { case message: TextMessage => JmsTextMessage(message) } .map { textMessage => textMessage.headers.foldLeft(JmsTextMessage(textMessage.body.reverse)) { case (acc, rt: JmsReplyTo) => acc.to(rt.jmsDestination) case (acc, cId: JmsCorrelationId) => acc.withHeader(cId) case (acc, _) => acc } } .via { JmsProducer.flow( JmsProducerSettings(system, connectionFactory).withQueue("ignored")) } .to(Sink.ignore) .run()
- Java
-
source
JmsConsumerControl respondStreamControl = JmsConsumer.create( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")) .map(JmsMessageFactory::create) .collectType(JmsTextMessage.class) .map( textMessage -> { JmsTextMessage m = JmsTextMessage.create(reverse.apply(textMessage.body())); for (JmsHeader h : textMessage.getHeaders()) if (h.getClass().equals(JmsReplyTo.class)) m = m.to(((JmsReplyTo) h).jmsDestination()); else if (h.getClass().equals(JmsCorrelationId.class)) m = m.withHeader(h); return m; }) .via( JmsProducer.flow( JmsProducerSettings.create(system, connectionFactory) .withQueue("ignored"))) .to(Sink.ignore()) .run(system);