From 5b5473ce7ba936734326ea9829ae250e737f91fb Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 13 Feb 2026 10:15:06 -0500 Subject: [PATCH] support durable function tracing features --- datadog_lambda/tracing.py | 161 ++++++++++++++++++++++++++++++++++++++ datadog_lambda/wrapper.py | 5 +- 2 files changed, 165 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index e7dca1f3..9e9c2589 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -621,6 +621,157 @@ def get_injected_authorizer_data(event, is_http_api) -> dict: logger.debug("Failed to check if invocated by an authorizer. error %s", e) +def is_durable_execution_replay(event): + """ + Check if this Lambda invocation is a durable execution replay. + + A replay occurs when there are existing operations in InitialExecutionState, + meaning this invocation is resuming from a previous checkpoint rather than + starting fresh. + + For replay invocations, we should skip creating inferred spans because: + - The trace context is being continued from the checkpoint + - Creating an inferred span would create a duplicate + + Returns: + True if this is a replay invocation (should skip inferred span) + False if this is first invocation or not a durable execution + """ + if not isinstance(event, dict): + return False + + if "DurableExecutionArn" not in event: + return False + + initial_state = event.get("InitialExecutionState", {}) + operations = initial_state.get("Operations", []) + + # If there are existing operations, this is a replay + is_replay = len(operations) > 0 + + if is_replay: + print(f"[DD-DURABLE] Detected replay invocation with {len(operations)} existing operations") + else: + print("[DD-DURABLE] Detected first invocation (no existing operations)") + + return is_replay + + +def extract_context_from_durable_execution(event, lambda_context): + """ + Extract Datadog trace context from AWS Lambda Durable Execution event. + + Durable executions persist trace context in operation payloads across + Lambda invocations. This function extracts the trace context from the + first available operation in the InitialExecutionState. + """ + try: + # Check if this is a durable execution invocation + if not isinstance(event, dict): + return None + + if "DurableExecutionArn" not in event or "InitialExecutionState" not in event: + return None + + print("[DD-DURABLE] Detected AWS Lambda Durable Execution event") + + initial_state = event.get("InitialExecutionState", {}) + operations = initial_state.get("Operations", []) + + print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState") + + # Look for trace context in operation payloads + for idx, operation in enumerate(operations): + operation_id = operation.get("Id") + operation_type = operation.get("Type") + operation_status = operation.get("Status") + + print(f"[DD-DURABLE] Checking operation {idx}: type={operation_type}, status={operation_status}") + + # Extract payload based on operation type + payload_str = None + + if operation_type == "STEP": + # For STEP operations, result is in StepDetails.Result + step_details = operation.get("StepDetails", {}) + payload_str = step_details.get("Result") + if payload_str: + print(f"[DD-DURABLE] Found Result in StepDetails for {operation_id}") + elif operation_type == "CALLBACK": + # For CALLBACK operations, check CallbackDetails + callback_details = operation.get("CallbackDetails", {}) + payload_str = callback_details.get("Result") + if payload_str: + print(f"[DD-DURABLE] Found Result in CallbackDetails for {operation_id}") + elif operation_type == "WAIT": + # For WAIT operations, check WaitDetails (likely no result, but check anyway) + wait_details = operation.get("WaitDetails", {}) + payload_str = wait_details.get("Result") + if payload_str: + print(f"[DD-DURABLE] Found Result in WaitDetails for {operation_id}") + else: + # Fall back to top-level Payload field for unknown operation types + payload_str = operation.get("Payload") + if payload_str: + print(f"[DD-DURABLE] Found Payload (top-level) for {operation_id}") + + if not payload_str: + continue + + print(f"[DD-DURABLE] Payload length: {len(payload_str)}") + + try: + payload = json.loads(payload_str) + if not isinstance(payload, dict): + print(f"[DD-DURABLE] Payload is not a dict: {type(payload)}") + continue + + # Try new namespaced format first (safer, avoids collisions) + dd_trace_ctx = None + instrumentation_data = payload.get("__dd_instrumentation__") + if instrumentation_data and isinstance(instrumentation_data, dict): + dd_trace_ctx = instrumentation_data.get("trace_context") + print(f"[DD-DURABLE] Found trace context in namespaced format for {operation_id}") + else: + # Fall back to legacy format for backward compatibility + dd_trace_ctx = payload.get("_dd_trace_context") + if dd_trace_ctx: + print(f"[DD-DURABLE] Found trace context in legacy format for {operation_id}") + + if dd_trace_ctx: + trace_id = dd_trace_ctx.get("trace_id") + span_id = dd_trace_ctx.get("span_id") + sampling_priority = dd_trace_ctx.get("sampling_priority") + + # sampling_priority can be None, 0, 1, etc. - any value is valid + if trace_id and span_id: + context = Context( + trace_id=trace_id, + span_id=span_id, + sampling_priority=sampling_priority if sampling_priority is not None else 1, + ) + print(f"[DD-DURABLE] ✅ Extracted trace context from operation {operation_id}") + print(f"[DD-DURABLE] trace_id={trace_id}, span_id={span_id}, sampling_priority={sampling_priority}") + logger.debug( + "Extracted Datadog trace context from durable execution operation %s: %s", + operation_id, + context, + ) + return context + else: + print(f"[DD-DURABLE] No trace context found in payload for operation {operation_id}") + except (json.JSONDecodeError, TypeError) as e: + print(f"[DD-DURABLE] Failed to parse operation payload: {e}") + logger.debug("Failed to parse operation payload: %s", e) + continue + + print("[DD-DURABLE] No trace context found in durable execution operations") + except Exception as e: + logger.debug("Failed to extract trace context from durable execution: %s", e) + + return None + + def extract_dd_trace_context( event, lambda_context, extractor=None, decode_authorizer_context: bool = True ): @@ -634,6 +785,16 @@ def extract_dd_trace_context( trace_context_source = None event_source = parse_event_source(event) + # Check for AWS Lambda Durable Execution events first (before other checks) + # This ensures trace context is properly continued across durable invocations + durable_context = extract_context_from_durable_execution(event, lambda_context) + if _is_context_complete(durable_context): + logger.debug("Extracted Datadog trace context from durable execution") + dd_trace_context = durable_context + trace_context_source = TraceContextSource.EVENT + logger.debug("extracted dd trace context from durable execution: %s", dd_trace_context) + return dd_trace_context, trace_context_source, event_source + if extractor is not None: context = extract_context_custom_extractor(extractor, event, lambda_context) elif isinstance(event, (set, dict)) and "request" in event: diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index b2f83f13..2d3d1039 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -39,6 +39,7 @@ create_inferred_span, InferredSpanInfo, is_authorizer_response, + is_durable_execution_replay, tracer, propagator, ) @@ -264,7 +265,9 @@ def _before(self, event, context): if config.trace_enabled: set_dd_trace_py_root(trace_context_source, config.merge_xray_traces) - if config.make_inferred_span: + # Skip inferred span for durable execution replays to avoid duplicates + # For replays, trace context comes from checkpoint, not from event trigger + if config.make_inferred_span and not is_durable_execution_replay(event): self.inferred_span = create_inferred_span( event, context, event_source, config.decode_authorizer_context )