File tree Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Original file line number Diff line number Diff line change @@ -51,15 +51,14 @@ def _dsm_set_kinesis_context(event):
51
51
52
52
def _set_dsm_context_for_record (record , type , arn ):
53
53
from ddtrace .data_streams import set_consume_checkpoint
54
-
55
54
try :
56
55
context_json = _get_dsm_context_from_lambda (record )
57
56
if not context_json :
58
57
logger .debug ("DataStreams skipped lambda message: %r" , record )
59
58
return
60
59
61
60
carrier_get = _create_carrier_get (context_json )
62
- set_consume_checkpoint (type , arn , carrier_get , manual_checkpoint = False )
61
+ set_consume_checkpoint (type , arn , carrier_get )
63
62
except Exception as e :
64
63
logger .error (f"Unable to set dsm context: { e } " )
65
64
@@ -91,7 +90,9 @@ def _get_dsm_context_from_lambda(message):
91
90
try :
92
91
body = message .get ("body" )
93
92
if body :
94
- message_body = json .loads (body )
93
+ parsed_body = json .loads (body )
94
+ if "MessageAttributes" in parsed_body :
95
+ message_body = parsed_body
95
96
except (ValueError , TypeError ):
96
97
logger .debug ("Unable to parse lambda message body as JSON, treat as non-json" )
97
98
You can’t perform that action at this time.
0 commit comments