Processing with Apache Pekko Streams
An Apache Pekko Streams FlowWithContext
can be used instead of a handler for processing the envelopes with at-least-once semantics.
The following example is using the CassandraProjection
but the flow would be the same if used any other offset storage.
- Scala
-
source
val logger = LoggerFactory.getLogger(getClass) val flow = FlowWithContext[EventEnvelope[ShoppingCart.Event], ProjectionContext] .map(envelope => envelope.event) .map { case ShoppingCart.CheckedOut(cartId, time) => logger.info2("Shopping cart {} was checked out at {}", cartId, time) Done case otherEvent => logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent) Done } val projection = CassandraProjection .atLeastOnceFlow(projectionId = ProjectionId("shopping-carts", "carts-1"), sourceProvider, handler = flow) .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
- Java
-
source
Logger logger = LoggerFactory.getLogger("example"); FlowWithContext< EventEnvelope<ShoppingCart.Event>, ProjectionContext, Done, ProjectionContext, NotUsed> flow = FlowWithContext.<EventEnvelope<ShoppingCart.Event>, ProjectionContext>create() .map(EventEnvelope::event) .map( event -> { if (event instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event; logger.info( "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime); } else { logger.debug("Shopping cart {} changed by {}", event.getCartId(), event); } return Done.getInstance(); }); int saveOffsetAfterEnvelopes = 100; Duration saveOffsetAfterDuration = Duration.ofMillis(500); Projection<EventEnvelope<ShoppingCart.Event>> projection = CassandraProjection.atLeastOnceFlow( ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, flow) .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
The flow should emit a Done
element for each completed envelope. The offset of the envelope is carried in the context of the FlowWithContext
and is stored in Cassandra when corresponding Done
is emitted. Since the offset is stored after processing the envelope, it means that if the projection is restarted from previously stored offset some envelopes may be processed more than once.
There are a few caveats to be aware of:
- If the flow filters out envelopes the corresponding offset will not be stored, and such an envelope will be processed again if the projection is restarted and no later offset was stored.
- The flow should not duplicate emitted envelopes (
mapConcat
) with same offset, because then it can result in that the first offset is stored and when the projection is restarted that offset is considered completed even though more of the duplicated enveloped were never processed. - The flow must not reorder elements, because the offsets may be stored in the wrong order and when the projection is restarted all envelopes up to the latest stored offset are considered completed even though some of them may not have been processed. This is the reason the flow is restricted to
FlowWithContext
rather than ordinaryFlow
.
1.0.0