Browse
Browsing messages¶
The browse source streams the messages in a queue without consuming them.
Unlike the other sources, the browse source will complete after browsing all the messages currently on the queue.
source
val browseSource: Source[javax.jms.Message, NotUsed] = JmsConsumer.browse(
JmsBrowseSettings(system, connectionFactory)
.withQueue("test"))
val result: Future[immutable.Seq[javax.jms.Message]] =
browseSource.runWith(Sink.seq)
sourceConnectionFactory connectionFactory = server.createConnectionFactory();
Source<javax.jms.Message, NotUsed> browseSource =
JmsConsumer.browse(
JmsBrowseSettings.create(system, connectionFactory).withQueue("test"));
CompletionStage<List<javax.jms.Message>> result =
browseSource.runWith(Sink.seq(), system);
A JMS selector
can be used to filter the messages. Otherwise it will browse the entire content of the queue.
Notes:
- Messages may be arriving and expiring while the scan is done.
- The JMS API does not require the content of an enumeration to be a static snapshot of queue content. Whether these changes are visible or not depends on the JMS provider.
- A message must not be returned by a QueueBrowser before its delivery time has been reached.
Configure JMS browse¶
To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory
. The Apache Pekko Connectors tests and all examples use Active MQ.
sourceval connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
sourcejavax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
The created ConnectionFactory
is then used for the creation of the different JMS sources.
The JmsBrowseSettings
factories allow for passing the actor system to read from the default pekko.connectors.jms.browse
section, or you may pass a Config
instance which is resolved to a section of the same structure.
sourceval browseConfig: Config = system.settings.config.getConfig(JmsBrowseSettings.configPath)
val settings = JmsBrowseSettings(browseConfig, connectionFactory)
.withQueue("target-queue")
.withCredentials(Credentials("username", "password"))
.withConnectionRetrySettings(retrySettings)
sourceConfig consumerConfig = config.getConfig(JmsConsumerSettings.configPath());
JmsConsumerSettings settings =
JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url"))
.withTopic("message-topic")
.withCredentials(Credentials.create("username", "password"))
.withConnectionRetrySettings(retrySettings)
.withSessionCount(10)
.withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge())
.withSelector("Important = TRUE");
The Apache Pekko Connectors JMS browse soruce is configured via default settings in the HOCON config file section pekko.connectors.jms.browse
in your application.conf
, and settings may be tweaked in the code using the withXyz
methods. On the second tab the section from reference.conf
shows the structure to use for configuring multiple set-ups.
source# Jms Browse Settings
# sets default values
browse {
# Configure connection retrying by providing settings for ConnectionRetrySettings.
connection-retry = ${pekko.connectors.jms.connection-retry}
# Credentials to connect to the JMS broker.
# credentials {
# username = "some text"
# password = "some text"
# }
# "off" to not use any credentials.
credentials = off
# JMS selector expression.
# See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
# empty string for unset
selector = "" # optional
# Set an explicit acknowledge mode.
# See eg. javax.jms.Session.AUTO_ACKNOWLEDGE
# Allowed values: "auto", "client", "duplicates-ok", "session", integer value
acknowledge-mode = auto
}