Skip to content

Commit d7ce695

Browse files
move get dsm context logic into lambda layer code
1 parent 6beb65d commit d7ce695

File tree

2 files changed

+332
-14
lines changed

2 files changed

+332
-14
lines changed

datadog_lambda/dsm.py

Lines changed: 97 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,33 @@
1+
import json
2+
import base64
3+
4+
from ddtrace.internal.logger import get_logger
15
from datadog_lambda import logger
26
from datadog_lambda.trigger import EventTypes
37

8+
log = get_logger(__name__)
49

5-
def set_dsm_context(event, event_source):
610

11+
def set_dsm_context(event, event_source):
712
if event_source.equals(EventTypes.SQS):
813
_dsm_set_sqs_context(event)
914

1015

11-
def _dsm_set_sqs_context(event):
16+
def _dsm_set_context_helper(
17+
event, service_type, arn_extractor, payload_size_calculator
18+
):
19+
"""
20+
Common helper function for setting DSM context.
21+
22+
Args:
23+
event: The Lambda event containing records
24+
service_type: The service type string (example: sqs', 'sns')
25+
arn_extractor: Function to extract the ARN from the record
26+
payload_size_calculator: Function to calculate payload size
27+
"""
1228
from datadog_lambda.wrapper import format_err_with_traceback
1329
from ddtrace.internal.datastreams import data_streams_processor
1430
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
15-
from ddtrace.internal.datastreams.botocore import (
16-
get_datastreams_context,
17-
calculate_sqs_payload_size,
18-
)
1931

2032
records = event.get("Records")
2133
if records is None:
@@ -24,15 +36,88 @@ def _dsm_set_sqs_context(event):
2436

2537
for record in records:
2638
try:
27-
queue_arn = record.get("eventSourceARN", "")
28-
29-
contextjson = get_datastreams_context(record)
30-
payload_size = calculate_sqs_payload_size(record)
39+
arn = arn_extractor(record)
40+
context_json = _get_dsm_context_from_lambda(record)
41+
payload_size = payload_size_calculator(record, context_json)
3142

32-
ctx = DsmPathwayCodec.decode(contextjson, processor)
43+
ctx = DsmPathwayCodec.decode(context_json, processor)
3344
ctx.set_checkpoint(
34-
["direction:in", f"topic:{queue_arn}", "type:sqs"],
45+
["direction:in", f"topic:{arn}", f"type:{service_type}"],
3546
payload_size=payload_size,
3647
)
3748
except Exception as e:
3849
logger.error(format_err_with_traceback(e))
50+
51+
52+
def _dsm_set_sqs_context(event):
53+
from ddtrace.internal.datastreams.botocore import calculate_sqs_payload_size
54+
55+
def sqs_payload_calculator(record, context_json):
56+
return calculate_sqs_payload_size(record)
57+
58+
def sqs_arn_extractor(record):
59+
return record.get("eventSourceARN", "")
60+
61+
_dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator)
62+
63+
64+
def _get_dsm_context_from_lambda(message):
65+
"""
66+
Lambda-specific message formats:
67+
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
68+
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
69+
- message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)
70+
- message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)
71+
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
72+
"""
73+
context_json = None
74+
message_body = message
75+
76+
if "kinesis" in message:
77+
try:
78+
kinesis_data = json.loads(
79+
base64.b64decode(message["kinesis"]["data"]).decode()
80+
)
81+
return kinesis_data.get("_datadog")
82+
except (ValueError, TypeError, KeyError):
83+
log.debug("Unable to parse kinesis data for lambda message")
84+
return None
85+
elif "Sns" in message:
86+
message_body = message["Sns"]
87+
else:
88+
try:
89+
body = message.get("body")
90+
if body:
91+
message_body = json.loads(body)
92+
except (ValueError, TypeError):
93+
log.debug("Unable to parse lambda message body as JSON, treat as non-json")
94+
95+
message_attributes = message_body.get("MessageAttributes") or message_body.get(
96+
"messageAttributes"
97+
)
98+
if not message_attributes:
99+
log.debug("DataStreams skipped lambda message: %r", message)
100+
return None
101+
102+
if "_datadog" not in message_attributes:
103+
log.debug("DataStreams skipped lambda message: %r", message)
104+
return None
105+
106+
datadog_attr = message_attributes["_datadog"]
107+
108+
if message_body.get("Type") == "Notification":
109+
# SNS -> lambda notification
110+
if datadog_attr.get("Type") == "Binary":
111+
context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode())
112+
elif "stringValue" in datadog_attr:
113+
# SQS -> lambda
114+
context_json = json.loads(datadog_attr["stringValue"])
115+
elif "binaryValue" in datadog_attr:
116+
# SNS -> SQS -> lambda, raw message delivery
117+
context_json = json.loads(
118+
base64.b64decode(datadog_attr["binaryValue"]).decode()
119+
)
120+
else:
121+
log.debug("DataStreams did not handle lambda message: %r", message)
122+
123+
return context_json

tests/test_dsm.py

Lines changed: 235 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
import unittest
2+
import json
3+
import base64
24
from unittest.mock import patch, MagicMock
35

4-
from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context
6+
from datadog_lambda.dsm import (
7+
set_dsm_context,
8+
_dsm_set_sqs_context,
9+
_get_dsm_context_from_lambda,
10+
)
511
from datadog_lambda.trigger import EventTypes, _EventSource
612

713

8-
class TestDsmSQSContext(unittest.TestCase):
14+
class TestDSMContext(unittest.TestCase):
915
def setUp(self):
1016
patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context")
1117
self.mock_dsm_set_sqs_context = patcher.start()
@@ -110,3 +116,230 @@ def test_sqs_multiple_records_process_each_record(self):
110116
self.assertIn(f"topic:{expected_arns[i]}", tags)
111117
self.assertIn("type:sqs", tags)
112118
self.assertEqual(kwargs["payload_size"], 100)
119+
120+
121+
class TestGetDSMContext(unittest.TestCase):
122+
def test_sqs_to_lambda_string_value_format(self):
123+
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)"""
124+
trace_context = {
125+
"x-datadog-trace-id": "789123456",
126+
"x-datadog-parent-id": "321987654",
127+
"dd-pathway-ctx": "test-pathway-ctx",
128+
}
129+
130+
lambda_record = {
131+
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
132+
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
133+
"body": "Test message.",
134+
"attributes": {
135+
"ApproximateReceiveCount": "1",
136+
"SentTimestamp": "1545082649183",
137+
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
138+
"ApproximateFirstReceiveTimestamp": "1545082649185",
139+
},
140+
"messageAttributes": {
141+
"_datadog": {
142+
"stringValue": json.dumps(trace_context),
143+
"stringListValues": [],
144+
"binaryListValues": [],
145+
"dataType": "String",
146+
},
147+
"myAttribute": {
148+
"stringValue": "myValue",
149+
"stringListValues": [],
150+
"binaryListValues": [],
151+
"dataType": "String",
152+
},
153+
},
154+
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
155+
"eventSource": "aws:sqs",
156+
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
157+
"awsRegion": "us-east-2",
158+
}
159+
160+
result = _get_dsm_context_from_lambda(lambda_record)
161+
162+
assert result is not None
163+
assert result == trace_context
164+
assert result["x-datadog-trace-id"] == "789123456"
165+
assert result["x-datadog-parent-id"] == "321987654"
166+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
167+
168+
def test_sns_to_lambda_format(self):
169+
"""Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)"""
170+
trace_context = {
171+
"x-datadog-trace-id": "111111111",
172+
"x-datadog-parent-id": "222222222",
173+
"dd-pathway-ctx": "test-pathway-ctx",
174+
}
175+
binary_data = base64.b64encode(
176+
json.dumps(trace_context).encode("utf-8")
177+
).decode("utf-8")
178+
179+
sns_lambda_record = {
180+
"EventSource": "aws:sns",
181+
"EventSubscriptionArn": (
182+
"arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012"
183+
),
184+
"Sns": {
185+
"Type": "Notification",
186+
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
187+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic",
188+
"Subject": "Test Subject",
189+
"Message": "Hello from SNS!",
190+
"Timestamp": "2023-01-01T12:00:00.000Z",
191+
"MessageAttributes": {
192+
"_datadog": {"Type": "Binary", "Value": binary_data}
193+
},
194+
},
195+
}
196+
197+
result = _get_dsm_context_from_lambda(sns_lambda_record)
198+
199+
assert result is not None
200+
assert result == trace_context
201+
assert result["x-datadog-trace-id"] == "111111111"
202+
assert result["x-datadog-parent-id"] == "222222222"
203+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
204+
205+
def test_sns_to_sqs_to_lambda_binary_value_format(self):
206+
"""Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)"""
207+
trace_context = {
208+
"x-datadog-trace-id": "777666555",
209+
"x-datadog-parent-id": "444333222",
210+
"dd-pathway-ctx": "test-pathway-ctx",
211+
}
212+
binary_data = base64.b64encode(
213+
json.dumps(trace_context).encode("utf-8")
214+
).decode("utf-8")
215+
216+
lambda_record = {
217+
"messageId": "test-message-id",
218+
"receiptHandle": "test-receipt-handle",
219+
"body": "Test message body",
220+
"messageAttributes": {
221+
"_datadog": {"binaryValue": binary_data, "dataType": "Binary"}
222+
},
223+
"eventSource": "aws:sqs",
224+
"eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue",
225+
}
226+
227+
result = _get_dsm_context_from_lambda(lambda_record)
228+
229+
assert result is not None
230+
assert result == trace_context
231+
assert result["x-datadog-trace-id"] == "777666555"
232+
assert result["x-datadog-parent-id"] == "444333222"
233+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
234+
235+
def test_sns_to_sqs_to_lambda_body_format(self):
236+
"""Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)"""
237+
trace_context = {
238+
"x-datadog-trace-id": "123987456",
239+
"x-datadog-parent-id": "654321987",
240+
"x-datadog-sampling-priority": "1",
241+
"dd-pathway-ctx": "test-pathway-ctx",
242+
}
243+
244+
message_body = {
245+
"Type": "Notification",
246+
"MessageId": "test-message-id",
247+
"Message": "Test message from SNS",
248+
"MessageAttributes": {
249+
"_datadog": {
250+
"Type": "Binary",
251+
"Value": base64.b64encode(
252+
json.dumps(trace_context).encode("utf-8")
253+
).decode("utf-8"),
254+
}
255+
},
256+
}
257+
258+
lambda_record = {
259+
"messageId": "lambda-message-id",
260+
"body": json.dumps(message_body),
261+
"eventSource": "aws:sqs",
262+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue",
263+
}
264+
265+
result = _get_dsm_context_from_lambda(lambda_record)
266+
267+
assert result is not None
268+
assert result == trace_context
269+
assert result["x-datadog-trace-id"] == "123987456"
270+
assert result["x-datadog-parent-id"] == "654321987"
271+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
272+
273+
def test_kinesis_to_lambda_format(self):
274+
"""Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)"""
275+
trace_context = {
276+
"x-datadog-trace-id": "555444333",
277+
"x-datadog-parent-id": "888777666",
278+
"dd-pathway-ctx": "test-pathway-ctx",
279+
}
280+
281+
# Create the kinesis data payload
282+
kinesis_payload = {
283+
"_datadog": trace_context,
284+
"actualData": "some business data",
285+
}
286+
encoded_kinesis_data = base64.b64encode(
287+
json.dumps(kinesis_payload).encode("utf-8")
288+
).decode("utf-8")
289+
290+
kinesis_lambda_record = {
291+
"eventSource": "aws:kinesis",
292+
"eventSourceARN": (
293+
"arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
294+
),
295+
"kinesis": {
296+
"data": encoded_kinesis_data,
297+
"partitionKey": "partition-key-1",
298+
"sequenceNumber": (
299+
"49590338271490256608559692538361571095921575989136588898"
300+
),
301+
},
302+
}
303+
304+
result = _get_dsm_context_from_lambda(kinesis_lambda_record)
305+
306+
assert result is not None
307+
assert result == trace_context
308+
assert result["x-datadog-trace-id"] == "555444333"
309+
assert result["x-datadog-parent-id"] == "888777666"
310+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
311+
312+
def test_no_message_attributes(self):
313+
"""Test message without MessageAttributes returns None."""
314+
message = {
315+
"messageId": "test-message-id",
316+
"body": "Test message without attributes",
317+
}
318+
319+
result = _get_dsm_context_from_lambda(message)
320+
321+
assert result is None
322+
323+
def test_no_datadog_attribute(self):
324+
"""Test message with MessageAttributes but no _datadog attribute returns None."""
325+
message = {
326+
"messageId": "test-message-id",
327+
"body": "Test message",
328+
"messageAttributes": {
329+
"customAttribute": {"stringValue": "custom-value", "dataType": "String"}
330+
},
331+
}
332+
333+
result = _get_dsm_context_from_lambda(message)
334+
assert result is None
335+
336+
def test_empty_datadog_attribute(self):
337+
"""Test message with empty _datadog attribute returns None."""
338+
message = {
339+
"messageId": "test-message-id",
340+
"messageAttributes": {"_datadog": {}},
341+
}
342+
343+
result = _get_dsm_context_from_lambda(message)
344+
345+
assert result is None

0 commit comments

Comments
 (0)