Skip to content

Commit 623f49e

Browse files
simplify PR
1 parent d7ce695 commit 623f49e

File tree

2 files changed

+2
-184
lines changed

2 files changed

+2
-184
lines changed

datadog_lambda/dsm.py

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import base64
32

43
from ddtrace.internal.logger import get_logger
54
from datadog_lambda import logger
@@ -65,36 +64,9 @@ def _get_dsm_context_from_lambda(message):
6564
"""
6665
Lambda-specific message formats:
6766
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
68-
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
69-
- message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)
70-
- message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)
71-
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
7267
"""
7368
context_json = None
74-
message_body = message
75-
76-
if "kinesis" in message:
77-
try:
78-
kinesis_data = json.loads(
79-
base64.b64decode(message["kinesis"]["data"]).decode()
80-
)
81-
return kinesis_data.get("_datadog")
82-
except (ValueError, TypeError, KeyError):
83-
log.debug("Unable to parse kinesis data for lambda message")
84-
return None
85-
elif "Sns" in message:
86-
message_body = message["Sns"]
87-
else:
88-
try:
89-
body = message.get("body")
90-
if body:
91-
message_body = json.loads(body)
92-
except (ValueError, TypeError):
93-
log.debug("Unable to parse lambda message body as JSON, treat as non-json")
94-
95-
message_attributes = message_body.get("MessageAttributes") or message_body.get(
96-
"messageAttributes"
97-
)
69+
message_attributes = message.get("messageAttributes")
9870
if not message_attributes:
9971
log.debug("DataStreams skipped lambda message: %r", message)
10072
return None
@@ -105,18 +77,9 @@ def _get_dsm_context_from_lambda(message):
10577

10678
datadog_attr = message_attributes["_datadog"]
10779

108-
if message_body.get("Type") == "Notification":
109-
# SNS -> lambda notification
110-
if datadog_attr.get("Type") == "Binary":
111-
context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode())
112-
elif "stringValue" in datadog_attr:
80+
if "stringValue" in datadog_attr:
11381
# SQS -> lambda
11482
context_json = json.loads(datadog_attr["stringValue"])
115-
elif "binaryValue" in datadog_attr:
116-
# SNS -> SQS -> lambda, raw message delivery
117-
context_json = json.loads(
118-
base64.b64decode(datadog_attr["binaryValue"]).decode()
119-
)
12083
else:
12184
log.debug("DataStreams did not handle lambda message: %r", message)
12285

tests/test_dsm.py

Lines changed: 0 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import unittest
22
import json
3-
import base64
43
from unittest.mock import patch, MagicMock
54

65
from datadog_lambda.dsm import (
@@ -165,150 +164,6 @@ def test_sqs_to_lambda_string_value_format(self):
165164
assert result["x-datadog-parent-id"] == "321987654"
166165
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
167166

168-
def test_sns_to_lambda_format(self):
169-
"""Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)"""
170-
trace_context = {
171-
"x-datadog-trace-id": "111111111",
172-
"x-datadog-parent-id": "222222222",
173-
"dd-pathway-ctx": "test-pathway-ctx",
174-
}
175-
binary_data = base64.b64encode(
176-
json.dumps(trace_context).encode("utf-8")
177-
).decode("utf-8")
178-
179-
sns_lambda_record = {
180-
"EventSource": "aws:sns",
181-
"EventSubscriptionArn": (
182-
"arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012"
183-
),
184-
"Sns": {
185-
"Type": "Notification",
186-
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
187-
"TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic",
188-
"Subject": "Test Subject",
189-
"Message": "Hello from SNS!",
190-
"Timestamp": "2023-01-01T12:00:00.000Z",
191-
"MessageAttributes": {
192-
"_datadog": {"Type": "Binary", "Value": binary_data}
193-
},
194-
},
195-
}
196-
197-
result = _get_dsm_context_from_lambda(sns_lambda_record)
198-
199-
assert result is not None
200-
assert result == trace_context
201-
assert result["x-datadog-trace-id"] == "111111111"
202-
assert result["x-datadog-parent-id"] == "222222222"
203-
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
204-
205-
def test_sns_to_sqs_to_lambda_binary_value_format(self):
206-
"""Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)"""
207-
trace_context = {
208-
"x-datadog-trace-id": "777666555",
209-
"x-datadog-parent-id": "444333222",
210-
"dd-pathway-ctx": "test-pathway-ctx",
211-
}
212-
binary_data = base64.b64encode(
213-
json.dumps(trace_context).encode("utf-8")
214-
).decode("utf-8")
215-
216-
lambda_record = {
217-
"messageId": "test-message-id",
218-
"receiptHandle": "test-receipt-handle",
219-
"body": "Test message body",
220-
"messageAttributes": {
221-
"_datadog": {"binaryValue": binary_data, "dataType": "Binary"}
222-
},
223-
"eventSource": "aws:sqs",
224-
"eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue",
225-
}
226-
227-
result = _get_dsm_context_from_lambda(lambda_record)
228-
229-
assert result is not None
230-
assert result == trace_context
231-
assert result["x-datadog-trace-id"] == "777666555"
232-
assert result["x-datadog-parent-id"] == "444333222"
233-
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
234-
235-
def test_sns_to_sqs_to_lambda_body_format(self):
236-
"""Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)"""
237-
trace_context = {
238-
"x-datadog-trace-id": "123987456",
239-
"x-datadog-parent-id": "654321987",
240-
"x-datadog-sampling-priority": "1",
241-
"dd-pathway-ctx": "test-pathway-ctx",
242-
}
243-
244-
message_body = {
245-
"Type": "Notification",
246-
"MessageId": "test-message-id",
247-
"Message": "Test message from SNS",
248-
"MessageAttributes": {
249-
"_datadog": {
250-
"Type": "Binary",
251-
"Value": base64.b64encode(
252-
json.dumps(trace_context).encode("utf-8")
253-
).decode("utf-8"),
254-
}
255-
},
256-
}
257-
258-
lambda_record = {
259-
"messageId": "lambda-message-id",
260-
"body": json.dumps(message_body),
261-
"eventSource": "aws:sqs",
262-
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue",
263-
}
264-
265-
result = _get_dsm_context_from_lambda(lambda_record)
266-
267-
assert result is not None
268-
assert result == trace_context
269-
assert result["x-datadog-trace-id"] == "123987456"
270-
assert result["x-datadog-parent-id"] == "654321987"
271-
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
272-
273-
def test_kinesis_to_lambda_format(self):
274-
"""Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)"""
275-
trace_context = {
276-
"x-datadog-trace-id": "555444333",
277-
"x-datadog-parent-id": "888777666",
278-
"dd-pathway-ctx": "test-pathway-ctx",
279-
}
280-
281-
# Create the kinesis data payload
282-
kinesis_payload = {
283-
"_datadog": trace_context,
284-
"actualData": "some business data",
285-
}
286-
encoded_kinesis_data = base64.b64encode(
287-
json.dumps(kinesis_payload).encode("utf-8")
288-
).decode("utf-8")
289-
290-
kinesis_lambda_record = {
291-
"eventSource": "aws:kinesis",
292-
"eventSourceARN": (
293-
"arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
294-
),
295-
"kinesis": {
296-
"data": encoded_kinesis_data,
297-
"partitionKey": "partition-key-1",
298-
"sequenceNumber": (
299-
"49590338271490256608559692538361571095921575989136588898"
300-
),
301-
},
302-
}
303-
304-
result = _get_dsm_context_from_lambda(kinesis_lambda_record)
305-
306-
assert result is not None
307-
assert result == trace_context
308-
assert result["x-datadog-trace-id"] == "555444333"
309-
assert result["x-datadog-parent-id"] == "888777666"
310-
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
311-
312167
def test_no_message_attributes(self):
313168
"""Test message without MessageAttributes returns None."""
314169
message = {

0 commit comments

Comments
 (0)