Azure Storage Queue
The Azure Storage Queue connector provides an Apache Pekko Stream Source and Sinks for Azure Storage Queue integration.
Azure Storage Queue is a queuing service similar to Amazon’s SQS. It is designed mostly for long-running and non-time-critical tasks. For more information on Azure Storage Queue see the Azure docs.
Project Info: Apache Pekko Connectors Azure Storage Queue | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-azure-storage-queue
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.azure.storagequeue |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-azure-storage-queue" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-azure-storage-queue_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-azure-storage-queue_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Init Azure Storage API¶
import com.microsoft.azure.storage._
import com.microsoft.azure.storage.queue._
val storageConnectionString = "DefaultEndpointsProtocol=http;AccountName=<YourAccountName>;AccountKey=<YourKey>"
val queueFactory = () => { // Since azure storage JDK is not guaranteed to be thread-safe
val storageAccount = CloudStorageAccount.parse(storageConnectionString)
val queueClient = storageAccount.createCloudQueueClient
queueClient.getQueueReference("myQueue")
}
For more details, see Microsoft Azure Storage Docs.
Queuing a message¶
import one.aleph.akkzure.queue._
import one.aleph.akkzure.queue.scaladsl._
// Create an example message
val message = new CloudQueueMessage("Hello Azure")
Source.single(message).runWith(AzureQueueSink(queueFactory))
Processing and deleting messages¶
AzureQueueSource(queueFactory).take(10)
.map({ msg: CloudQueueMessage =>
println(msg.getMessageContentAsString) // Print the messages content
msg // Return message to the flow for deletion
}).runWith(AzureQueueDeleteSink(queueFactory))
1.1.0