Pravega

Pravega provides a new storage abstraction - a stream - for continuous and unbounded data. A Pravega stream is an elastic set of durable and append-only segments, each segment being an unbounded sequence of bytes. Streams provide exactly-once semantics, and atomicity for groups of events using transactions.

Project Info: Apache Pekko Connectors Pravega
Artifact
org.apache.pekko
pekko-connectors-pravega
1.1.0-M1+129-4a175fef-SNAPSHOT
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.pravega
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-connectors-pravega" % "1.1.0-M1+129-4a175fef-SNAPSHOT"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-pravega_${scala.binary.version}</artifactId>
    <version>1.1.0-M1+129-4a175fef-SNAPSHOT</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-pravega_${versions.ScalaBinary}:1.1.0-M1+129-4a175fef-SNAPSHOT"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Concepts

Pravega stores streams of events, and streams are organized using scopes. A Pravega stream comprises a one or more parallel segments, and the set of parallel segments can change over time with auto-scaling. Pravega is designed to operate at scale and is able to accommodate a large number of segments and streams.

Pravega has an API to write and read events. An application looking into ingesting data writes events to a stream, while consuming data consists of reading events from a stream. In addition to the events API, Pravega has other APIs that enable an application to read and write bytes rather than events and to read events of a stream out of order (e.g., when batch processing).

Pravega stores stream data durably, and applications can access the stream data using the same API both when tailing the stream and when processing past data. The system is architected so that the underlying storage is elastic and it is able to accommodate unbounded streams.

When writing an event, Pravega accepts a routing key parameter, and it guarantees order per key even in the presence of auto-scaling.

Since version 8.x, Pravega supports write and read access to a Key/Value tables.

For more information about Pravega please visit the official documentation.

Configuration

Two categories of properties can/must be provided to configure the connector.

Pravega internals properties that are forwarded to Pravega configuration builders:

Apache Pekko Connectors Connector properties (all others).

reference.conf
source# SPDX-License-Identifier: Apache-2.0

pekko.connectors.pravega {
  #
  # ClientConfig (Pravega internals)
  defaults.client-config {
    # ControllerURI The controller rpc URI. This can be of 2 types
    #  1. tcp://ip1:port1,ip2:port2,...
    #   This is used if the controller endpoints are static and can be directly accessed.
    #  2. pravega://ip1:port1,ip2:port2,...
    #   This is used to autodiscovery the controller endpoints from an initial controller list.
    #controller-uri = "tcp://localhost:9090"

    # An optional property representing whether to enable TLS for client's communication with the Controller.
    # If this property and enable-tls-to-segment-store are not set, and the scheme used in controller-uri
    #  is "tls" or "pravegas", TLS is automatically enabled for both client-to-Controller and
    # client-to-Segment Store communications.
    #enable-tls-to-controller = false

    # An optional property representing whether to enable TLS for client's communication with the Controller.
    # If this property and 'enable-tls-to-controller' are not set, and the scheme used in 'controller-uri'
    #   is "tls" or "pravegas", TLS is automatically enabled for both client-to-Controller and
    # client-to-Segment Store communications.
    #enable-tls-to-segment-store = false

    # Maximum number of connections per Segment store to be used by connection pooling.
    #max-connections-per-segment-store=10

    # Path to an optional truststore. If this is null or empty, the default JVM trust store is used.
    # This is currently expected to be a signing certificate for the certification authority.
    #trust-store

    # If the flag 'isEnableTls'  is set, this flag decides whether to enable host name validation or not.
    #validate-host-name=true
  }
  reader {
    client-config = ${pekko.connectors.pravega.defaults.client-config}
    # ReaderConfig (Pravega internals)
    config {
      #disable-time-windows = false
      #initial-allocation-delay = 0
    }
    timeout = 1 second
    # The reader-id must be unique across all instances of a reader group.
    # When a reader-id is not provided one is randomly generated each time a Reader Source is created.
    #reader-id="scala-reader-id"
  }
  writer {
    client-config = ${pekko.connectors.pravega.defaults.client-config}
    maximum-inflight-messages = 10
    # EventWriterConfig (Pravega internals)
    config {
      #automatically-note-time=false
      #backoff-multiple=10
      #enable-connection-pooling=false
      #initial-backoff-millis=1
      #retry-attempts=10
      #
      # The transaction timeout parameter corresponds to the lease renewal period.
      # In every period, the client must send at least one ping to keep the txn alive.
      # If the client fails to do so, then Pravega aborts the txn automatically. The client
      # sends pings internally and requires no application intervention, only that it sets
      # this parameter accordingly.
      #
      # This parameter is additionally used to determine the total amount of time that
      # a txn can remain open. Currently, we set the maximum amount of time for a
      # txn to remain open to be the minimum between 1 day and 1,000 times the value
      # of the lease renewal period. The 1,000 is hardcoded and has been chosen arbitrarily
      # to be a large enough value.
      #
      # The maximum allowed lease time by default is 120s, see:
      #
      # io.pravega.controller.util.Config.PROPERTY_TXN_MAX_LEASE
      #
      # The maximum allowed lease time is a configuration parameter of the controller
      # and can be changed accordingly. Note that being a controller-wide parameter,
      # it affects all transactions.
      #transaction-timeout-time=89999L
    }
  }
  table {
    client-config = ${pekko.connectors.pravega.defaults.client-config}
    maximum-inflight-messages = 10
    # Max entries retrived by iterator.
    max-entries-at-once = 100 
  }

}

The Pravega connector can automatically configure the Pravega client by supplying configuration in an application.conf, or it can be set programmatically with ReaderSettingsBuilderReaderSettingsBuilder or WriterSettingsBuilderWriterSettingsBuilder. See the following sections for examples.

ClientConfig

This configuration holds connection properties (endpoints, protocol) for all communication.

It can be overridden in an application.conf file at the following configuration paths:

  • reader: pekko.connectors.pravega.reader.client-config
  • writer: pekko.connectors.pravega.writer.client-config

It can be customised programmatically, see below.

Streams

EventReader configuration

A Pravega Source needs a ReaderSettingsReaderSettings to operate, it can be built from configuration and programmatically customized:

Scala
sourceval readerSettings = ReaderSettingsBuilder(system)
  .clientConfigBuilder(
    _.controllerURI(new URI("pravegas://localhost:9090")) // ClientConfig customization
      .enableTlsToController(true)
      .enableTlsToSegmentStore(true))
  .readerConfigBuilder(_.disableTimeWindows(true)) // ReaderConfig customization
  .withTimeout(3.seconds)
  .withSerializer(new UTF8StringSerializer)
Java
sourceReaderSettings<String> readerSettings =
    ReaderSettingsBuilder.create(system)
        .clientConfigBuilder(
            builder -> builder.enableTlsToController(true)) // ClientConfig customization
        .readerConfigBuilder(
            builder -> builder.disableTimeWindows(true)) // ReaderConfig customization
        .withTimeout(Duration.ofSeconds(3))
        .withSerializer(new UTF8StringSerializer());

EventWriter configuration

A Pravega Flow or Sink needs a WriterSettingsWriterSettings to operate, it can be built from configuration and programmatically customized:

You may want to use a routing key, you have to provide a key extractor functionkey extractor function for your message type.

Scala
sourceval writerSettinds = WriterSettingsBuilder(system)
  .clientConfigBuilder(_.enableTlsToController(true)) // ClientConfig customization
  .eventWriterConfigBuilder(_.backoffMultiple(5)) // EventWriterConfig customization
  .withMaximumInflightMessages(5)
  .withKeyExtractor((str: String) => str.substring(0, 2))
  .withSerializer(new UTF8StringSerializer)
Java
sourceWriterSettings<String> writerSettings =
    WriterSettingsBuilder.<String>create(system)
        .withKeyExtractor((String str) -> str.substring(0, 1))
        .withSerializer(new UTF8StringSerializer());

ReaderSettingsBuilderReaderSettingsBuilder, ReaderSettingsBuilderReaderSettingsBuilder produce respectively ReaderSettings and ReaderSettings once a Serializer is provided.

Writing to Pravega stream

Pravega message writes are done through a Flow/Sink like:

Scala
sourceSource(1 to 100)
  .map(i => s"event_$i")
  .runWith(Pravega.sink("an_existing_scope", "an_existing_streamName", writerSettings))

Source(1 to 100)
  .map { i =>
    val routingKey = i % 10
    s"${routingKey}_event_$i"
  }
  .runWith(Pravega.sink("an_existing_scope", "an_existing_streamName", writerSettingsWithRoutingKey))
Java
sourceSink<String, CompletionStage<Done>> sinkWithRouting =
    Pravega.sink("an_existing_scope", "an_existing_scope", writerSettings);

CompletionStage<Done> doneWithRouting =
    Source.from(Arrays.asList("One", "Two", "Three")).runWith(sinkWithRouting, system);

Reading from Pravega stream

First you need to build a reader group:

Scala
source
Using(Pravega.readerGroupManager("an_existing_scope", readerSettings.clientConfig)) { readerGroupManager => readerGroupManager.createReaderGroup("myGroup", "stream1", "stream2") }
Java
source
ReaderGroup readerGroup; try (PravegaReaderGroupManager readerGroupManager = Pravega.readerGroup("an_existing_scope", readerSettings.clientConfig())) { readerGroup = readerGroupManager.createReaderGroup("my_group", "streamName"); }

Then use this reader group to read from a Source:

Scala
source
Pravega .source(readerGroup, readerSettings) .to(Sink.foreach { (event: PravegaEvent[String]) => val message: String = event.message processMessage(message) }) .run()
Java
sourceCompletionStage<Done> fut =
    Pravega.<String>source(readerGroup, readerSettings)
        .to(Sink.foreach(e -> processMessage(e.message())))
        .run(system);

It produces a stream of PravegaEventPravegaEvent, a thin wrapper which includes some Pravega metadata along with the message.

Key Value Pair table

Since version 0.8 Pravega exposes Key Value Pair table please note that this API is still experimental.

Similarly a Pravega Table Flow or Sink needs a TableWriterSettingsTableWriterSettings to operate:

Scala
sourceval tableWriterSettings = TableWriterSettingsBuilder[Int, String]()
  .clientConfigBuilder(_.enableTlsToController(true)) // ClientConfig customization
  .withMaximumInflightMessages(5)
  .withKeyExtractor(str => new TableKey(intSerializer.serialize(str.hashCode())))
  .build()
Java
sourceTableWriterSettings<Integer, String> tableWriterSettings =
    TableWriterSettingsBuilder.<Integer, String>create(
            system, intSerializer, new UTF8StringSerializer())
        .withKeyExtractor(id -> new TableKey(intSerializer.serialize(id)))
        .build();

To read from a Pravega Table Flow or Source needs a TableReaderSettingsTableReaderSettings to operate:

Scala
sourceval tableReaderSettings = TableReaderSettingsBuilder[Int, String]()
  .clientConfigBuilder(_.enableTlsToController(true)) // ClientConfig customization
  .withMaximumInflightMessages(5)
  .withMaxEntriesAtOnce(100)
  .withKeyExtractor(str => new TableKey(intSerializer.serialize(str.hashCode())))
  .build()
Java
sourceTableReaderSettings<Integer, String> tableReaderSettings =
    TableReaderSettingsBuilder.<Integer, String>create(
            system, intSerializer, new UTF8StringSerializer())
        .withKeyExtractor(id -> new TableKey(intSerializer.serialize(id)))
        .build();

Writing to Pravega KVP Table

Pravega message writes are done through a Flow/Sink like:

Scala
source
// Write through a flow Source(1 to 10) .map(id => (id, Person(id, s"name_$id"))) .via(PravegaTable.writeFlow("an_existing_scope", "an_existing_tablename", tablewriterSettings)) .runWith(Sink.ignore) // Write in a sink Source(1 to 10) .map(id => (id, Person(id, s"name_$id"))) .runWith(PravegaTable.sink("an_existing_scope", "an_existing_tablename", tablewriterSettings))
Java
sourcefinal List<Pair<Integer, String>> events =
    Arrays.asList(
        new Pair<Integer, String>(1, "One"),
        new Pair<Integer, String>(2, "Two"),
        new Pair<Integer, String>(3, "Three"),
        new Pair<Integer, String>(4, "Four"));

Sink<Pair<Integer, String>, CompletionStage<Done>> sink =
    PravegaTable.sink("an_existing_scope", "an_existing_tableName", tablewriterSettings);

CompletionStage<Done> done = Source.from(events).toMat(sink, Keep.right()).run(system);

Reading from Pravega KVP Table

Pravega message reads are from a Source:

Scala
source
val readingDone = PravegaTable .source("an_existing_scope", "an_existing_tablename", tableSettings) .to(Sink.foreach(println)) .run()
Java
source
final CompletionStage<Done> pair = PravegaTable.source("an_existing_scope", "an_existing_tableName", tableReaderSettings) .to(Sink.foreach((TableEntry<String> kvp) -> processKVP(kvp))) .run(system);

Or a Flow

Scala
sourceval tableSettings: TableSettings[String, Int] =
  TableReaderSettingsBuilder[String, Int]()
    .withKeyExtractor(p => new TableKey(stringSerializer.serialize(p)))
    .build()
PravegaTable
  .readFlow[String, Int](
    scope,
    tableName,
    tableSettings)

Support