Configuration

Connection configuration

Shared configuration for the connection pool is located under pekko.persistence.r2dbc.connection-factory. You have to set at least:

Postgres
sourcepekko.persistence.r2dbc {
  dialect = "postgres"
  connection-factory {
    driver = "postgres"
    host = "localhost"
    host = ${?DB_HOST}
    database = "postgres"
    database = ${?DB_NAME}
    user = "postgres"
    user = ${?DB_USER}
    password = "postgres"
    password = ${?DB_PASSWORD}

    # ssl {
    #   enabled = on
    #   mode = "VERIFY_CA"
    #   root-cert = "/path/db_root.crt"
    # }
  }
}
Yugabyte
sourcepekko.persistence.r2dbc {
  dialect = "yugabyte"
  connection-factory {
    driver = "postgres"
    host = "localhost"
    host = ${?DB_HOST}
    port = 5433
    database = "yugabyte"
    database = ${?DB_NAME}
    user = "yugabyte"
    user = ${?DB_USER}
    password = "yugabyte"
    password = ${?DB_PASSWORD}

    # ssl {
    #   enabled = on
    #   mode = "VERIFY_CA"
    #   root-cert = "/path/db_root.crt"
    # }
  }
}
MySQL
sourcepekko.persistence.r2dbc {
  dialect = "mysql"
  connection-factory {
    driver = "mysql"
    host = "localhost"
    host = ${?DB_HOST}
    port = 3306
    database = "mysql"
    database = ${?DB_NAME}
    user = "root"
    user = ${?DB_USER}
    password = "root"
    password = ${?DB_PASSWORD}

    db-timestamp-monotonic-increasing = on
    use-app-timestamp = on

    # ssl {
    #   enabled = on
    #   mode = "VERIFY_CA"
    #   root-cert = "/path/db_root.crt"
    # }
  }
}

Reference configuration

The following configuration can be overridden in your application.conf:

sourcepekko.persistence.r2dbc {

  # postgres, yugabyte or mysql
  dialect = "postgres"

  # set this to your database schema if applicable, empty by default
  schema = ""

  connection-factory {
    driver = "postgres"

    # the connection can be configured with a url, eg: "r2dbc:postgresql://<host>:5432/<database>"
    url = ""

    # The connection options to be used. Ignored if 'url' is non-empty
    host = "localhost"
    port = 5432
    database = "postgres"
    user = "postgres"
    password = "postgres"

    ssl {
      enabled = off
      # See PostgresqlConnectionFactoryProvider.SSL_MODE
      # Possible values:
      #  allow - encryption if the server insists on it
      #  prefer - encryption if the server supports it
      #  require - encryption enabled and required, but trust network to connect to the right server
      #  verify-ca - encryption enabled and required, and verify server certificate
      #  verify-full - encryption enabled and required, and verify server certificate and hostname
      #  tunnel - use a SSL tunnel instead of following Postgres SSL handshake protocol
      mode = ""

      # Can point to either a resource within the classpath or a file.
      root-cert = ""
    }

    # Initial pool size.
    initial-size = 5
    # Maximum pool size.
    max-size = 20
    # Maximum time to create a new connection.
    connect-timeout = 3 seconds
    # Maximum time to acquire connection from pool.
    acquire-timeout = 5 seconds
    # Number of retries if the connection acquisition attempt fails.
    # In the case the database server was restarted all connections in the pool will
    # be invalid. To recover from that without failed acquire you can use the same number
    # of retries as max-size of the pool
    acquire-retry = 1

    # Maximum idle time of the connection in the pool.
    # Background eviction interval of idle connections is derived from this property
    # and max-life-time.
    max-idle-time = 30 minutes

    # Maximum lifetime of the connection in the pool.
    # Background eviction interval of connections is derived from this property
    # and max-idle-time.
    max-life-time = 60 minutes

    # Configures the statement cache size.
    # 0 means no cache, negative values will select an unbounded cache
    # a positive value will configure a bounded cache with the passed size.
    statement-cache-size = 5000

    # Validate the connection when acquired with this SQL.
    # Enabling this has some performance overhead.
    # A fast query for Postgres is "SELECT 1"
    validation-query = ""

    # FQCN of a ConnectionFactoryOptionsCustomizer. If non-empty, it must be the fully
    # qualified class name of a class implementing the trait ConnectionFactoryOptionsCustomizer.
    # The class must have a constructor with a single parameter of type ActorSystem[_].
    # If this setting is empty, the default no-op customizer will be used.
    connection-factory-options-customizer = ""
  }

  # Fully qualified config path which holds the connection factory configuration.
  # Connection factories are initialized using the config at this path and are identified by the value of this path.
  # All persistence plugins use the same value by default, which allows sharing of single connection factory between all of the plugins.
  use-connection-factory = "pekko.persistence.r2dbc.connection-factory"

  # If database timestamp is guaranteed to not move backwards for two subsequent
  # updates of the same persistenceId there might be a performance gain to
  # set this to `on`. Note that many databases use the system clock and that can
  # move backwards when the system clock is adjusted.
  db-timestamp-monotonic-increasing = off

  # Enable this for testing or workaround of https://github.com/yugabyte/yugabyte-db/issues/10995
  # FIXME: This property will be removed when the Yugabyte issue has been resolved.
  use-app-timestamp = off

  # Logs database calls that take longer than this duration at INFO level.
  # Set to "off" to disable this logging.
  # Set to 0 to log all calls.
  log-db-calls-exceeding = 300 ms

  # In-memory buffer holding events when reading from database.
  buffer-size = 1000

  # When live queries return no results or <= 10% of buffer-size, the next query
  # to db will be delayed for this duration.
  # When the number of rows from previous query is >= 90% of buffer-size, the next
  # query will be emitted immediately.
  # Otherwise, between 10% - 90% of buffer-size, the next query will be delayed
  # for half of this duration.
  refresh-interval = 3s

  # Live queries read events up to this duration from the current database time.
  behind-current-time = 100 millis

  backtracking {
    enabled = on
    # Backtracking queries will look back for this amount of time. It should
    # not be larger than the pekko.projection.r2dbc.offset-store.time-window.
    window = 2 minutes
    # Backtracking queries read events up to this duration from the current database time.
    behind-current-time = 10 seconds
  }

  persistence-ids {
    buffer-size = 1000
  }
}

Plugin configuration at runtime

Plugin implementation supports plugin configuration at runtime.

The following example demonstrates how the database to which the events and snapshots of an EventSourcedBehavior are stored can be set during runtime:

sourcedef eventSourcedBehaviorForDatabase(database: String) = {
  val configKey = s"config-for-$database"

  val config: Config =
    ConfigFactory
      .load(
        ConfigFactory
          .parseString(
            s"""
            $configKey = $${pekko.persistence.r2dbc}
            $configKey = {
              connection-factory {
                database = "$database"
              }

              journal.$configKey.connection-factory = $${$configKey.connection-factory}
              journal.use-connection-factory = "$configKey.connection-factory"
              snapshot.$configKey.connection-factory = $${$configKey.connection-factory}
              snapshot.use-connection-factory = "$configKey.connection-factory"
            }
            """
          )
      )

  (persistenceId: String) =>
    EventSourcedBehavior[Command, String, String](
      PersistenceId.ofUniqueId(persistenceId),
      emptyState = ???,
      commandHandler = ???,
      eventHandler = ???)
      .withJournalPluginId(s"$configKey.journal")
      .withJournalPluginConfig(Some(config))
      .withSnapshotPluginId(s"$configKey.snapshot")
      .withSnapshotPluginConfig(Some(config))
}

val eventSourcedBehaviorForDatabase1 = eventSourcedBehaviorForDatabase("database-1")
context.spawn(eventSourcedBehaviorForDatabase1("persistence-id-1"), "Actor-1")
context.spawn(eventSourcedBehaviorForDatabase1("persistence-id-2"), "Actor-2")

val eventSourcedBehaviorForDatabase2 = eventSourcedBehaviorForDatabase("database-2")
context.spawn(eventSourcedBehaviorForDatabase2("persistence-id-1"), "Actor-3")
context.spawn(eventSourcedBehaviorForDatabase2("persistence-id-2"), "Actor-4")