Server-sent Events (SSE)
The SSE connector provides a continuous source of server-sent events recovering from connection failure.
Project Info: Apache Pekko Connectors Server-sent events (SSE) | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-sse
1.0.2
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.14, 2.12.20, 3.3.3 |
JPMS module name | pekko.stream.connectors.sse |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.0.3" val PekkoHttpVersion = "1.0.1" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-sse" % "1.0.2", "org.apache.pekko" %% "pekko-stream" % PekkoVersion, "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion )
- Maven
<properties> <pekko.version>1.0.3</pekko.version> <pekko.http.version>1.0.1</pekko.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-sse_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-http_${scala.binary.version}</artifactId> <version>${pekko.http.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.0.3", PekkoHttpVersion: "1.0.1", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-sse_${versions.ScalaBinary}:1.0.2" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Usage
Define an EventSource
by giving a URI, a function for sending HTTP requests, and an optional initial value for Last-Event-ID header:
- Scala
-
source
import org.apache.pekko import pekko.http.scaladsl.Http import pekko.http.scaladsl.model.sse.ServerSentEvent import pekko.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse, Uri } import pekko.stream.connectors.sse.scaladsl.EventSource import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec val send: HttpRequest => Future[HttpResponse] = Http().singleRequest(_) val eventSource: Source[ServerSentEvent, NotUsed] = EventSource( uri = Uri(s"http://$host:$port"), send, initialLastEventId = Some("2"), retryDelay = 1.second)
- Java
-
source
import java.util.function.Function; import java.util.concurrent.CompletionStage; import org.apache.pekko.http.javadsl.Http; import org.apache.pekko.http.javadsl.model.*; import org.apache.pekko.http.javadsl.model.sse.ServerSentEvent; import org.apache.pekko.stream.connectors.sse.javadsl.EventSource; final Http http = Http.get(system); Function<HttpRequest, CompletionStage<HttpResponse>> send = (request) -> http.singleRequest(request); final Uri targetUri = Uri.create(String.format("http://%s:%d", host, port)); final Optional<String> lastEventId = Optional.of("2"); Source<ServerSentEvent, NotUsed> eventSource = EventSource.create(targetUri, send, lastEventId, system);
Then happily consume ServerSentEvent
s:
- Scala
-
source
val events: Future[immutable.Seq[ServerSentEvent]] = eventSource .throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping) .take(nrOfSamples) .runWith(Sink.seq)
- Java
-
source
int elements = 1; Duration per = Duration.ofMillis(500); int maximumBurst = 1; eventSource .throttle(elements, per, maximumBurst, ThrottleMode.shaping()) .take(nrOfSamples) .runWith(Sink.seq(), system);
1.0.2