Apache Solr
Solr (pronounced “solar”) is an open source enterprise search platform, written in Java, from the Apache Lucene project. Its major features include full-text search, hit highlighting, faceted search, real-time indexing, dynamic clustering, database integration, NoSQL features and rich document (e.g., Word, PDF) handling. Providing distributed search and index replication, Solr is designed for scalability and fault tolerance. Solr is widely used for enterprise search and analytics use cases and has an active development community and regular releases.
Apache Pekko Connectors Solr provides Apache Pekko Stream sources and sinks for Apache Solr.
For more information about Solr please visit the Solr documentation.
Project Info: Apache Pekko Connectors Solr | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-solr
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.solr |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-solr" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-solr_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-solr_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Set up a Solr client¶
Sources, Flows and Sinks provided by this connector need a prepared SolrClient
(eg. CloudSolrClient
) to access to Solr.
sourcefinal val zookeeperPort = 9984
final val zookeeperHost = s"127.0.0.1:$zookeeperPort/solr"
implicit val solrClient: CloudSolrClient =
new CloudSolrClient.Builder(Arrays.asList(zookeeperHost), Optional.empty()).build
sourceprivate static final int zookeeperPort = 9984;
private static final String zookeeperHost = "127.0.0.1:" + zookeeperPort + "/solr";
CloudSolrClient solrClient =
new CloudSolrClient.Builder(Arrays.asList(zookeeperHost), Optional.empty()).build();
Reading from Solr¶
Create a Solr TupleStream
(eg. via CloudSolrStream
) and use SolrSource.fromTupleStream
(API
) to create a source.
sourceval factory = new StreamFactory().withCollectionZkHost(collection, zookeeperHost)
val solrClientCache = new SolrClientCache()
val streamContext = new StreamContext()
streamContext.setSolrClientCache(solrClientCache)
val expression =
StreamExpressionParser.parse(s"""search($collection, q=*:*, fl="title,comment", sort="title asc")""")
val stream: TupleStream = new CloudSolrStream(expression, factory)
stream.setStreamContext(streamContext)
val source = SolrSource
.fromTupleStream(stream)
sourceStreamFactory factory = new StreamFactory().withCollectionZkHost(collection, zookeeperHost);
SolrClientCache solrClientCache = new SolrClientCache();
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);
String expressionStr =
String.format("search(%s, q=*:*, fl=\"title,comment\", sort=\"title asc\")", collection);
StreamExpression expression = StreamExpressionParser.parse(expressionStr);
TupleStream stream = new CloudSolrStream(expression, factory);
stream.setStreamContext(streamContext);
Source<Tuple, NotUsed> source = SolrSource.fromTupleStream(stream);
Writing to Solr¶
Apache Pekko Connectors Solr batches updates to Solr by sending all updates of the same operation type at once to Solr. These batches are extracted from the elements within one collection sent to a Solr flow or sink. Updates of different types may be contained in a single collection sent, though. In case streams don’t have natural batches of updates, you may use the groupedWithin
operator to create count or time-based batches.
Apache Pekko Connectors Solr offers three styles for writing to Apache Solr:
- Using
SolrInputDocument
(viaSolrSink.documents
,SolrFlow.documents
andSolrFlow.documentsWithPassThrough
) - Annotated Java Bean classes supported by Solr’s
DocumentObjectBinder
(viaSolrSink.beans
,SolrFlow.beans
andSolrFlow.beansWithPassThrough
) - Typed streams with document binders to translate to
SolrInputDocument
(viaSolrSink.typeds
,SolrFlow.typeds
andSolrFlow.typedsWithPassThrough
)
In all variations the data is wrapped into WriteMessage
s.
Committing and configuration for updates¶
Data sent to Solr is not searchable until it has been committed to the index. These are the major options for handling commits:
- The Solr installation can be configured to use auto-commit.
- Specify commit-within in
SolrUpdateSettings
to trigger commits after every write through Apache Pekko Connectors Solr. - Use explicit committing via the
SolrClient.commit
methods on stream completion as most examples show. Ascommit
is a blocking operation, choose an appropriate execution context (preferably notsystem.dispatcher
).
Configuration of Solr committing is described in UpdateHandlers in SolrConfig.
Available settings¶
Parameter | Default | Description |
---|---|---|
commitWithin | -1 | Max time (in ms) before a commit will happen, -1 for explicit committing |
sourceimport org.apache.pekko.stream.connectors.solr.SolrUpdateSettings
val settings = SolrUpdateSettings()
.withCommitWithin(-1)
sourceimport org.apache.pekko.stream.connectors.solr.SolrUpdateSettings;
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(-1);
Writing SolrInputDocument
s¶
Use SolrSink.document
to stream SolrInputDocument
to Solr.
Defining mappings¶
sourcecase class Book(title: String, comment: String = "", routerOpt: Option[String] = None)
val bookToDoc: Book => SolrInputDocument = { b =>
val doc = new SolrInputDocument
doc.setField("title", b.title)
doc.setField("comment", b.comment)
b.routerOpt.foreach { router =>
doc.setField("router", router)
}
doc
}
val tupleToBook: Tuple => Book = { t =>
val title = t.getString("title")
Book(title, t.getString("comment"))
}
sourcepublic static class Book {
public String title;
public String comment;
public String router;
public Book() {}
public Book(String title) {
this.title = title;
}
public Book(String title, String comment) {
this.title = title;
this.comment = comment;
}
public Book(String title, String comment, String router) {
this.title = title;
this.comment = comment;
this.router = router;
}
}
Function<Book, SolrInputDocument> bookToDoc =
book -> {
SolrInputDocument doc = new SolrInputDocument();
doc.setField("title", book.title);
doc.setField("comment", book.comment);
if (book.router != null) doc.setField("router", book.router);
return doc;
};
Function<Tuple, Book> tupleToBook =
tuple -> {
String title = tuple.getString("title");
return new Book(title, tuple.getString("comment"));
};
Use SolrSink.documents
, SolrFlow.documents
or SolrFlow.documentsWithPassThrough
to stream SolrInputDocument
s to Solr.
A SolrClient
must be provided to SolrSink
.
sourceval copyCollection = SolrSource
.fromTupleStream(stream)
.map { (tuple: Tuple) =>
val book: Book = tupleToBook(tuple)
val doc: SolrInputDocument = bookToDoc(book)
WriteMessage.createUpsertMessage(doc)
}
.groupedWithin(5, 10.millis)
.runWith(
SolrSink.documents(collectionName, SolrUpdateSettings()))
// explicit commit when stream ended
.map { _ =>
solrClient.commit(collectionName)
}(commitExecutionContext)
sourceCompletionStage<UpdateResponse> copyCollection =
SolrSource.fromTupleStream(stream)
.map(
tuple -> {
Book book = tupleToBook.apply(tuple);
SolrInputDocument doc = bookToDoc.apply(book);
return WriteMessage.createUpsertMessage(doc);
})
.groupedWithin(5, Duration.ofMillis(10))
.runWith(
SolrSink.documents(collectionName, SolrUpdateSettings.create(), solrClient), system)
// explicit commit when stream ended
.thenApply(
done -> {
try {
return solrClient.commit(collectionName);
} catch (Exception e) {
throw new IllegalStateException(e);
}
});
Writing Java beans¶
Firstly, create a POJO.
sourceimport org.apache.solr.client.solrj.beans.Field
import scala.annotation.meta.field
case class BookBean(@(Field @field) title: String)
sourceclass BookBean {
@Field("title")
public String title;
public BookBean(String title) {
this.title = title;
}
}
Use SolrSink.beans
, SolrFlow.beans
or SolrFlow.beansWithPassThrough
to stream POJOs to Solr.
sourceval copyCollection = SolrSource
.fromTupleStream(stream)
.map { (tuple: Tuple) =>
val title = tuple.getString("title")
WriteMessage.createUpsertMessage(BookBean(title))
}
.groupedWithin(5, 10.millis)
.runWith(
SolrSink.beans[BookBean](collectionName, SolrUpdateSettings()))
// explicit commit when stream ended
.map { _ =>
solrClient.commit(collectionName)
}(commitExecutionContext)
sourceCompletionStage<UpdateResponse> copyCollection =
SolrSource.fromTupleStream(stream)
.map(
tuple -> {
String title = tuple.getString("title");
return WriteMessage.createUpsertMessage(new BookBean(title));
})
.groupedWithin(5, Duration.ofMillis(10))
.runWith(
SolrSink.beans(
collectionName, SolrUpdateSettings.create(), solrClient, BookBean.class),
system)
// explicit commit when stream ended
.thenApply(
done -> {
try {
return solrClient.commit(collectionName);
} catch (Exception e) {
throw new IllegalStateException(e);
}
});
Writing arbitrary classes via custom binding¶
Use SolrSink.typeds
, SolrFlow.typeds
or SolrFlow.typedsWithPassThrough
to stream messages with custom binding to Solr.
sourceval copyCollection = SolrSource
.fromTupleStream(stream)
.map { (tuple: Tuple) =>
val book: Book = tupleToBook(tuple)
WriteMessage.createUpsertMessage(book)
}
.groupedWithin(5, 10.millis)
.runWith(
SolrSink
.typeds[Book](
collectionName,
SolrUpdateSettings(),
binder = bookToDoc))
// explicit commit when stream ended
.map { _ =>
solrClient.commit(collectionName)
}(commitExecutionContext)
sourceCompletionStage<UpdateResponse> copyCollection =
SolrSource.fromTupleStream(stream)
.map(tuple -> WriteMessage.createUpsertMessage(tupleToBook.apply(tuple)))
.groupedWithin(5, Duration.ofMillis(10))
.runWith(
SolrSink.typeds(
collectionName, SolrUpdateSettings.create(), bookToDoc, solrClient, Book.class),
system)
// explicit commit when stream ended
.thenApply(
done -> {
try {
return solrClient.commit(collectionName);
} catch (Exception e) {
throw new IllegalStateException(e);
}
});
Using a flow with custom binding¶
You can also build flow stages with SolrFlow
.
sourceval copyCollection = SolrSource
.fromTupleStream(stream)
.map { (tuple: Tuple) =>
val book: Book = tupleToBook(tuple)
WriteMessage.createUpsertMessage(book)
}
.groupedWithin(5, 10.millis)
.via(
SolrFlow
.typeds[Book](
collectionName,
SolrUpdateSettings(),
binder = bookToDoc))
.runWith(Sink.seq)
// explicit commit when stream ended
.map { seq =>
solrClient.commit(collectionName)
seq
}(commitExecutionContext)
sourceCompletionStage<UpdateResponse> copyCollection =
SolrSource.fromTupleStream(stream)
.map(tuple -> WriteMessage.createUpsertMessage(tupleToBook.apply(tuple)))
.groupedWithin(5, Duration.ofMillis(10))
.via(
SolrFlow.typeds(
collectionName, SolrUpdateSettings.create(), bookToDoc, solrClient, Book.class))
.runWith(Sink.ignore(), system)
// explicit commit when stream ended
.thenApply(
done -> {
try {
return solrClient.commit(collectionName);
} catch (Exception e) {
throw new IllegalStateException(e);
}
});
Passing data through SolrFlow¶
All flow types (documents
, beans
, typeds
) exist with pass-through support: Use SolrFlow.documentsWithPassThrough
, SolrFlow.beansWithPassThrough
or SolrFlow.typedsWithPassThrough
.
When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Solr. This scenario uses implicit committing via the commit within setting.
source// Note: This code mimics Pekko Connectors Kafka APIs
val copyCollection = kafkaConsumerSource
.map { (kafkaMessage: CommittableMessage) =>
val book = kafkaMessage.book
// Transform message so that we can write to solr
WriteMessage.createUpsertMessage(book).withPassThrough(kafkaMessage.committableOffset)
}
.groupedWithin(5, 10.millis)
.via( // write to Solr
SolrFlow.typedsWithPassThrough[Book, CommittableOffset](
collectionName,
// use implicit commits to Solr
SolrUpdateSettings().withCommitWithin(5),
binder = bookToDoc)) // check status and collect Kafka offsets
.map { messageResults =>
val offsets = messageResults.map { result =>
if (result.status != 0)
throw new Exception("Failed to write message to Solr")
result.passThrough
}
CommittableOffsetBatch(offsets)
}
.mapAsync(1)(_.commitScaladsl())
.runWith(Sink.ignore)
source// Note: This code mimics Pekko Connectors Kafka APIs
CompletionStage<Done> completion =
kafkaConsumerSource // Assume we get this from Kafka
.map(
kafkaMessage -> {
Book book = kafkaMessage.book;
// Transform message so that we can write to elastic
return WriteMessage.createUpsertMessage(book)
.withPassThrough(kafkaMessage.committableOffset);
})
.groupedWithin(5, Duration.ofMillis(10))
.via(
SolrFlow.typedsWithPassThrough(
collectionName,
// use implicit commits to Solr
SolrUpdateSettings.create().withCommitWithin(5),
bookToDoc,
solrClient,
Book.class))
.map(
messageResults ->
messageResults.stream()
.map(
result -> {
if (result.status() != 0) {
throw new RuntimeException("Failed to write message to Solr");
}
return result.passThrough();
})
.collect(Collectors.toList()))
.map(ConsumerMessage::createCommittableOffsetBatch)
.mapAsync(1, CommittableOffsetBatch::commitJavadsl)
.runWith(Sink.ignore(), system);
Excluding messages¶
Failure to deserialize a kafka message is a particular case of conditional message processing. It is also likely that we would have no message to produce to SolR when we encounter messages that fail to deserialize. The solr flow will not let us pass through the corresponding committable offset without doing a request to solr.
Use WriteMessage.createPassThrough
to exclude this message without doing any change on solr inside a flow.
source// Note: This code mimics Pekko Connectors Kafka APIs
val copyCollection = kafkaConsumerSource
.map { (offset: CommittableOffset) =>
// Transform message so that we can write to solr
WriteMessage.createPassThrough(offset).withSource(new SolrInputDocument())
}
.groupedWithin(5, 10.millis)
.via( // write to Solr
SolrFlow.documentsWithPassThrough[CommittableOffset](
collectionName,
// use implicit commits to Solr
SolrUpdateSettings().withCommitWithin(5))) // check status and collect Kafka offsets
.map { messageResults =>
val offsets = messageResults.map { result =>
if (result.status != 0)
throw new Exception("Failed to write message to Solr")
result.passThrough
}
CommittableOffsetBatch(offsets)
}
.mapAsync(1)(_.commitScaladsl())
.runWith(Sink.ignore)
source// Note: This code mimics Pekko Connectors Kafka APIs
CompletionStage<Done> completion =
kafkaConsumerSource // Assume we get this from Kafka
.map(
kafkaMessage -> {
// Transform message so that we can write to elastic
return WriteMessage.createPassThrough(kafkaMessage)
.withSource(new SolrInputDocument());
})
.groupedWithin(5, Duration.ofMillis(10))
.via(
SolrFlow.documentsWithPassThrough(
collectionName,
// use implicit commits to Solr
SolrUpdateSettings.create().withCommitWithin(5),
solrClient))
.map(
messageResults ->
messageResults.stream()
.map(
result -> {
if (result.status() != 0) {
throw new RuntimeException("Failed to write message to Solr");
}
return result.passThrough();
})
.collect(Collectors.toList()))
.map(ConsumerMessage::createCommittableOffsetBatch)
.mapAsync(1, CommittableOffsetBatch::commitJavadsl)
.runWith(Sink.ignore(), system);
Update documents¶
With WriteMessage.createUpdateMessage
documents can be updated atomically. All flow and sink types (documents
, beans
, typeds
) support atomic updates.
sourceval updateCollection = SolrSource
.fromTupleStream(stream2)
.map { (tuple: Tuple) =>
val id = tuple.getFields.get("title").toString
val comment = tuple.getFields.get("comment").toString
WriteMessage.createUpdateMessage[SolrInputDocument](
idField = "title",
idValue = id,
updates = Map(
"comment" ->
Map("set" -> (comment + " It is a good book!!!"))))
}
.groupedWithin(5, 10.millis)
.runWith(
SolrSink.documents(collectionName, SolrUpdateSettings()))
// explicit commit when stream ended
.map { _ =>
solrClient.commit(collectionName)
}(commitExecutionContext)
sourceCompletionStage<UpdateResponse> updateCollection =
SolrSource.fromTupleStream(stream2)
.map(
t -> {
String id = t.getFields().get("title").toString();
String comment = t.getFields().get("comment").toString();
Map<String, Object> m2 = new HashMap<>();
m2.put("set", (comment + " It's is a good book!!!"));
Map<String, Map<String, Object>> updates = new HashMap<>();
updates.put("comment", m2);
return WriteMessage.<SolrInputDocument>createUpdateMessage("title", id, updates);
})
.groupedWithin(5, Duration.ofMillis(10))
.runWith(
SolrSink.documents(collectionName, SolrUpdateSettings.create(), solrClient), system)
// explicit commit when stream ended
.thenApply(
done -> {
try {
return solrClient.commit(collectionName);
} catch (Exception e) {
throw new IllegalStateException(e);
}
});
If a collection contains a router field, use the WriteMessage.createUpdateMessage(...).withRoutingFieldValue(..)
to set the router field.
Delete documents by ids¶
With WriteMessage.createDeleteMessage(id)
documents may be deleted by ID. All flow and sink types (documents
, beans
, typeds
) support deleting.
sourceval deleteDocuments = SolrSource
.fromTupleStream(stream2)
.map { (tuple: Tuple) =>
val id = tuple.getFields.get("title").toString
WriteMessage.createDeleteMessage[SolrInputDocument](id)
}
.groupedWithin(5, 10.millis)
.runWith(
SolrSink.documents(collectionName, SolrUpdateSettings()))
// explicit commit when stream ended
.map { _ =>
solrClient.commit(collectionName)
}(commitExecutionContext)
sourceCompletionStage<UpdateResponse> deleteDocuments =
SolrSource.fromTupleStream(stream2)
.map(
t -> {
String id = tupleToBook.apply(t).title;
return WriteMessage.<SolrInputDocument>createDeleteMessage(id);
})
.groupedWithin(5, Duration.ofMillis(10))
.runWith(
SolrSink.documents(collectionName, SolrUpdateSettings.create(), solrClient), system)
// explicit commit when stream ended
.thenApply(
done -> {
try {
return solrClient.commit(collectionName);
} catch (Exception e) {
throw new IllegalStateException(e);
}
});
Delete documents by query¶
With WriteMessage.createDeleteByQueryMessage(query)
documents matching a query may be deleted. All flow and sink types (documents
, beans
, typeds
) support deleting.
sourceval deleteByQuery = SolrSource
.fromTupleStream(stream2)
.map { (tuple: Tuple) =>
val title = tuple.getFields.get("title").toString
WriteMessage.createDeleteByQueryMessage[SolrInputDocument](
s"""title:"$title" """)
}
.groupedWithin(5, 10.millis)
.runWith(
SolrSink.documents(collectionName, SolrUpdateSettings()))
// explicit commit when stream ended
.map { _ =>
solrClient.commit(collectionName)
}(commitExecutionContext)
sourceCompletionStage<UpdateResponse> deleteByQuery =
SolrSource.fromTupleStream(stream2)
.map(
t -> {
String id = t.getFields().get("title").toString();
return WriteMessage.<SolrInputDocument>createDeleteByQueryMessage(
"title:\"" + id + "\"");
})
.groupedWithin(5, Duration.ofMillis(10))
.runWith(
SolrSink.documents(collectionName, SolrUpdateSettings.create(), solrClient), system)
// explicit commit when stream ended
.thenApply(
done -> {
try {
return solrClient.commit(collectionName);
} catch (Exception e) {
throw new IllegalStateException(e);
}
});