Pravega
Pravega provides a new storage abstraction - a stream - for continuous and unbounded data. A Pravega stream is an elastic set of durable and append-only segments, each segment being an unbounded sequence of bytes. Streams provide exactly-once semantics, and atomicity for groups of events using transactions.
Project Info: Apache Pekko Connectors Pravega | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-pravega
1.1.0-M1+154-6981eaa8-SNAPSHOT
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.pravega |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-connectors-pravega" % "1.1.0-M1+154-6981eaa8-SNAPSHOT"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-pravega_${scala.binary.version}</artifactId> <version>1.1.0-M1+154-6981eaa8-SNAPSHOT</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-pravega_${versions.ScalaBinary}:1.1.0-M1+154-6981eaa8-SNAPSHOT" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Concepts
Pravega stores streams of events, and streams are organized using scopes. A Pravega stream comprises a one or more parallel segments, and the set of parallel segments can change over time with auto-scaling. Pravega is designed to operate at scale and is able to accommodate a large number of segments and streams.
Pravega has an API to write and read events. An application looking into ingesting data writes events to a stream, while consuming data consists of reading events from a stream. In addition to the events API, Pravega has other APIs that enable an application to read and write bytes rather than events and to read events of a stream out of order (e.g., when batch processing).
Pravega stores stream data durably, and applications can access the stream data using the same API both when tailing the stream and when processing past data. The system is architected so that the underlying storage is elastic and it is able to accommodate unbounded streams.
When writing an event, Pravega accepts a routing key parameter, and it guarantees order per key even in the presence of auto-scaling.
Since version 8.x, Pravega supports write and read access to a Key/Value tables.
For more information about Pravega please visit the official documentation.
Configuration
Two categories of properties can/must be provided to configure the connector.
Pravega internals properties that are forwarded to Pravega configuration builders:
ClientConfig
pekko.connectors.pravega.defaults.client-config
EventWriterConfig
pekko.connectors.pravega.writer.config
ReaderConfig
pekko.connectors.pravega.reader.config
Apache Pekko Connectors Connector properties (all others).
- reference.conf
-
source
# SPDX-License-Identifier: Apache-2.0 pekko.connectors.pravega { # # ClientConfig (Pravega internals) defaults.client-config { # ControllerURI The controller rpc URI. This can be of 2 types # 1. tcp://ip1:port1,ip2:port2,... # This is used if the controller endpoints are static and can be directly accessed. # 2. pravega://ip1:port1,ip2:port2,... # This is used to autodiscovery the controller endpoints from an initial controller list. #controller-uri = "tcp://localhost:9090" # An optional property representing whether to enable TLS for client's communication with the Controller. # If this property and enable-tls-to-segment-store are not set, and the scheme used in controller-uri # is "tls" or "pravegas", TLS is automatically enabled for both client-to-Controller and # client-to-Segment Store communications. #enable-tls-to-controller = false # An optional property representing whether to enable TLS for client's communication with the Controller. # If this property and 'enable-tls-to-controller' are not set, and the scheme used in 'controller-uri' # is "tls" or "pravegas", TLS is automatically enabled for both client-to-Controller and # client-to-Segment Store communications. #enable-tls-to-segment-store = false # Maximum number of connections per Segment store to be used by connection pooling. #max-connections-per-segment-store=10 # Path to an optional truststore. If this is null or empty, the default JVM trust store is used. # This is currently expected to be a signing certificate for the certification authority. #trust-store # If the flag 'isEnableTls' is set, this flag decides whether to enable host name validation or not. #validate-host-name=true } reader { client-config = ${pekko.connectors.pravega.defaults.client-config} # ReaderConfig (Pravega internals) config { #disable-time-windows = false #initial-allocation-delay = 0 } timeout = 1 second # The reader-id must be unique across all instances of a reader group. # When a reader-id is not provided one is randomly generated each time a Reader Source is created. #reader-id="scala-reader-id" } writer { client-config = ${pekko.connectors.pravega.defaults.client-config} maximum-inflight-messages = 10 # EventWriterConfig (Pravega internals) config { #automatically-note-time=false #backoff-multiple=10 #enable-connection-pooling=false #initial-backoff-millis=1 #retry-attempts=10 # # The transaction timeout parameter corresponds to the lease renewal period. # In every period, the client must send at least one ping to keep the txn alive. # If the client fails to do so, then Pravega aborts the txn automatically. The client # sends pings internally and requires no application intervention, only that it sets # this parameter accordingly. # # This parameter is additionally used to determine the total amount of time that # a txn can remain open. Currently, we set the maximum amount of time for a # txn to remain open to be the minimum between 1 day and 1,000 times the value # of the lease renewal period. The 1,000 is hardcoded and has been chosen arbitrarily # to be a large enough value. # # The maximum allowed lease time by default is 120s, see: # # io.pravega.controller.util.Config.PROPERTY_TXN_MAX_LEASE # # The maximum allowed lease time is a configuration parameter of the controller # and can be changed accordingly. Note that being a controller-wide parameter, # it affects all transactions. #transaction-timeout-time=89999L } } table { client-config = ${pekko.connectors.pravega.defaults.client-config} maximum-inflight-messages = 10 # Max entries retrived by iterator. max-entries-at-once = 100 } }
The Pravega connector can automatically configure the Pravega client by supplying configuration in an application.conf, or it can be set programmatically with ReaderSettingsBuilder
ReaderSettingsBuilder
or WriterSettingsBuilder
WriterSettingsBuilder
. See the following sections for examples.
ClientConfig
This configuration holds connection properties (endpoints, protocol) for all communication.
It can be overridden in an application.conf
file at the following configuration paths:
- reader:
pekko.connectors.pravega.reader.client-config
- writer:
pekko.connectors.pravega.writer.client-config
It can be customised programmatically, see below.
Streams
EventReader configuration
A Pravega Source needs a ReaderSettings
ReaderSettings
to operate, it can be built from configuration and programmatically customized:
- Scala
-
source
val readerSettings = ReaderSettingsBuilder(system) .clientConfigBuilder( _.controllerURI(new URI("pravegas://localhost:9090")) // ClientConfig customization .enableTlsToController(true) .enableTlsToSegmentStore(true)) .readerConfigBuilder(_.disableTimeWindows(true)) // ReaderConfig customization .withTimeout(3.seconds) .withSerializer(new UTF8StringSerializer)
- Java
-
source
ReaderSettings<String> readerSettings = ReaderSettingsBuilder.create(system) .clientConfigBuilder( builder -> builder.enableTlsToController(true)) // ClientConfig customization .readerConfigBuilder( builder -> builder.disableTimeWindows(true)) // ReaderConfig customization .withTimeout(Duration.ofSeconds(3)) .withSerializer(new UTF8StringSerializer());
EventWriter configuration
A Pravega Flow or Sink needs a WriterSettings
WriterSettings
to operate, it can be built from configuration and programmatically customized:
You may want to use a routing key, you have to provide a key extractor function
key extractor function
for your message type.
- Scala
-
source
val writerSettinds = WriterSettingsBuilder(system) .clientConfigBuilder(_.enableTlsToController(true)) // ClientConfig customization .eventWriterConfigBuilder(_.backoffMultiple(5)) // EventWriterConfig customization .withMaximumInflightMessages(5) .withKeyExtractor((str: String) => str.substring(0, 2)) .withSerializer(new UTF8StringSerializer)
- Java
-
source
WriterSettings<String> writerSettings = WriterSettingsBuilder.<String>create(system) .withKeyExtractor((String str) -> str.substring(0, 1)) .withSerializer(new UTF8StringSerializer());
ReaderSettingsBuilder
ReaderSettingsBuilder
, ReaderSettingsBuilder
ReaderSettingsBuilder
produce respectively ReaderSettings and ReaderSettings once a Serializer
is provided.
Writing to Pravega stream
Pravega message writes are done through a Flow/Sink like:
- Scala
-
source
Source(1 to 100) .map(i => s"event_$i") .runWith(Pravega.sink("an_existing_scope", "an_existing_streamName", writerSettings)) Source(1 to 100) .map { i => val routingKey = i % 10 s"${routingKey}_event_$i" } .runWith(Pravega.sink("an_existing_scope", "an_existing_streamName", writerSettingsWithRoutingKey))
- Java
-
source
Sink<String, CompletionStage<Done>> sinkWithRouting = Pravega.sink("an_existing_scope", "an_existing_scope", writerSettings); CompletionStage<Done> doneWithRouting = Source.from(Arrays.asList("One", "Two", "Three")).runWith(sinkWithRouting, system);
Reading from Pravega stream
First you need to build a reader group:
- Scala
-
source
Using(Pravega.readerGroupManager("an_existing_scope", readerSettings.clientConfig)) { readerGroupManager => readerGroupManager.createReaderGroup("myGroup", "stream1", "stream2") } - Java
-
source
ReaderGroup readerGroup; try (PravegaReaderGroupManager readerGroupManager = Pravega.readerGroup("an_existing_scope", readerSettings.clientConfig())) { readerGroup = readerGroupManager.createReaderGroup("my_group", "streamName"); }
Then use this reader group to read from a Source:
- Scala
-
source
Pravega .source(readerGroup, readerSettings) .to(Sink.foreach { (event: PravegaEvent[String]) => val message: String = event.message processMessage(message) }) .run() - Java
-
source
CompletionStage<Done> fut = Pravega.<String>source(readerGroup, readerSettings) .to(Sink.foreach(e -> processMessage(e.message()))) .run(system);
It produces a stream of PravegaEvent
PravegaEvent
, a thin wrapper which includes some Pravega metadata along with the message.
Key Value Pair table
Since version 0.8 Pravega exposes Key Value Pair table please note that this API is still experimental.
Similarly a Pravega Table Flow or Sink needs a TableWriterSettings
TableWriterSettings
to operate:
- Scala
-
source
val tableWriterSettings = TableWriterSettingsBuilder[Int, String]() .clientConfigBuilder(_.enableTlsToController(true)) // ClientConfig customization .withMaximumInflightMessages(5) .withKeyExtractor(str => new TableKey(intSerializer.serialize(str.hashCode()))) .build()
- Java
-
source
TableWriterSettings<Integer, String> tableWriterSettings = TableWriterSettingsBuilder.<Integer, String>create( system, intSerializer, new UTF8StringSerializer()) .withKeyExtractor(id -> new TableKey(intSerializer.serialize(id))) .build();
To read from a Pravega Table Flow or Source needs a TableReaderSettings
TableReaderSettings
to operate:
- Scala
-
source
val tableReaderSettings = TableReaderSettingsBuilder[Int, String]() .clientConfigBuilder(_.enableTlsToController(true)) // ClientConfig customization .withMaximumInflightMessages(5) .withMaxEntriesAtOnce(100) .withKeyExtractor(str => new TableKey(intSerializer.serialize(str.hashCode()))) .build()
- Java
-
source
TableReaderSettings<Integer, String> tableReaderSettings = TableReaderSettingsBuilder.<Integer, String>create( system, intSerializer, new UTF8StringSerializer()) .withKeyExtractor(id -> new TableKey(intSerializer.serialize(id))) .build();
Writing to Pravega KVP Table
Pravega message writes are done through a Flow/Sink like:
- Scala
-
source
// Write through a flow Source(1 to 10) .map(id => (id, Person(id, s"name_$id"))) .via(PravegaTable.writeFlow("an_existing_scope", "an_existing_tablename", tablewriterSettings)) .runWith(Sink.ignore) // Write in a sink Source(1 to 10) .map(id => (id, Person(id, s"name_$id"))) .runWith(PravegaTable.sink("an_existing_scope", "an_existing_tablename", tablewriterSettings)) - Java
-
source
final List<Pair<Integer, String>> events = Arrays.asList( new Pair<Integer, String>(1, "One"), new Pair<Integer, String>(2, "Two"), new Pair<Integer, String>(3, "Three"), new Pair<Integer, String>(4, "Four")); Sink<Pair<Integer, String>, CompletionStage<Done>> sink = PravegaTable.sink("an_existing_scope", "an_existing_tableName", tablewriterSettings); CompletionStage<Done> done = Source.from(events).toMat(sink, Keep.right()).run(system);
Reading from Pravega KVP Table
Pravega message reads are from a Source:
- Scala
-
source
val readingDone = PravegaTable .source("an_existing_scope", "an_existing_tablename", tableSettings) .to(Sink.foreach(println)) .run() - Java
-
source
final CompletionStage<Done> pair = PravegaTable.source("an_existing_scope", "an_existing_tableName", tableReaderSettings) .to(Sink.foreach((TableEntry<String> kvp) -> processKVP(kvp))) .run(system);
Or a Flow
- Scala
-
source
val tableSettings: TableSettings[String, Int] = TableReaderSettingsBuilder[String, Int]() .withKeyExtractor(p => new TableKey(stringSerializer.serialize(p))) .build() PravegaTable .readFlow[String, Int]( scope, tableName, tableSettings)