Server-sent Events (SSE)
The SSE connector provides a continuous source of server-sent events recovering from connection failure.
Project Info: Apache Pekko Connectors Server-sent events (SSE) | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-sse
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.sse |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
val PekkoHttpVersion = "1.1.0"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-sse" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion,
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<pekko.http.version>1.1.0</pekko.http.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-sse_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-http_${scala.binary.version}</artifactId>
<version>${pekko.http.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
PekkoHttpVersion: "1.1.0",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-sse_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Usage¶
Define an EventSource
by giving a URI, a function for sending HTTP requests, and an optional initial value for Last-Event-ID header:
sourceimport org.apache.pekko
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model.sse.ServerSentEvent
import pekko.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse, Uri }
import pekko.stream.connectors.sse.scaladsl.EventSource
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
val send: HttpRequest => Future[HttpResponse] = Http().singleRequest(_)
val eventSource: Source[ServerSentEvent, NotUsed] =
EventSource(
uri = Uri(s"http://$host:$port"),
send,
initialLastEventId = Some("2"),
retryDelay = 1.second)
sourceimport java.util.function.Function;
import java.util.concurrent.CompletionStage;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.model.*;
import org.apache.pekko.http.javadsl.model.sse.ServerSentEvent;
import org.apache.pekko.stream.connectors.sse.javadsl.EventSource;
final Http http = Http.get(system);
Function<HttpRequest, CompletionStage<HttpResponse>> send =
(request) -> http.singleRequest(request);
final Uri targetUri = Uri.create(String.format("http://%s:%d", host, port));
final Optional<String> lastEventId = Optional.of("2");
Source<ServerSentEvent, NotUsed> eventSource =
EventSource.create(targetUri, send, lastEventId, system);
Then happily consume ServerSentEvent
s:
sourceval events: Future[immutable.Seq[ServerSentEvent]] =
eventSource
.throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
.take(nrOfSamples)
.runWith(Sink.seq)
sourceint elements = 1;
Duration per = Duration.ofMillis(500);
int maximumBurst = 1;
eventSource
.throttle(elements, per, maximumBurst, ThrottleMode.shaping())
.take(nrOfSamples)
.runWith(Sink.seq(), system);
1.1.0