Azure Storage Queue
The Azure Storage Queue connector provides an Apache Pekko Stream Source and Sinks for Azure Storage Queue integration.
Azure Storage Queue is a queuing service similar to Amazon’s SQS. It is designed mostly for long-running and non-time-critical tasks. For more information on Azure Storage Queue see the Azure docs.
| Project Info: Apache Pekko Connectors Azure Storage Queue | |
|---|---|
| Artifact | org.apache.pekko pekko-connectors-azure-storage-queue 1.0.2 | 
| JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 | 
| Scala versions | 2.13.14, 2.12.20, 3.3.3 | 
| JPMS module name | pekko.stream.connectors.azure.storagequeue | 
| License | |
| API documentation | |
| Forums | |
| Release notes | GitHub releases | 
| Issues | Github issues | 
| Sources | https://github.com/apache/pekko-connectors | 
Artifacts
- sbt
- val PekkoVersion = "1.0.3" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-azure-storage-queue" % "1.0.2", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
- <properties> <pekko.version>1.0.3</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-azure-storage-queue_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
- def versions = [ PekkoVersion: "1.0.3", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-azure-storage-queue_${versions.ScalaBinary}:1.0.2" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Init Azure Storage API
import com.microsoft.azure.storage._
import com.microsoft.azure.storage.queue._
val storageConnectionString = "DefaultEndpointsProtocol=http;AccountName=<YourAccountName>;AccountKey=<YourKey>"
val queueFactory = () => { // Since azure storage JDK is not guaranteed to be thread-safe
  val storageAccount = CloudStorageAccount.parse(storageConnectionString)
  val queueClient = storageAccount.createCloudQueueClient
  queueClient.getQueueReference("myQueue")
}
For more details, see Microsoft Azure Storage Docs.
Queuing a message
import one.aleph.akkzure.queue._
import one.aleph.akkzure.queue.scaladsl._
// Create an example message
val message = new CloudQueueMessage("Hello Azure")
Source.single(message).runWith(AzureQueueSink(queueFactory))
Processing and deleting messages
AzureQueueSource(queueFactory).take(10)
.map({ msg: CloudQueueMessage =>
  println(msg.getMessageContentAsString) // Print the messages content
  msg                                    // Return message to the flow for deletion
}).runWith(AzureQueueDeleteSink(queueFactory))
1.0.2