Integration Patterns
Many Enterprise Integration Patterns can be implemented with Apache Pekko Streams (see Apache Pekko Streams documentation).
PassThrough
Use PassThroughFlow when you have a message that should be used in a flow that transform it but you want to maintain the original message for another following flow. For example when consuming messages from Kafka (CommittableMessage), the message can be used inside a flow (transform it, save it inside a database, …) and then you need again the original message to commit the offset.
It can be used whenever you have 2 flows:
- Flow1 that takes a message
A
and returnsB
- Flow2 that takes a message
A
and returnC
If you want to execute first Flow1 and then Flow2 you need a way to maintain/passthrough message A
.
a=>transform=>a1
/ \
/ \
a=>(a, a)=>unzip - zip=>(a1, a)=> a
\ /
\ /
--------a--------
- Scala
-
source
object PassThroughFlow { def apply[A, T](processingFlow: Flow[A, T, NotUsed]): Graph[FlowShape[A, (T, A)], NotUsed] = apply[A, T, (T, A)](processingFlow, Keep.both) def apply[A, T, O](processingFlow: Flow[A, T, NotUsed], output: (T, A) => O): Graph[FlowShape[A, O], NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder => { import GraphDSL.Implicits._ val broadcast = builder.add(Broadcast[A](2)) val zip = builder.add(ZipWith[T, A, O]((left, right) => output(left, right))) // format: off broadcast.out(0) ~> processingFlow ~> zip.in0 broadcast.out(1) ~> zip.in1 // format: on FlowShape(broadcast.in, zip.out) } }) }
- Java
-
source
class PassThroughFlow { public static <A, T> Graph<FlowShape<A, Pair<T, A>>, NotUsed> create(Flow<A, T, NotUsed> flow) { return create(flow, Keep.both()); } public static <A, T, O> Graph<FlowShape<A, O>, NotUsed> create( Flow<A, T, NotUsed> flow, Function2<T, A, O> output) { return Flow.fromGraph( GraphDSL.create( builder -> { UniformFanOutShape<A, A> broadcast = builder.add(Broadcast.create(2)); FanInShape2<T, A, O> zip = builder.add(ZipWith.create(output)); builder.from(broadcast.out(0)).via(builder.add(flow)).toInlet(zip.in0()); builder.from(broadcast.out(1)).toInlet(zip.in1()); return FlowShape.apply(broadcast.in(), zip.out()); })); } }
A sample usage:
- Scala
-
source
// Sample Source val source = Source(List(1, 2, 3)) // Pass through this flow maintaining the original message val passThroughMe = Flow[Int] .map(_ * 10) val ret = source .via(PassThroughFlow(passThroughMe)) .runWith(Sink.seq) // Verify results ret.futureValue should be(Vector((10, 1), (20, 2), (30, 3)))
- Java
-
source
// Sample Source Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2, 3)); // Pass through this flow maintaining the original message Flow<Integer, Integer, NotUsed> passThroughMe = Flow.of(Integer.class).map(i -> i * 10); CompletionStage<List<Pair<Integer, Integer>>> ret = source.via(PassThroughFlow.create(passThroughMe)).runWith(Sink.seq(), system); // Verify results List<Pair<Integer, Integer>> list = ret.toCompletableFuture().get(); assert list.equals( Arrays.asList( new Pair<Integer, Integer>(10, 1), new Pair<Integer, Integer>(20, 2), new Pair<Integer, Integer>(30, 3)));
Using Keep
you can choose what it the return value:
PassThroughFlow(passThroughMe, Keep.right)
: to only output the original messagePassThroughFlow(passThroughMe, Keep.both)
: to output both values as aTuple
Keep.left
/Keep.none
: are not very useful in this use case, there isn’t a pass-through …
You can also write your own output function to combine in different ways the two outputs.
- Scala
-
source
// Sample Source val source = Source(List(1, 2, 3)) // Pass through this flow maintaining the original message val passThroughMe = Flow[Int] .map(_ * 10) val ret = source .via(PassThroughFlow(passThroughMe, Keep.right)) .runWith(Sink.seq) // Verify results ret.futureValue should be(Vector(1, 2, 3))
- Java
-
source
// Sample Source Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2, 3)); // Pass through this flow maintaining the original message Flow<Integer, Integer, NotUsed> passThroughMe = Flow.of(Integer.class).map(i -> i * 10); CompletionStage<List<Integer>> ret = source.via(PassThroughFlow.create(passThroughMe, Keep.right())).runWith(Sink.seq(), system); // Verify results List<Integer> list = ret.toCompletableFuture().get(); assert list.equals(Arrays.asList(1, 2, 3));
This pattern is useful when integrating Apache Pekko Connectors connectors together. Here an example with Kafka:
- Scala
-
source
val writeFlow = Flow[ConsumerMessage.CommittableMessage[String, Array[Byte]]].map(_ => ???) val consumerSettings = ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer) val committerSettings = CommitterSettings(system) Consumer .committableSource(consumerSettings, Subscriptions.topics("topic1")) .via(PassThroughFlow(writeFlow, Keep.right)) .map(_.committableOffset) .toMat(Committer.sink(committerSettings))(DrainingControl.apply) .run()
- Java
-
source
Flow<ConsumerMessage.CommittableMessage<String, byte[]>, String, NotUsed> writeFlow = Flow.fromFunction(i -> i.record().value().toString()); ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new ByteArrayDeserializer()); CommitterSettings comitterSettings = CommitterSettings.create(system); Consumer.DrainingControl<Done> control = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .via(PassThroughFlow.create(writeFlow, Keep.right())) .map(i -> i.committableOffset()) .toMat(Committer.sink(comitterSettings), Keep.both()) .mapMaterializedValue(Consumer::createDrainingControl) .run(system);