Using IBM MQ

You can use IBM MQ like any other JMS Provider by creating a QueueConnectionFactory or a TopicConnectionFactory and creating a JmsConsumerSettings or JmsProducerSettings from it. The below snippets have been tested with a default IBM MQ docker image which contains queues and topics for testing. The following command starts MQ 9 using docker:

docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 1414:1414 --publish 9443:9443 ibmcom/mq:9.1.1.0

MQ settings for this image are shown here: https://github.com/ibm-messaging/mq-docker#mq-developer-defaults

Artifacts

sbt
libraryDependencies ++= Seq(
  "org.pekko" %% "pekko-connectors-jms" % "1.1.0-M1+161-711cabb5-SNAPSHOT",
  "javax.jms" % "jms" % "1.1",
  "com.ibm.mq" % "com.ibm.mq.allclient" % "9.1.1.0"
)
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.pekko</groupId>
    <artifactId>pekko-connectors-jms_${scala.binary.version}</artifactId>
    <version>1.1.0-M1+161-711cabb5-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>javax.jms</groupId>
    <artifactId>jms</artifactId>
    <version>1.1</version>
  </dependency>
  <dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>com.ibm.mq.allclient</artifactId>
    <version>9.1.1.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.pekko:pekko-connectors-jms_${versions.ScalaBinary}:1.1.0-M1+161-711cabb5-SNAPSHOT"
  implementation "javax.jms:jms:1.1"
  implementation "com.ibm.mq:com.ibm.mq.allclient:9.1.1.0"
}

Create a MQConnectionFactory

The MQConnectionFactory needs a queue manager name and a channel name, the docker command used in the previous section sets up a QM1 queue manager and a DEV.APP.SVRCONN channel. The IBM MQ client makes it possible to connect to the MQ server over TCP/IP or natively through JNI (when the client and server run on the same machine). In the examples below we have chosen to use TCP/IP, which is done by setting the transport type to CommonConstants.WMQ_CM_CLIENT.

Depending on the connection target, choose an appropriate implementation for the connection factory.

Scala
source// Create the IBM MQ MQQueueConnectionFactory
val connectionFactory = new MQQueueConnectionFactory()

// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost")
connectionFactory.setPort(1414)
connectionFactory.setQueueManager("QM1")
connectionFactory.setChannel("DEV.APP.SVRCONN")
Java
source// Create the IBM MQ MQQueueConnectionFactory
MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory();

// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost");
connectionFactory.setPort(1414);
connectionFactory.setQueueManager("QM1");
connectionFactory.setChannel("DEV.APP.SVRCONN");

Create a JmsConsumer and JmsProducer to a Queue

Scala
source// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
val queueName = "DEV.QUEUE.1"

val jmsSink: Sink[String, Future[Done]] = JmsProducer.textSink(
  JmsProducerSettings(producerConfig, queueConnectionFactory)
    .withQueue(queueName))

// Option1: create Source using default factory with just name
val jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource(
  JmsConsumerSettings(consumerConfig, queueConnectionFactory)
    .withQueue(queueName))
Java
source// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String queueName = "DEV.QUEUE.1";

Sink<String, CompletionStage<Done>> jmsSink =
    JmsProducer.textSink(
        JmsProducerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// Option1: create Source using default factory with just name
Source<TxEnvelope, JmsConsumerControl> txJmsSource =
    JmsConsumer.txSource(
        JmsConsumerSettings.create(system, queueConnectionFactory).withQueue(queueName));

Create a JmsConsumer and JmsProducer to a Topic

The IBM MQ docker container sets up a dev/ topic, which is used in the example below.

Scala
source// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
val testTopicName = "dev/"

val jmsTopicSink: Sink[String, Future[Done]] = JmsProducer.textSink(
  JmsProducerSettings
    .create(system, topicConnectionFactory)
    .withTopic(testTopicName))

// Option1: create Source using default factory with just name
val jmsTopicSource: Source[String, JmsConsumerControl] = JmsConsumer
  .textSource(
    JmsConsumerSettings
      .create(system, topicConnectionFactory)
      .withTopic(testTopicName))
Java
source// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testTopicName = "dev/";

Sink<String, CompletionStage<Done>> jmsTopicSink =
    JmsProducer.textSink(
        JmsProducerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));

// Option1: create Source using default factory with just name
Source<String, JmsConsumerControl> jmsTopicSource =
    JmsConsumer.textSource(
        JmsConsumerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));

Create a JmsConsumer and JmsProducer to custom destination

Example with custom queue.

Scala
source  // Option2: create Source using custom factory
  val customQueue = "DEV.QUEUE.3"
  val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource(
    JmsConsumerSettings
      .create(system, queueConnectionFactory)
      .withDestination(CustomDestination("custom", createQueue(customQueue))))

def createQueue(destinationName: String): Session => javax.jms.Queue = { (session: Session) =>
  // cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
  val mqSession = session.asInstanceOf[MQQueueSession]
  mqSession.createQueue(destinationName)
}
Java
source  // Option2: create Source using custom factory
  String customQueue = "DEV.QUEUE.3";
  Source<String, JmsConsumerControl> jmsSource =
      JmsConsumer.textSource(
          JmsConsumerSettings.create(system, queueConnectionFactory)
              .withDestination(new CustomDestination("custom", createQueue(customQueue))));

Function<Session, Destination> createQueue(String destinationName) {
  return (session) -> {
    // cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
    MQQueueSession mqSession = (MQQueueSession) session;
    try {
      return mqSession.createQueue(destinationName);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    }
  };
}