Server-Sent Events Support
Server-Sent Events (SSE) is a lightweight and standardized protocol for pushing notifications from an HTTP server to a client. In contrast to WebSocket, which offers bi-directional communication, SSE only allows for one-way communication from the server to the client. If that’s all you need, SSE has the advantages to be much simpler, to rely on HTTP only and to offer retry semantics on broken connections by the browser.
According to the SSE specification clients can request an event stream from the server via HTTP. The server responds with the media type text/event-stream
which has the fixed character encoding UTF-8 and keeps the response open to send events to the client when available. Events are textual structures which carry fields and are terminated by an empty line, e.g.
data: { "username": "John Doe" }
event: added
id: 42
data: another event
Clients can optionally signal the last seen event to the server via the Last-Event-ID
LastEventId
header, e.g. after a reconnect.
Model
Apache Pekko HTTP represents event streams as Source<ServerSentEvent, ?>
Source[ServerSentEvent, _]
where ServerSentEvent
ServerSentEvent
is a case class with the following read-only properties:
data: String
String data
– the actual payload, may span multiple lineseventType: Option[String]
Optional<String> type
– optional qualifier, e.g. “added”, “removed”, etc.id: Option[String]
Optional<String> id
– optional identifierretry: Option[Int]
OptionalInt retry
– optional reconnection delay in milliseconds
In accordance to the SSE specification Apache Pekko HTTP also provides the Last-Event-ID
LastEventId
header and the text/event-stream
TEXT_EVENT_STREAM
media type.
Server-side usage: marshalling
In order to respond to an HTTP request with an event stream, you have to bring the implicit ToResponseMarshaller[Source[ServerSentEvent, \_]]
defined by EventStreamMarshalling
EventStreamMarshalling
into the scope defining the respective routeuse the EventStreamMarshalling.toEventStream
marshaller:
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source import pekko.http.scaladsl.Http import pekko.http.scaladsl.unmarshalling.Unmarshal import pekko.http.scaladsl.model.sse.ServerSentEvent import scala.concurrent.duration._ import java.time.LocalTime import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME def route: Route = { import pekko.http.scaladsl.marshalling.sse.EventStreamMarshalling._ path("events") { get { complete { Source .tick(2.seconds, 2.seconds, NotUsed) .map(_ => LocalTime.now()) .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time))) .keepAlive(1.second, () => ServerSentEvent.heartbeat) } } } }
- Java
-
source
final List<ServerSentEvent> events = new ArrayList<>(); events.add(ServerSentEvent.create("1")); events.add(ServerSentEvent.create("2")); final Route route = completeOK(Source.from(events), EventStreamMarshalling.toEventStream());
Client-side usage: unmarshalling
In order to unmarshal an event stream as Source<ServerSentEvent, ?>
Source[ServerSentEvent, _]
, you have to bring the implicit FromEntityUnmarshaller[Source[ServerSentEvent, _]]
defined by EventStreamUnmarshalling
EventStreamUnmarshalling
into scopeuse the EventStreamUnmarshalling.fromEventsStream
unmarshaller:
- Scala
-
source
import org.apache.pekko.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._ Http() .singleRequest(Get("http://localhost:8000/events")) .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) .foreach(_.runForeach(println))
- Java
-
source
List<ServerSentEvent> unmarshalledEvents = EventStreamUnmarshalling.fromEventsStream(system) .unmarshal(entity, system) .thenCompose(source -> source.runWith(Sink.seq(), system)) .toCompletableFuture() .get(3000, TimeUnit.SECONDS);
Configuration
Apache Pekko HTTP provides several configuration options for Server-Sent Events handling:
Message Size Limits
The SSE client parser has configurable limits to handle various message sizes:
pekko.http.sse {
# The maximum size for parsing server-sent events. Set to 0 to disable limit entirely (unlimited).
max-event-size = 8192
# The maximum size for parsing lines of a server-sent event. Set to 0 to disable limit entirely (unlimited).
max-line-size = 4096
}
Oversized Message Handling
Apache Pekko HTTP uses a two-stage parsing process for SSE streams, and oversized content can be handled at either stage:
- Line-level parsing: Individual lines are checked against
max-line-size
- Event-level parsing: Complete events are limited to
max-event-size
When SSE content exceeds the configured limits, Apache Pekko HTTP provides four handling strategies that can be configured separately for lines and events:
- fail-stream (default): Fails the stream with an error message
- log-and-skip: Logs a warning and skips the oversized content, continuing stream processing
- truncate: Logs an info message and handles oversized content appropriately, continuing processing
- dead-letter: Sends the oversized content to the dead letter queue, continuing processing
Warning about truncate strategy: For event-level truncation, the strategy drops entire lines that would exceed event size limits rather than truncating field values. This can change event semantics in unexpected ways when non-data fields (like id:
or event:
) are dropped. For predictable behavior, ensure that id:
and event:
fields appear before data:
fields in your SSE events, or consider using log-and-skip
or dead-letter
strategies instead.
pekko.http.sse {
# How to handle lines that exceed max-line-size limit
# Options:
# "fail-stream" - Fail the stream with an error message (default)
# "log-and-skip" - Log a warning and skip the oversized line, continuing stream processing
# "truncate" - Log an info message and truncate the line to max-line-size, continuing processing
# "dead-letter" - Send oversized line to the dead letter queue, continuing processing
oversized-line-handling = "fail-stream"
# How to handle events that exceed max-event-size limit
# Options:
# "fail-stream" - Fail the stream with an error message (default)
# "log-and-skip" - Log a warning and skip the oversized event, continuing stream processing
# "truncate" - Log an info message and drop lines that would exceed max-event-size, continuing processing
# "dead-letter" - Send oversized event to the dead letter queue, continuing processing
oversized-event-handling = "fail-stream"
}
Line vs Event Handling Examples
Line-level and event-level size limits are imposed separately and their behavior is different: - Lines are parsed one line at a time. The limits and handling strategy are applied per line. Line length limits include the SSE field names (id:
, data:
, event:
, etc.). - Events are built from successive data:
lines. As each line is added to the built event, the event size limit is used to short-circuit processing of the current and/or subsequent lines. This limit is generally meant to help prevent runaway memory usage causing an application crash from a single (possible erroneous) message from the server.
Since line and event strategies can be configured independently, you can have different behaviors for each level. For example:
pekko.http.sse {
oversized-line-handling = "truncate" # Truncate oversized lines
oversized-event-handling = "log-and-skip" # Skip oversized events entirely
}
Example 1: Oversized Line in Multi-line Event
Consider this SSE event with max-line-size = 50
:
data: This is a normal line
data: This line is much too long and exceeds the configured max-line-size limit by a lot
data: Another normal line
With log-and-skip strategy: - The oversized line gets skipped - The resulting event contains only:
data: This is a normal line
data: Another normal line
With truncate strategy: - The oversized line gets truncated to 50 characters - The resulting event contains:
data: This is a normal line
data: This line is much too long and exceeds the c
data: Another normal line
With dead-letter strategy: - The oversized line is sent to the dead letter queue as OversizedSseLine(line: String)
- The resulting event contains only:
data: This is a normal line
data: Another normal line
Example 2: Event Exceeds max-event-size
If the complete event (after line processing) exceeds max-event-size
, the strategy applies to the entire event:
data: Line 1
data: Line 2
data: Line 3
[... many more lines causing total event size > max-event-size]
With log-and-skip: The entire event is logged and skipped With truncate: The event is truncated by dropping the final line(s) that would exceed the limit, and the remaining event is emitted With dead-letter: The oversized event is sent to dead letters as OversizedSseEvent(event: ServerSentEvent)
and skipped from the stream
Example 3: Event Size Exceeded During Line Parsing (Memory Protection)
If while parsing individual lines, the accumulated event size would exceed max-event-size
, the parser stops processing additional lines for that event to prevent memory exhaustion:
data: Line 1 (fits)
data: Line 2 (fits)
data: Line 3 (would exceed max-event-size)
data: Line 4 (fits line limit)
With log-and-skip: - The event with Line 3 is built and logged as oversized - Lines 3 and 4 are both skipped until the empty line - No event is emitted to the stream
With truncate: - The event with lines 1 and 2 (without Line 3) is emitted - Lines 3 and 4 are both skipped until the empty line
With dead-letter: - The event with Line 3 is built and sent to dead letters as OversizedSseEvent(event: ServerSentEvent)
- Lines 3 and 4 are both skipped until the empty line - No event is emitted to the stream
This behavior prevents the application from running out of memory when processing very large events, as the parser immediately stops accumulating data once the size limit would be exceeded.
Processing Order
- First: Individual lines are processed against
max-line-size
- Then: Each line is added to the event builder, checking against
max-event-size
during accumulation
This means an event can have individual lines handled by the line-level strategy, then be subject to event-level handling during building as lines are accumulated.
For applications that need to handle very large messages (like blockchain data or detailed JSON payloads), consider setting max-line-size = 0
and max-event-size = 0
to disable limits entirely, or use one of the non-failing handling modes.
Notice that if you are looking for a resilient way to permanently subscribe to an event stream, Apache Pekko Connectors provides the EventSource connector which reconnects automatically with the id of the last seen event.