Using IBM MQ
You can use IBM MQ like any other JMS Provider by creating a QueueConnectionFactory
or a TopicConnectionFactory
and creating a JmsConsumerSettings
or JmsProducerSettings
from it. The below snippets have been tested with a default IBM MQ docker image which contains queues and topics for testing. The following command starts MQ 9 using docker:
docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 1414:1414 --publish 9443:9443 ibmcom/mq:9.1.1.0
MQ settings for this image are shown here: https://github.com/ibm-messaging/mq-docker#mq-developer-defaults
Artifacts
- sbt
libraryDependencies ++= Seq( "org.pekko" %% "pekko-connectors-jms" % "1.1.0-M1+161-711cabb5-SNAPSHOT", "javax.jms" % "jms" % "1.1", "com.ibm.mq" % "com.ibm.mq.allclient" % "9.1.1.0" )
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.pekko</groupId> <artifactId>pekko-connectors-jms_${scala.binary.version}</artifactId> <version>1.1.0-M1+161-711cabb5-SNAPSHOT</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>com.ibm.mq</groupId> <artifactId>com.ibm.mq.allclient</artifactId> <version>9.1.1.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.pekko:pekko-connectors-jms_${versions.ScalaBinary}:1.1.0-M1+161-711cabb5-SNAPSHOT" implementation "javax.jms:jms:1.1" implementation "com.ibm.mq:com.ibm.mq.allclient:9.1.1.0" }
Create a MQConnectionFactory
The MQConnectionFactory
needs a queue manager name and a channel name, the docker command used in the previous section sets up a QM1
queue manager and a DEV.APP.SVRCONN
channel. The IBM MQ client makes it possible to connect to the MQ server over TCP/IP or natively through JNI (when the client and server run on the same machine). In the examples below we have chosen to use TCP/IP, which is done by setting the transport type to CommonConstants.WMQ_CM_CLIENT
.
Depending on the connection target, choose an appropriate implementation for the connection factory.
- Scala
-
source
// Create the IBM MQ MQQueueConnectionFactory val connectionFactory = new MQQueueConnectionFactory() // align to docker image: ibmcom/mq:9.1.1.0 connectionFactory.setHostName("localhost") connectionFactory.setPort(1414) connectionFactory.setQueueManager("QM1") connectionFactory.setChannel("DEV.APP.SVRCONN")
- Java
-
source
// Create the IBM MQ MQQueueConnectionFactory MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory(); // align to docker image: ibmcom/mq:9.1.1.0 connectionFactory.setHostName("localhost"); connectionFactory.setPort(1414); connectionFactory.setQueueManager("QM1"); connectionFactory.setChannel("DEV.APP.SVRCONN");
Create a JmsConsumer and JmsProducer to a Queue
- Scala
-
source
// Connect to IBM MQ over TCP/IP queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) val queueName = "DEV.QUEUE.1" val jmsSink: Sink[String, Future[Done]] = JmsProducer.textSink( JmsProducerSettings(producerConfig, queueConnectionFactory) .withQueue(queueName)) // Option1: create Source using default factory with just name val jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource( JmsConsumerSettings(consumerConfig, queueConnectionFactory) .withQueue(queueName))
- Java
-
source
// Connect to IBM MQ over TCP/IP queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT); String queueName = "DEV.QUEUE.1"; Sink<String, CompletionStage<Done>> jmsSink = JmsProducer.textSink( JmsProducerSettings.create(system, queueConnectionFactory).withQueue(queueName)); // Option1: create Source using default factory with just name Source<TxEnvelope, JmsConsumerControl> txJmsSource = JmsConsumer.txSource( JmsConsumerSettings.create(system, queueConnectionFactory).withQueue(queueName));
Create a JmsConsumer and JmsProducer to a Topic
The IBM MQ docker container sets up a dev/
topic, which is used in the example below.
- Scala
-
source
// Connect to IBM MQ over TCP/IP topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) val testTopicName = "dev/" val jmsTopicSink: Sink[String, Future[Done]] = JmsProducer.textSink( JmsProducerSettings .create(system, topicConnectionFactory) .withTopic(testTopicName)) // Option1: create Source using default factory with just name val jmsTopicSource: Source[String, JmsConsumerControl] = JmsConsumer .textSource( JmsConsumerSettings .create(system, topicConnectionFactory) .withTopic(testTopicName))
- Java
-
source
// Connect to IBM MQ over TCP/IP topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT); String testTopicName = "dev/"; Sink<String, CompletionStage<Done>> jmsTopicSink = JmsProducer.textSink( JmsProducerSettings.create(system, topicConnectionFactory).withTopic(testTopicName)); // Option1: create Source using default factory with just name Source<String, JmsConsumerControl> jmsTopicSource = JmsConsumer.textSource( JmsConsumerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));
Create a JmsConsumer and JmsProducer to custom destination
Example with custom queue.
- Scala
-
source
// Option2: create Source using custom factory val customQueue = "DEV.QUEUE.3" val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource( JmsConsumerSettings .create(system, queueConnectionFactory) .withDestination(CustomDestination("custom", createQueue(customQueue)))) def createQueue(destinationName: String): Session => javax.jms.Queue = { (session: Session) => // cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession val mqSession = session.asInstanceOf[MQQueueSession] mqSession.createQueue(destinationName) }
- Java
-
source
// Option2: create Source using custom factory String customQueue = "DEV.QUEUE.3"; Source<String, JmsConsumerControl> jmsSource = JmsConsumer.textSource( JmsConsumerSettings.create(system, queueConnectionFactory) .withDestination(new CustomDestination("custom", createQueue(customQueue)))); Function<Session, Destination> createQueue(String destinationName) { return (session) -> { // cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession MQQueueSession mqSession = (MQQueueSession) session; try { return mqSession.createQueue(destinationName); } catch (JMSException e) { throw new RuntimeException(e); } }; }