Browse

Browsing messages

The browse source streams the messages in a queue without consuming them.

Unlike the other sources, the browse source will complete after browsing all the messages currently on the queue.

Scala
source
val browseSource: Source[javax.jms.Message, NotUsed] = JmsConsumer.browse( JmsBrowseSettings(system, connectionFactory) .withQueue("test")) val result: Future[immutable.Seq[javax.jms.Message]] = browseSource.runWith(Sink.seq)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Source<javax.jms.Message, NotUsed> browseSource =
    JmsConsumer.browse(
        JmsBrowseSettings.create(system, connectionFactory).withQueue("test"));

CompletionStage<List<javax.jms.Message>> result =
    browseSource.runWith(Sink.seq(), system);

A JMS selector can be used to filter the messages. Otherwise it will browse the entire content of the queue.

Notes:

  • Messages may be arriving and expiring while the scan is done.
  • The JMS API does not require the content of an enumeration to be a static snapshot of queue content. Whether these changes are visible or not depends on the JMS provider.
  • A message must not be returned by a QueueBrowser before its delivery time has been reached.

Configure JMS browse

To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory. The Apache Pekko Connectors tests and all examples use Active MQ.

Scala
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
Java
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();

The created ConnectionFactory is then used for the creation of the different JMS sources.

The JmsBrowseSettings factories allow for passing the actor system to read from the default pekko.connectors.jms.browse section, or you may pass a Config instance which is resolved to a section of the same structure.

Scala
sourceval browseConfig: Config = system.settings.config.getConfig(JmsBrowseSettings.configPath)
val settings = JmsBrowseSettings(browseConfig, connectionFactory)
  .withQueue("target-queue")
  .withCredentials(Credentials("username", "password"))
  .withConnectionRetrySettings(retrySettings)
Java
sourceConfig consumerConfig = config.getConfig(JmsConsumerSettings.configPath());
JmsConsumerSettings settings =
    JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url"))
        .withTopic("message-topic")
        .withCredentials(Credentials.create("username", "password"))
        .withConnectionRetrySettings(retrySettings)
        .withSessionCount(10)
        .withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge())
        .withSelector("Important = TRUE");

The Apache Pekko Connectors JMS browse soruce is configured via default settings in the HOCON config file section pekko.connectors.jms.browse in your application.conf, and settings may be tweaked in the code using the withXyz methods. On the second tab the section from reference.conf shows the structure to use for configuring multiple set-ups.

Table
Setting Description Default Value
connectionFactory Factory to use for creating JMS connections Must be set in code
destination The queue to browse Must be set in code
credentials JMS broker credentials Empty
connectionRetrySettings Retry characteristics if the connection failed to be established or is taking a long time. See Connection Retries
reference.conf
source# Jms Browse Settings
# sets default values
browse {
  # Configure connection retrying by providing settings for ConnectionRetrySettings.
  connection-retry = ${pekko.connectors.jms.connection-retry}
  # Credentials to connect to the JMS broker.
  # credentials {
  #   username = "some text"
  #   password = "some text"
  # }
  # "off" to not use any credentials.
  credentials = off
  # JMS selector expression.
  # See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
  # empty string for unset
  selector = "" # optional
  # Set an explicit acknowledge mode.
  # See eg. javax.jms.Session.AUTO_ACKNOWLEDGE
  # Allowed values: "auto", "client", "duplicates-ok", "session", integer value
  acknowledge-mode = auto
}