InfluxDB
The Apache Pekko Connectors InfluxDb connector provides Apache Pekko Streams integration for InfluxDB.
For more information about InfluxDB, please visit the InfluxDB Documentation
Project Info: Apache Pekko Connectors InfluxDB | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-influxdb
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.influxdb |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Influxdata, the makers of InfluxDB now offer an Apache Pekko Streams-aware client library in https://github.com/influxdata/influxdb-client-java/tree/master/client-scala¶
“The reference Scala client that allows query and write for the InfluxDB 2.0 by Apache Pekko Streams.”
Apache Pekko Connectors InfluxDB is marked as “API may change”. Please try it out and suggest improvements.
Furthermore, the major InfluxDB update to version 2.0 is expected to bring API and dependency changes to Apache Pekko Connectors InfluxDB.
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-influxdb" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-influxdb_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-influxdb_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Set up InfluxDB client¶
Sources, Flows and Sinks provided by this connector need a prepared org.influxdb.InfluxDB
to access to InfluxDB.
sourceinfluxDB = InfluxDBFactory.connect(INFLUXDB_URL, USERNAME, PASSWORD);
influxDB.setDatabase(DatabaseName);
influxDB.query(new Query("CREATE DATABASE " + DatabaseName, DatabaseName));
sourcefinal InfluxDB influxDB = InfluxDBFactory.connect(INFLUXDB_URL, USERNAME, PASSWORD);
influxDB.setDatabase(databaseName);
influxDB.query(new Query("CREATE DATABASE " + databaseName, databaseName));
return influxDB;
InfluxDB as Source and Sink¶
Now we can stream messages from or to InfluxDB by providing the InfluxDB
to the InfluxDbSource
or the InfluxDbSink
.
source@Measurement(name = "cpu", database = "InfluxDbSpec")
public class InfluxDbSpecCpu extends Cpu {
public InfluxDbSpecCpu() {
}
public InfluxDbSpecCpu(Instant time, String hostname, String region, Double idle, Boolean happydevop, Long uptimeSecs) {
super(time, hostname, region, idle, happydevop, uptimeSecs);
}
public InfluxDbSpecCpu cloneAt(Instant time) {
return new InfluxDbSpecCpu(time, getHostname(), getRegion(), getIdle(), getHappydevop(), getUptimeSecs());
}
}
source@Measurement(name = "cpu", database = "InfluxDbTest")
public class InfluxDbCpu extends Cpu {
public InfluxDbCpu() {}
public InfluxDbCpu(
Instant time,
String hostname,
String region,
Double idle,
Boolean happydevop,
Long uptimeSecs) {
super(time, hostname, region, idle, happydevop, uptimeSecs);
}
public InfluxDbCpu cloneAt(Instant time) {
return new InfluxDbCpu(
time, getHostname(), getRegion(), getIdle(), getHappydevop(), getUptimeSecs());
}
}
With typed source¶
Use InfluxDbSource.typed
and InfluxDbSink.typed
to create source and sink. The data is converted by InfluxDBMapper.
sourceval f1 = InfluxDbSource
.typed(classOf[InfluxDbSpecCpu], InfluxDbReadSettings(), influxDB, query)
.map { (cpu: InfluxDbSpecCpu) =>
{
val clonedCpu = cpu.cloneAt(cpu.getTime.plusSeconds(60000))
List(InfluxDbWriteMessage(clonedCpu))
}
}
.runWith(InfluxDbSink.typed(classOf[InfluxDbSpecCpu]))
sourceCompletionStage<Done> completionStage =
InfluxDbSource.typed(InfluxDbCpu.class, InfluxDbReadSettings.Default(), influxDB, query)
.map(
cpu -> {
InfluxDbCpu clonedCpu = cpu.cloneAt(cpu.getTime().plusSeconds(60000l));
return InfluxDbWriteMessage.create(clonedCpu, NotUsed.notUsed());
})
.groupedWithin(10, Duration.of(50l, ChronoUnit.MILLIS))
.runWith(InfluxDbSink.typed(InfluxDbCpu.class, influxDB), system);
With QueryResult
source¶
Use InfluxDbSource.create
and InfluxDbSink.create
to create source and sink.
sourceval query = new Query("SELECT * FROM cpu", DatabaseName);
val f1 = InfluxDbSource(influxDB, query)
.map(resultToPoints)
.runWith(InfluxDbSink.create())
sourceQuery query = new Query("SELECT * FROM cpu", DATABASE_NAME);
CompletionStage<Done> completionStage =
InfluxDbSource.create(influxDB, query)
.map(queryResult -> points(queryResult))
.mapConcat(i -> i)
.groupedWithin(10, Duration.of(50l, ChronoUnit.MILLIS))
.runWith(InfluxDbSink.create(influxDB), system);
TODO
Writing to InfluxDB¶
You can also build flow stages. InfluxDbFlow
. The API is similar to creating Sinks.
sourceval result = Source(
List(
List(validMessage))).via(InfluxDbFlow.create())
.runWith(Sink.seq)
.futureValue
sourceCompletableFuture<List<List<InfluxDbWriteResult<Point, NotUsed>>>> completableFuture =
Source.single(Collections.singletonList(influxDbWriteMessage))
.via(InfluxDbFlow.create(influxDB))
.runWith(Sink.seq(), system)
.toCompletableFuture();
Passing data through InfluxDbFlow¶
When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to InfluxDB.
source// We're going to pretend we got messages from kafka.
// After we've written them to InfluxDB, we want
// to commit the offset to Kafka
case class KafkaOffset(offset: Int)
case class KafkaMessage(cpu: InfluxDbFlowCpu, offset: KafkaOffset)
val messagesFromKafka = List(
KafkaMessage(new InfluxDbFlowCpu(Instant.now().minusSeconds(1000), "local_1", "eu-west-2", 1.4d, true, 123L),
KafkaOffset(0)),
KafkaMessage(new InfluxDbFlowCpu(Instant.now().minusSeconds(2000), "local_2", "eu-west-1", 2.5d, false, 125L),
KafkaOffset(1)),
KafkaMessage(new InfluxDbFlowCpu(Instant.now().minusSeconds(3000), "local_3", "eu-west-4", 3.1d, false, 251L),
KafkaOffset(2)))
var committedOffsets = List[KafkaOffset]()
def commitToKafka(offset: KafkaOffset): Unit =
committedOffsets = committedOffsets :+ offset
val f1 = Source(messagesFromKafka)
.map { (kafkaMessage: KafkaMessage) =>
val cpu = kafkaMessage.cpu
println("hostname: " + cpu.getHostname)
InfluxDbWriteMessage(cpu).withPassThrough(kafkaMessage.offset)
}
.groupedWithin(10, 50.millis)
.via(
InfluxDbFlow.typedWithPassThrough(classOf[InfluxDbFlowCpu]))
.map { (messages: Seq[InfluxDbWriteResult[InfluxDbFlowCpu, KafkaOffset]]) =>
messages.foreach { message =>
commitToKafka(message.writeMessage.passThrough)
}
}
.runWith(Sink.ignore)
source// We're going to pretend we got metrics from kafka.
// After we've written them to InfluxDb, we want
// to commit the offset to Kafka
/** Just clean the previous data */
influxDB.query(new Query("DELETE FROM cpu"));
List<Integer> committedOffsets = new ArrayList<>();
List<MessageFromKafka> messageFromKafka =
Arrays.asList(
new MessageFromKafka(
new InfluxDbCpu(
Instant.now().minusSeconds(1000), "local_1", "eu-west-2", 1.4d, true, 123L),
new KafkaOffset(0)),
new MessageFromKafka(
new InfluxDbCpu(
Instant.now().minusSeconds(2000), "local_2", "eu-west-1", 2.5d, false, 125L),
new KafkaOffset(1)),
new MessageFromKafka(
new InfluxDbCpu(
Instant.now().minusSeconds(3000), "local_3", "eu-west-4", 3.1d, false, 251L),
new KafkaOffset(2)));
Consumer<KafkaOffset> commitToKafka =
kafkaOffset -> committedOffsets.add(kafkaOffset.getOffset());
Source.from(messageFromKafka)
.map(
kafkaMessage -> {
return InfluxDbWriteMessage.create(
kafkaMessage.influxDbCpu, kafkaMessage.kafkaOffset);
})
.groupedWithin(10, Duration.ofMillis(10))
.via(InfluxDbFlow.typedWithPassThrough(InfluxDbCpu.class, influxDB))
.map(
messages -> {
messages.stream()
.forEach(
message -> {
KafkaOffset kafkaOffset = message.writeMessage().passThrough();
commitToKafka.accept(kafkaOffset);
});
return NotUsed.getInstance();
})
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(10, TimeUnit.SECONDS);