Server-sent Events (SSE)
The SSE connector provides a continuous source of server-sent events recovering from connection failure.
| Project Info: Apache Pekko Connectors Server-sent events (SSE) | |
|---|---|
| Artifact | org.apache.pekko
pekko-connectors-sse
1.2.0+3-e195cec2-SNAPSHOT
|
| JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
| Scala versions | 2.13.17, 2.12.20, 3.3.6 |
| JPMS module name | pekko.stream.connectors.sse |
| License | |
| API documentation | |
| Forums | |
| Release notes | GitHub releases |
| Issues | Github issues |
| Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.1.5" val PekkoHttpVersion = "1.1.0" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-sse" % "1.2.0+3-e195cec2-SNAPSHOT", "org.apache.pekko" %% "pekko-stream" % PekkoVersion, "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion )- Maven
<properties> <pekko.version>1.1.5</pekko.version> <pekko.http.version>1.1.0</pekko.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-sse_${scala.binary.version}</artifactId> <version>1.2.0+3-e195cec2-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-http_${scala.binary.version}</artifactId> <version>${pekko.http.version}</version> </dependency> </dependencies>- Gradle
def versions = [ PekkoVersion: "1.1.5", PekkoHttpVersion: "1.1.0", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-sse_${versions.ScalaBinary}:1.2.0+3-e195cec2-SNAPSHOT" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Usage
Define an EventSource by giving a URI, a function for sending HTTP requests, and an optional initial value for Last-Event-ID header:
- Scala
-
source
import org.apache.pekko import pekko.http.scaladsl.Http import pekko.http.scaladsl.model.sse.ServerSentEvent import pekko.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse, Uri } import pekko.stream.connectors.sse.scaladsl.EventSource import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec val send: HttpRequest => Future[HttpResponse] = Http().singleRequest(_) val eventSource: Source[ServerSentEvent, NotUsed] = EventSource( uri = Uri(s"http://$host:$port"), send, initialLastEventId = Some("2"), retryDelay = 1.second) - Java
-
source
import java.util.function.Function; import java.util.concurrent.CompletionStage; import org.apache.pekko.http.javadsl.Http; import org.apache.pekko.http.javadsl.model.*; import org.apache.pekko.http.javadsl.model.sse.ServerSentEvent; import org.apache.pekko.stream.connectors.sse.javadsl.EventSource; final Http http = Http.get(system); Function<HttpRequest, CompletionStage<HttpResponse>> send = (request) -> http.singleRequest(request); final Uri targetUri = Uri.create(String.format("http://%s:%d", host, port)); final Optional<String> lastEventId = Optional.of("2"); Source<ServerSentEvent, NotUsed> eventSource = EventSource.create(targetUri, send, lastEventId, system);
Then happily consume ServerSentEvents:
- Scala
-
source
val events: Future[immutable.Seq[ServerSentEvent]] = eventSource .throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping) .take(nrOfSamples) .runWith(Sink.seq) - Java
-
source
int elements = 1; Duration per = Duration.ofMillis(500); int maximumBurst = 1; eventSource .throttle(elements, per, maximumBurst, ThrottleMode.shaping()) .take(nrOfSamples) .runWith(Sink.seq(), system);
1.2.0+3-e195cec2*