Consumer
The Apache Pekko Connectors JMS connector offers consuming JMS messages from topics or queues:
- Read
javax.jms.Message
s from an Apache Pekko Streams source - Allow for client acknowledgement to the JMS broker
- Allow for JMS transactions
- Read raw JVM types from an Apache Pekko Streams Source
The JMS message model supports several types of message bodies in (see javax.jms.Message
), which may be created directly from the Apache Pekko Stream elements, or in wrappers to access more advanced features.
Receiving messages¶
JmsConsumer
offers factory methods to consume JMS messages in a number of ways.
This examples shows how to listen to a JMS queue and emit javax.jms.Message
elements into the stream.
The materialized value JmsConsumerControl
is used to shut down the consumer (it is a KillSwitch
) and offers the possibility to inspect the connectivity state of the consumer.
sourceval jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer(
JmsConsumerSettings(system, connectionFactory).withQueue("numbers"))
val (control, result): (JmsConsumerControl, Future[immutable.Seq[String]]) =
jmsSource
.take(msgsIn.size)
.map {
case t: javax.jms.TextMessage => t.getText
case other => sys.error(s"unexpected message type ${other.getClass}")
}
.toMat(Sink.seq)(Keep.both)
.run()
control.shutdown()
sourceConnectionFactory connectionFactory = server.createConnectionFactory();
Source<javax.jms.Message, JmsConsumerControl> jmsSource =
JmsConsumer.create(
JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));
Pair<JmsConsumerControl, CompletionStage<List<String>>> controlAndResult =
jmsSource
.take(expectedMessages)
.map(
msg -> {
if (msg instanceof TextMessage) {
TextMessage t = (TextMessage) msg;
return t.getText();
} else
throw new RuntimeException("unexpected message type " + msg.getClass());
})
.toMat(Sink.seq(), Keep.both())
.run(system);
JmsConsumerControl control = controlAndResult.first();
control.shutdown();
Configure JMS consumers¶
To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory
. The Apache Pekko Connectors tests and all examples use Active MQ.
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
The created ConnectionFactory
is then used for the creation of the different JMS sources.
The JmsConsumerSettings
factories allow for passing the actor system to read from the default pekko.connectors.jms.consumer
section, or you may pass a Config
instance which is resolved to a section of the same structure.
sourceval consumerConfig: Config = system.settings.config.getConfig(JmsConsumerSettings.configPath)
// reiterating defaults from reference.conf
val settings = JmsConsumerSettings(consumerConfig, connectionFactory)
.withQueue("target-queue")
.withCredentials(Credentials("username", "password"))
.withConnectionRetrySettings(retrySettings)
.withSessionCount(1)
.withBufferSize(100)
.withAckTimeout(1.second)
sourceConfig consumerConfig = config.getConfig(JmsConsumerSettings.configPath());
JmsConsumerSettings settings =
JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url"))
.withTopic("message-topic")
.withCredentials(Credentials.create("username", "password"))
.withConnectionRetrySettings(retrySettings)
.withSessionCount(10)
.withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge())
.withSelector("Important = TRUE");
The Apache Pekko Connectors JMS consumer is configured via default settings in the HOCON config file section pekko.connectors.jms.consumer
in your application.conf
, and settings may be tweaked in the code using the withXyz
methods. On the second tab the section from reference.conf
shows the structure to use for configuring multiple set-ups.
source# Jms Consumer Settings
# sets default values
consumer {
# Configure connection retrying by providing settings for ConnectionRetrySettings.
connection-retry = ${pekko.connectors.jms.connection-retry}
# Credentials to connect to the JMS broker.
# credentials {
# username = "some text"
# password = "some text"
# }
# "off" to not use any credentials.
credentials = off
# Number of parallel sessions to use for receiving JMS messages.
session-count = 1
# Buffer size for maximum number for messages read from JMS when there is no demand.
buffer-size = 100
# JMS selector expression.
# See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
# empty string for unset
selector = "" # optional
# Set an explicit acknowledge mode.
# (Consumers have specific defaults.)
# See eg. javax.jms.Session.AUTO_ACKNOWLEDGE
# Allowed values: "off", "auto", "client", "duplicates-ok", "session", integer value
acknowledge-mode = off
# Timeout for acknowledge.
# (Used by TX consumers.)
ack-timeout = 1 second
# For use with transactions, if true the stream fails if Apache Pekko Connectors rolls back the transaction
# when `ack-timeout` is hit.
fail-stream-on-ack-timeout = false
# Max interval before sending queued acknowledges back to the broker. (Used by AckSources.)
# max-ack-interval = 5 seconds
# Max number of acks queued by AckSource before they are sent to broker. (Unless MaxAckInterval is specified).
max-pending-acks = ${pekko.connectors.jms.consumer.buffer-size}
# How long the stage should preserve connection status events for the first subscriber before discarding them
connection-status-subscription-timeout = 5 seconds
}
Broker specific destinations¶
To reach out to special features of the JMS broker, destinations can be created as CustomDestination
which takes a factory method for creating destinations.
sourcedef createQueue(destinationName: String): Session => javax.jms.Queue = { (session: Session) =>
val amqSession = session.asInstanceOf[ActiveMQSession]
amqSession.createQueue(s"my-$destinationName")
}
val jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer(
JmsConsumerSettings(consumerConfig, connectionFactory)
.withDestination(CustomDestination("custom", createQueue("custom"))))
sourceFunction<javax.jms.Session, javax.jms.Destination> createQueue(String destinationName) {
return (session) -> {
ActiveMQSession amqSession = (ActiveMQSession) session;
try {
return amqSession.createQueue("my-" + destinationName);
} catch (JMSException e) {
throw new RuntimeException(e);
}
};
}
Source<Message, JmsConsumerControl> jmsSource =
JmsConsumer.create(
JmsConsumerSettings.create(system, connectionFactory)
.withDestination(new CustomDestination("custom", createQueue("custom"))));
Using JMS client acknowledgement¶
Client acknowledgement ensures a message is successfully received by the consumer and notifies the JMS broker for every message. Due to the threading details in JMS brokers, this special source is required (see the explanation below).
sourceval jmsSource: Source[AckEnvelope, JmsConsumerControl] = JmsConsumer.ackSource(
JmsConsumerSettings(consumerConfig, connectionFactory)
.withSessionCount(5)
.withQueue(queueName))
val result: Future[immutable.Seq[javax.jms.Message]] =
jmsSource
.take(msgsIn.size)
.map { ackEnvelope =>
ackEnvelope.acknowledge()
ackEnvelope.message
}
.runWith(Sink.seq)
sourceConnectionFactory connectionFactory = server.createConnectionFactory();
Source<org.apache.pekko.stream.connectors.jms.AckEnvelope, JmsConsumerControl> jmsSource =
JmsConsumer.ackSource(
JmsConsumerSettings.create(system, connectionFactory)
.withSessionCount(5)
.withQueue("test"));
CompletionStage<List<javax.jms.Message>> result =
jmsSource
.take(msgsIn.size())
.map(
envelope -> {
envelope.acknowledge();
return envelope.message();
})
.runWith(Sink.seq(), system);
The sessionCount
parameter controls the number of JMS sessions to run in parallel.
Notes:
- Using multiple sessions increases throughput, especially if acknowledging message by message is desired.
- Messages may arrive out of order if
sessionCount
is larger than 1. - Message-by-message acknowledgement can be achieved by setting
bufferSize
to 0, thus disabling buffering. The outstanding messages before backpressure will be thesessionCount
. - If buffering is enabled then it’s possible for messages to remain in the buffer and never be acknowledged (or acknowledged after a long time) when no new elements arrive to reach the
maxPendingAcks
threshold. By settingmaxAckInterval
messages will be acknowledged after the defined interval or number of pending acks, whichever comes first. - The default
AcknowledgeMode
isClientAcknowledge
but can be overridden to customAcknowledgeMode
s, even implementation-specific ones by setting theAcknowledgeMode
in theJmsConsumerSettings
when creating the stream.
Using a regular JmsConsumer
with AcknowledgeMode.ClientAcknowledge
and using message.acknowledge()
from the stream is not compliant with the JMS specification and can cause issues for some message brokers. message.acknowledge()
in many cases acknowledges the session and not the message itself, contrary to what the API makes you believe.
Use this JmsConsumer.ackSource
as shown above instead.
Using JMS transactions¶
JMS transactions may be used with this connector. Be aware that transactions are a heavy-weight tool and may not perform very good.
sourceval jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource(
JmsConsumerSettings(consumerConfig, connectionFactory)
.withSessionCount(5)
.withAckTimeout(1.second)
.withQueue(queueName))
val result: Future[immutable.Seq[javax.jms.Message]] =
jmsSource
.take(msgsIn.size)
.map { txEnvelope =>
txEnvelope.commit()
txEnvelope.message
}
.runWith(Sink.seq)
sourceConnectionFactory connectionFactory = server.createConnectionFactory();
Source<org.apache.pekko.stream.connectors.jms.TxEnvelope, JmsConsumerControl> jmsSource =
JmsConsumer.txSource(
JmsConsumerSettings.create(system, connectionFactory)
.withSessionCount(5)
.withAckTimeout(Duration.ofSeconds(1))
.withQueue("test"));
CompletionStage<List<javax.jms.Message>> result =
jmsSource
.take(msgsIn.size())
.map(
txEnvelope -> {
txEnvelope.commit();
return txEnvelope.message();
})
.runWith(Sink.seq(), system);
The sessionCount
parameter controls the number of JMS sessions to run in parallel.
The ackTimeout
parameter controls the maximum time given to a message to be committed or rolled back. If the message times out it will automatically be rolled back. This is to prevent stream from starvation if the application fails to commit or rollback a message, or if the message errors out and the stream is resumed by a decider
.
Notes:
- Higher throughput is achieved by increasing the
sessionCount
. - Messages will arrive out of order if
sessionCount
is larger than 1. - Buffering is not supported in transaction mode. The
bufferSize
is ignored. - The default
AcknowledgeMode
isSessionTransacted
but can be overridden to customAcknowledgeMode
s, even implementation-specific ones by setting theAcknowledgeMode
in theJmsConsumerSettings
when creating the stream.
Using JMS selectors¶
Create a javax.jms.Message
source specifying a JMS selector expression: Verify that we are only receiving messages according to the selector:
sourceval jmsSource = JmsConsumer(
JmsConsumerSettings(consumerConfig, connectionFactory)
.withQueue("numbers")
.withSelector("IsOdd = TRUE"))
sourceSource<Message, JmsConsumerControl> jmsSource =
JmsConsumer.create(
JmsConsumerSettings.create(system, connectionFactory)
.withQueue("test")
.withSelector("IsOdd = TRUE"));
Raw JVM type sources¶
Stream element type | Apache Pekko Connectors source factory |
---|---|
String |
JmsConsumer.textSource |
byte[] |
JmsConsumer.bytesSource |
Map<String, Object> |
JmsConsumer.mapSource |
Object (java.io.Serializable ) |
JmsConsumer.objectSource |
Text sources¶
The textSource
emits the received message body as String:
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource(
JmsConsumerSettings(system, connectionFactory).withQueue("test"))
val result: Future[immutable.Seq[String]] = jmsSource.take(in.size).runWith(Sink.seq)
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
Source<String, JmsConsumerControl> jmsSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));
CompletionStage<List<String>> result =
jmsSource.take(in.size()).runWith(Sink.seq(), system);
Byte array sources¶
The bytesSource
emits the received message body as byte array:
sourceval jmsSource: Source[Array[Byte], JmsConsumerControl] = JmsConsumer.bytesSource(
JmsConsumerSettings(system, connectionFactory).withQueue("test"))
val result: Future[Array[Byte]] =
jmsSource
.take(1)
.runWith(Sink.head)
sourceConnectionFactory connectionFactory = server.createConnectionFactory();
Source<byte[], JmsConsumerControl> jmsSource =
JmsConsumer.bytesSource(
JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));
CompletionStage<byte[]> result = jmsSource.take(1).runWith(Sink.head(), system);
Map sources¶
The mapSource
emits the received message body as Map<String, Object>:
sourceval jmsSource: Source[Map[String, Any], JmsConsumerControl] = JmsConsumer.mapSource(
JmsConsumerSettings(system, connectionFactory).withQueue("test"))
val result: Future[immutable.Seq[Map[String, Any]]] =
jmsSource
.take(1)
.runWith(Sink.seq)
sourceConnectionFactory connectionFactory = server.createConnectionFactory();
Source<Map<String, Object>, JmsConsumerControl> jmsSource =
JmsConsumer.mapSource(
JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));
CompletionStage<Map<String, Object>> resultStage =
jmsSource.take(1).runWith(Sink.head(), system);
Object sources¶
The objectSource
emits the received message body as deserialized JVM instance. As serialization may be a security concern, JMS clients require special configuration to allow this. The example shows how to configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this.
sourceval connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory]
connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava)
val jmsSource: Source[java.io.Serializable, JmsConsumerControl] = JmsConsumer.objectSource(
JmsConsumerSettings(system, connectionFactory).withQueue("test"))
val result: Future[java.io.Serializable] =
jmsSource
.take(1)
.runWith(Sink.head)
sourceActiveMQConnectionFactory connectionFactory =
(ActiveMQConnectionFactory) server.createConnectionFactory();
connectionFactory.setTrustedPackages(
Arrays.asList(DummyJavaTests.class.getPackage().getName()));
Source<java.io.Serializable, JmsConsumerControl> jmsSource =
JmsConsumer.objectSource(
JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));
CompletionStage<java.io.Serializable> result =
jmsSource.take(1).runWith(Sink.head(), system);
Request / Reply¶
The request / reply pattern can be implemented by streaming a JmsConsumer
to a JmsProducer
, with a stage in between that extracts the ReplyTo
and CorrelationID
from the original message and adds them to the response.
sourceval respondStreamControl: JmsConsumerControl =
JmsConsumer(JmsConsumerSettings(system, connectionFactory).withQueue("test"))
.collect {
case message: TextMessage => JmsTextMessage(message)
}
.map { textMessage =>
textMessage.headers.foldLeft(JmsTextMessage(textMessage.body.reverse)) {
case (acc, rt: JmsReplyTo) => acc.to(rt.jmsDestination)
case (acc, cId: JmsCorrelationId) => acc.withHeader(cId)
case (acc, _) => acc
}
}
.via {
JmsProducer.flow(
JmsProducerSettings(system, connectionFactory).withQueue("ignored"))
}
.to(Sink.ignore)
.run()
sourceJmsConsumerControl respondStreamControl =
JmsConsumer.create(
JmsConsumerSettings.create(system, connectionFactory).withQueue("test"))
.map(JmsMessageFactory::create)
.collectType(JmsTextMessage.class)
.map(
textMessage -> {
JmsTextMessage m = JmsTextMessage.create(reverse.apply(textMessage.body()));
for (JmsHeader h : textMessage.getHeaders())
if (h.getClass().equals(JmsReplyTo.class))
m = m.to(((JmsReplyTo) h).jmsDestination());
else if (h.getClass().equals(JmsCorrelationId.class)) m = m.withHeader(h);
return m;
})
.via(
JmsProducer.flow(
JmsProducerSettings.create(system, connectionFactory)
.withQueue("ignored")))
.to(Sink.ignore())
.run(system);