Slick (JDBC)
The Slick connector provides Scala and Java DSLs to create a Source
to stream the results of a SQL database query and a Flow
/Sink
to perform SQL actions (like inserts, updates, and deletes) for each element in a stream. It is built on the Slick library to interact with a long list of supported relational databases.
Project Info: Apache Pekko Connectors Slick/JDBC | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-slick
1.0.2
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.14, 2.12.20 |
JPMS module name | pekko.stream.connectors.slick |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.0.3" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-slick" % "1.0.2", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
<properties> <pekko.version>1.0.3</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-slick_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.0.3", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-slick_${versions.ScalaBinary}:1.0.2" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
You will also need to add the JDBC driver(s) for the specific relational database(s) to your project. Most of those databases have drivers that are not available from public repositories so unfortunately some manual steps will probably be required. The Slick documentation has information on where to download the drivers.
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Initialization
As always, before we get started we will need an ActorSystem
ActorSystem
.
- Scala
-
source
implicit val system: ActorSystem = ActorSystem()
- Java
-
source
system = ActorSystem.create();
You will also always need the following important imports:
- Scala
-
source
import org.apache.pekko import pekko.stream.connectors.slick.scaladsl._ import pekko.stream.scaladsl._ import slick.jdbc.GetResult
- Java
-
source
import org.apache.pekko.stream.javadsl.*; import org.apache.pekko.stream.connectors.slick.javadsl.*;
The full examples for using the Source
, Sink
, and Flow
(listed further down) also include all required imports.
Starting a Database Session
All functionality provided by this connector requires the user to first create an instance of SlickSession
, which is a thin wrapper around Slick’s database connection management and database profile API.
If you are using Slick in your project, you can create a SlickSession
instance by sharing the database configuration:
- Scala
-
source
val databaseConfig = DatabaseConfig.forConfig[JdbcProfile]("slick-h2") implicit val session = SlickSession.forConfig(databaseConfig)
Otherwise, you can configure your database using typesafe-config by adding a named configuration to your application.conf and then referring to that configuration when starting the session:
- Scala
-
source
implicit val session: SlickSession = SlickSession.forConfig("slick-h2")
- Java
-
source
private static final SlickSession session = SlickSession.forConfig("slick-h2");
Here is an example configuration for the H2 database, which is used for the unit tests of the Slick connector itself:
- Configuration
-
source
# Load using SlickSession.forConfig("slick-h2") slick-h2 { profile = "slick.jdbc.H2Profile$" db { connectionPool = disabled dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "org.h2.Driver" url = "jdbc:h2:"${java.io.tmpdir}"/pekko-connectors-slick-h2-test" } } }
You can specify multiple different database configurations, as long as you use unique names. These can then be loaded by fully qualified configuration name using the SlickSession.forConfig()
method described above.
The Slick connector supports all the various ways Slick allows you to configure your JDBC database drivers, connection pools, etc., but we strongly recommend using the so-called “DatabaseConfig” method of configuration, which is the only method explicitly tested to work with the Slick connector.
Below are a few configuration examples for other databases. The Slick connector supports all databases supported by Slick (as of Slick 3.2.x)
- Postgres
-
source
# Load using SlickSession.forConfig("slick-postgres") slick-postgres { profile = "slick.jdbc.PostgresProfile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "org.postgresql.Driver" url = "jdbc:postgresql://127.0.0.1/slickdemo" user = slick password = "" } } }
- MySQL
-
source
# Load using SlickSession.forConfig("slick-mysql") slick-mysql { profile = "slick.jdbc.MySQLProfile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://localhost:3306/" user = slick password = "" } } }
- DB2
-
source
# Load using SlickSession.forConfig("slick-db2") slick-db2 { profile = "slick.jdbc.DB2Profile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" connectionTestQuery = "SELECT 1 FROM SYSIBM.SYSDUMMY1" properties = { driver = "com.ibm.db2.jcc.DB2Driver" url = "jdbc:db2://localhost:50000/sample" user = "db2inst1" password = "db2-admin-password" } } }
- Oracle
-
source
# Load using SlickSession.forConfig("slick-oracle") slick-oracle { profile = "slick.jdbc.OracleProfile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "oracle.jdbc.OracleDriver" url = "jdbc:oracle:thin:@//localhost:49161/xe" user = slick password = "" } } }
- SQL Server
-
source
# Load using SlickSession.forConfig("slick-sqlserver") slick-sqlserver { profile = "slick.jdbc.SQLServerProfile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" url = "jdbc:sqlserver://localhost:1433" user = slick password = "" } } }
Of course these are just examples. Please visit the Slick documentation for DatabaseConfig.fromConfig
for the full list of things to configure.
You also have the option to create a SlickSession from Slick Database and Profile objects.
- Scala
-
source
val db = Database.forConfig("slick-h2.db") val profile = slick.jdbc.H2Profile val slickSessionCreatedForDbAndProfile: SlickSession = SlickSession.forDbAndProfile(db, profile)
This can be useful if you need to share you configurations with code using Slick directly, or in cases where you first get the database information at runtime, as the Slicks Database class offer factory methods for that. This method is only available in the scaladsl, as Slick has no Java API and as such no easy way of creating a Database instance from Java.
Closing a Database Session
Slick requires you to eventually close your database session to free up connection pool resources. You would usually do this when terminating the ActorSystem
, by registering a termination handler like this:
- Scala
-
source
system.registerOnTermination(() => session.close())
- Java
-
source
system.registerOnTermination(session::close);
Using a Slick Source
The Slick connector allows you to perform a SQL query and expose the resulting stream of results as an Apache Pekko Streams Source[T]
. Where T
is any type that can be constructed using a database row.
Some database systems, such as PostgreSQL, require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side, see Slick documentation.
// Example for PostgreSQL:
query.result.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 128, // not to be left at default value (0)
).transactionally
Plain SQL queries
Both the Scala and Java DSLs support the use of plain SQL queries.
The Scala DSL expects you to use the special sql"..."
, sqlu"..."
, and sqlt"..."
String interpolators provided by Slick to construct queries.
Unfortunately, String interpolation is a Scala language feature that cannot be directly translated to Java. This means that query strings in the Java DSL will need to be manually prepared using plain Java Strings (or a StringBuilder
).
The following examples put it all together to perform a simple streaming query.
- Scala
-
source
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain case class User(id: Int, name: String) // We need this to automatically transform result rows // into instances of the User class. // Please import slick.jdbc.GetResult // See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#result-sets" implicit val getUserResult = GetResult(r => User(r.nextInt(), r.nextString())) // This import enables the use of the Slick sql"...", // sqlu"...", and sqlt"..." String interpolators. // See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation" import session.profile.api._ // Stream the results of a query val done: Future[Done] = Slick .source(sql"SELECT ID, NAME FROM PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS".as[User]) .log("user") .runWith(Sink.ignore)
- Java
-
source
final SlickSession session = SlickSession.forConfig("slick-h2"); system.registerOnTermination(session::close); final CompletionStage<Done> done = Slick.source( session, "SELECT ID, NAME FROM PEKKO_CONNECTORS_SLICK_JAVADSL_TEST_USERS", (SlickRow row) -> new User(row.nextInt(), row.nextString())) .log("user") .runWith(Sink.ignore(), system);
Typed Queries
The Scala DSL also supports the use of Slick Scala queries, which are more type-safe then their plain SQL equivalent. The code will look very similar to the plain SQL example.
- Scala
-
source
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // This import brings everything you need into scope import session.profile.api._ // The example domain class Users(tag: Tag) extends Table[(Int, String)](tag, "PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS") { def id = column[Int]("ID") def name = column[String]("NAME") def * = (id, name) } // Stream the results of a query val done: Future[Done] = Slick .source(TableQuery[Users].result) .log("user") .runWith(Sink.ignore)
Using a Slick Flow or Sink
If you want to take a stream of elements and turn them into side-effecting actions in a relational database, the Slick connector allows you to perform any DML or DDL statement using either a Sink
or a Flow
. This includes the typical insert
/update
/delete
statements but also create table
, drop table
, etc. The unit tests have a couple of good examples of the latter usage.
The following example show the use of a Slick Sink
to take a stream of elements and insert them into the database. There is an optional parallelism
argument to specify how many concurrent streams will be sent to the database. The unit tests for the slick connector have example of performing parallel inserts.
- Scala
-
source
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain case class User(id: Int, name: String) val users = (1 to 42).map(i => User(i, s"Name$i")) // This import enables the use of the Slick sql"...", // sqlu"...", and sqlt"..." String interpolators. // See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation" import session.profile.api._ // Stream the users into the database as insert statements val done: Future[Done] = Source(users) .runWith( // add an optional first argument to specify the parallelism factor (Int) Slick.sink(user => sqlu"INSERT INTO PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})"))
- Java
-
source
final CompletionStage<Done> done = Source.from(users) .runWith( Slick.<User>sink( session, // add an optional second argument to specify the parallelism factor (int) (user, connection) -> { PreparedStatement statement = connection.prepareStatement( "INSERT INTO PEKKO_CONNECTORS_SLICK_JAVADSL_TEST_USERS VALUES (?, ?)"); statement.setInt(1, user.id); statement.setString(2, user.name); return statement; }), system);
Flow
The Slick connector also exposes a Flow
that has the exact same functionality as the Sink
but it allows you to continue the stream for further processing. The return value of every executed statement, e.g. the element values is the fixed type Int
denoting the number of updated/inserted/deleted rows.
- Scala
-
source
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain case class User(id: Int, name: String) val users = (1 to 42).map(i => User(i, s"Name$i")) // This import enables the use of the Slick sql"...", // sqlu"...", and sqlt"..." String interpolators. // See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation" import session.profile.api._ // Stream the users into the database as insert statements val done: Future[Done] = Source(users) .via( // add an optional first argument to specify the parallelism factor (Int) Slick.flow(user => sqlu"INSERT INTO PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})")) .log("nr-of-updated-rows") .runWith(Sink.ignore)
- Java
-
source
final SlickSession session = SlickSession.forConfig("slick-h2"); system.registerOnTermination(session::close); final List<User> users = IntStream.range(0, 42) .boxed() .map((i) -> new User(i, "Name" + i)) .collect(Collectors.toList()); int parallelism = 1; final CompletionStage<Done> done = Source.from(users) .via( Slick.flow( session, parallelism, (user, connection) -> { PreparedStatement statement = connection.prepareStatement( "INSERT INTO PEKKO_CONNECTORS_SLICK_JAVADSL_TEST_USERS VALUES (?, ?)"); statement.setInt(1, user.id); statement.setString(2, user.name); return statement; })) .log("nr-of-updated-rows") .runWith(Sink.ignore(), system);
Flow with pass-through
To have a different return type, use the flowWithPassThrough
function. E.g. when consuming Kafka messages, this allows you to maintain the kafka committable offset so the message can be committed in a next stage in the flow.
- Scala
-
source
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain case class User(id: Int, name: String) val users = (1 to 42).map(i => User(i, s"Name$i")) val messagesFromKafka = users.zipWithIndex.map { case (user, index) => KafkaMessage(user, CommittableOffset(index)) } // This import enables the use of the Slick sql"...", // sqlu"...", and sqlt"..." String interpolators. // See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation" import session.profile.api._ // Stream the users into the database as insert statements val done: Future[Done] = Source(messagesFromKafka) .via( // add an optional first argument to specify the parallelism factor (Int) Slick.flowWithPassThrough { kafkaMessage => val user = kafkaMessage.msg sqlu"INSERT INTO PEKKO_CONNECTORS_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})" .map { insertCount => // map db result to something else // allows to keep the kafka message offset so it can be committed in a next stage kafkaMessage.map(user => (user, insertCount)) } }) .log("nr-of-updated-rows") .mapAsync(1) { // in correct order kafkaMessage => kafkaMessage.offset.commit // commit kafka messages } .runWith(Sink.ignore)
- Java
-
source
final CompletionStage<Done> done = Source.from(messagesFromKafka) .via( Slick.flowWithPassThrough( session, system.dispatcher(), // add an optional second argument to specify the parallelism factor (int) (kafkaMessage, connection) -> { PreparedStatement statement = connection.prepareStatement( "INSERT INTO PEKKO_CONNECTORS_SLICK_JAVADSL_TEST_USERS VALUES (?, ?)"); statement.setInt(1, kafkaMessage.msg.id); statement.setString(2, kafkaMessage.msg.name); return statement; }, (kafkaMessage, insertCount) -> kafkaMessage.map( user -> Pair.create( user, insertCount)) // allows to keep the kafka message offset so it // can be committed in a next stage )) .log("nr-of-updated-rows") .mapAsync( 1, kafkaMessage -> kafkaMessage.offset.commit()) // in correct order, commit Kafka message .runWith(Sink.ignore(), system);