Server-sent Events (SSE)

The SSE connector provides a continuous source of server-sent events recovering from connection failure.

Project Info: Apache Pekko Connectors Server-sent events (SSE)
Artifact
org.apache.pekko
pekko-connectors-sse
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.13, 2.12.19, 3.3.3
JPMS module namepekko.stream.connectors.sse
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.2"
val PekkoHttpVersion = "1.0.0"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-sse" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
  "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion
)
Maven
<properties>
  <pekko.version>1.0.2</pekko.version>
  <pekko.http.version>1.0.0</pekko.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-sse_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http_${scala.binary.version}</artifactId>
    <version>${pekko.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.2",
  PekkoHttpVersion: "1.0.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-sse_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
  implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Usage

Define an EventSource by giving a URI, a function for sending HTTP requests, and an optional initial value for Last-Event-ID header:

Scala
sourceimport org.apache.pekko
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model.sse.ServerSentEvent
import pekko.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse, Uri }
import pekko.stream.connectors.sse.scaladsl.EventSource
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec

val send: HttpRequest => Future[HttpResponse] = Http().singleRequest(_)

val eventSource: Source[ServerSentEvent, NotUsed] =
  EventSource(
    uri = Uri(s"http://$host:$port"),
    send,
    initialLastEventId = Some("2"),
    retryDelay = 1.second)
Java
sourceimport java.util.function.Function;
import java.util.concurrent.CompletionStage;

import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.model.*;
import org.apache.pekko.http.javadsl.model.sse.ServerSentEvent;
import org.apache.pekko.stream.connectors.sse.javadsl.EventSource;

final Http http = Http.get(system);
Function<HttpRequest, CompletionStage<HttpResponse>> send =
    (request) -> http.singleRequest(request);

final Uri targetUri = Uri.create(String.format("http://%s:%d", host, port));
final Optional<String> lastEventId = Optional.of("2");
Source<ServerSentEvent, NotUsed> eventSource =
    EventSource.create(targetUri, send, lastEventId, system);

Then happily consume ServerSentEvents:

Scala
sourceval events: Future[immutable.Seq[ServerSentEvent]] =
  eventSource
    .throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
    .take(nrOfSamples)
    .runWith(Sink.seq)
Java
sourceint elements = 1;
Duration per = Duration.ofMillis(500);
int maximumBurst = 1;

eventSource
    .throttle(elements, per, maximumBurst, ThrottleMode.shaping())
    .take(nrOfSamples)
    .runWith(Sink.seq(), system);