Producer

The Apache Pekko Connectors JMS connector offers producing JMS messages to topics or queues in three ways

  • JVM types to an Apache Pekko Streams Sink
  • JmsMessage sub-types to a Apache Pekko Streams Sink or Flow (using JmsProducer.sink or JmsProducer.flow)
  • JmsEnvelope sub-types to a Apache Pekko Streams Flow (using JmsProducer.flexiFlow) to support pass-throughs

The JMS message model supports several types of message bodies in (see javax.jms.Message), which may be created directly from the Apache Pekko Stream elements, or in wrappers to access more advanced features.

Stream element type Apache Pekko Connectors producer
String JmsProducer.textSink
Array[Byte]byte[] JmsProducer.bytesSink
Map[String, AnyRef]Map<String, Object> JmsProducer.mapSink
Object (java.io.Serializable) JmsProducer.objectSink
JmsTextMessage JmsProducer.sink or JmsProducer.flow
JmsByteMessage JmsProducer.sink or JmsProducer.flow
JmsByteStringMessage JmsProducer.sink or JmsProducer.flow
JmsMapMessage JmsProducer.sink or JmsProducer.flow
JmsObjectMessage JmsProducer.sink or JmsProducer.flow
JmsEnvelope[PassThrough]JmsEnvelope<PassThrough> with instances JmsPassThrough, JmsTextMessagePassThrough, JmsByteMessagePassThrough, JmsByteStringMessagePassThrough, JmsMapMessagePassThrough, JmsObjectMessagePassThrough JmsProducer.flexiFlow

Configure JMS producers

To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory. Here we’re using Active MQ.

Scala
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
Java
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();

The created ConnectionFactory is then used for the creation of the different JMS sinks or sources (see below).

A JmsMessage sub-type sink

Use a case class with the subtype of JmsMessageJmsMessage to wrap the messages you want to send and optionally set message specific properties or headers. JmsProducerJmsProducer contains factory methods to facilitate the creation of sinks according to the message type.

Scala
sourceval jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink(
  JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers"))

val finished: Future[Done] =
  Source(immutable.Seq("Message A", "Message B"))
    .map(JmsTextMessage(_))
    .runWith(jmsSink)
Java
sourceSink<JmsTextMessage, CompletionStage<Done>> jmsSink =
    JmsProducer.sink(
        JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));

CompletionStage<Done> finished =
    Source.from(Arrays.asList("Message A", "Message B"))
        .map(JmsTextMessage::create)
        .runWith(jmsSink, system);

Setting JMS message properties

For every JmsMessageJmsMessage you can set JMS message properties.

Scala
sourceval msgsIn = (1 to 10).toList.map { n =>
  org.apache.pekko.stream.connectors.jms
    .JmsTextMessage(n.toString)
    .withProperty("Number", n)
    .withProperty("IsOdd", n % 2 == 1)
    .withProperty("IsEven", n % 2 == 0)
}
Java
sourceJmsTextMessage message =
    org.apache.pekko.stream.connectors.jms.JmsTextMessage.create(n.toString())
        .withProperty("Number", n)
        .withProperty("IsOdd", n % 2 == 1)
        .withProperty("IsEven", n % 2 == 0);

Setting JMS message header attributes

For every JmsMessageJmsMessage you can set also JMS message headers.

Scala
sourceval msgsIn = (1 to 10).toList.map { n =>
  JmsTextMessage(n.toString)
    .withHeader(JmsType("type"))
    .withHeader(JmsCorrelationId("correlationId"))
    .withHeader(JmsReplyTo.queue("test-reply"))
    .withHeader(JmsTimeToLive(FiniteDuration(999, TimeUnit.SECONDS)))
    .withHeader(JmsPriority(2))
    .withHeader(JmsDeliveryMode(DeliveryMode.NON_PERSISTENT))
}
Java
sourceList<JmsTextMessage> msgsIn =
    createTestMessageList().stream()
        .map(
            jmsTextMessage ->
                jmsTextMessage
                    .withHeader(JmsType.create("type"))
                    .withHeader(JmsCorrelationId.create("correlationId"))
                    .withHeader(JmsReplyTo.queue("test-reply"))
                    .withHeader(JmsTimeToLive.create(999, TimeUnit.SECONDS))
                    .withHeader(JmsPriority.create(2))
                    .withHeader(JmsDeliveryMode.create(DeliveryMode.NON_PERSISTENT)))
        .collect(Collectors.toList());

Raw JVM type sinks

Text sinks

Create a sink, that accepts and forwards JmsTextMessageJmsTextMessages to the JMS provider:

Scala
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)

val jmsSink: Sink[String, Future[Done]] = JmsProducer.textSink(
  JmsProducerSettings(system, connectionFactory).withQueue("test"))

val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val streamCompletion: Future[Done] =
  Source(in)
    .runWith(jmsSink)
Java
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();

Sink<String, CompletionStage<Done>> jmsSink =
    JmsProducer.textSink(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
CompletionStage<Done> finished = Source.from(in).runWith(jmsSink, system);

Byte array sinks

Create a sink, that accepts and forwards JmsByteMessageJmsByteMessages to the JMS provider.

Scala
sourceval jmsSink: Sink[Array[Byte], Future[Done]] = JmsProducer.bytesSink(
  JmsProducerSettings(system, connectionFactory).withQueue("test"))
val in: Array[Byte] = "ThisIsATest".getBytes(StandardCharsets.UTF_8)
val streamCompletion: Future[Done] =
  Source
    .single(in)
    .runWith(jmsSink)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Sink<byte[], CompletionStage<Done>> jmsSink =
    JmsProducer.bytesSink(
        JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));

byte[] in = "ThisIsATest".getBytes(StandardCharsets.UTF_8);
CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);

Map message sink

Create a sink, that accepts and forwards JmsMapMessageJmsMapMessages to the JMS provider:

Scala
sourceval jmsSink: Sink[Map[String, Any], Future[Done]] = JmsProducer.mapSink(
  JmsProducerSettings(system, connectionFactory).withQueue("test"))

val input = List(
  Map[String, Any](
    "string" -> "value",
    "int value" -> 42,
    "double value" -> 43.toDouble,
    "short value" -> 7.toShort,
    "boolean value" -> true,
    "long value" -> 7.toLong,
    "bytearray" -> "AStringAsByteArray".getBytes(StandardCharsets.UTF_8),
    "byte" -> 1.toByte))

val streamCompletion: Future[Done] =
  Source(input)
    .runWith(jmsSink)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Sink<Map<String, Object>, CompletionStage<Done>> jmsSink =
    JmsProducer.mapSink(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

Map<String, Object> in = new HashMap<>();
in.put("string value", "value");
in.put("int value", 42);
in.put("double value", 43.0);
in.put("short value", (short) 7);
in.put("boolean value", true);
in.put("long value", 7L);
in.put("bytearray", "AStringAsByteArray".getBytes(StandardCharsets.UTF_8));
in.put("byte", (byte) 1);

CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);

Object sinks

Create and configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this. Create a sink, that accepts and forwards JmsObjectMessageJmsObjectMessages to the JMS provider:

Scala
sourceval connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory]
connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava)

val jmsSink: Sink[Serializable, Future[Done]] = JmsProducer.objectSink(
  JmsProducerSettings(system, connectionFactory).withQueue("test"))
val in = DummyObject("ThisIsATest")
val streamCompletion: Future[Done] =
  Source
    .single(in)
    .runWith(jmsSink)
Java
sourceActiveMQConnectionFactory connectionFactory =
    (ActiveMQConnectionFactory) server.createConnectionFactory();
connectionFactory.setTrustedPackages(
    Arrays.asList(DummyJavaTests.class.getPackage().getName()));

Sink<java.io.Serializable, CompletionStage<Done>> jmsSink =
    JmsProducer.objectSink(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

java.io.Serializable in = new DummyJavaTests("javaTest");
CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);

Sending messages as a Flow

The producer can also act as a flow, in order to publish messages in the middle of stream processing. For example, you can ensure that a message is persisted to the queue before subsequent processing.

Scala
source
val flow: Flow[JmsMessage, JmsMessage, JmsProducerStatus] = JmsProducer.flow( JmsProducerSettings(system, connectionFactory) .withQueue("test")) val input: immutable.Seq[JmsTextMessage] = (1 to 100).map(i => JmsTextMessage(i.toString)) val result: Future[Seq[JmsMessage]] = Source(input) .via(flow) .runWith(Sink.seq)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Flow<JmsTextMessage, JmsTextMessage, JmsProducerStatus> flow =
    JmsProducer.flow(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

List<JmsTextMessage> input = createTestMessageList();

CompletionStage<List<JmsTextMessage>> result =
    Source.from(input).via(flow).runWith(Sink.seq(), system);

Sending messages with per-message destinations

It is also possible to define message destinations per message:

Scala
sourceval flowSink: Flow[JmsMessage, JmsMessage, JmsProducerStatus] =
  JmsProducer.flow(
    JmsProducerSettings(system, connectionFactory).withQueue("test"))

val input = (1 to 100).map { i =>
  val queueName = if (i % 2 == 0) "even" else "odd"
  JmsTextMessage(i.toString).toQueue(queueName)
}
Source(input).via(flowSink).runWith(Sink.ignore)
Java
sourceFlow<JmsTextMessage, JmsTextMessage, JmsProducerStatus> flowSink =
    JmsProducer.flow(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

List<JmsTextMessage> input = new ArrayList<>();
for (Integer n : Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) {
  String queueName = (n % 2 == 0) ? "even" : "odd";
  input.add(JmsTextMessage.create(n.toString()).toQueue(queueName));
}

Source.from(input).via(flowSink).runWith(Sink.seq(), system);

When no destination is defined on the message, the destination given in the producer settings is used.

Passing context through the producer

In some use cases, it is useful to pass through context information when producing (e.g. for acknowledging or committing messages after sending to Jms). For this, the JmsProducer.flexiFlow accepts implementations of JmsEnvelope, which it will pass through:

  • JmsPassThrough
  • JmsTextMessagePassThrough
  • JmsByteMessagePassThrough
  • JmsByteStringMessagePassThrough
  • JmsMapMessagePassThrough
  • JmsObjectMessagePassThrough
Scala
sourceval jmsProducer: Flow[JmsEnvelope[String], JmsEnvelope[String], JmsProducerStatus] =
  JmsProducer.flexiFlow[String](
    JmsProducerSettings(system, connectionFactory).withQueue("test"))

val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val in: immutable.Seq[JmsTextMessagePassThrough[String]] =
  data.map(t => JmsTextMessage(t).withPassThrough(t))

val result = Source(in)
  .via(jmsProducer)
  .map(_.passThrough) // extract the value passed through
  .runWith(Sink.seq)
Java
sourceFlow<JmsEnvelope<String>, JmsEnvelope<String>, JmsProducerStatus> jmsProducer =
    JmsProducer.flexiFlow(
        JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));

List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
List<JmsEnvelope<String>> input = new ArrayList<>();
for (String s : data) {
  String passThrough = s;
  input.add(JmsTextMessage.create(s, passThrough));
}

CompletionStage<List<String>> result =
    Source.from(input)
        .via(jmsProducer)
        .map(JmsEnvelope::passThrough)
        .runWith(Sink.seq(), system);

There are two implementations: One envelope type containing a messages to send to Jms, and one envelope type containing only values to pass through. This allows messages to flow without producing any new messages to Jms. This is primarily useful when committing offsets back to Kakfa, or when acknowledging Jms messages after sending the outcome of processing them back to Jms.

Scala
sourceval jmsProducer = JmsProducer.flexiFlow[String](
  JmsProducerSettings(producerConfig, connectionFactory).withQueue("topic"))

val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val in = data.map(t => JmsPassThrough(t))

val result = Source(in).via(jmsProducer).map(_.passThrough).runWith(Sink.seq)
Java
sourceFlow<JmsEnvelope<String>, JmsEnvelope<String>, JmsProducerStatus> jmsProducer =
    JmsProducer.flexiFlow(
        JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));

List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
List<JmsEnvelope<String>> input = new ArrayList<>();
for (String s : data) {
  String passThrough = s;
  input.add(JmsPassThrough.create(passThrough));
}

CompletionStage<List<String>> result =
    Source.from(input)
        .via(jmsProducer)
        .map(JmsEnvelope::passThrough)
        .runWith(Sink.seq(), system);

Producer Settings

The Apache Pekko Connectors JMS producer is configured via default settings in the HOCON config file section pekko.connectors.jms.producer in your application.conf, and settings may be tweaked in the code using the withXyz methods.

The JmsProducerSettings factories allow for passing the actor system to read from the default pekko.connectors.jms.producer section, or you may pass a Config instance which is resolved to a section of the same structure.

Scala
sourceval producerConfig: Config = system.settings.config.getConfig(JmsProducerSettings.configPath)
val settings = JmsProducerSettings(producerConfig, connectionFactory)
  .withTopic("target-topic")
  .withCredentials(Credentials("username", "password"))
  .withSessionCount(1)
Java
sourceConfig producerConfig = config.getConfig(JmsProducerSettings.configPath());
JmsProducerSettings settings =
    JmsProducerSettings.create(producerConfig, new ActiveMQConnectionFactory("broker-url"))
        .withTopic("target-topic")
        .withCredentials(Credentials.create("username", "password"))
        .withConnectionRetrySettings(retrySettings)
        .withSendRetrySettings(sendRetrySettings)
        .withSessionCount(10)
        .withTimeToLive(Duration.ofHours(1));

The producer can be configured with the following settings. On the second tab, the section from reference.conf shows the structure to use for configuring multiple set-ups.

Table
Setting Defaults Description
connectionFactory mandatory Factory to use for creating JMS connections
destination mandatory Destination (queue or topic) to send JMS messages to
credentials optional JMS broker credentials
connectionRetrySettings default settings Retry characteristics if the connection failed to be established or taking a long time. Please see default values under Connection Retries
sendRetrySettings default settings Retry characteristics if message sending failed. Please see default values under Send Retries
sessionCount defaults to 1 Number of parallel sessions to use for sending JMS messages. Increasing the number of parallel sessions increases throughput at the cost of message ordering. While the messages may arrive out of order on the JMS broker, the producer flow outputs messages in the order they are received
timeToLive optional Time messages should be kept on the Jms broker. This setting can be overridden on individual messages. If not set, messages will never expire
connectionStatusSubscriptionTimeout 5 seconds Time to wait for subscriber of connection status events before starting to discard them
reference.conf
source# Jms Producer Settings
# sets default values
producer {
  # Configure connection retrying by providing settings for ConnectionRetrySettings.
  connection-retry = ${pekko.connectors.jms.connection-retry}
  # Configure re-sending by providing settings for SendRetrySettings.
  send-retry = ${pekko.connectors.jms.send-retry}
  # Credentials to connect to the JMS broker.
  # credentials {
  #   username = "some text"
  #   password = "some text"
  # }
  # "off" to not use any credentials.
  credentials = off
  # Number of parallel sessions to use for sending JMS messages.
  # Increasing the number of parallel sessions increases throughput at the cost of message ordering.
  # While the messages may arrive out of order on the JMS broker, the producer flow outputs messages
  # in the order they are received.
  session-count = 1
  # Time messages should be kept on the JMS broker.
  # This setting can be overridden on individual messages.
  # "off" to not let messages expire.
  time-to-live = off
  # How long the stage should preserve connection status events for the first subscriber before discarding them
  connection-status-subscription-timeout = 5 seconds
}

Connection Retries

When a connection to a broker cannot be established and errors out, or is timing out being established or started, the connection can be retried. All JMS publishers, consumers, and browsers are configured with connection retry settings. On the second tab the section from reference.conf shows the structure to use for configuring multiple set-ups.

Table
Setting Description Default Value
connectTimeout Time allowed to establish and start a connection 10 s
initialRetry Wait time before retrying the first time 100 ms
backoffFactor Back-off factor for subsequent retries 2.0
maxBackoff Maximum back-off time allowed, after which all retries will happen after this delay 1 minute
maxRetries Maximum number of retries allowed (negative value is infinite) 10
reference.conf
source# Connection Retry Settings
# these set the defaults for Consumer, Producer, and Browse settings
connection-retry {
  # Time allowed to establish and start a connection.
  connect-timeout = 10 seconds
  # Wait time before retrying the connection the first time.
  initial-retry = 100 millis
  # Back-off factor for subsequent retries.
  backoff-factor = 2
  # Back-off factor for subsequent retries.
  max-backoff = 1 minute
  # Maximum number of retries allowed.
  # "infinite", or positive integer
  max-retries = 10
}

The retry time is calculated by:

initialRetry * retryNumberbackoffFactor

With the default settings, we’ll see retries after 100ms, 400ms, 900ms pauses, until the pauses reach 1 minute and will stay with 1 minute intervals for any subsequent retries.

Consumers, producers and browsers try to reconnect with the same retry characteristics if a connection fails mid-stream.

All JMS settings support setting the connectionRetrySettings field using .withConnectionRetrySettings(retrySettings) on the given settings. The followings show how to create ConnectionRetrySettings:

Scala
source// reiterating defaults from reference.conf
val retrySettings = ConnectionRetrySettings(system)
  .withConnectTimeout(10.seconds)
  .withInitialRetry(100.millis)
  .withBackoffFactor(2.0d)
  .withMaxBackoff(1.minute)
  .withMaxRetries(10)
Java
sourceConfig connectionRetryConfig = config.getConfig("pekko.connectors.jms.connection-retry");
// reiterating the values from reference.conf
ConnectionRetrySettings retrySettings =
    ConnectionRetrySettings.create(connectionRetryConfig)
        .withConnectTimeout(Duration.ofSeconds(10))
        .withInitialRetry(Duration.ofMillis(100))
        .withBackoffFactor(2.0)
        .withMaxBackoff(Duration.ofMinutes(1))
        .withMaxRetries(10);

Send Retries

When a connection to a broker starts failing, sending JMS messages will also fail. Those failed messages can be retried at the cost of potentially duplicating the failed messages. Send retries can be configured as follows:

Table
Setting Description Default Value
initialRetry Wait time before retrying the first time 20 ms
backoffFactor Back-off factor for subsequent retries 1.5
maxBackoff Maximum back-off time allowed, after which all retries will happen after this delay 500 ms
maxRetries Maximum number of retries allowed (negative value is infinite) 10
reference.conf
source# Send Retry Settings
# these set the defaults for Producer settings
send-retry {
  # Wait time before retrying the first time.
  initial-retry = 20 millis
  # Back-off factor for subsequent retries.
  backoff-factor = 1.5
  # Maximum back-off time allowed, after which all retries will happen after this delay.
  max-backoff = 500 millis
  # Maximum number of retries allowed.
  # "infinite", or positive integer
  max-retries = 10
}

The retry time is calculated by:

initialRetry * retryNumberbackoffFactor

With the default settings, we’ll see retries after 20ms, 57ms, 104ms pauses, until the pauses reach 500 ms and will stay with 500 ms intervals for any subsequent retries.

JMS producer settings support configuring retries by using .withSendRetrySettings(retrySettings). The followings show how to create SendRetrySettings:

Scala
source// reiterating defaults from reference.conf
val sendRetrySettings = SendRetrySettings(system)
  .withInitialRetry(20.millis)
  .withBackoffFactor(1.5d)
  .withMaxBackoff(500.millis)
  .withMaxRetries(10)
Java
sourceimport com.typesafe.config.Config;
import scala.Option;

Config sendRetryConfig = config.getConfig("pekko.connectors.jms.send-retry");
// reiterating the values from reference.conf
SendRetrySettings sendRetrySettings =
    SendRetrySettings.create(sendRetryConfig)
        .withInitialRetry(Duration.ofMillis(20))
        .withBackoffFactor(1.5d)
        .withMaxBackoff(Duration.ofMillis(500))
        .withMaxRetries(10);

If a send operation finally fails, the stage also fails unless a different supervision strategy is applied. The producer stage honours stream supervision.

Observing connectivity and state of a JMS producer

All JMS producer’s materialized values are of type JmsProducerStatus. This provides a connectorState method returning a Source of JmsConnectorState updates that publishes connection attempts, disconnections, completions and failures. The source is completed after the JMS producer completes or fails.