Elasticsearch
The Apache Pekko Connectors Elasticsearch connector provides Apache Pekko Streams integration for Elasticsearch.
For more information about Elasticsearch, please visit the Elasticsearch documentation.
Project Info: Apache Pekko Connectors Elasticsearch | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-elasticsearch
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.elasticsearch |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-elasticsearch" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-elasticsearch_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-elasticsearch_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Elasticsearch connection¶
The connection and credentials to authenticate with are configured with ElasticsearchConnectionSettings
.
sourceval connectionSettings = ElasticsearchConnectionSettings("http://localhost:9200")
.withCredentials("user", "password")
sourceElasticsearchConnectionSettings connectionSettings =
ElasticsearchConnectionSettings.create("http://localhost:9200")
.withCredentials("user", "password");
Parameter | Default | Description |
---|---|---|
baseUrl | Empty | The base URL of Elasticsearch. Should not include a trailing slash. |
username | None | The username to authenticate with |
password | None | The password to authenticate with |
headers | None | List of headers that should be sent with the http request. |
connectionContext | None | The connectionContext that will be used with the http request. This can be used for TLS Auth instead of basic auth (username/password) by setting the SSLContext within the connectionContext. |
Elasticsearch parameters¶
Any API method that allows reading from and writing to Elasticsearch takes an instance of ElasticsearchParams
.
ElasticsearchParams
has be constructed based on the ElasticSearch API version that you’re targeting:
sourceval elasticsearchParamsV5 = ElasticsearchParams.V5("index", "_doc")
val elasticsearchParamsV7 = ElasticsearchParams.V7("index")
sourceElasticsearchParams elasticsearchParamsV5 = ElasticsearchParams.V5("source", "_doc");
ElasticsearchParams elasticsearchParamsV7 = ElasticsearchParams.V7("source");
Elasticsearch as Source and Sink¶
You can stream messages from or to Elasticsearch using the ElasticsearchSource
, ElasticsearchFlow
or the ElasticsearchSink
.
sourceimport spray.json._
import DefaultJsonProtocol._
case class Book(title: String, shouldSkip: Option[Boolean] = None, price: Int = 10)
implicit val format: JsonFormat[Book] = jsonFormat3(Book.apply)
sourcepublic static class Book {
public String title;
public Book() {}
public Book(String title) {
this.title = title;
}
}
With typed source¶
Use ElasticsearchSource.typed
and ElasticsearchSink.create
to create source and sink. The data is converted to and from JSON by Jackson’s ObjectMapper.
sourceval copy = ElasticsearchSource
.typed[Book](
constructElasticsearchParams("source", "_doc", ApiVersion.V5),
query = """{"match_all": {}}""",
settings = baseSourceSettings)
.map { (message: ReadResult[Book]) =>
WriteMessage.createIndexMessage(message.id, message.source)
}
.runWith(
ElasticsearchSink.create[Book](
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
settings = baseWriteSettings))
sourceElasticsearchSourceSettings sourceSettings =
ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5);
ElasticsearchWriteSettings sinkSettings =
ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5);
Source<ReadResult<ElasticsearchTestBase.Book>, NotUsed> source =
ElasticsearchSource.typed(
constructElasticsearchParams("source", "_doc", ApiVersion.V5),
"{\"match_all\": {}}",
sourceSettings,
ElasticsearchTestBase.Book.class);
CompletionStage<Done> f1 =
source
.map(m -> WriteMessage.createIndexMessage(m.id(), m.source()))
.runWith(
ElasticsearchSink.create(
constructElasticsearchParams("sink2", "_doc", ApiVersion.V5),
sinkSettings,
new ObjectMapper()),
system);
With JSON source¶
Use ElasticsearchSource.create
and ElasticsearchSink.create
to create source and sink.
sourceval copy = ElasticsearchSource
.create(
constructElasticsearchParams("source", "_doc", ApiVersion.V5),
query = """{"match_all": {}}""",
settings = baseSourceSettings)
.map { (message: ReadResult[spray.json.JsObject]) =>
val book: Book = jsonReader[Book].read(message.source)
WriteMessage.createIndexMessage(message.id, book)
}
.runWith(
ElasticsearchSink.create[Book](
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
settings = baseWriteSettings))
sourceElasticsearchSourceSettings sourceSettings =
ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5);
ElasticsearchWriteSettings sinkSettings =
ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5);
Source<ReadResult<Map<String, Object>>, NotUsed> source =
ElasticsearchSource.create(
constructElasticsearchParams("source", "_doc", ApiVersion.V5),
"{\"match_all\": {}}",
sourceSettings);
CompletionStage<Done> f1 =
source
.map(m -> WriteMessage.createIndexMessage(m.id(), m.source()))
.runWith(
ElasticsearchSink.create(
constructElasticsearchParams("sink1", "_doc", ApiVersion.V5),
sinkSettings,
new ObjectMapper()),
system);
Writing to Elasticsearch¶
In the above examples, WriteMessage
is used as the input to ElasticsearchSink
and ElasticsearchFlow
. This means requesting index
operation to Elasticsearch. It’s possible to request other operations using following message types:
Message factory | Description |
---|---|
WriteMessage.createIndexMessage | Create a new document. If id is specified and it already exists, replace the document and increment its version. |
WriteMessage.createCreateMessage | Create a new document. If id already exists, the WriteResult will contain an error. |
WriteMessage.createUpdateMessage | Update an existing document. If there is no document with the specified id , do nothing. |
WriteMessage.createUpsertMessage | Update an existing document. If there is no document with the specified id , create a new document. |
WriteMessage.createDeleteMessage | Delete an existing document. If there is no document with the specified id , do nothing. |
sourceval requests = List[WriteMessage[Book, NotUsed]](
WriteMessage.createIndexMessage(id = "00001", source = Book("Book 1")),
WriteMessage.createUpsertMessage(id = "00002", source = Book("Book 2")),
WriteMessage.createUpsertMessage(id = "00003", source = Book("Book 3")),
WriteMessage.createUpdateMessage(id = "00004", source = Book("Book 4")),
WriteMessage.createCreateMessage(id = "00005", source = Book("Book 5")),
WriteMessage.createDeleteMessage(id = "00002"))
val writeResults = Source(requests)
.via(
ElasticsearchFlow.create[Book](
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
baseWriteSettings))
.runWith(Sink.seq)
source// Create, update, upsert and delete documents in sink8/book
List<WriteMessage<Book, NotUsed>> requests =
Arrays.asList(
WriteMessage.createIndexMessage("00001", new Book("Book 1")),
WriteMessage.createUpsertMessage("00002", new Book("Book 2")),
WriteMessage.createUpsertMessage("00003", new Book("Book 3")),
WriteMessage.createUpdateMessage("00004", new Book("Book 4")),
WriteMessage.createDeleteMessage("00002"));
Source.from(requests)
.via(
ElasticsearchFlow.create(
constructElasticsearchParams("sink8", "_doc", ApiVersion.V5),
ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5),
new ObjectMapper()))
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get();
Source configuration¶
We can configure the source by ElasticsearchSourceSettings
.
sourceval sourceSettings = ElasticsearchSourceSettings(connectionSettings)
.withBufferSize(10)
.withScrollDuration(5.minutes)
sourceElasticsearchSourceSettings sourceSettings =
ElasticsearchSourceSettings.create(connectionSettings).withBufferSize(10);
Parameter | Default | Description |
---|---|---|
connection | The connection details and credentials to authenticate against ElasticSearch. See ElasticsearchConnectionSettings |
|
bufferSize | 10 | ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size. |
includeDocumentVersion | false | Tell Elasticsearch to return the documents _version property with the search results. See Version and Optimistic Concurrenct Control to know about this property. |
scrollDuration | 5 min | ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This parameter is used as a scroll value. See Time units for supported units. |
apiVersion | V7 | Currently supports V5 and V7 (see below) |
Sink and flow configuration¶
Sinks and flows are configured with ElasticsearchWriteSettings
.
sourceval sinkSettings =
ElasticsearchWriteSettings(connectionSettings)
.withBufferSize(10)
.withVersionType("internal")
.withRetryLogic(RetryAtFixedRate(maxRetries = 5, retryInterval = 1.second))
.withApiVersion(ApiVersion.V5)
sourceElasticsearchWriteSettings settings =
ElasticsearchWriteSettings.create(connectionSettings)
.withBufferSize(10)
.withVersionType("internal")
.withRetryLogic(RetryAtFixedRate.create(5, Duration.ofSeconds(1)))
.withApiVersion(ApiVersion.V5);
Parameter | Default | Description |
---|---|---|
connection | The connection details and credentials to authenticate against ElasticSearch. See ElasticsearchConnectionSettings |
|
bufferSize | 10 | Flow and Sink batch messages to bulk requests when back-pressure applies. |
versionType | None | If set, ElasticsearchSink uses the chosen versionType to index documents. See Version types for accepted settings. |
retryLogic | No retries | See below |
apiVersion | V7 | Currently supports V5 and V7 (see below) |
allowExplicitIndex | True | When set to False, the index name will be included in the URL instead of on each document (see below) |
Retry logic¶
A bulk request might fail partially for some reason. To retry failed writes to Elasticsearch, a RetryLogic
can be specified.
The provided implementations are:
RetryAtFixedRate
Parameter | Description |
---|---|
maxRetries | The stage fails, if it gets this number of consecutive failures. |
retryInterval | Failing writes are retried after this duration. |
RetryWithBackoff
Parameter | Description |
---|---|
maxRetries | The stage fails, if it gets this number of consecutive failures. |
minBackoff | Initial backoff for failing writes. |
maxBackoff | Maximum backoff for failing writes. |
In case of write failures the order of messages downstream is guaranteed to be preserved.
Supported API versions¶
To support reading and writing to multiple versions of Elasticsearch, an ApiVersion
can be specified.
This will be used to: 1. transform the bulk request into a format understood by the corresponding Elasticsearch server. 2. determine whether to include the index type mapping in the API calls. See removal of types
Currently V5
and V7
are supported specifically but this parameter does not need to match the server version exactly (for example, either V5
or V7
should work with Elasticsearch 6.x).
Allow explicit index¶
When using the _bulk
API, Elasticsearch will reject requests that have an explicit index in the request body if explicit index names are not allowed. See URL-based access control
Elasticsearch as Flow¶
You can also build flow stages with ElasticsearchFlow
. The API is similar to creating Sinks.
sourceval copy = ElasticsearchSource
.typed[Book](
constructElasticsearchParams("source", "_doc", ApiVersion.V5),
query = """{"match_all": {}}""",
settings = baseSourceSettings)
.map { (message: ReadResult[Book]) =>
WriteMessage.createIndexMessage(message.id, message.source)
}
.via(
ElasticsearchFlow.create[Book](
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
settings = baseWriteSettings))
.runWith(Sink.seq)
sourceCompletionStage<List<WriteResult<Book, NotUsed>>> f1 =
ElasticsearchSource.typed(
constructElasticsearchParams("source", "_doc", ApiVersion.V5),
"{\"match_all\": {}}",
ElasticsearchSourceSettings.create(connectionSettings)
.withApiVersion(ApiVersion.V5)
.withBufferSize(5),
Book.class)
.map(m -> WriteMessage.createIndexMessage(m.id(), m.source()))
.via(
ElasticsearchFlow.create(
constructElasticsearchParams("sink3", "_doc", ApiVersion.V5),
ElasticsearchWriteSettings.create(connectionSettings)
.withApiVersion(ApiVersion.V5)
.withBufferSize(5),
new ObjectMapper()))
.runWith(Sink.seq(), system);
Storing documents from Strings¶
Elasticsearch requires the documents to be properly formatted JSON. If your data is available as JSON in Strings, you may use the pre-defined StringMessageWriter
to avoid any conversions. For any other JSON technologies, implement a MessageWriter<T>
.
sourceval write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source(
immutable.Seq(
WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.compactPrint),
WriteMessage.createIndexMessage("2", Book("Faust").toJson.compactPrint),
WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.compactPrint))).via(
ElasticsearchFlow.create(
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
settings = baseWriteSettings,
StringMessageWriter))
.runWith(Sink.seq)
sourceString indexName = "sink3-0";
CompletionStage<List<WriteResult<String, NotUsed>>> write =
Source.from(
Arrays.asList(
WriteMessage.createIndexMessage("1", "{\"title\": \"Das Parfum\"}"),
WriteMessage.createIndexMessage("2", "{\"title\": \"Faust\"}"),
WriteMessage.createIndexMessage(
"3", "{\"title\": \"Die unendliche Geschichte\"}")))
.via(
ElasticsearchFlow.create(
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
ElasticsearchWriteSettings.create(connectionSettings)
.withApiVersion(ApiVersion.V5)
.withBufferSize(5),
StringMessageWriter.getInstance()))
.runWith(Sink.seq(), system);
Passing data through ElasticsearchFlow¶
When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Elastic.
source// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka
case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)
val messagesFromKafka = List(
KafkaMessage(Book("Book 1"), KafkaOffset(0)),
KafkaMessage(Book("Book 2"), KafkaOffset(1)),
KafkaMessage(Book("Book 3"), KafkaOffset(2)))
var committedOffsets = Vector[KafkaOffset]()
def commitToKafka(offset: KafkaOffset): Unit =
committedOffsets = committedOffsets :+ offset
val indexName = "sink6"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title
// Transform message so that we can write to elastic
WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset)
}
.via( // write to elastic
ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset](
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
settings = baseWriteSettings))
.map { result =>
if (!result.success) throw new Exception("Failed to write message to elastic")
// Commit to kafka
commitToKafka(result.message.passThrough)
}
.runWith(Sink.ignore)
kafkaToEs.futureValue shouldBe Done
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka
case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)
val messagesFromKafka = List(
KafkaMessage(Book("Book 1"), KafkaOffset(0)),
KafkaMessage(Book("Book 2"), KafkaOffset(1)),
KafkaMessage(Book("Book 3"), KafkaOffset(2)))
var committedOffsets = Vector[KafkaOffset]()
def commitToKafka(offset: KafkaOffset): Unit =
committedOffsets = committedOffsets :+ offset
val indexName = "sink6-bulk"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title
// Transform message so that we can write to elastic
WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset)
}
.grouped(2)
.via( // write to elastic
ElasticsearchFlow.createBulk[Book, KafkaOffset](
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
settings = baseWriteSettings))
.map(_.map { result =>
if (!result.success) throw new Exception("Failed to write message to elastic")
// Commit to kafka
commitToKafka(result.message.passThrough)
})
.runWith(Sink.ignore)
kafkaToEs.futureValue shouldBe Done
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka
case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)
val messagesFromKafka = List(
KafkaMessage(Book("Book A", shouldSkip = Some(true)), KafkaOffset(0)),
KafkaMessage(Book("Book 1"), KafkaOffset(1)),
KafkaMessage(Book("Book 2"), KafkaOffset(2)),
KafkaMessage(Book("Book B", shouldSkip = Some(true)), KafkaOffset(3)),
KafkaMessage(Book("Book 3"), KafkaOffset(4)),
KafkaMessage(Book("Book C", shouldSkip = Some(true)), KafkaOffset(5)))
var committedOffsets = Vector[KafkaOffset]()
def commitToKafka(offset: KafkaOffset): Unit =
committedOffsets = committedOffsets :+ offset
val indexName = "sink6-nop"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title
// Transform message so that we can write to elastic
if (book.shouldSkip.getOrElse(false))
WriteMessage.createNopMessage[Book]().withPassThrough(kafkaMessage.offset)
else
WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset)
}
.via( // write to elastic
ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset](
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
settings = baseWriteSettings))
.map { result =>
if (!result.success) throw new Exception("Failed to write message to elastic")
// Commit to kafka
commitToKafka(result.message.passThrough)
}
.runWith(Sink.ignore)
kafkaToEs.futureValue shouldBe Done
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka
case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)
val messagesFromKafka = List(
KafkaMessage(Book("Book 1", shouldSkip = Some(true)), KafkaOffset(0)),
KafkaMessage(Book("Book 2", shouldSkip = Some(true)), KafkaOffset(1)),
KafkaMessage(Book("Book 3", shouldSkip = Some(true)), KafkaOffset(2)))
var committedOffsets = Vector[KafkaOffset]()
def commitToKafka(offset: KafkaOffset): Unit =
committedOffsets = committedOffsets :+ offset
val indexName = "sink6-none"
register(connectionSettings, indexName, "dummy", 10) // need to create index else exception in reading below
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title
// Transform message so that we can write to elastic
if (book.shouldSkip.getOrElse(false))
WriteMessage.createNopMessage[Book]().withPassThrough(kafkaMessage.offset)
else
WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset)
}
.via( // write to elastic
ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset](
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
settings = baseWriteSettings))
.map { result =>
if (!result.success) throw new Exception("Failed to write message to elastic")
// Commit to kafka
commitToKafka(result.message.passThrough)
}
.runWith(Sink.ignore)
kafkaToEs.futureValue shouldBe Done
source// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka
List<KafkaMessage> messagesFromKafka =
Arrays.asList(
new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)),
new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)),
new KafkaMessage(new Book("Book 3"), new KafkaOffset(2)));
final KafkaCommitter kafkaCommitter = new KafkaCommitter();
CompletionStage<Done> kafkaToEs =
Source.from(messagesFromKafka) // Assume we get this from Kafka
.map(
kafkaMessage -> {
Book book = kafkaMessage.book;
String id = book.title;
// Transform message so that we can write to elastic
return WriteMessage.createIndexMessage(id, book)
.withPassThrough(kafkaMessage.offset);
})
.via( // write to elastic
ElasticsearchFlow.createWithPassThrough(
constructElasticsearchParams("sink6", "_doc", ApiVersion.V5),
ElasticsearchWriteSettings.create(connectionSettings)
.withApiVersion(ApiVersion.V5)
.withBufferSize(5),
new ObjectMapper()))
.map(
result -> {
if (!result.success())
throw new RuntimeException("Failed to write message to elastic");
// Commit to kafka
kafkaCommitter.commit(result.message().passThrough());
return NotUsed.getInstance();
})
.runWith(Sink.ignore(), system);
Specifying custom index-name for every document¶
When working with index-patterns using wildcards, you might need to specify a custom index-name for each document:
sourceval customIndexName = "custom-index"
val writeCustomIndex = ElasticsearchSource
.typed[Book](
constructElasticsearchParams("source", "_doc", ApiVersion.V5),
query = """{"match_all": {}}""",
settings = baseSourceSettings)
.map { (message: ReadResult[Book]) =>
WriteMessage
.createIndexMessage(message.id, message.source)
.withIndexName(customIndexName) // Setting the index-name to use for this document
}
.runWith(
ElasticsearchSink.create[Book](
constructElasticsearchParams("this-is-not-the-index-we-are-using", "_doc", ApiVersion.V5),
settings = baseWriteSettings))
sourceWriteMessage<String, NotUsed> msg =
WriteMessage.createIndexMessage(doc).withIndexName("my-index");
Specifying custom metadata for every document¶
In some cases you might want to specify custom metadata per document you are inserting, for example a pipeline
, this can be done like so:
sourceval msg = WriteMessage
.createIndexMessage(doc)
.withCustomMetadata(Map("pipeline" -> "myPipeline"))
sourceMap<String, String> metadata = new HashMap<>();
metadata.put("pipeline", "myPipeline");
WriteMessage<String, NotUsed> msgWithMetadata =
WriteMessage.createIndexMessage(doc).withCustomMetadata(metadata);
More custom searching¶
The easiest way of using Elasticsearch-source, is to just specify the query-param. Sometimes you need more control, like specifying which fields to return and so on. In such cases you can instead use ‘searchParams’ instead:
sourcecase class TestDoc(id: String, a: String, b: Option[String], c: String)
// Search for docs and ask elastic to only return some fields
val readWithSearchParameters = ElasticsearchSource
.typed[TestDoc](
constructElasticsearchParams(indexName, typeName, ApiVersion.V5),
searchParams = Map(
"query" -> """ {"match_all": {}} """,
"_source" -> """ ["id", "a", "c"] """),
baseSourceSettings)
.map { message =>
message.source
}
.runWith(Sink.seq)
source// Search for docs and ask elastic to only return some fields
Map<String, String> searchParams = new HashMap<>();
searchParams.put("query", "{\"match_all\": {}}");
searchParams.put("_source", "[\"id\", \"a\", \"c\"]");
List<TestDoc> result =
ElasticsearchSource.<TestDoc>typed(
constructElasticsearchParams(indexName, typeName, ApiVersion.V5),
searchParams, // <-- Using searchParams
ElasticsearchSourceSettings.create(connectionSettings)
.withApiVersion(ApiVersion.V5),
TestDoc.class,
new ObjectMapper())
.map(
o -> {
return o.source(); // These documents will only have property id, a and c (not
})
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get();
Routing¶
Support for custom routing is available through the routing
key. Add this key and the respective value in ‘searchParams’ map, to route your search directly to the shard that holds the document you are looking for and enjoy improved response times.
Sort¶
Support for sort is available through the sort
key in searchParams
map. If no sort is given, the source will use sort=_doc
to maximize performance, as indicated by elasticsearch documentation.