Configuration

source# SPDX-License-Identifier: Apache-2.0

######################################
# Pekko Stream Reference Config File #
######################################

# eager creation of the system wide materializer
pekko.library-extensions += "org.apache.pekko.stream.SystemMaterializer$"
pekko {
  stream {

    # Default materializer settings
    materializer {

      # Initial size of buffers used in stream elements
      initial-input-buffer-size = 4
      # Maximum size of buffers used in stream elements
      max-input-buffer-size = 16

      # Fully qualified config path which holds the dispatcher configuration
      # or full dispatcher configuration to be used by ActorMaterializer when creating Actors.
      dispatcher = "pekko.actor.default-dispatcher"

      # Fully qualified config path which holds the dispatcher configuration
      # or full dispatcher configuration to be used by stream operators that
      # perform blocking operations
      blocking-io-dispatcher = "pekko.actor.default-blocking-io-dispatcher"

      # Cleanup leaked publishers and subscribers when they are not used within a given
      # deadline
      subscription-timeout {
        # when the subscription timeout is reached one of the following strategies on
        # the "stale" publisher:
        # cancel - cancel it (via `onError` or subscribing to the publisher and
        #          `cancel()`ing the subscription right away
        # warn   - log a warning statement about the stale element (then drop the
        #          reference to it)
        # noop   - do nothing (not recommended)
        mode = cancel

        # time after which a subscriber / publisher is considered stale and eligible
        # for cancelation (see `pekko.stream.subscription-timeout.mode`)
        timeout = 5s
      }

      # Enable additional troubleshooting logging at DEBUG log level
      debug-logging = off

      # Maximum number of elements emitted in batch if downstream signals large demand
      output-burst-limit = 1000

      # Enable automatic fusing of all graphs that are run. For short-lived streams
      # this may cause an initial runtime overhead, but most of the time fusing is
      # desirable since it reduces the number of Actors that are created.
      # Deprecated, since Akka 2.5.0, setting does not have any effect.
      auto-fusing = on

      # Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered,
      # buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed
      # buffer upon stream materialization if the requested buffer size is less than this
      # configuration parameter. The default is very high because failing early is better
      # than failing under load.
      #
      # Buffers sized larger than this will dynamically grow/shrink and consume more memory
      # per element than the fixed size buffers.
      max-fixed-buffer-size = 1000000000

      # Maximum number of sync messages that actor can process for stream to substream communication.
      # Parameter allows to interrupt synchronous processing to get upstream/downstream messages.
      # Allows to accelerate message processing that happening within same actor but keep system responsive.
      sync-processing-limit = 1000

      debug {
        # Enables the fuzzing mode which increases the chance of race conditions
        # by aggressively reordering events and making certain operations more
        # concurrent than usual.
        # This setting is for testing purposes, NEVER enable this in a production
        # environment!
        # To get the best results, try combining this setting with a throughput
        # of 1 on the corresponding dispatchers.
        fuzzing-mode = off
      }

      io.tcp {
        # The outgoing bytes are accumulated in a buffer while waiting for acknowledgment
        # of pending write. This improves throughput for small messages (frames) without
        # sacrificing latency. While waiting for the ack the stage will eagerly pull
        # from upstream until the buffer exceeds this size. That means that the buffer may hold
        # slightly more bytes than this limit (at most one element more). It can be set to 0
        # to disable the usage of the buffer.
        write-buffer-size = 16 KiB

        # In addition to the buffering described for property write-buffer-size, try to collect
        # more consecutive writes from the upstream stream producers.
        #
        # The rationale is to increase write efficiency by avoiding separate small 
        # writes to the network which is expensive to do. Merging those writes together
        # (up to `write-buffer-size`) improves throughput for small writes.
        #
        # The idea is that a running stream may produce multiple small writes consecutively
        # in one go without waiting for any external input. To probe the stream for
        # data, this features delays sending a write immediately by probing the stream
        # for more writes. This works by rescheduling the TCP connection stage via the
        # actor mailbox of the underlying actor. Thus, before the stage is reactivated
        # the upstream gets another opportunity to emit writes.
        #
        # When the stage is reactivated and if new writes are detected another round-trip
        # is scheduled. The loop repeats until either the number of round trips given in this
        # setting is reached, the buffer reaches `write-buffer-size`, or no new writes
        # were detected during the last round-trip.
        #
        # This mechanism ensures that a write is guaranteed to be sent when the remaining stream
        # becomes idle waiting for external signals.
        #
        # In most cases, the extra latency this mechanism introduces should be negligible,
        # but depending on the stream setup it may introduce a noticeable delay,
        # if the upstream continuously produces small amounts of writes in a
        # blocking (CPU-bound) way.
        #
        # In that case, the feature can either be disabled, or the producing CPU-bound
        # work can be taken off-stream to avoid excessive delays (e.g. using `mapAsync` instead of `map`).
        #
        # A value of 0 disables this feature.
        coalesce-writes = 10
      }

      # Time to wait for async materializer creation before throwing an exception
      creation-timeout = 20 seconds

      //#stream-ref
      # configure defaults for SourceRef and SinkRef
      stream-ref {
        # Buffer of a SinkRef that is used to batch Request elements from the other side of the stream ref
        #
        # The buffer will be attempted to be filled eagerly even while the local stage did not request elements,
        # because the delay of requesting over network boundaries is much higher.
        buffer-capacity = 32

        # Demand is signalled by sending a cumulative demand message ("requesting messages until the n-th sequence number)
        # Using a cumulative demand model allows us to re-deliver the demand message in case of message loss (which should
        # be very rare in any case, yet possible -- mostly under connection break-down and re-establishment).
        #
        # The semantics of handling and updating the demand however are in-line with what Reactive Streams dictates.
        #
        # In normal operation, demand is signalled in response to arriving elements, however if no new elements arrive
        # within `demand-redelivery-interval` a re-delivery of the demand will be triggered, assuming that it may have gotten lost.
        demand-redelivery-interval = 1 second

        # Subscription timeout, during which the "remote side" MUST subscribe (materialize) the handed out stream ref.
        # This timeout does not have to be very low in normal situations, since the remote side may also need to
        # prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking
        # in-active streams which are never subscribed to.
        subscription-timeout = 30 seconds

        # In order to guard the receiving end of a stream ref from never terminating (since awaiting a Completion or Failed
        # message) after / before a Terminated is seen, a special timeout is applied once Terminated is received by it.
        # This allows us to terminate stream refs that have been targeted to other nodes which are Downed, and as such the
        # other side of the stream ref would never send the "final" terminal message.
        #
        # The timeout specifically means the time between the Terminated signal being received and when the local SourceRef
        # determines to fail itself, assuming there was message loss or a complete partition of the completion signal.
        final-termination-signal-deadline = 2 seconds
      }
      //#stream-ref
    }

    # Deprecated, left here to not break Pekko HTTP which refers to it
    blocking-io-dispatcher = "pekko.actor.default-blocking-io-dispatcher"

    # Deprecated, will not be used unless user code refer to it, use 'pekko.stream.materializer.blocking-io-dispatcher'
    # instead, or if from code, prefer the 'ActorAttributes.IODispatcher' attribute
    default-blocking-io-dispatcher = "pekko.actor.default-blocking-io-dispatcher"
  }

  # configure overrides to ssl-configuration here (to be used by pekko-streams, and pekko-http – i.e. when serving https connections)
  ssl-config {
    protocol = "TLSv1.2"
  }

  actor {

    serializers {
      pekko-stream-ref = "org.apache.pekko.stream.serialization.StreamRefSerializer"
    }

    serialization-bindings {
      "org.apache.pekko.stream.SinkRef"                           = pekko-stream-ref
      "org.apache.pekko.stream.SourceRef"                         = pekko-stream-ref
      "org.apache.pekko.stream.impl.streamref.StreamRefsProtocol" = pekko-stream-ref
    }

    serialization-identifiers {
      "org.apache.pekko.stream.serialization.StreamRefSerializer" = 30
    }
  }
}

# ssl configuration
# folded in from former ssl-config-pekko module
ssl-config {
  logger = "com.typesafe.sslconfig.pekko.util.PekkoLoggerBridge"
}