diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 427f5e47..e9924c21 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -1,38 +1,121 @@ -from datadog_lambda import logger +import logging +import json +import base64 + from datadog_lambda.trigger import EventTypes +logger = logging.getLogger(__name__) -def set_dsm_context(event, event_source): +def set_dsm_context(event, event_source): if event_source.equals(EventTypes.SQS): _dsm_set_sqs_context(event) + elif event_source.equals(EventTypes.SNS): + _dsm_set_sns_context(event) + elif event_source.equals(EventTypes.KINESIS): + _dsm_set_kinesis_context(event) def _dsm_set_sqs_context(event): - from datadog_lambda.wrapper import format_err_with_traceback - from ddtrace.internal.datastreams import data_streams_processor - from ddtrace.internal.datastreams.processor import DsmPathwayCodec - from ddtrace.internal.datastreams.botocore import ( - get_datastreams_context, - calculate_sqs_payload_size, - ) + records = event.get("Records") + if records is None: + return + + for record in records: + arn = record.get("eventSourceARN", "") + _set_dsm_context_for_record(record, "sqs", arn) + +def _dsm_set_sns_context(event): records = event.get("Records") if records is None: return - processor = data_streams_processor() for record in records: - try: - queue_arn = record.get("eventSourceARN", "") + sns_data = record.get("Sns") + if not sns_data: + return + arn = sns_data.get("TopicArn", "") + _set_dsm_context_for_record(sns_data, "sns", arn) + + +def _dsm_set_kinesis_context(event): + records = event.get("Records") + if records is None: + return + + for record in records: + arn = record.get("eventSourceARN", "") + _set_dsm_context_for_record(record, "kinesis", arn) - contextjson = get_datastreams_context(record) - payload_size = calculate_sqs_payload_size(record) - ctx = DsmPathwayCodec.decode(contextjson, processor) - ctx.set_checkpoint( - ["direction:in", f"topic:{queue_arn}", "type:sqs"], - payload_size=payload_size, +def _set_dsm_context_for_record(record, type, arn): + from ddtrace.data_streams import set_consume_checkpoint + + try: + context_json = _get_dsm_context_from_lambda(record) + if not context_json: + logger.debug("DataStreams skipped lambda message: %r", record) + return + + carrier_get = _create_carrier_get(context_json) + set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False) + except Exception as e: + logger.error(f"Unable to set dsm context: {e}") + + +def _get_dsm_context_from_lambda(message): + """ + Lambda-specific message formats: + - message.messageAttributes._datadog.stringValue (SQS -> lambda) + - message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda) + - message.kinesis.data.decode()._datadog (Kinesis -> lambda) + """ + context_json = None + message_body = message + + if "kinesis" in message: + try: + kinesis_data = json.loads( + base64.b64decode(message["kinesis"]["data"]).decode() ) - except Exception as e: - logger.error(format_err_with_traceback(e)) + return kinesis_data.get("_datadog") + except (ValueError, TypeError, KeyError): + logger.debug("Unable to parse kinesis data for lambda message") + return None + + if "Sns" in message: + message_body = message["Sns"] + + message_attributes = message_body.get("MessageAttributes") or message_body.get( + "messageAttributes" + ) + + if not message_attributes: + logger.debug("DataStreams skipped lambda message: %r", message) + return None + + if "_datadog" not in message_attributes: + logger.debug("DataStreams skipped lambda message: %r", message) + return None + + datadog_attr = message_attributes["_datadog"] + + if message_body.get("Type") == "Notification": + # SNS -> lambda notification + if datadog_attr.get("Type") == "Binary": + context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode()) + elif "stringValue" in datadog_attr: + # SQS -> lambda + context_json = json.loads(datadog_attr["stringValue"]) + else: + logger.debug("DataStreams did not handle lambda message: %r", message) + + return context_json + + +def _create_carrier_get(context_json): + def carrier_get(key): + return context_json.get(key) + + return carrier_get diff --git a/layers/datadog_lambda_py-amd64-3.13.zip b/layers/datadog_lambda_py-amd64-3.13.zip new file mode 100644 index 00000000..1206306e Binary files /dev/null and b/layers/datadog_lambda_py-amd64-3.13.zip differ diff --git a/layers/datadog_lambda_py-arm64-3.13.zip b/layers/datadog_lambda_py-arm64-3.13.zip new file mode 100644 index 00000000..ad4c535a Binary files /dev/null and b/layers/datadog_lambda_py-arm64-3.13.zip differ diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 544212d8..4099fa81 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -1,44 +1,46 @@ import unittest -from unittest.mock import patch, MagicMock +import base64 +import json +from unittest.mock import patch -from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context +from datadog_lambda.dsm import ( + set_dsm_context, + _dsm_set_sqs_context, + _dsm_set_sns_context, + _dsm_set_kinesis_context, + _get_dsm_context_from_lambda, +) from datadog_lambda.trigger import EventTypes, _EventSource -class TestDsmSQSContext(unittest.TestCase): +class TestSetDSMContext(unittest.TestCase): def setUp(self): patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context") self.mock_dsm_set_sqs_context = patcher.start() self.addCleanup(patcher.stop) - patcher = patch("ddtrace.internal.datastreams.data_streams_processor") - self.mock_data_streams_processor = patcher.start() + patcher = patch("ddtrace.data_streams.set_consume_checkpoint") + self.mock_set_consume_checkpoint = patcher.start() self.addCleanup(patcher.stop) - patcher = patch("ddtrace.internal.datastreams.botocore.get_datastreams_context") - self.mock_get_datastreams_context = patcher.start() - self.mock_get_datastreams_context.return_value = {} + patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda") + self.mock_get_dsm_context_from_lambda = patcher.start() self.addCleanup(patcher.stop) - patcher = patch( - "ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size" - ) - self.mock_calculate_sqs_payload_size = patcher.start() - self.mock_calculate_sqs_payload_size.return_value = 100 + patcher = patch("datadog_lambda.dsm._dsm_set_sns_context") + self.mock_dsm_set_sns_context = patcher.start() self.addCleanup(patcher.stop) - patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode") - self.mock_dsm_pathway_codec_decode = patcher.start() + patcher = patch("datadog_lambda.dsm._dsm_set_kinesis_context") + self.mock_dsm_set_kinesis_context = patcher.start() self.addCleanup(patcher.stop) def test_non_sqs_event_source_does_nothing(self): """Test that non-SQS event sources don't trigger DSM context setting""" event = {} - # Use Unknown Event Source event_source = _EventSource(EventTypes.UNKNOWN) set_dsm_context(event, event_source) - # DSM context should not be set for non-SQS events self.mock_dsm_set_sqs_context.assert_not_called() def test_sqs_event_with_no_records_does_nothing(self): @@ -51,7 +53,7 @@ def test_sqs_event_with_no_records_does_nothing(self): for event in events_with_no_records: _dsm_set_sqs_context(event) - self.mock_data_streams_processor.assert_not_called() + self.mock_set_consume_checkpoint.assert_not_called() def test_sqs_event_triggers_dsm_sqs_context(self): """Test that SQS event sources trigger the SQS-specific DSM context function""" @@ -77,36 +79,452 @@ def test_sqs_multiple_records_process_each_record(self): { "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue1", "body": "Message 1", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps( + {"dd-pathway-ctx-base64": "context1"} + ), + "dataType": "String", + } + }, }, { "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue2", "body": "Message 2", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps( + {"dd-pathway-ctx-base64": "context2"} + ), + "dataType": "String", + } + }, }, { "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3", "body": "Message 3", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps( + {"dd-pathway-ctx-base64": "context3"} + ), + "dataType": "String", + } + }, }, ] } - mock_context = MagicMock() - self.mock_dsm_pathway_codec_decode.return_value = mock_context + self.mock_get_dsm_context_from_lambda.side_effect = [ + {"dd-pathway-ctx-base64": "context1"}, + {"dd-pathway-ctx-base64": "context2"}, + {"dd-pathway-ctx-base64": "context3"}, + ] _dsm_set_sqs_context(multi_record_event) - self.assertEqual(mock_context.set_checkpoint.call_count, 3) + self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3) - calls = mock_context.set_checkpoint.call_args_list + calls = self.mock_set_consume_checkpoint.call_args_list expected_arns = [ "arn:aws:sqs:us-east-1:123456789012:queue1", "arn:aws:sqs:us-east-1:123456789012:queue2", "arn:aws:sqs:us-east-1:123456789012:queue3", ] + expected_contexts = ["context1", "context2", "context3"] + + for i, call in enumerate(calls): + args, kwargs = call + service_type = args[0] + arn = args[1] + carrier_get_func = args[2] + + self.assertEqual(service_type, "sqs") + + self.assertEqual(arn, expected_arns[i]) + + pathway_ctx = carrier_get_func("dd-pathway-ctx-base64") + self.assertEqual(pathway_ctx, expected_contexts[i]) + + def test_sns_event_with_no_records_does_nothing(self): + """Test that events where Records is None don't trigger DSM processing""" + events_with_no_records = [ + {}, + {"Records": None}, + {"someOtherField": "value"}, + ] + + for event in events_with_no_records: + _dsm_set_sns_context(event) + self.mock_set_consume_checkpoint.assert_not_called() + + def test_sns_event_triggers_dsm_sns_context(self): + """Test that SNS event sources trigger the SNS-specific DSM context function""" + sns_event = { + "Records": [ + { + "EventSource": "aws:sns", + "Sns": { + "TopicArn": "arn:aws:sns:us-east-1:123456789012:my-topic", + "Message": "Hello from SNS!", + }, + } + ] + } + + event_source = _EventSource(EventTypes.SNS) + set_dsm_context(sns_event, event_source) + + self.mock_dsm_set_sns_context.assert_called_once_with(sns_event) + + def test_sns_multiple_records_process_each_record(self): + """Test that each record in an SNS event gets processed individually""" + multi_record_event = { + "Records": [ + { + "EventSource": "aws:sns", + "Sns": { + "TopicArn": "arn:aws:sns:us-east-1:123456789012:topic1", + "Message": "Message 1", + "MessageAttributes": { + "_datadog": { + "Type": "Binary", + "Value": base64.b64encode( + json.dumps( + {"dd-pathway-ctx-base64": "context1"} + ).encode("utf-8") + ).decode("utf-8"), + } + }, + }, + }, + { + "EventSource": "aws:sns", + "Sns": { + "TopicArn": "arn:aws:sns:us-east-1:123456789012:topic2", + "Message": "Message 2", + "MessageAttributes": { + "_datadog": { + "Type": "Binary", + "Value": base64.b64encode( + json.dumps( + {"dd-pathway-ctx-base64": "context2"} + ).encode("utf-8") + ).decode("utf-8"), + } + }, + }, + }, + { + "EventSource": "aws:sns", + "Sns": { + "TopicArn": "arn:aws:sns:us-east-1:123456789012:topic3", + "Message": "Message 3", + "MessageAttributes": { + "_datadog": { + "Type": "Binary", + "Value": base64.b64encode( + json.dumps( + {"dd-pathway-ctx-base64": "context3"} + ).encode("utf-8") + ).decode("utf-8"), + } + }, + }, + }, + ] + } + + self.mock_get_dsm_context_from_lambda.side_effect = [ + {"dd-pathway-ctx-base64": "context1"}, + {"dd-pathway-ctx-base64": "context2"}, + {"dd-pathway-ctx-base64": "context3"}, + ] + + _dsm_set_sns_context(multi_record_event) + + self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3) + + calls = self.mock_set_consume_checkpoint.call_args_list + expected_arns = [ + "arn:aws:sns:us-east-1:123456789012:topic1", + "arn:aws:sns:us-east-1:123456789012:topic2", + "arn:aws:sns:us-east-1:123456789012:topic3", + ] + expected_contexts = ["context1", "context2", "context3"] + + for i, call in enumerate(calls): + args, kwargs = call + service_type = args[0] + arn = args[1] + carrier_get_func = args[2] + + self.assertEqual(service_type, "sns") + + self.assertEqual(arn, expected_arns[i]) + + pathway_ctx = carrier_get_func("dd-pathway-ctx-base64") + self.assertEqual(pathway_ctx, expected_contexts[i]) + + def test_kinesis_event_with_no_records_does_nothing(self): + """Test that events where Records is None don't trigger DSM processing""" + events_with_no_records = [ + {}, + {"Records": None}, + {"someOtherField": "value"}, + ] + + for event in events_with_no_records: + _dsm_set_kinesis_context(event) + self.mock_set_consume_checkpoint.assert_not_called() + + def test_kinesis_event_triggers_dsm_kinesis_context(self): + """Test that Kinesis event sources trigger the Kinesis-specific DSM context function""" + kinesis_event = { + "Records": [ + { + "eventSource": "aws:kinesis", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream", + "kinesis": { + "data": "SGVsbG8gZnJvbSBLaW5lc2lzIQ==", + "partitionKey": "partition-key", + }, + } + ] + } + + event_source = _EventSource(EventTypes.KINESIS) + set_dsm_context(kinesis_event, event_source) + + self.mock_dsm_set_kinesis_context.assert_called_once_with(kinesis_event) + + def test_kinesis_multiple_records_process_each_record(self): + """Test that each record in a Kinesis event gets processed individually""" + multi_record_event = { + "Records": [ + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream1", + "kinesis": { + "data": base64.b64encode( + json.dumps({"dd-pathway-ctx-base64": "context1"}).encode( + "utf-8" + ) + ).decode("utf-8"), + "partitionKey": "partition-1", + }, + }, + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream2", + "kinesis": { + "data": base64.b64encode( + json.dumps({"dd-pathway-ctx-base64": "context2"}).encode( + "utf-8" + ) + ).decode("utf-8"), + "partitionKey": "partition-2", + }, + }, + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream3", + "kinesis": { + "data": base64.b64encode( + json.dumps({"dd-pathway-ctx-base64": "context3"}).encode( + "utf-8" + ) + ).decode("utf-8"), + "partitionKey": "partition-3", + }, + }, + ] + } + + self.mock_get_dsm_context_from_lambda.side_effect = [ + {"dd-pathway-ctx-base64": "context1"}, + {"dd-pathway-ctx-base64": "context2"}, + {"dd-pathway-ctx-base64": "context3"}, + ] + + _dsm_set_kinesis_context(multi_record_event) + + self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3) + + calls = self.mock_set_consume_checkpoint.call_args_list + expected_arns = [ + "arn:aws:kinesis:us-east-1:123456789012:stream/stream1", + "arn:aws:kinesis:us-east-1:123456789012:stream/stream2", + "arn:aws:kinesis:us-east-1:123456789012:stream/stream3", + ] + expected_contexts = ["context1", "context2", "context3"] for i, call in enumerate(calls): args, kwargs = call - tags = args[0] - self.assertIn("direction:in", tags) - self.assertIn(f"topic:{expected_arns[i]}", tags) - self.assertIn("type:sqs", tags) - self.assertEqual(kwargs["payload_size"], 100) + service_type = args[0] + arn = args[1] + carrier_get_func = args[2] + + self.assertEqual(service_type, "kinesis") + + self.assertEqual(arn, expected_arns[i]) + + pathway_ctx = carrier_get_func("dd-pathway-ctx-base64") + self.assertEqual(pathway_ctx, expected_contexts[i]) + + +class TestGetDSMContext(unittest.TestCase): + def test_sqs_to_lambda_string_value_format(self): + """Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "789123456", + "x-datadog-parent-id": "321987654", + "dd-pathway-ctx": "test-pathway-ctx", + } + + lambda_record = { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185", + }, + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps(trace_context), + "stringListValues": [], + "binaryListValues": [], + "dataType": "String", + }, + "myAttribute": { + "stringValue": "myValue", + "stringListValues": [], + "binaryListValues": [], + "dataType": "String", + }, + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "789123456" + assert result["x-datadog-parent-id"] == "321987654" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_sns_to_lambda_format(self): + """Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "111111111", + "x-datadog-parent-id": "222222222", + "dd-pathway-ctx": "test-pathway-ctx", + } + binary_data = base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8") + + sns_lambda_record = { + "EventSource": "aws:sns", + "EventSubscriptionArn": ( + "arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012" + ), + "Sns": { + "Type": "Notification", + "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic", + "Subject": "Test Subject", + "Message": "Hello from SNS!", + "Timestamp": "2023-01-01T12:00:00.000Z", + "MessageAttributes": { + "_datadog": {"Type": "Binary", "Value": binary_data} + }, + }, + } + + result = _get_dsm_context_from_lambda(sns_lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "111111111" + assert result["x-datadog-parent-id"] == "222222222" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_kinesis_to_lambda_format(self): + """Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)""" + trace_context = { + "x-datadog-trace-id": "555444333", + "x-datadog-parent-id": "888777666", + "dd-pathway-ctx": "test-pathway-ctx", + } + + # Create the kinesis data payload + kinesis_payload = { + "_datadog": trace_context, + "actualData": "some business data", + } + encoded_kinesis_data = base64.b64encode( + json.dumps(kinesis_payload).encode("utf-8") + ).decode("utf-8") + + kinesis_lambda_record = { + "eventSource": "aws:kinesis", + "eventSourceARN": ( + "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream" + ), + "kinesis": { + "data": encoded_kinesis_data, + "partitionKey": "partition-key-1", + "sequenceNumber": ( + "49590338271490256608559692538361571095921575989136588898" + ), + }, + } + + result = _get_dsm_context_from_lambda(kinesis_lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "555444333" + assert result["x-datadog-parent-id"] == "888777666" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_no_message_attributes(self): + """Test message without MessageAttributes returns None.""" + message = { + "messageId": "test-message-id", + "body": "Test message without attributes", + } + + result = _get_dsm_context_from_lambda(message) + + assert result is None + + def test_no_datadog_attribute(self): + """Test message with MessageAttributes but no _datadog attribute returns None.""" + message = { + "messageId": "test-message-id", + "body": "Test message", + "messageAttributes": { + "customAttribute": {"stringValue": "custom-value", "dataType": "String"} + }, + } + + result = _get_dsm_context_from_lambda(message) + + assert result is None + + def test_empty_datadog_attribute(self): + """Test message with empty _datadog attribute returns None.""" + message = { + "messageId": "test-message-id", + "messageAttributes": {"_datadog": {}}, + } + + result = _get_dsm_context_from_lambda(message) + + assert result is None