Producer
The Apache Pekko Connectors JMS connector offers producing JMS messages to topics or queues in three ways
- JVM types to an Apache Pekko Streams Sink
JmsMessage
sub-types to a Apache Pekko Streams Sink or Flow (usingJmsProducer.sink
orJmsProducer.flow
)JmsEnvelope
sub-types to a Apache Pekko Streams Flow (usingJmsProducer.flexiFlow
) to support pass-throughs
The JMS message model supports several types of message bodies in (see javax.jms.Message
), which may be created directly from the Apache Pekko Stream elements, or in wrappers to access more advanced features.
Stream element type | Apache Pekko Connectors producer |
---|---|
String |
JmsProducer.textSink |
Array[Byte] byte[] |
JmsProducer.bytesSink |
Map[String, AnyRef] Map<String, Object> |
JmsProducer.mapSink |
Object (java.io.Serializable ) |
JmsProducer.objectSink |
JmsTextMessage |
JmsProducer.sink or JmsProducer.flow |
JmsByteMessage |
JmsProducer.sink or JmsProducer.flow |
JmsByteStringMessage |
JmsProducer.sink or JmsProducer.flow |
JmsMapMessage |
JmsProducer.sink or JmsProducer.flow |
JmsObjectMessage |
JmsProducer.sink or JmsProducer.flow |
JmsEnvelope[PassThrough] JmsEnvelope<PassThrough> with instances JmsPassThrough , JmsTextMessagePassThrough , JmsByteMessagePassThrough , JmsByteStringMessagePassThrough , JmsMapMessagePassThrough , JmsObjectMessagePassThrough |
JmsProducer.flexiFlow |
Configure JMS producers
To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory
. Here we’re using Active MQ.
- Scala
-
source
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
- Java
-
source
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
The created ConnectionFactory
is then used for the creation of the different JMS sinks or sources (see below).
A JmsMessage
sub-type sink
Use a case class with the subtype of JmsMessage
JmsMessage
to wrap the messages you want to send and optionally set message specific properties or headers. JmsProducer
JmsProducer
contains factory methods to facilitate the creation of sinks according to the message type.
- Scala
-
source
val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink( JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers")) val finished: Future[Done] = Source(immutable.Seq("Message A", "Message B")) .map(JmsTextMessage(_)) .runWith(jmsSink)
- Java
-
source
Sink<JmsTextMessage, CompletionStage<Done>> jmsSink = JmsProducer.sink( JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test")); CompletionStage<Done> finished = Source.from(Arrays.asList("Message A", "Message B")) .map(JmsTextMessage::create) .runWith(jmsSink, system);
Setting JMS message properties
For every JmsMessage
JmsMessage
you can set JMS message properties.
- Scala
-
source
val msgsIn = (1 to 10).toList.map { n => org.apache.pekko.stream.connectors.jms .JmsTextMessage(n.toString) .withProperty("Number", n) .withProperty("IsOdd", n % 2 == 1) .withProperty("IsEven", n % 2 == 0) }
- Java
-
source
JmsTextMessage message = org.apache.pekko.stream.connectors.jms.JmsTextMessage.create(n.toString()) .withProperty("Number", n) .withProperty("IsOdd", n % 2 == 1) .withProperty("IsEven", n % 2 == 0);
Setting JMS message header attributes
For every JmsMessage
JmsMessage
you can set also JMS message headers.
- Scala
-
source
val msgsIn = (1 to 10).toList.map { n => JmsTextMessage(n.toString) .withHeader(JmsType("type")) .withHeader(JmsCorrelationId("correlationId")) .withHeader(JmsReplyTo.queue("test-reply")) .withHeader(JmsTimeToLive(FiniteDuration(999, TimeUnit.SECONDS))) .withHeader(JmsPriority(2)) .withHeader(JmsDeliveryMode(DeliveryMode.NON_PERSISTENT)) }
- Java
-
source
List<JmsTextMessage> msgsIn = createTestMessageList().stream() .map( jmsTextMessage -> jmsTextMessage .withHeader(JmsType.create("type")) .withHeader(JmsCorrelationId.create("correlationId")) .withHeader(JmsReplyTo.queue("test-reply")) .withHeader(JmsTimeToLive.create(999, TimeUnit.SECONDS)) .withHeader(JmsPriority.create(2)) .withHeader(JmsDeliveryMode.create(DeliveryMode.NON_PERSISTENT))) .collect(Collectors.toList());
Raw JVM type sinks
Text sinks
Create a sink, that accepts and forwards JmsTextMessage
JmsTextMessage
s to the JMS provider:
- Scala
-
source
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url) val jmsSink: Sink[String, Future[Done]] = JmsProducer.textSink( JmsProducerSettings(system, connectionFactory).withQueue("test")) val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") val streamCompletion: Future[Done] = Source(in) .runWith(jmsSink)
- Java
-
source
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory(); Sink<String, CompletionStage<Done>> jmsSink = JmsProducer.textSink( JmsProducerSettings.create(system, connectionFactory).withQueue("test")); List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"); CompletionStage<Done> finished = Source.from(in).runWith(jmsSink, system);
Byte array sinks
Create a sink, that accepts and forwards JmsByteMessage
JmsByteMessage
s to the JMS provider.
- Scala
-
source
val jmsSink: Sink[Array[Byte], Future[Done]] = JmsProducer.bytesSink( JmsProducerSettings(system, connectionFactory).withQueue("test")) val in: Array[Byte] = "ThisIsATest".getBytes(StandardCharsets.UTF_8) val streamCompletion: Future[Done] = Source .single(in) .runWith(jmsSink)
- Java
-
source
ConnectionFactory connectionFactory = server.createConnectionFactory(); Sink<byte[], CompletionStage<Done>> jmsSink = JmsProducer.bytesSink( JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test")); byte[] in = "ThisIsATest".getBytes(StandardCharsets.UTF_8); CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);
Map message sink
Create a sink, that accepts and forwards JmsMapMessage
JmsMapMessage
s to the JMS provider:
- Scala
-
source
val jmsSink: Sink[Map[String, Any], Future[Done]] = JmsProducer.mapSink( JmsProducerSettings(system, connectionFactory).withQueue("test")) val input = List( Map[String, Any]( "string" -> "value", "int value" -> 42, "double value" -> 43.toDouble, "short value" -> 7.toShort, "boolean value" -> true, "long value" -> 7.toLong, "bytearray" -> "AStringAsByteArray".getBytes(StandardCharsets.UTF_8), "byte" -> 1.toByte)) val streamCompletion: Future[Done] = Source(input) .runWith(jmsSink)
- Java
-
source
ConnectionFactory connectionFactory = server.createConnectionFactory(); Sink<Map<String, Object>, CompletionStage<Done>> jmsSink = JmsProducer.mapSink( JmsProducerSettings.create(system, connectionFactory).withQueue("test")); Map<String, Object> in = new HashMap<>(); in.put("string value", "value"); in.put("int value", 42); in.put("double value", 43.0); in.put("short value", (short) 7); in.put("boolean value", true); in.put("long value", 7L); in.put("bytearray", "AStringAsByteArray".getBytes(StandardCharsets.UTF_8)); in.put("byte", (byte) 1); CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);
Object sinks
Create and configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this. Create a sink, that accepts and forwards JmsObjectMessage
JmsObjectMessage
s to the JMS provider:
- Scala
-
source
val connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory] connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava) val jmsSink: Sink[Serializable, Future[Done]] = JmsProducer.objectSink( JmsProducerSettings(system, connectionFactory).withQueue("test")) val in = DummyObject("ThisIsATest") val streamCompletion: Future[Done] = Source .single(in) .runWith(jmsSink)
- Java
-
source
ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) server.createConnectionFactory(); connectionFactory.setTrustedPackages( Arrays.asList(DummyJavaTests.class.getPackage().getName())); Sink<java.io.Serializable, CompletionStage<Done>> jmsSink = JmsProducer.objectSink( JmsProducerSettings.create(system, connectionFactory).withQueue("test")); java.io.Serializable in = new DummyJavaTests("javaTest"); CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);
Sending messages as a Flow
The producer can also act as a flow, in order to publish messages in the middle of stream processing. For example, you can ensure that a message is persisted to the queue before subsequent processing.
- Scala
-
source
val flow: Flow[JmsMessage, JmsMessage, JmsProducerStatus] = JmsProducer.flow( JmsProducerSettings(system, connectionFactory) .withQueue("test")) val input: immutable.Seq[JmsTextMessage] = (1 to 100).map(i => JmsTextMessage(i.toString)) val result: Future[Seq[JmsMessage]] = Source(input) .via(flow) .runWith(Sink.seq) - Java
-
source
ConnectionFactory connectionFactory = server.createConnectionFactory(); Flow<JmsTextMessage, JmsTextMessage, JmsProducerStatus> flow = JmsProducer.flow( JmsProducerSettings.create(system, connectionFactory).withQueue("test")); List<JmsTextMessage> input = createTestMessageList(); CompletionStage<List<JmsTextMessage>> result = Source.from(input).via(flow).runWith(Sink.seq(), system);
Sending messages with per-message destinations
It is also possible to define message destinations per message:
- Scala
-
source
val flowSink: Flow[JmsMessage, JmsMessage, JmsProducerStatus] = JmsProducer.flow( JmsProducerSettings(system, connectionFactory).withQueue("test")) val input = (1 to 100).map { i => val queueName = if (i % 2 == 0) "even" else "odd" JmsTextMessage(i.toString).toQueue(queueName) } Source(input).via(flowSink).runWith(Sink.ignore)
- Java
-
source
Flow<JmsTextMessage, JmsTextMessage, JmsProducerStatus> flowSink = JmsProducer.flow( JmsProducerSettings.create(system, connectionFactory).withQueue("test")); List<JmsTextMessage> input = new ArrayList<>(); for (Integer n : Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) { String queueName = (n % 2 == 0) ? "even" : "odd"; input.add(JmsTextMessage.create(n.toString()).toQueue(queueName)); } Source.from(input).via(flowSink).runWith(Sink.seq(), system);
When no destination is defined on the message, the destination given in the producer settings is used.
Passing context through the producer
In some use cases, it is useful to pass through context information when producing (e.g. for acknowledging or committing messages after sending to Jms). For this, the JmsProducer.flexiFlow
accepts implementations of JmsEnvelope
, which it will pass through:
JmsPassThrough
JmsTextMessagePassThrough
JmsByteMessagePassThrough
JmsByteStringMessagePassThrough
JmsMapMessagePassThrough
JmsObjectMessagePassThrough
- Scala
-
source
val jmsProducer: Flow[JmsEnvelope[String], JmsEnvelope[String], JmsProducerStatus] = JmsProducer.flexiFlow[String]( JmsProducerSettings(system, connectionFactory).withQueue("test")) val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") val in: immutable.Seq[JmsTextMessagePassThrough[String]] = data.map(t => JmsTextMessage(t).withPassThrough(t)) val result = Source(in) .via(jmsProducer) .map(_.passThrough) // extract the value passed through .runWith(Sink.seq)
- Java
-
source
Flow<JmsEnvelope<String>, JmsEnvelope<String>, JmsProducerStatus> jmsProducer = JmsProducer.flexiFlow( JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test")); List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"); List<JmsEnvelope<String>> input = new ArrayList<>(); for (String s : data) { String passThrough = s; input.add(JmsTextMessage.create(s, passThrough)); } CompletionStage<List<String>> result = Source.from(input) .via(jmsProducer) .map(JmsEnvelope::passThrough) .runWith(Sink.seq(), system);
There are two implementations: One envelope type containing a messages to send to Jms, and one envelope type containing only values to pass through. This allows messages to flow without producing any new messages to Jms. This is primarily useful when committing offsets back to Kakfa, or when acknowledging Jms messages after sending the outcome of processing them back to Jms.
- Scala
-
source
val jmsProducer = JmsProducer.flexiFlow[String]( JmsProducerSettings(producerConfig, connectionFactory).withQueue("topic")) val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") val in = data.map(t => JmsPassThrough(t)) val result = Source(in).via(jmsProducer).map(_.passThrough).runWith(Sink.seq)
- Java
-
source
Flow<JmsEnvelope<String>, JmsEnvelope<String>, JmsProducerStatus> jmsProducer = JmsProducer.flexiFlow( JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test")); List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"); List<JmsEnvelope<String>> input = new ArrayList<>(); for (String s : data) { String passThrough = s; input.add(JmsPassThrough.create(passThrough)); } CompletionStage<List<String>> result = Source.from(input) .via(jmsProducer) .map(JmsEnvelope::passThrough) .runWith(Sink.seq(), system);
Producer Settings
The Apache Pekko Connectors JMS producer is configured via default settings in the HOCON config file section pekko.connectors.jms.producer
in your application.conf
, and settings may be tweaked in the code using the withXyz
methods.
The JmsProducerSettings
factories allow for passing the actor system to read from the default pekko.connectors.jms.producer
section, or you may pass a Config
instance which is resolved to a section of the same structure.
- Scala
-
source
val producerConfig: Config = system.settings.config.getConfig(JmsProducerSettings.configPath) val settings = JmsProducerSettings(producerConfig, connectionFactory) .withTopic("target-topic") .withCredentials(Credentials("username", "password")) .withSessionCount(1)
- Java
-
source
Config producerConfig = config.getConfig(JmsProducerSettings.configPath()); JmsProducerSettings settings = JmsProducerSettings.create(producerConfig, new ActiveMQConnectionFactory("broker-url")) .withTopic("target-topic") .withCredentials(Credentials.create("username", "password")) .withConnectionRetrySettings(retrySettings) .withSendRetrySettings(sendRetrySettings) .withSessionCount(10) .withTimeToLive(Duration.ofHours(1));
The producer can be configured with the following settings. On the second tab, the section from reference.conf
shows the structure to use for configuring multiple set-ups.
- Table
-
Setting Defaults Description connectionFactory mandatory Factory to use for creating JMS connections destination mandatory Destination (queue or topic) to send JMS messages to credentials optional JMS broker credentials connectionRetrySettings default settings Retry characteristics if the connection failed to be established or taking a long time. Please see default values under Connection Retries sendRetrySettings default settings Retry characteristics if message sending failed. Please see default values under Send Retries sessionCount defaults to 1
Number of parallel sessions to use for sending JMS messages. Increasing the number of parallel sessions increases throughput at the cost of message ordering. While the messages may arrive out of order on the JMS broker, the producer flow outputs messages in the order they are received timeToLive optional Time messages should be kept on the Jms broker. This setting can be overridden on individual messages. If not set, messages will never expire connectionStatusSubscriptionTimeout 5 seconds Time to wait for subscriber of connection status events before starting to discard them - reference.conf
-
source
# Jms Producer Settings # sets default values producer { # Configure connection retrying by providing settings for ConnectionRetrySettings. connection-retry = ${pekko.connectors.jms.connection-retry} # Configure re-sending by providing settings for SendRetrySettings. send-retry = ${pekko.connectors.jms.send-retry} # Credentials to connect to the JMS broker. # credentials { # username = "some text" # password = "some text" # } # "off" to not use any credentials. credentials = off # Number of parallel sessions to use for sending JMS messages. # Increasing the number of parallel sessions increases throughput at the cost of message ordering. # While the messages may arrive out of order on the JMS broker, the producer flow outputs messages # in the order they are received. session-count = 1 # Time messages should be kept on the JMS broker. # This setting can be overridden on individual messages. # "off" to not let messages expire. time-to-live = off # How long the stage should preserve connection status events for the first subscriber before discarding them connection-status-subscription-timeout = 5 seconds }
Connection Retries
When a connection to a broker cannot be established and errors out, or is timing out being established or started, the connection can be retried. All JMS publishers, consumers, and browsers are configured with connection retry settings. On the second tab the section from reference.conf
shows the structure to use for configuring multiple set-ups.
- Table
-
Setting Description Default Value connectTimeout Time allowed to establish and start a connection 10 s initialRetry Wait time before retrying the first time 100 ms backoffFactor Back-off factor for subsequent retries 2.0 maxBackoff Maximum back-off time allowed, after which all retries will happen after this delay 1 minute maxRetries Maximum number of retries allowed (negative value is infinite) 10 - reference.conf
-
source
# Connection Retry Settings # these set the defaults for Consumer, Producer, and Browse settings connection-retry { # Time allowed to establish and start a connection. connect-timeout = 10 seconds # Wait time before retrying the connection the first time. initial-retry = 100 millis # Back-off factor for subsequent retries. backoff-factor = 2 # Back-off factor for subsequent retries. max-backoff = 1 minute # Maximum number of retries allowed. # "infinite", or positive integer max-retries = 10 }
The retry time is calculated by:
initialRetry * retryNumberbackoffFactor
With the default settings, we’ll see retries after 100ms, 400ms, 900ms pauses, until the pauses reach 1 minute and will stay with 1 minute intervals for any subsequent retries.
Consumers, producers and browsers try to reconnect with the same retry characteristics if a connection fails mid-stream.
All JMS settings support setting the connectionRetrySettings
field using .withConnectionRetrySettings(retrySettings)
on the given settings. The followings show how to create ConnectionRetrySettings
:
- Scala
-
source
// reiterating defaults from reference.conf val retrySettings = ConnectionRetrySettings(system) .withConnectTimeout(10.seconds) .withInitialRetry(100.millis) .withBackoffFactor(2.0d) .withMaxBackoff(1.minute) .withMaxRetries(10)
- Java
-
source
Config connectionRetryConfig = config.getConfig("pekko.connectors.jms.connection-retry"); // reiterating the values from reference.conf ConnectionRetrySettings retrySettings = ConnectionRetrySettings.create(connectionRetryConfig) .withConnectTimeout(Duration.ofSeconds(10)) .withInitialRetry(Duration.ofMillis(100)) .withBackoffFactor(2.0) .withMaxBackoff(Duration.ofMinutes(1)) .withMaxRetries(10);
Send Retries
When a connection to a broker starts failing, sending JMS messages will also fail. Those failed messages can be retried at the cost of potentially duplicating the failed messages. Send retries can be configured as follows:
- Table
-
Setting Description Default Value initialRetry Wait time before retrying the first time 20 ms backoffFactor Back-off factor for subsequent retries 1.5 maxBackoff Maximum back-off time allowed, after which all retries will happen after this delay 500 ms maxRetries Maximum number of retries allowed (negative value is infinite) 10 - reference.conf
-
source
# Send Retry Settings # these set the defaults for Producer settings send-retry { # Wait time before retrying the first time. initial-retry = 20 millis # Back-off factor for subsequent retries. backoff-factor = 1.5 # Maximum back-off time allowed, after which all retries will happen after this delay. max-backoff = 500 millis # Maximum number of retries allowed. # "infinite", or positive integer max-retries = 10 }
The retry time is calculated by:
initialRetry * retryNumberbackoffFactor
With the default settings, we’ll see retries after 20ms, 57ms, 104ms pauses, until the pauses reach 500 ms and will stay with 500 ms intervals for any subsequent retries.
JMS producer settings support configuring retries by using .withSendRetrySettings(retrySettings)
. The followings show how to create SendRetrySettings
:
- Scala
-
source
// reiterating defaults from reference.conf val sendRetrySettings = SendRetrySettings(system) .withInitialRetry(20.millis) .withBackoffFactor(1.5d) .withMaxBackoff(500.millis) .withMaxRetries(10)
- Java
-
source
import com.typesafe.config.Config; import scala.Option; Config sendRetryConfig = config.getConfig("pekko.connectors.jms.send-retry"); // reiterating the values from reference.conf SendRetrySettings sendRetrySettings = SendRetrySettings.create(sendRetryConfig) .withInitialRetry(Duration.ofMillis(20)) .withBackoffFactor(1.5d) .withMaxBackoff(Duration.ofMillis(500)) .withMaxRetries(10);
If a send operation finally fails, the stage also fails unless a different supervision strategy is applied. The producer stage honours stream supervision.
Observing connectivity and state of a JMS producer
All JMS producer’s materialized values are of type JmsProducerStatus
. This provides a connectorState
method returning a Source
of JmsConnectorState
updates that publishes connection attempts, disconnections, completions and failures. The source is completed after the JMS producer completes or fails.