Server-Sent Events Support
Server-Sent Events (SSE) is a lightweight and standardized protocol for pushing notifications from an HTTP server to a client. In contrast to WebSocket, which offers bi-directional communication, SSE only allows for one-way communication from the server to the client. If that’s all you need, SSE has the advantages to be much simpler, to rely on HTTP only and to offer retry semantics on broken connections by the browser.
According to the SSE specification clients can request an event stream from the server via HTTP. The server responds with the media type text/event-stream
which has the fixed character encoding UTF-8 and keeps the response open to send events to the client when available. Events are textual structures which carry fields and are terminated by an empty line, e.g.
data: { "username": "John Doe" }
event: added
id: 42
data: another event
Clients can optionally signal the last seen event to the server via the Last-Event-ID
LastEventId
header, e.g. after a reconnect.
Model
Apache Pekko HTTP represents event streams as Source<ServerSentEvent, ?>
Source[ServerSentEvent, _]
where ServerSentEvent
ServerSentEvent
is a case class with the following read-only properties:
data: String
String data
– the actual payload, may span multiple lineseventType: Option[String]
Optional<String> type
– optional qualifier, e.g. “added”, “removed”, etc.id: Option[String]
Optional<String> id
– optional identifierretry: Option[Int]
OptionalInt retry
– optional reconnection delay in milliseconds
In accordance to the SSE specification Apache Pekko HTTP also provides the Last-Event-ID
LastEventId
header and the text/event-stream
TEXT_EVENT_STREAM
media type.
Server-side usage: marshalling
In order to respond to an HTTP request with an event stream, you have to bring the implicit ToResponseMarshaller[Source[ServerSentEvent, \_]]
defined by EventStreamMarshalling
EventStreamMarshalling
into the scope defining the respective routeuse the EventStreamMarshalling.toEventStream
marshaller:
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source import pekko.http.scaladsl.Http import pekko.http.scaladsl.unmarshalling.Unmarshal import pekko.http.scaladsl.model.sse.ServerSentEvent import scala.concurrent.duration._ import java.time.LocalTime import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME def route: Route = { import pekko.http.scaladsl.marshalling.sse.EventStreamMarshalling._ path("events") { get { complete { Source .tick(2.seconds, 2.seconds, NotUsed) .map(_ => LocalTime.now()) .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time))) .keepAlive(1.second, () => ServerSentEvent.heartbeat) } } } }
- Java
-
source
final List<ServerSentEvent> events = new ArrayList<>(); events.add(ServerSentEvent.create("1")); events.add(ServerSentEvent.create("2")); final Route route = completeOK(Source.from(events), EventStreamMarshalling.toEventStream());
Client-side usage: unmarshalling
In order to unmarshal an event stream as Source<ServerSentEvent, ?>
Source[ServerSentEvent, _]
, you have to bring the implicit FromEntityUnmarshaller[Source[ServerSentEvent, _]]
defined by EventStreamUnmarshalling
EventStreamUnmarshalling
into scopeuse the EventStreamUnmarshalling.fromEventsStream
unmarshaller:
- Scala
-
source
import org.apache.pekko.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._ Http() .singleRequest(Get("http://localhost:8000/events")) .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) .foreach(_.runForeach(println))
- Java
-
source
List<ServerSentEvent> unmarshalledEvents = EventStreamUnmarshalling.fromEventsStream(system) .unmarshal(entity, system) .thenCompose(source -> source.runWith(Sink.seq(), system)) .toCompletableFuture() .get(3000, TimeUnit.SECONDS);
Notice that if you are looking for a resilient way to permanently subscribe to an event stream, Apache Pekko Connectors provides the EventSource connector which reconnects automatically with the id of the last seen event.