Producer
The Apache Pekko Connectors JMS connector offers producing JMS messages to topics or queues in three ways
- JVM types to an Apache Pekko Streams Sink
JmsMessage
sub-types to a Apache Pekko Streams Sink or Flow (usingJmsProducer.sink
orJmsProducer.flow
)JmsEnvelope
sub-types to a Apache Pekko Streams Flow (usingJmsProducer.flexiFlow
) to support pass-throughs
The JMS message model supports several types of message bodies in (see javax.jms.Message
), which may be created directly from the Apache Pekko Stream elements, or in wrappers to access more advanced features.
Stream element type | Apache Pekko Connectors producer |
---|---|
String |
JmsProducer.textSink |
byte[] |
JmsProducer.bytesSink |
Map<String, Object> |
JmsProducer.mapSink |
Object (java.io.Serializable ) |
JmsProducer.objectSink |
JmsTextMessage |
JmsProducer.sink or JmsProducer.flow |
JmsByteMessage |
JmsProducer.sink or JmsProducer.flow |
JmsByteStringMessage |
JmsProducer.sink or JmsProducer.flow |
JmsMapMessage |
JmsProducer.sink or JmsProducer.flow |
JmsObjectMessage |
JmsProducer.sink or JmsProducer.flow |
JmsEnvelope<PassThrough> with instances JmsPassThrough , JmsTextMessagePassThrough , JmsByteMessagePassThrough , JmsByteStringMessagePassThrough , JmsMapMessagePassThrough , JmsObjectMessagePassThrough |
JmsProducer.flexiFlow |
Configure JMS producers¶
To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory
. Here we’re using Active MQ.
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
The created ConnectionFactory
is then used for the creation of the different JMS sinks or sources (see below).
A JmsMessage
sub-type sink¶
Use a case class with the subtype of JmsMessage
to wrap the messages you want to send and optionally set message specific properties or headers. JmsProducer
contains factory methods to facilitate the creation of sinks according to the message type.
sourceval jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink(
JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers"))
val finished: Future[Done] =
Source(immutable.Seq("Message A", "Message B"))
.map(JmsTextMessage(_))
.runWith(jmsSink)
sourceSink<JmsTextMessage, CompletionStage<Done>> jmsSink =
JmsProducer.sink(
JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));
CompletionStage<Done> finished =
Source.from(Arrays.asList("Message A", "Message B"))
.map(JmsTextMessage::create)
.runWith(jmsSink, system);
Setting JMS message properties¶
For every JmsMessage
you can set JMS message properties.
sourceval msgsIn = (1 to 10).toList.map { n =>
org.apache.pekko.stream.connectors.jms
.JmsTextMessage(n.toString)
.withProperty("Number", n)
.withProperty("IsOdd", n % 2 == 1)
.withProperty("IsEven", n % 2 == 0)
}
sourceJmsTextMessage message =
org.apache.pekko.stream.connectors.jms.JmsTextMessage.create(n.toString())
.withProperty("Number", n)
.withProperty("IsOdd", n % 2 == 1)
.withProperty("IsEven", n % 2 == 0);
Setting JMS message header attributes¶
For every JmsMessage
you can set also JMS message headers.
sourceval msgsIn = (1 to 10).toList.map { n =>
JmsTextMessage(n.toString)
.withHeader(JmsType("type"))
.withHeader(JmsCorrelationId("correlationId"))
.withHeader(JmsReplyTo.queue("test-reply"))
.withHeader(JmsTimeToLive(FiniteDuration(999, TimeUnit.SECONDS)))
.withHeader(JmsPriority(2))
.withHeader(JmsDeliveryMode(DeliveryMode.NON_PERSISTENT))
}
sourceList<JmsTextMessage> msgsIn =
createTestMessageList().stream()
.map(
jmsTextMessage ->
jmsTextMessage
.withHeader(JmsType.create("type"))
.withHeader(JmsCorrelationId.create("correlationId"))
.withHeader(JmsReplyTo.queue("test-reply"))
.withHeader(JmsTimeToLive.create(999, TimeUnit.SECONDS))
.withHeader(JmsPriority.create(2))
.withHeader(JmsDeliveryMode.create(DeliveryMode.NON_PERSISTENT)))
.collect(Collectors.toList());
Raw JVM type sinks¶
Text sinks¶
Create a sink, that accepts and forwards JmsTextMessage
s to the JMS provider:
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
val jmsSink: Sink[String, Future[Done]] = JmsProducer.textSink(
JmsProducerSettings(system, connectionFactory).withQueue("test"))
val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val streamCompletion: Future[Done] =
Source(in)
.runWith(jmsSink)
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, connectionFactory).withQueue("test"));
List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
CompletionStage<Done> finished = Source.from(in).runWith(jmsSink, system);
Byte array sinks¶
Create a sink, that accepts and forwards JmsByteMessage
s to the JMS provider.
sourceval jmsSink: Sink[Array[Byte], Future[Done]] = JmsProducer.bytesSink(
JmsProducerSettings(system, connectionFactory).withQueue("test"))
val in: Array[Byte] = "ThisIsATest".getBytes(StandardCharsets.UTF_8)
val streamCompletion: Future[Done] =
Source
.single(in)
.runWith(jmsSink)
sourceConnectionFactory connectionFactory = server.createConnectionFactory();
Sink<byte[], CompletionStage<Done>> jmsSink =
JmsProducer.bytesSink(
JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));
byte[] in = "ThisIsATest".getBytes(StandardCharsets.UTF_8);
CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);
Map message sink¶
Create a sink, that accepts and forwards JmsMapMessage
s to the JMS provider:
sourceval jmsSink: Sink[Map[String, Any], Future[Done]] = JmsProducer.mapSink(
JmsProducerSettings(system, connectionFactory).withQueue("test"))
val input = List(
Map[String, Any](
"string" -> "value",
"int value" -> 42,
"double value" -> 43.toDouble,
"short value" -> 7.toShort,
"boolean value" -> true,
"long value" -> 7.toLong,
"bytearray" -> "AStringAsByteArray".getBytes(StandardCharsets.UTF_8),
"byte" -> 1.toByte))
val streamCompletion: Future[Done] =
Source(input)
.runWith(jmsSink)
sourceConnectionFactory connectionFactory = server.createConnectionFactory();
Sink<Map<String, Object>, CompletionStage<Done>> jmsSink =
JmsProducer.mapSink(
JmsProducerSettings.create(system, connectionFactory).withQueue("test"));
Map<String, Object> in = new HashMap<>();
in.put("string value", "value");
in.put("int value", 42);
in.put("double value", 43.0);
in.put("short value", (short) 7);
in.put("boolean value", true);
in.put("long value", 7L);
in.put("bytearray", "AStringAsByteArray".getBytes(StandardCharsets.UTF_8));
in.put("byte", (byte) 1);
CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);
Object sinks¶
Create and configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this. Create a sink, that accepts and forwards JmsObjectMessage
s to the JMS provider:
sourceval connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory]
connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava)
val jmsSink: Sink[Serializable, Future[Done]] = JmsProducer.objectSink(
JmsProducerSettings(system, connectionFactory).withQueue("test"))
val in = DummyObject("ThisIsATest")
val streamCompletion: Future[Done] =
Source
.single(in)
.runWith(jmsSink)
sourceActiveMQConnectionFactory connectionFactory =
(ActiveMQConnectionFactory) server.createConnectionFactory();
connectionFactory.setTrustedPackages(
Arrays.asList(DummyJavaTests.class.getPackage().getName()));
Sink<java.io.Serializable, CompletionStage<Done>> jmsSink =
JmsProducer.objectSink(
JmsProducerSettings.create(system, connectionFactory).withQueue("test"));
java.io.Serializable in = new DummyJavaTests("javaTest");
CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);
Sending messages as a Flow¶
The producer can also act as a flow, in order to publish messages in the middle of stream processing. For example, you can ensure that a message is persisted to the queue before subsequent processing.
source
val flow: Flow[JmsMessage, JmsMessage, JmsProducerStatus] =
JmsProducer.flow(
JmsProducerSettings(system, connectionFactory)
.withQueue("test"))
val input: immutable.Seq[JmsTextMessage] =
(1 to 100).map(i => JmsTextMessage(i.toString))
val result: Future[Seq[JmsMessage]] = Source(input)
.via(flow)
.runWith(Sink.seq)
sourceConnectionFactory connectionFactory = server.createConnectionFactory();
Flow<JmsTextMessage, JmsTextMessage, JmsProducerStatus> flow =
JmsProducer.flow(
JmsProducerSettings.create(system, connectionFactory).withQueue("test"));
List<JmsTextMessage> input = createTestMessageList();
CompletionStage<List<JmsTextMessage>> result =
Source.from(input).via(flow).runWith(Sink.seq(), system);
Sending messages with per-message destinations¶
It is also possible to define message destinations per message:
sourceval flowSink: Flow[JmsMessage, JmsMessage, JmsProducerStatus] =
JmsProducer.flow(
JmsProducerSettings(system, connectionFactory).withQueue("test"))
val input = (1 to 100).map { i =>
val queueName = if (i % 2 == 0) "even" else "odd"
JmsTextMessage(i.toString).toQueue(queueName)
}
Source(input).via(flowSink).runWith(Sink.ignore)
sourceFlow<JmsTextMessage, JmsTextMessage, JmsProducerStatus> flowSink =
JmsProducer.flow(
JmsProducerSettings.create(system, connectionFactory).withQueue("test"));
List<JmsTextMessage> input = new ArrayList<>();
for (Integer n : Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) {
String queueName = (n % 2 == 0) ? "even" : "odd";
input.add(JmsTextMessage.create(n.toString()).toQueue(queueName));
}
Source.from(input).via(flowSink).runWith(Sink.seq(), system);
When no destination is defined on the message, the destination given in the producer settings is used.
Passing context through the producer¶
In some use cases, it is useful to pass through context information when producing (e.g. for acknowledging or committing messages after sending to Jms). For this, the JmsProducer.flexiFlow
accepts implementations of JmsEnvelope
, which it will pass through:
JmsPassThrough
JmsTextMessagePassThrough
JmsByteMessagePassThrough
JmsByteStringMessagePassThrough
JmsMapMessagePassThrough
JmsObjectMessagePassThrough
sourceval jmsProducer: Flow[JmsEnvelope[String], JmsEnvelope[String], JmsProducerStatus] =
JmsProducer.flexiFlow[String](
JmsProducerSettings(system, connectionFactory).withQueue("test"))
val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val in: immutable.Seq[JmsTextMessagePassThrough[String]] =
data.map(t => JmsTextMessage(t).withPassThrough(t))
val result = Source(in)
.via(jmsProducer)
.map(_.passThrough) // extract the value passed through
.runWith(Sink.seq)
sourceFlow<JmsEnvelope<String>, JmsEnvelope<String>, JmsProducerStatus> jmsProducer =
JmsProducer.flexiFlow(
JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));
List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
List<JmsEnvelope<String>> input = new ArrayList<>();
for (String s : data) {
String passThrough = s;
input.add(JmsTextMessage.create(s, passThrough));
}
CompletionStage<List<String>> result =
Source.from(input)
.via(jmsProducer)
.map(JmsEnvelope::passThrough)
.runWith(Sink.seq(), system);
There are two implementations: One envelope type containing a messages to send to Jms, and one envelope type containing only values to pass through. This allows messages to flow without producing any new messages to Jms. This is primarily useful when committing offsets back to Kakfa, or when acknowledging Jms messages after sending the outcome of processing them back to Jms.
sourceval jmsProducer = JmsProducer.flexiFlow[String](
JmsProducerSettings(producerConfig, connectionFactory).withQueue("topic"))
val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val in = data.map(t => JmsPassThrough(t))
val result = Source(in).via(jmsProducer).map(_.passThrough).runWith(Sink.seq)
sourceFlow<JmsEnvelope<String>, JmsEnvelope<String>, JmsProducerStatus> jmsProducer =
JmsProducer.flexiFlow(
JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));
List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
List<JmsEnvelope<String>> input = new ArrayList<>();
for (String s : data) {
String passThrough = s;
input.add(JmsPassThrough.create(passThrough));
}
CompletionStage<List<String>> result =
Source.from(input)
.via(jmsProducer)
.map(JmsEnvelope::passThrough)
.runWith(Sink.seq(), system);
Producer Settings¶
The Apache Pekko Connectors JMS producer is configured via default settings in the HOCON config file section pekko.connectors.jms.producer
in your application.conf
, and settings may be tweaked in the code using the withXyz
methods.
The JmsProducerSettings
factories allow for passing the actor system to read from the default pekko.connectors.jms.producer
section, or you may pass a Config
instance which is resolved to a section of the same structure.
sourceval producerConfig: Config = system.settings.config.getConfig(JmsProducerSettings.configPath)
val settings = JmsProducerSettings(producerConfig, connectionFactory)
.withTopic("target-topic")
.withCredentials(Credentials("username", "password"))
.withSessionCount(1)
sourceConfig producerConfig = config.getConfig(JmsProducerSettings.configPath());
JmsProducerSettings settings =
JmsProducerSettings.create(producerConfig, new ActiveMQConnectionFactory("broker-url"))
.withTopic("target-topic")
.withCredentials(Credentials.create("username", "password"))
.withConnectionRetrySettings(retrySettings)
.withSendRetrySettings(sendRetrySettings)
.withSessionCount(10)
.withTimeToLive(Duration.ofHours(1));
The producer can be configured with the following settings. On the second tab, the section from reference.conf
shows the structure to use for configuring multiple set-ups.
source# Jms Producer Settings
# sets default values
producer {
# Configure connection retrying by providing settings for ConnectionRetrySettings.
connection-retry = ${pekko.connectors.jms.connection-retry}
# Configure re-sending by providing settings for SendRetrySettings.
send-retry = ${pekko.connectors.jms.send-retry}
# Credentials to connect to the JMS broker.
# credentials {
# username = "some text"
# password = "some text"
# }
# "off" to not use any credentials.
credentials = off
# Number of parallel sessions to use for sending JMS messages.
# Increasing the number of parallel sessions increases throughput at the cost of message ordering.
# While the messages may arrive out of order on the JMS broker, the producer flow outputs messages
# in the order they are received.
session-count = 1
# Time messages should be kept on the JMS broker.
# This setting can be overridden on individual messages.
# "off" to not let messages expire.
time-to-live = off
# How long the stage should preserve connection status events for the first subscriber before discarding them
connection-status-subscription-timeout = 5 seconds
}
Connection Retries¶
When a connection to a broker cannot be established and errors out, or is timing out being established or started, the connection can be retried. All JMS publishers, consumers, and browsers are configured with connection retry settings. On the second tab the section from reference.conf
shows the structure to use for configuring multiple set-ups.
source# Connection Retry Settings
# these set the defaults for Consumer, Producer, and Browse settings
connection-retry {
# Time allowed to establish and start a connection.
connect-timeout = 10 seconds
# Wait time before retrying the connection the first time.
initial-retry = 100 millis
# Back-off factor for subsequent retries.
backoff-factor = 2
# Back-off factor for subsequent retries.
max-backoff = 1 minute
# Maximum number of retries allowed.
# "infinite", or positive integer
max-retries = 10
}
The retry time is calculated by:
initialRetry * retryNumberbackoffFactor
With the default settings, we’ll see retries after 100ms, 400ms, 900ms pauses, until the pauses reach 1 minute and will stay with 1 minute intervals for any subsequent retries.
Consumers, producers and browsers try to reconnect with the same retry characteristics if a connection fails mid-stream.
All JMS settings support setting the connectionRetrySettings
field using .withConnectionRetrySettings(retrySettings)
on the given settings. The followings show how to create ConnectionRetrySettings
:
source// reiterating defaults from reference.conf
val retrySettings = ConnectionRetrySettings(system)
.withConnectTimeout(10.seconds)
.withInitialRetry(100.millis)
.withBackoffFactor(2.0d)
.withMaxBackoff(1.minute)
.withMaxRetries(10)
sourceConfig connectionRetryConfig = config.getConfig("pekko.connectors.jms.connection-retry");
// reiterating the values from reference.conf
ConnectionRetrySettings retrySettings =
ConnectionRetrySettings.create(connectionRetryConfig)
.withConnectTimeout(Duration.ofSeconds(10))
.withInitialRetry(Duration.ofMillis(100))
.withBackoffFactor(2.0)
.withMaxBackoff(Duration.ofMinutes(1))
.withMaxRetries(10);
Send Retries¶
When a connection to a broker starts failing, sending JMS messages will also fail. Those failed messages can be retried at the cost of potentially duplicating the failed messages. Send retries can be configured as follows:
source# Send Retry Settings
# these set the defaults for Producer settings
send-retry {
# Wait time before retrying the first time.
initial-retry = 20 millis
# Back-off factor for subsequent retries.
backoff-factor = 1.5
# Maximum back-off time allowed, after which all retries will happen after this delay.
max-backoff = 500 millis
# Maximum number of retries allowed.
# "infinite", or positive integer
max-retries = 10
}
The retry time is calculated by:
initialRetry * retryNumberbackoffFactor
With the default settings, we’ll see retries after 20ms, 57ms, 104ms pauses, until the pauses reach 500 ms and will stay with 500 ms intervals for any subsequent retries.
JMS producer settings support configuring retries by using .withSendRetrySettings(retrySettings)
. The followings show how to create SendRetrySettings
:
source// reiterating defaults from reference.conf
val sendRetrySettings = SendRetrySettings(system)
.withInitialRetry(20.millis)
.withBackoffFactor(1.5d)
.withMaxBackoff(500.millis)
.withMaxRetries(10)
sourceimport com.typesafe.config.Config;
import scala.Option;
Config sendRetryConfig = config.getConfig("pekko.connectors.jms.send-retry");
// reiterating the values from reference.conf
SendRetrySettings sendRetrySettings =
SendRetrySettings.create(sendRetryConfig)
.withInitialRetry(Duration.ofMillis(20))
.withBackoffFactor(1.5d)
.withMaxBackoff(Duration.ofMillis(500))
.withMaxRetries(10);
If a send operation finally fails, the stage also fails unless a different supervision strategy is applied. The producer stage honours stream supervision.
Observing connectivity and state of a JMS producer¶
All JMS producer’s materialized values are of type JmsProducerStatus
. This provides a connectorState
method returning a Source
of JmsConnectorState
updates that publishes connection attempts, disconnections, completions and failures. The source is completed after the JMS producer completes or fails.