Skip to content

Commit 7be23f8

Browse files
Merge pull request #1480 from bradovitt/elasticmq
Elasticmq
2 parents edd8f27 + cce3657 commit 7be23f8

File tree

5 files changed

+350
-0
lines changed

5 files changed

+350
-0
lines changed

build.sbt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,12 @@ lazy val scala_libraries = (project in file("scala-libraries"))
439439
"nl.gn0s1s" %% "elastic4s-core" % elastic4sVersion,
440440
logback
441441
),
442+
libraryDependencies ++= Seq(
443+
"org.elasticmq" %% "elasticmq-core" % "1.6.5",
444+
"org.elasticmq" %% "elasticmq-server" % "1.6.5",
445+
"org.elasticmq" %% "elasticmq-rest-sqs" % "1.6.5",
446+
"software.amazon.awssdk" % "sqs" % "2.26.24"
447+
),
442448
Defaults.itSettings
443449
)
444450

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# What is the outside visible address of this ElasticMQ node
2+
# Used to create the queue URL (may be different from bind address!)
3+
node-address {
4+
protocol = http
5+
host = localhost
6+
port = 9324
7+
context-path = ""
8+
}
9+
10+
rest-sqs {
11+
enabled = true
12+
bind-port = 9324
13+
bind-hostname = "0.0.0.0"
14+
// Possible values: relaxed, strict
15+
sqs-limits = strict
16+
}
17+
18+
rest-stats {
19+
enabled = true
20+
bind-port = 9325
21+
bind-hostname = "0.0.0.0"
22+
}
23+
24+
// Should the node-address be generated from the bind port/hostname
25+
// Set this to true e.g. when assigning port automatically by using port 0.
26+
generate-node-address = false
27+
28+
queues {
29+
queue1 {
30+
defaultVisibilityTimeout = 10 seconds
31+
delay = 0 seconds
32+
receiveMessageWait = 0 seconds
33+
deadLettersQueue {
34+
name = "queue1-dead-letters"
35+
maxReceiveCount = 3 // from 1 to 1000
36+
}
37+
fifo = false
38+
contentBasedDeduplication = false
39+
tags {
40+
tag1 = "tagged1"
41+
tag2 = "tagged2"
42+
}
43+
}
44+
queue1-dead-letters { }
45+
}
46+
47+
elastic-mq {
48+
region = "elasticMQ"
49+
endPoint = "http://localhost:9325"
50+
access-key-id = "your aws access key id"
51+
secret-access-key = "secret-access-token"
52+
}
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
package com.baeldung.elasticmq
2+
3+
import software.amazon.awssdk.auth.credentials.{
4+
AwsBasicCredentials,
5+
AwsCredentialsProviderChain,
6+
StaticCredentialsProvider
7+
}
8+
import software.amazon.awssdk.regions.Region
9+
import software.amazon.awssdk.services.sqs.model.*
10+
import software.amazon.awssdk.services.sqs.{
11+
SqsAsyncClient,
12+
SqsAsyncClientBuilder
13+
}
14+
15+
import java.net.URI
16+
import java.util.UUID
17+
18+
import scala.concurrent.{ExecutionContext, Future}
19+
import scala.jdk.FutureConverters.*
20+
import scala.jdk.CollectionConverters.*
21+
22+
class SQSAsyncClient(
23+
queueURL: String,
24+
region: String,
25+
endpoint: String
26+
)(implicit executionContext: ExecutionContext):
27+
28+
private val sqsAsyncClient: SqsAsyncClient =
29+
SqsAsyncClient
30+
.builder()
31+
.region(Region.of(region))
32+
.credentialsProvider(
33+
AwsCredentialsProviderChain
34+
.builder()
35+
.credentialsProviders(
36+
StaticCredentialsProvider.create(
37+
AwsBasicCredentials.create(
38+
ElasticMQConfig.ELASTIC_MQ_ACCESS_KEY,
39+
ElasticMQConfig.ELASTIC_MQ_SECRET_ACCESS_KEY
40+
)
41+
)
42+
)
43+
.build()
44+
)
45+
.endpointOverride(URI.create(endpoint))
46+
.build()
47+
48+
def createStandardQueue(queueName: String): Future[CreateQueueResponse] =
49+
val request = CreateQueueRequest.builder.queueName(queueName).build
50+
51+
sqsAsyncClient.createQueue(request).asScala
52+
53+
final lazy val createFIFOQueueAttributes = Map(
54+
(QueueAttributeName.FIFO_QUEUE, "true")
55+
).asJava
56+
57+
def createFIFOQueue(queueName: String): Future[CreateQueueResponse] =
58+
val createQueueRequest = CreateQueueRequest.builder
59+
.queueName(queueName)
60+
.attributes(createFIFOQueueAttributes)
61+
.build
62+
63+
sqsAsyncClient.createQueue(createQueueRequest).asScala
64+
65+
def deleteQueue(): Future[DeleteQueueResponse] =
66+
val request = DeleteQueueRequest.builder().queueUrl(queueURL).build()
67+
68+
sqsAsyncClient.deleteQueue(request).asScala
69+
70+
def sendMessage(message: String): Future[SendMessageResponse] =
71+
val request = SendMessageRequest
72+
.builder()
73+
.messageBody(message)
74+
.queueUrl(queueURL)
75+
.build()
76+
77+
sqsAsyncClient.sendMessage(request).asScala
78+
79+
def sendMessagesInBatch(
80+
messages: List[String]
81+
): Future[SendMessageBatchResponse] =
82+
val batchRequestEntry = messages
83+
.map(
84+
SendMessageBatchRequestEntry
85+
.builder()
86+
.messageBody(_)
87+
.id(UUID.randomUUID().toString)
88+
.build()
89+
)
90+
.asJava
91+
val sendMessageBatchRequest = SendMessageBatchRequest
92+
.builder()
93+
.queueUrl(queueURL)
94+
.entries(batchRequestEntry)
95+
.build()
96+
97+
sqsAsyncClient.sendMessageBatch(sendMessageBatchRequest).asScala
98+
99+
// maxNumberOfMessages must be less than 10.
100+
def receiveMessages(
101+
maxNumberOfMessages: Int
102+
): Future[ReceiveMessageResponse] =
103+
val receiveMessageRequest =
104+
ReceiveMessageRequest
105+
.builder()
106+
.maxNumberOfMessages(maxNumberOfMessages)
107+
.queueUrl(queueURL)
108+
.waitTimeSeconds(10)
109+
.build()
110+
111+
sqsAsyncClient.receiveMessage(receiveMessageRequest).asScala
112+
113+
def deleteMessage(receiptHandle: String): Future[DeleteMessageResponse] =
114+
val deleteMessageRequest = DeleteMessageRequest
115+
.builder()
116+
.queueUrl(queueURL)
117+
.receiptHandle(receiptHandle)
118+
.build()
119+
120+
sqsAsyncClient.deleteMessage(deleteMessageRequest).asScala
121+
122+
def deleteMessageInBatch(
123+
messages: List[Message]
124+
): Future[DeleteMessageBatchResponse] =
125+
val listDeleteMessageBatchRequestEntry = messages
126+
.map(message =>
127+
DeleteMessageBatchRequestEntry
128+
.builder()
129+
.receiptHandle(message.receiptHandle())
130+
.build()
131+
)
132+
.asJava
133+
val deleteMessageBatchRequest = DeleteMessageBatchRequest
134+
.builder()
135+
.queueUrl(queueURL)
136+
.entries(listDeleteMessageBatchRequestEntry)
137+
.build()
138+
139+
sqsAsyncClient.deleteMessageBatch(deleteMessageBatchRequest).asScala
140+
141+
def getQueueURL(queueName: String): Future[GetQueueUrlResponse] =
142+
val getQueueUrlRequest =
143+
GetQueueUrlRequest.builder().queueName(queueName).build()
144+
145+
sqsAsyncClient.getQueueUrl(getQueueUrlRequest).asScala
146+
147+
def listQueues(): Future[ListQueuesResponse] =
148+
sqsAsyncClient.listQueues().asScala
149+
150+
def listQueuesStartingFromPrefix(prefix: String): Future[ListQueuesResponse] =
151+
val listQueueStartingFromPrefixRequest =
152+
ListQueuesRequest.builder().queueNamePrefix(prefix).build()
153+
154+
sqsAsyncClient.listQueues(listQueueStartingFromPrefixRequest).asScala
155+
156+
def changeMessageVisibility(
157+
message: Message
158+
): Future[ChangeMessageVisibilityResponse] =
159+
val changeMessageVisibilityRequest = ChangeMessageVisibilityRequest
160+
.builder()
161+
.queueUrl(queueURL)
162+
.receiptHandle(message.receiptHandle())
163+
.visibilityTimeout(30)
164+
.build()
165+
166+
sqsAsyncClient
167+
.changeMessageVisibility(changeMessageVisibilityRequest)
168+
.asScala
169+
170+
def changeMessageVisibilityOfBatch(
171+
messages: List[Message]
172+
): Future[ChangeMessageVisibilityBatchResponse] =
173+
val changeMessageVisibilityBatchRequestEntry = messages
174+
.map(message =>
175+
ChangeMessageVisibilityBatchRequestEntry
176+
.builder()
177+
.receiptHandle(message.receiptHandle())
178+
.visibilityTimeout(30)
179+
.build()
180+
)
181+
.asJava
182+
val changeMessageVisibilityRequest = ChangeMessageVisibilityBatchRequest
183+
.builder()
184+
.queueUrl(queueURL)
185+
.entries(changeMessageVisibilityBatchRequestEntry)
186+
.build()
187+
188+
sqsAsyncClient
189+
.changeMessageVisibilityBatch(changeMessageVisibilityRequest)
190+
.asScala
191+
192+
final lazy val purgeQueueRequest =
193+
PurgeQueueRequest.builder().queueUrl(queueURL).build()
194+
def purgeQueue(): Future[PurgeQueueResponse] =
195+
sqsAsyncClient.purgeQueue(purgeQueueRequest).asScala
196+
197+
def setQueueAttributes(
198+
attributes: Map[QueueAttributeName, String]
199+
): Future[SetQueueAttributesResponse] =
200+
val setQueueAttributesRequest = SetQueueAttributesRequest
201+
.builder()
202+
.queueUrl(queueURL)
203+
.attributes(attributes.asJava)
204+
.build()
205+
206+
sqsAsyncClient.setQueueAttributes(setQueueAttributesRequest).asScala
207+
208+
def tagQueue(tags: Map[String, String]): Future[TagQueueResponse] =
209+
val tagQueueRequest =
210+
TagQueueRequest.builder().queueUrl(queueURL).tags(tags.asJava).build()
211+
212+
sqsAsyncClient.tagQueue(tagQueueRequest).asScala
213+
214+
def untagQueue(listOfTagsToRemove: List[String]): Future[UntagQueueResponse] =
215+
val untagQueueRequest = UntagQueueRequest
216+
.builder()
217+
.queueUrl(queueURL)
218+
.tagKeys(listOfTagsToRemove.asJava)
219+
.build()
220+
221+
sqsAsyncClient.untagQueue(untagQueueRequest).asScala
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.baeldung.elasticmq
2+
3+
import com.typesafe.config.{Config, ConfigFactory}
4+
5+
object ElasticMQConfig:
6+
7+
private final val config: Config = ConfigFactory.load("elasticmq.conf")
8+
9+
final val ELASTIC_MQ_ACCESS_KEY: String =
10+
config.getString("elastic-mq.access-key-id")
11+
final val ELASTIC_MQ_SECRET_ACCESS_KEY: String =
12+
config.getString("elastic-mq.secret-access-key")
13+
14+
final val ELASTIC_MQ_REGION = config.getString("elastic-mq.region")
15+
final val ELASTIC_MQ_ENDPOINT = config.getString("elastic-mq.endPoint")
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.baeldung.elasticmq
2+
3+
import org.elasticmq.rest.sqs.SQSRestServerBuilder
4+
import org.elasticmq.server.ElasticMQServer
5+
import org.elasticmq.server.config.ElasticMQServerConfig
6+
7+
import com.typesafe.config.ConfigFactory
8+
9+
import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem}
10+
import org.apache.pekko.event.LoggingAdapter
11+
12+
import scala.util.{Failure, Success}
13+
14+
object ElasticMQService extends App:
15+
16+
implicit val actorSystem: ActorSystem = ActorSystem.create()
17+
implicit val executionContext: concurrent.ExecutionContextExecutor =
18+
actorSystem.dispatcher
19+
implicit val m_logger: LoggingAdapter = actorSystem.log
20+
21+
final val ElasticMQ_URL = s"http://localhost:9324/000000000000/"
22+
23+
val endpoint = "http://localhost:9325"
24+
val region = "elasticmq"
25+
26+
val server = SQSRestServerBuilder
27+
.withPort(9325)
28+
.withInterface("localhost")
29+
.start()
30+
31+
val elasticMQClient = new SQSAsyncClient(ElasticMQ_URL, region, endpoint)
32+
33+
val uselessWorkflow =
34+
for
35+
_ <- elasticMQClient.createStandardQueue("standardQueueForTest")
36+
testQueueClient = new SQSAsyncClient(
37+
ElasticMQ_URL + "standardQueueForTest",
38+
region,
39+
endpoint
40+
)
41+
_ <- testQueueClient.createFIFOQueue("fifoQueue.fifo")
42+
_ <- testQueueClient.listQueues()
43+
_ <- testQueueClient.sendMessage("Hi")
44+
_ <- testQueueClient.sendMessagesInBatch(
45+
List("Follow", "Baeldung", "on", "LinkedIn")
46+
)
47+
_ <- testQueueClient.receiveMessages(5)
48+
_ <- testQueueClient.purgeQueue()
49+
yield ()
50+
51+
uselessWorkflow
52+
.andThen(_ => server.stopAndWait())
53+
.onComplete:
54+
case Success(_) => m_logger.info("queue created")
55+
case Failure(exception) =>
56+
m_logger.error(exception, "exception in uselessWorkflow")

0 commit comments

Comments
 (0)