Slick (JDBC)

The Slick connector provides Scala and Java DSLs to create a Source to stream the results of a SQL database query and a Flow/Sink to perform SQL actions (like inserts, updates, and deletes) for each element in a stream. It is built on the Slick library to interact with a long list of supported relational databases.

Project Info: Apache Pekko Connectors Slick/JDBC
Artifact
org.apache.pekko
pekko-connectors-slick
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20
JPMS module namepekko.stream.connectors.slick
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-slick" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-slick_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-slick_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

You will also need to add the JDBC driver(s) for the specific relational database(s) to your project. Most of those databases have drivers that are not available from public repositories so unfortunately some manual steps will probably be required. The Slick documentation has information on where to download the drivers.

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Initialization

As always, before we get started we will need an ActorSystemActorSystem.

Scala
sourceimplicit val system: ActorSystem = ActorSystem()
Java
sourcesystem = ActorSystem.create();

You will also always need the following important imports:

Scala
sourceimport org.apache.pekko
import pekko.stream.connectors.slick.scaladsl._
import pekko.stream.scaladsl._
import slick.jdbc.GetResult
Java
sourceimport org.apache.pekko.stream.javadsl.*;
import org.apache.pekko.stream.connectors.slick.javadsl.*;

The full examples for using the Source, Sink, and Flow (listed further down) also include all required imports.

Starting a Database Session

All functionality provided by this connector requires the user to first create an instance of SlickSession, which is a thin wrapper around Slick’s database connection management and database profile API.

If you are using Slick in your project, you can create a SlickSession instance by sharing the database configuration:

Scala
sourceval databaseConfig = DatabaseConfig.forConfig[JdbcProfile]("slick-h2")
implicit val session = SlickSession.forConfig(databaseConfig)

Otherwise, you can configure your database using typesafe-config by adding a named configuration to your application.conf and then referring to that configuration when starting the session:

Scala
sourceimplicit val session: SlickSession = SlickSession.forConfig("slick-h2")
Java
sourceprivate static final SlickSession session = SlickSession.forConfig("slick-h2");

Here is an example configuration for the H2 database, which is used for the unit tests of the Slick connector itself:

Configuration
source# Load using SlickSession.forConfig("slick-h2")
slick-h2 {
  profile = "slick.jdbc.H2Profile$"
  db {
    connectionPool = disabled
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "org.h2.Driver"
      url = "jdbc:h2:"${java.io.tmpdir}"/pekko-connectors-slick-h2-test"
    }
  }
}

You can specify multiple different database configurations, as long as you use unique names. These can then be loaded by fully qualified configuration name using the SlickSession.forConfig() method described above.

The Slick connector supports all the various ways Slick allows you to configure your JDBC database drivers, connection pools, etc., but we strongly recommend using the so-called “DatabaseConfig” method of configuration, which is the only method explicitly tested to work with the Slick connector.

Below are a few configuration examples for other databases. The Slick connector supports all databases supported by Slick (as of Slick 3.2.x)

Postgres
source# Load using SlickSession.forConfig("slick-postgres")
slick-postgres {
  profile = "slick.jdbc.PostgresProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://127.0.0.1/slickdemo"
      user = slick
      password = ""
    }
  }
}
MySQL
source# Load using SlickSession.forConfig("slick-mysql")
slick-mysql {
  profile = "slick.jdbc.MySQLProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "com.mysql.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/"
      user = slick
      password = ""
    }
  }
}
DB2
source# Load using SlickSession.forConfig("slick-db2")
slick-db2 {
  profile = "slick.jdbc.DB2Profile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    connectionTestQuery = "SELECT 1 FROM SYSIBM.SYSDUMMY1"
    properties = {
      driver = "com.ibm.db2.jcc.DB2Driver"
      url = "jdbc:db2://localhost:50000/sample"
      user = "db2inst1"
      password = "db2-admin-password"
    }
  }
}
Oracle
source# Load using SlickSession.forConfig("slick-oracle")
slick-oracle {
  profile = "slick.jdbc.OracleProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "oracle.jdbc.OracleDriver"
      url = "jdbc:oracle:thin:@//localhost:49161/xe"
      user = slick
      password = ""
    }
  }
}
SQL Server
source# Load using SlickSession.forConfig("slick-sqlserver")
slick-sqlserver {
  profile = "slick.jdbc.SQLServerProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      url = "jdbc:sqlserver://localhost:1433"
      user = slick
      password = ""
    }
  }
}

Of course these are just examples. Please visit the Slick documentation for DatabaseConfig.fromConfig for the full list of things to configure.

You also have the option to create a SlickSession from Slick Database and Profile objects.

Scala
sourceval db = Database.forConfig("slick-h2.db")
val profile = slick.jdbc.H2Profile
val slickSessionCreatedForDbAndProfile: SlickSession = SlickSession.forDbAndProfile(db, profile)

This can be useful if you need to share you configurations with code using Slick directly, or in cases where you first get the database information at runtime, as the Slicks Database class offer factory methods for that. This method is only available in the scaladsl, as Slick has no Java API and as such no easy way of creating a Database instance from Java.

Closing a Database Session

Slick requires you to eventually close your database session to free up connection pool resources. You would usually do this when terminating the ActorSystem, by registering a termination handler like this:

Scala
sourcesystem.registerOnTermination(() => session.close())
Java
sourcesystem.registerOnTermination(session::close);

Using a Slick Source

The Slick connector allows you to perform a SQL query and expose the resulting stream of results as an Apache Pekko Streams Source[T]. Where T is any type that can be constructed using a database row.

Warning

Some database systems, such as PostgreSQL, require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side, see Slick documentation.

// Example for PostgreSQL:
query.result.withStatementParameters(
  rsType = ResultSetType.ForwardOnly,
  rsConcurrency = ResultSetConcurrency.ReadOnly,
  fetchSize = 128, // not to be left at default value (0)
).transactionally

Plain SQL queries

Both the Scala and Java DSLs support the use of plain SQL queries.

The Scala DSL expects you to use the special sql"...", sqlu"...", and sqlt"..." String interpolators provided by Slick to construct queries.

Unfortunately, String interpolation is a Scala language feature that cannot be directly translated to Java. This means that query strings in the Java DSL will need to be manually prepared using plain Java Strings (or a StringBuilder).

The following examples put it all together to perform a simple streaming query.

Scala
sourceimplicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// The example domain
case class User(id: Int, name: String)

// We need this to automatically transform result rows
// into instances of the User class.
// Please import slick.jdbc.GetResult
// See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#result-sets"
implicit val getUserResult = GetResult(r => User(r.nextInt(), r.nextString()))

// This import enables the use of the Slick sql"...",
// sqlu"...", and sqlt"..." String interpolators.
// See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
import session.profile.api._

// Stream the results of a query
val done: Future[Done] =
  Slick
    .source(sql"SELECT ID, NAME FROM PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS".as[User])
    .log("user")
    .runWith(Sink.ignore)
Java
sourcefinal SlickSession session = SlickSession.forConfig("slick-h2");
system.registerOnTermination(session::close);

final CompletionStage<Done> done =
    Slick.source(
            session,
            "SELECT ID, NAME FROM PEKKO_CONNECTORS_SLICK_JAVADSL_TEST_USERS",
            (SlickRow row) -> new User(row.nextInt(), row.nextString()))
        .log("user")
        .runWith(Sink.ignore(), system);

Typed Queries

The Scala DSL also supports the use of Slick Scala queries, which are more type-safe then their plain SQL equivalent. The code will look very similar to the plain SQL example.

Scala
sourceimplicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// This import brings everything you need into scope
import session.profile.api._

// The example domain
class Users(tag: Tag) extends Table[(Int, String)](tag, "PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS") {
  def id = column[Int]("ID")
  def name = column[String]("NAME")
  def * = (id, name)
}

// Stream the results of a query
val done: Future[Done] =
  Slick
    .source(TableQuery[Users].result)
    .log("user")
    .runWith(Sink.ignore)

Using a Slick Flow or Sink

If you want to take a stream of elements and turn them into side-effecting actions in a relational database, the Slick connector allows you to perform any DML or DDL statement using either a Sink or a Flow. This includes the typical insert/update/delete statements but also create table, drop table, etc. The unit tests have a couple of good examples of the latter usage.

The following example show the use of a Slick Sink to take a stream of elements and insert them into the database. There is an optional parallelism argument to specify how many concurrent streams will be sent to the database. The unit tests for the slick connector have example of performing parallel inserts.

Scala
sourceimplicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// The example domain
case class User(id: Int, name: String)
val users = (1 to 42).map(i => User(i, s"Name$i"))

// This import enables the use of the Slick sql"...",
// sqlu"...", and sqlt"..." String interpolators.
// See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
import session.profile.api._

// Stream the users into the database as insert statements
val done: Future[Done] =
  Source(users)
    .runWith(
      // add an optional first argument to specify the parallelism factor (Int)
      Slick.sink(user =>
        sqlu"INSERT INTO PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})"))
Java
sourcefinal CompletionStage<Done> done =
    Source.from(users)
        .runWith(
            Slick.<User>sink(
                session,
                // add an optional second argument to specify the parallelism factor (int)
                (user, connection) -> {
                  PreparedStatement statement =
                      connection.prepareStatement(
                          "INSERT INTO PEKKO_CONNECTORS_SLICK_JAVADSL_TEST_USERS VALUES (?, ?)");
                  statement.setInt(1, user.id);
                  statement.setString(2, user.name);
                  return statement;
                }),
            system);

Flow

The Slick connector also exposes a Flow that has the exact same functionality as the Sink but it allows you to continue the stream for further processing. The return value of every executed statement, e.g. the element values is the fixed type Int denoting the number of updated/inserted/deleted rows.

Scala
sourceimplicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// The example domain
case class User(id: Int, name: String)
val users = (1 to 42).map(i => User(i, s"Name$i"))

// This import enables the use of the Slick sql"...",
// sqlu"...", and sqlt"..." String interpolators.
// See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
import session.profile.api._

// Stream the users into the database as insert statements
val done: Future[Done] =
  Source(users)
    .via(
      // add an optional first argument to specify the parallelism factor (Int)
      Slick.flow(user =>
        sqlu"INSERT INTO PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})"))
    .log("nr-of-updated-rows")
    .runWith(Sink.ignore)
Java
sourcefinal SlickSession session = SlickSession.forConfig("slick-h2");
system.registerOnTermination(session::close);

final List<User> users =
    IntStream.range(0, 42)
        .boxed()
        .map((i) -> new User(i, "Name" + i))
        .collect(Collectors.toList());

int parallelism = 1;

final CompletionStage<Done> done =
    Source.from(users)
        .via(
            Slick.flow(
                session,
                parallelism,
                (user, connection) -> {
                  PreparedStatement statement =
                      connection.prepareStatement(
                          "INSERT INTO PEKKO_CONNECTORS_SLICK_JAVADSL_TEST_USERS VALUES (?, ?)");
                  statement.setInt(1, user.id);
                  statement.setString(2, user.name);
                  return statement;
                }))
        .log("nr-of-updated-rows")
        .runWith(Sink.ignore(), system);

Flow with pass-through

To have a different return type, use the flowWithPassThrough function. E.g. when consuming Kafka messages, this allows you to maintain the kafka committable offset so the message can be committed in a next stage in the flow.

Scala
sourceimplicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// The example domain
case class User(id: Int, name: String)
val users = (1 to 42).map(i => User(i, s"Name$i"))
val messagesFromKafka = users.zipWithIndex.map { case (user, index) => KafkaMessage(user, CommittableOffset(index)) }

// This import enables the use of the Slick sql"...",
// sqlu"...", and sqlt"..." String interpolators.
// See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
import session.profile.api._

// Stream the users into the database as insert statements
val done: Future[Done] =
  Source(messagesFromKafka)
    .via(
      // add an optional first argument to specify the parallelism factor (Int)
      Slick.flowWithPassThrough { kafkaMessage =>
        val user = kafkaMessage.msg
        sqlu"INSERT INTO PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})"
          .map { insertCount => // map db result to something else
            // allows to keep the kafka message offset so it can be committed in a next stage
            kafkaMessage.map(user => (user, insertCount))
          }
      })
    .log("nr-of-updated-rows")
    .mapAsync(1) { // in correct order
      kafkaMessage =>
        kafkaMessage.offset.commit // commit kafka messages
    }
    .runWith(Sink.ignore)
Java
sourcefinal CompletionStage<Done> done =
    Source.from(messagesFromKafka)
        .via(
            Slick.flowWithPassThrough(
                session,
                system.dispatcher(),
                // add an optional second argument to specify the parallelism factor (int)
                (kafkaMessage, connection) -> {
                  PreparedStatement statement =
                      connection.prepareStatement(
                          "INSERT INTO PEKKO_CONNECTORS_SLICK_JAVADSL_TEST_USERS VALUES (?, ?)");
                  statement.setInt(1, kafkaMessage.msg.id);
                  statement.setString(2, kafkaMessage.msg.name);
                  return statement;
                },
                (kafkaMessage, insertCount) ->
                    kafkaMessage.map(
                        user ->
                            Pair.create(
                                user,
                                insertCount)) // allows to keep the kafka message offset so it
                // can be committed in a next stage
                ))
        .log("nr-of-updated-rows")
        .mapAsync(
            1,
            kafkaMessage ->
                kafkaMessage.offset.commit()) // in correct order, commit Kafka message
        .runWith(Sink.ignore(), system);