Skip to content

Commit 61422eb

Browse files
add sns->sqs support
1 parent 5b6d48e commit 61422eb

File tree

2 files changed

+83
-2
lines changed

2 files changed

+83
-2
lines changed

datadog_lambda/dsm.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ def _get_dsm_context_from_lambda(message):
7070
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
7171
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
7272
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
73+
- message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)
74+
- message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)
7375
"""
7476
context_json = None
7577
message_body = message
@@ -83,9 +85,15 @@ def _get_dsm_context_from_lambda(message):
8385
except (ValueError, TypeError, KeyError):
8486
logger.debug("Unable to parse kinesis data for lambda message")
8587
return None
86-
87-
if "Sns" in message:
88+
elif "Sns" in message:
8889
message_body = message["Sns"]
90+
else:
91+
try:
92+
body = message.get("body")
93+
if body:
94+
message_body = json.loads(body)
95+
except (ValueError, TypeError):
96+
logger.debug("Unable to parse lambda message body as JSON, treat as non-json")
8997

9098
message_attributes = message_body.get("MessageAttributes") or message_body.get(
9199
"messageAttributes"
@@ -108,6 +116,11 @@ def _get_dsm_context_from_lambda(message):
108116
elif "stringValue" in datadog_attr:
109117
# SQS -> lambda
110118
context_json = json.loads(datadog_attr["stringValue"])
119+
elif "binaryValue" in datadog_attr:
120+
# SNS -> SQS -> lambda, raw message delivery
121+
context_json = json.loads(
122+
base64.b64decode(datadog_attr["binaryValue"]).decode()
123+
)
111124
else:
112125
logger.debug("DataStreams did not handle lambda message: %r", message)
113126

tests/test_dsm.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,74 @@ def test_kinesis_to_lambda_format(self):
493493
assert result["x-datadog-parent-id"] == "888777666"
494494
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
495495

496+
def test_sns_to_sqs_to_lambda_binary_value_format(self):
497+
"""Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)"""
498+
trace_context = {
499+
"x-datadog-trace-id": "777666555",
500+
"x-datadog-parent-id": "444333222",
501+
"dd-pathway-ctx": "test-pathway-ctx",
502+
}
503+
binary_data = base64.b64encode(
504+
json.dumps(trace_context).encode("utf-8")
505+
).decode("utf-8")
506+
507+
lambda_record = {
508+
"messageId": "test-message-id",
509+
"receiptHandle": "test-receipt-handle",
510+
"body": "Test message body",
511+
"messageAttributes": {
512+
"_datadog": {"binaryValue": binary_data, "dataType": "Binary"}
513+
},
514+
"eventSource": "aws:sqs",
515+
"eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue",
516+
}
517+
518+
result = _get_dsm_context_from_lambda(lambda_record)
519+
520+
assert result is not None
521+
assert result == trace_context
522+
assert result["x-datadog-trace-id"] == "777666555"
523+
assert result["x-datadog-parent-id"] == "444333222"
524+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
525+
526+
def test_sns_to_sqs_to_lambda_body_format(self):
527+
"""Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)"""
528+
trace_context = {
529+
"x-datadog-trace-id": "123987456",
530+
"x-datadog-parent-id": "654321987",
531+
"x-datadog-sampling-priority": "1",
532+
"dd-pathway-ctx": "test-pathway-ctx",
533+
}
534+
535+
message_body = {
536+
"Type": "Notification",
537+
"MessageId": "test-message-id",
538+
"Message": "Test message from SNS",
539+
"MessageAttributes": {
540+
"_datadog": {
541+
"Type": "Binary",
542+
"Value": base64.b64encode(
543+
json.dumps(trace_context).encode("utf-8")
544+
).decode("utf-8"),
545+
}
546+
},
547+
}
548+
549+
lambda_record = {
550+
"messageId": "lambda-message-id",
551+
"body": json.dumps(message_body),
552+
"eventSource": "aws:sqs",
553+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue",
554+
}
555+
556+
result = _get_dsm_context_from_lambda(lambda_record)
557+
558+
assert result is not None
559+
assert result == trace_context
560+
assert result["x-datadog-trace-id"] == "123987456"
561+
assert result["x-datadog-parent-id"] == "654321987"
562+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
563+
496564
def test_no_message_attributes(self):
497565
"""Test message without MessageAttributes returns None."""
498566
message = {

0 commit comments

Comments
 (0)