StreamConverters.fromInputStream

Create a source that wraps an InputStream.

Additional Sink and Source converters

Signature

StreamConverters.fromInputStreamStreamConverters.fromInputStream

Description

Creates a Source from a java.io.InputStream created by the given function. Emitted elements are up to chunkSize sized ByteStringByteStrings elements. The actual size of the emitted elements depends on how much data the underlying java.io.InputStream returns on each read invocation. Such chunks will never be larger than chunkSize though.

You can configure the default dispatcher for this Source by changing the pekko.stream.materializer.blocking-io-dispatcher or set it for a given Source by using org.apache.pekko.stream.ActorAttributes.

It materializes a CompletionStageFuture of IOResult containing the number of bytes read from the source file upon completion, and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does not give any guarantee that the bytes were seen by downstream stages.

The created InputStream will be closed when the Source is cancelled.

See also fromOutputStream

Example

Here is an example using both fromInputStream and fromOutputStream to read from a java.io.InputStream, uppercase the read content and write back out into a java.io.OutputStream.

Scala
sourceval bytes = "Some random input".getBytes
val inputStream = new ByteArrayInputStream(bytes)
val outputStream = new ByteArrayOutputStream()

val source: Source[ByteString, Future[IOResult]] = StreamConverters.fromInputStream(() => inputStream)

val toUpperCase: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].map(_.map(_.toChar.toUpper.toByte))

val sink: Sink[ByteString, Future[IOResult]] = StreamConverters.fromOutputStream(() => outputStream)

val eventualResult = source.via(toUpperCase).runWith(sink)
Java
sourcejava.io.InputStream inputStream = new ByteArrayInputStream(bytes);
Source<ByteString, CompletionStage<IOResult>> source =
    StreamConverters.fromInputStream(() -> inputStream);

// Given a ByteString produces a ByteString with the upperCase representation
// Removed from the sample for brevity.
// Flow<ByteString, ByteString, NotUsed> toUpperCase = ...

java.io.OutputStream outputStream = new ByteArrayOutputStream();
Sink<ByteString, CompletionStage<IOResult>> sink =
    StreamConverters.fromOutputStream(() -> outputStream);

CompletionStage<IOResult> ioResultCompletionStage =
    source.via(toUpperCase).runWith(sink, system);
// When the ioResultCompletionStage completes, the byte array backing the outputStream
// will contain the uppercase representation of the bytes read from the inputStream.