Using IBM MQ
You can use IBM MQ like any other JMS Provider by creating a QueueConnectionFactory
or a TopicConnectionFactory
and creating a JmsConsumerSettings
or JmsProducerSettings
from it. The below snippets have been tested with a default IBM MQ docker image which contains queues and topics for testing. The following command starts MQ 9 using docker:
docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 1414:1414 --publish 9443:9443 ibmcom/mq:9.1.1.0
MQ settings for this image are shown here: https://github.com/ibm-messaging/mq-docker#mq-developer-defaults
Artifacts¶
libraryDependencies ++= Seq(
"org.pekko" %% "pekko-connectors-jms" % "1.1.0",
"javax.jms" % "jms" % "1.1",
"com.ibm.mq" % "com.ibm.mq.allclient" % "9.1.1.0"
)
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.pekko</groupId>
<artifactId>pekko-connectors-jms_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.1.0</version>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation "org.pekko:pekko-connectors-jms_${versions.ScalaBinary}:1.1.0"
implementation "javax.jms:jms:1.1"
implementation "com.ibm.mq:com.ibm.mq.allclient:9.1.1.0"
}
Create a MQConnectionFactory¶
The MQConnectionFactory
needs a queue manager name and a channel name, the docker command used in the previous section sets up a QM1
queue manager and a DEV.APP.SVRCONN
channel. The IBM MQ client makes it possible to connect to the MQ server over TCP/IP or natively through JNI (when the client and server run on the same machine). In the examples below we have chosen to use TCP/IP, which is done by setting the transport type to CommonConstants.WMQ_CM_CLIENT
.
Depending on the connection target, choose an appropriate implementation for the connection factory.
source// Create the IBM MQ MQQueueConnectionFactory
val connectionFactory = new MQQueueConnectionFactory()
// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost")
connectionFactory.setPort(1414)
connectionFactory.setQueueManager("QM1")
connectionFactory.setChannel("DEV.APP.SVRCONN")
source// Create the IBM MQ MQQueueConnectionFactory
MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory();
// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost");
connectionFactory.setPort(1414);
connectionFactory.setQueueManager("QM1");
connectionFactory.setChannel("DEV.APP.SVRCONN");
Create a JmsConsumer and JmsProducer to a Queue¶
source// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
val queueName = "DEV.QUEUE.1"
val jmsSink: Sink[String, Future[Done]] = JmsProducer.textSink(
JmsProducerSettings(producerConfig, queueConnectionFactory)
.withQueue(queueName))
// Option1: create Source using default factory with just name
val jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource(
JmsConsumerSettings(consumerConfig, queueConnectionFactory)
.withQueue(queueName))
source// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String queueName = "DEV.QUEUE.1";
Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, queueConnectionFactory).withQueue(queueName));
// Option1: create Source using default factory with just name
Source<TxEnvelope, JmsConsumerControl> txJmsSource =
JmsConsumer.txSource(
JmsConsumerSettings.create(system, queueConnectionFactory).withQueue(queueName));
Create a JmsConsumer and JmsProducer to a Topic¶
The IBM MQ docker container sets up a dev/
topic, which is used in the example below.
source// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
val testTopicName = "dev/"
val jmsTopicSink: Sink[String, Future[Done]] = JmsProducer.textSink(
JmsProducerSettings
.create(system, topicConnectionFactory)
.withTopic(testTopicName))
// Option1: create Source using default factory with just name
val jmsTopicSource: Source[String, JmsConsumerControl] = JmsConsumer
.textSource(
JmsConsumerSettings
.create(system, topicConnectionFactory)
.withTopic(testTopicName))
source// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testTopicName = "dev/";
Sink<String, CompletionStage<Done>> jmsTopicSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));
// Option1: create Source using default factory with just name
Source<String, JmsConsumerControl> jmsTopicSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));
Create a JmsConsumer and JmsProducer to custom destination¶
Example with custom queue.
source // Option2: create Source using custom factory
val customQueue = "DEV.QUEUE.3"
val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource(
JmsConsumerSettings
.create(system, queueConnectionFactory)
.withDestination(CustomDestination("custom", createQueue(customQueue))))
def createQueue(destinationName: String): Session => javax.jms.Queue = { (session: Session) =>
// cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
val mqSession = session.asInstanceOf[MQQueueSession]
mqSession.createQueue(destinationName)
}
source // Option2: create Source using custom factory
String customQueue = "DEV.QUEUE.3";
Source<String, JmsConsumerControl> jmsSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, queueConnectionFactory)
.withDestination(new CustomDestination("custom", createQueue(customQueue))));
Function<Session, Destination> createQueue(String destinationName) {
return (session) -> {
// cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
MQQueueSession mqSession = (MQQueueSession) session;
try {
return mqSession.createQueue(destinationName);
} catch (JMSException e) {
throw new RuntimeException(e);
}
};
}