Skip to content

Commit 9fc4c28

Browse files
fixes
1 parent f25c71d commit 9fc4c28

File tree

2 files changed

+50
-0
lines changed

2 files changed

+50
-0
lines changed

datadog_lambda/dsm.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,21 @@ def _get_dsm_context_from_lambda(message):
6969
Lambda-specific message formats:
7070
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
7171
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
72+
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
7273
"""
7374
context_json = None
7475
message_body = message
7576

77+
if "kinesis" in message:
78+
try:
79+
kinesis_data = json.loads(
80+
base64.b64decode(message["kinesis"]["data"]).decode()
81+
)
82+
return kinesis_data.get("_datadog")
83+
except (ValueError, TypeError, KeyError):
84+
logger.debug("Unable to parse kinesis data for lambda message")
85+
return None
86+
7687
if "Sns" in message:
7788
message_body = message["Sns"]
7889

tests/test_dsm.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,45 @@ def test_sns_to_lambda_format(self):
448448
assert result["x-datadog-parent-id"] == "222222222"
449449
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
450450

451+
def test_kinesis_to_lambda_format(self):
452+
"""Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)"""
453+
trace_context = {
454+
"x-datadog-trace-id": "555444333",
455+
"x-datadog-parent-id": "888777666",
456+
"dd-pathway-ctx": "test-pathway-ctx",
457+
}
458+
459+
# Create the kinesis data payload
460+
kinesis_payload = {
461+
"_datadog": trace_context,
462+
"actualData": "some business data",
463+
}
464+
encoded_kinesis_data = base64.b64encode(
465+
json.dumps(kinesis_payload).encode("utf-8")
466+
).decode("utf-8")
467+
468+
kinesis_lambda_record = {
469+
"eventSource": "aws:kinesis",
470+
"eventSourceARN": (
471+
"arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
472+
),
473+
"kinesis": {
474+
"data": encoded_kinesis_data,
475+
"partitionKey": "partition-key-1",
476+
"sequenceNumber": (
477+
"49590338271490256608559692538361571095921575989136588898"
478+
),
479+
},
480+
}
481+
482+
result = _get_dsm_context_from_lambda(kinesis_lambda_record)
483+
484+
assert result is not None
485+
assert result == trace_context
486+
assert result["x-datadog-trace-id"] == "555444333"
487+
assert result["x-datadog-parent-id"] == "888777666"
488+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
489+
451490
def test_no_message_attributes(self):
452491
"""Test message without MessageAttributes returns None."""
453492
message = {

0 commit comments

Comments
 (0)