@@ -13,51 +13,50 @@ def set_dsm_context(event, event_source):
13
13
14
14
15
15
def _dsm_set_context_helper (
16
- event , service_type , arn_extractor , payload_size_calculator
16
+ record , service_type , arn , payload_size
17
17
):
18
18
"""
19
19
Common helper function for setting DSM context.
20
20
21
21
Args:
22
22
event: The Lambda event containing records
23
23
service_type: The service type string (example: sqs', 'sns')
24
- arn_extractor: Function to extract the ARN from the record
25
- payload_size_calculator: Function to calculate payload size
24
+ arn: ARN from the record
25
+ payload_size: payload size of the record
26
26
"""
27
27
from datadog_lambda .wrapper import format_err_with_traceback
28
28
from ddtrace .internal .datastreams import data_streams_processor
29
29
from ddtrace .internal .datastreams .processor import DsmPathwayCodec
30
30
31
- records = event .get ("Records" )
32
- if records is None :
33
- return
34
31
processor = data_streams_processor ()
35
32
36
- for record in records :
37
- try :
38
- arn = arn_extractor (record )
39
- context_json = _get_dsm_context_from_lambda (record )
40
- payload_size = payload_size_calculator (record , context_json )
41
-
42
- ctx = DsmPathwayCodec .decode (context_json , processor )
43
- ctx .set_checkpoint (
44
- ["direction:in" , f"topic:{ arn } " , f"type:{ service_type } " ],
45
- payload_size = payload_size ,
46
- )
47
- except Exception as e :
48
- logger .error (format_err_with_traceback (e ))
33
+ try :
34
+ context_json = _get_dsm_context_from_lambda (record )
35
+
36
+ ctx = DsmPathwayCodec .decode (context_json , processor )
37
+ ctx .set_checkpoint (
38
+ ["direction:in" , f"topic:{ arn } " , f"type:{ service_type } " ],
39
+ payload_size = payload_size ,
40
+ )
41
+ except Exception as e :
42
+ logger .error (format_err_with_traceback (e ))
49
43
50
44
51
45
def _dsm_set_sqs_context (event ):
52
46
from ddtrace .internal .datastreams .botocore import calculate_sqs_payload_size
47
+ from datadog_lambda .wrapper import format_err_with_traceback
53
48
54
- def sqs_payload_calculator (record , context_json ):
55
- return calculate_sqs_payload_size (record )
56
-
57
- def sqs_arn_extractor (record ):
58
- return record .get ("eventSourceARN" , "" )
49
+ records = event .get ("Records" )
50
+ if records is None :
51
+ return
59
52
60
- _dsm_set_context_helper (event , "sqs" , sqs_arn_extractor , sqs_payload_calculator )
53
+ for record in records :
54
+ try :
55
+ arn = record .get ("eventSourceARN" , "" )
56
+ payload_size = calculate_sqs_payload_size (record )
57
+ _dsm_set_context_helper (record , "sqs" , arn , payload_size )
58
+ except Exception as e :
59
+ logger .error (format_err_with_traceback (e ))
61
60
62
61
63
62
def _get_dsm_context_from_lambda (message ):
0 commit comments