object EventSource
This stream processing stage establishes a continuous source of server-sent events from the given URI.
A single source of server-sent events is obtained from the URI. Once completed, either normally or by failure, a next one is obtained thereby sending a Last-Event-ID header if available. This continues in an endless cycle.
This endless cycle means that if the connection is interrupted, reconnection attempts will continue forever. This can
be problematic if, for example, the connection fails due to an oversized line or event being received from the
server. By default, an oversized SSE line or event will cause pekko-http to fail the stream. If the stream fails,
this connector will establish a new connection and attempt to continue processing using Last-Event-ID. Reconsuming
the oversized payload will fail the stream again, causing an infinite retry loop. This infinite loop can look like
the connector getting stuck at the same point in the stream. Since the opinionated design of this connector is to
retry forever, the connection error will be logged but only at the INFO level. You can use optional pekko-http
configuration settings to define alternate handling of oversized SSE lines and events instead of failing the stream.
See: pekko.http.sse.oversized-line-handling
and oversized-event-handling
.
The shape of this processing stage is a source of server-sent events; to take effect it must be connected and run. Progress (including termination) is controlled by the connected flow or sink, e.g. a retry delay can be implemented by streaming the materialized values of the handler via a throttle.
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +---------------------+ | | trigger | | +----------o----------+ | | | Option[String]| | v | Option[String] +----------o----------+ | +------------------------------->o merge | | | +----------o----------+ | | | | | Option[String]| | | v | +--------o--------+ +----------o----------+ | | lastEventId | | continuousEvents | | +--------o--------+ +----------o----------+ | ^ | | | ServerSentEvent (including delimiters)| | | v | | +----------o----------+ | +--------------------------------o bcast | | ServerSentEvent (incl. delim.) +----------o----------+ | | | ServerSentEvent (including delimiters)| | v | +----------o----------+ | +----------o events | | ServerSentEvent| +---------------------+ | v | - - - - - - - - - - - - - - - - - o - - - - - - - - - - - - - - - - -
- Source
- EventSource.scala
- Alphabetic
- By Inheritance
- EventSource
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- type EventSource = Source[ServerSentEvent, NotUsed]
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def apply(uri: Uri, send: (HttpRequest) => Future[HttpResponse], initialLastEventId: Option[String] = None, retryDelay: FiniteDuration = Duration.Zero)(implicit system: ClassicActorSystemProvider): EventSource
- uri
URI with absolute path, e.g. "http://myserver/events
- send
function to send a HTTP request
- initialLastEventId
initial value for Last-Event-ID header,
None
by default- retryDelay
delay for retrying after completion,
0
by default- system
implicit actor system (classic or new API)
- returns
continuous source of server-sent events
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()