MongoDB

The MongoDB connector allows you to read and save documents. You can query a stream of documents from MongoSourceMongoSource or update documents in a collection with MongoSinkMongoSink.

This connector is based on the MongoDB Java Driver, which is compatible with MongoDB versions 2.6 through 4.4.

Alternative connector

Another MongoDB connector is available - ReactiveMongo. It is a Scala driver that provides fully non-blocking and asynchronous I/O operations. Please read more about it in the ReactiveMongo documentation.

Project Info: Apache Pekko Connectors MongoDB
Artifact
org.apache.pekko
pekko-connectors-mongodb
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20
JPMS module namepekko.stream.connectors.mongodb
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-mongodb" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-mongodb_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-mongodb_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Initialization

In the code examples below we will be using Mongo’s support for automatic codec derivation for POJOs. For Scala we will be using a case class and a macro based codec derivation. For Java a codec for POJO is derived using reflection.

Scala
sourcecase class Number(_id: Int)
Java
sourcepublic final class Number {
  private Integer _id;

  public Number() {}

  public Number(Integer _id) {
    this._id = _id;
  }

  public void setId(Integer _id) {
    this._id = _id;
  }

  public Integer getId() {
    return _id;
  }

}

For codec support, you first need to setup a CodecRegistry.

Scala
sourceimport org.bson.codecs.configuration.CodecRegistries.{ fromProviders, fromRegistries }
import org.mongodb.scala.MongoClient.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.bson.codecs.Macros._

val codecRegistry =
  fromRegistries(fromProviders(classOf[Number]: @nowarn("msg=match may not be exhaustive")), DEFAULT_CODEC_REGISTRY)
Java
sourcePojoCodecProvider codecProvider = PojoCodecProvider.builder().register(Number.class).build();
CodecRegistry codecRegistry =
    CodecRegistries.fromProviders(codecProvider, new ValueCodecProvider());

Sources provided by this connector need a prepared collection to communicate with the MongoDB server. To get a reference to a collection, let’s initialize a MongoDB connection and access the database.

Scala
sourceprivate val client = MongoClients.create("mongodb://localhost:27017")
private val db = client.getDatabase("MongoSourceSpec")
private val numbersColl = db
  .getCollection("numbers", classOf[Number])
  .withCodecRegistry(codecRegistry)
Java
sourceclient = MongoClients.create("mongodb://localhost:27017");
db = client.getDatabase("MongoSourceTest");
numbersColl = db.getCollection("numbers", Number.class).withCodecRegistry(codecRegistry);

We will also need an ActorSystemActorSystem.

Scala
sourceimplicit val system: ActorSystem = ActorSystem()
Java
sourcesystem = ActorSystem.create();

Source

Let’s create a source from a Reactive Streams Publisher.

Scala
sourceval source: Source[Number, NotUsed] =
  MongoSource(numbersColl.find(classOf[Number]))
Java
sourcefinal Source<Number, NotUsed> source = MongoSource.create(numbersColl.find(Number.class));

And then run it.

Scala
sourceval rows: Future[Seq[Number]] = source.runWith(Sink.seq)
Java
sourcefinal CompletionStage<List<Number>> rows = source.runWith(Sink.seq(), system);

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Flow and Sink

Each of these sink factory methods have a corresponding factory in MongoFlowMongoFlow which will emit the written document or result of the operation downstream.

Insert

We can use a Source of documents to save them to a mongo collection using MongoSink.insertOneMongoSink.insertOne or MongoSink.insertManyMongoSink.insertMany.

Scala
sourceval testRangeObjects = testRange.map(Number)
val source = Source(testRangeObjects)
source.runWith(MongoSink.insertOne(numbersColl)).futureValue
Java
sourceList<Number> testRangeObjects = testRange.stream().map(Number::new).collect(toList());
final CompletionStage<Done> completion =
    Source.from(testRangeObjects).runWith(MongoSink.insertOne(numbersColl), system);

Insert Many

Insert many can be used if you have a collection of documents to insert at once.

Scala
sourceval objects = testRange.map(Number)
val source = Source(objects)
val completion = source.grouped(2).runWith(MongoSink.insertMany[Number](numbersColl))
Java
sourcefinal List<Number> testRangeObjects = testRange.stream().map(Number::new).collect(toList());
final CompletionStage<Done> completion =
    Source.from(testRangeObjects).grouped(2).runWith(MongoSink.insertMany(numbersColl), system);

Update

We can update documents with a Source of DocumentUpdate which is a filter and an update definition. Use either MongoSink.updateOneMongoSink.updateOne or MongoSink.updateManyMongoSink.updateMany if the filter should target one or many documents.

Scala
sourceval source = Source(testRange).map(i =>
  DocumentUpdate(filter = Filters.eq("value", i), update = Updates.set("updateValue", i * -1)))
val completion = source.runWith(MongoSink.updateOne(numbersDocumentColl))
Java
sourcefinal Source<DocumentUpdate, NotUsed> source =
    Source.from(testRange)
        .map(
            i ->
                DocumentUpdate.create(
                    Filters.eq("value", i), Updates.set("updateValue", i * -1)));
final CompletionStage<Done> completion =
    source.runWith(MongoSink.updateOne(numbersDocumentColl), system);

Delete

We can delete documents with a Source of filters. Use either MongoSink.deleteOneMongoSink.deleteOne or MongoSink.deleteManyMongoSink.deleteMany if the filter should target one or many documents.

Scala
sourceval source = Source(testRange).map(i => Filters.eq("value", i))
val completion = source.runWith(MongoSink.deleteOne(numbersDocumentColl))
Java
sourcefinal Source<Bson, NotUsed> source = Source.from(testRange).map(i -> Filters.eq("value", i));
final CompletionStage<Done> completion =
    source.runWith(MongoSink.deleteOne(numbersDocumentColl), system);