Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,13 @@ def listen():
"molten.trace_func",
"redis.execute_pipeline",
"redis.command",
"azure.functions.patched_event_hubs",
"azure.functions.patched_route_request",
"azure.functions.patched_service_bus",
"azure.functions.patched_timer",
"azure.servicebus.patched_producer_batch",
"azure.servicebus.patched_producer_schedule",
"azure.servicebus.patched_producer_send",
"psycopg.patched_connect",
"azure.eventhubs.patched_producer_batch",
"azure.eventhubs.patched_producer_send",
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/contrib/internal/azure_eventhubs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ddtrace.internal import core
from ddtrace.internal.utils import get_argument_value
from ddtrace.propagation.http import HTTPPropagator
from ddtrace.trace import Context


def create_context(
Expand Down Expand Up @@ -62,7 +63,7 @@ def handle_event_hubs_event_data_context(
span.link_span(parent_context)


def extract_context(event_data: Union[EventData, AmqpAnnotatedMessage]):
def extract_context(event_data: Union[EventData, AmqpAnnotatedMessage]) -> Context:
msg = event_data if isinstance(event_data, AmqpAnnotatedMessage) else event_data._message
return HTTPPropagator.extract(msg.application_properties)

Expand Down
5 changes: 1 addition & 4 deletions ddtrace/contrib/internal/azure_functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
Enabling
~~~~~~~~

Use :func:`patch()<ddtrace.patch>` to manually enable the integration::

from ddtrace import patch
patch(azure_functions=True)
The azure_functions integration is enabled by default when using :ref:`import ddtrace.auto<ddtraceauto>`.


Global Configuration
Expand Down
26 changes: 20 additions & 6 deletions ddtrace/contrib/internal/azure_functions/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import functools
import inspect
from typing import Any
from typing import Callable
from typing import Coroutine
from typing import Optional
from typing import Tuple
from typing import Union

from ddtrace import config
from ddtrace._trace.pin import Pin
from ddtrace.contrib.internal.trace_utils import int_service
from ddtrace.ext import SpanTypes
from ddtrace.internal import core
from ddtrace.internal.schema import schematize_cloud_faas_operation


def create_context(context_name, pin, resource=None, headers=None):
def create_context(
context_name: str, pin: Pin, resource: Optional[str] = None, headers: Optional[dict] = None
) -> core.ExecutionContext:
operation_name = schematize_cloud_faas_operation(
"azure.functions.invoke", cloud_provider="azure", cloud_service="functions"
)
Expand All @@ -25,12 +34,17 @@ def create_context(context_name, pin, resource=None, headers=None):
)


def wrap_function_with_tracing(func, context_factory, pre_dispatch=None, post_dispatch=None):
def wrap_function_with_tracing(
func: Callable[..., Any],
context_factory: Callable[[Any], core.ExecutionContext],
pre_dispatch: Optional[Callable[[core.ExecutionContext, Any], Tuple[str, Tuple[Any, ...]]]] = None,
post_dispatch: Optional[Callable[[core.ExecutionContext, Any], Tuple[str, Tuple[Any, ...]]]] = None,
) -> Union[Callable[..., Any], Callable[..., Coroutine[Any, Any, Any]]]:
if inspect.iscoroutinefunction(func):

@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
with context_factory(kwargs) as ctx, ctx.span:
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
with context_factory(kwargs) as ctx:
if pre_dispatch:
core.dispatch(*pre_dispatch(ctx, kwargs))

Expand All @@ -45,8 +59,8 @@ async def async_wrapper(*args, **kwargs):
return async_wrapper

@functools.wraps(func)
def wrapper(*args, **kwargs):
with context_factory(kwargs) as ctx, ctx.span:
def wrapper(*args: Any, **kwargs: Any) -> Any:
with context_factory(kwargs) as ctx:
if pre_dispatch:
core.dispatch(*pre_dispatch(ctx, kwargs))

Expand Down
5 changes: 1 addition & 4 deletions ddtrace/contrib/internal/azure_servicebus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
Enabling
~~~~~~~~

Use :func:`patch()<ddtrace.patch>` to manually enable the integration::

from ddtrace import patch
patch(azure_servicebus=True)
The azure_servicebus integration is enabled by default when using :ref:`import ddtrace.auto<ddtraceauto>`.


Global Configuration
Expand Down
14 changes: 5 additions & 9 deletions ddtrace/contrib/internal/azure_servicebus/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def _patched_add_message(wrapped, instance, args, kwargs):
fully_qualified_namespace = instance._dd_fully_qualified_namespace
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.CREATE}"

with create_context("azure.servicebus.patched_producer_batch", pin, operation_name, resource_name) as ctx, ctx.span:
with create_context("azure.servicebus.patched_producer_batch", pin, operation_name, resource_name) as ctx:
dispatch_message_modifier(
ctx, args, kwargs, azure_servicebusx.CREATE, resource_name, fully_qualified_namespace, "message"
)
Expand All @@ -113,7 +113,7 @@ def _patched_send_messages(wrapped, instance, args, kwargs):
fully_qualified_namespace = instance.fully_qualified_namespace
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.SEND}"

with create_context("azure.servicebus.patched_producer_send", pin, operation_name, resource_name) as ctx, ctx.span:
with create_context("azure.servicebus.patched_producer_send", pin, operation_name, resource_name) as ctx:
dispatch_message_modifier(
ctx, args, kwargs, azure_servicebusx.SEND, resource_name, fully_qualified_namespace, "message"
)
Expand All @@ -129,7 +129,7 @@ async def _patched_send_messages_async(wrapped, instance, args, kwargs):
fully_qualified_namespace = instance.fully_qualified_namespace
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.SEND}"

with create_context("azure.servicebus.patched_producer_send", pin, operation_name, resource_name) as ctx, ctx.span:
with create_context("azure.servicebus.patched_producer_send", pin, operation_name, resource_name) as ctx:
dispatch_message_modifier(
ctx, args, kwargs, azure_servicebusx.SEND, resource_name, fully_qualified_namespace, "message"
)
Expand All @@ -145,9 +145,7 @@ def _patched_schedule_messages(wrapped, instance, args, kwargs):
fully_qualified_namespace = instance.fully_qualified_namespace
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.SEND}"

with create_context(
"azure.servicebus.patched_producer_schedule", pin, operation_name, resource_name
) as ctx, ctx.span:
with create_context("azure.servicebus.patched_producer_schedule", pin, operation_name, resource_name) as ctx:
dispatch_message_modifier(
ctx, args, kwargs, azure_servicebusx.SEND, resource_name, fully_qualified_namespace, "messages"
)
Expand All @@ -163,9 +161,7 @@ async def _patched_schedule_messages_async(wrapped, instance, args, kwargs):
fully_qualified_namespace = instance.fully_qualified_namespace
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.SEND}"

with create_context(
"azure.servicebus.patched_producer_schedule", pin, operation_name, resource_name
) as ctx, ctx.span:
with create_context("azure.servicebus.patched_producer_schedule", pin, operation_name, resource_name) as ctx:
dispatch_message_modifier(
ctx, args, kwargs, azure_servicebusx.SEND, resource_name, fully_qualified_namespace, "messages"
)
Expand Down
57 changes: 45 additions & 12 deletions ddtrace/contrib/internal/azure_servicebus/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from uuid import UUID

import azure.servicebus as azure_servicebus
from azure.servicebus import ServiceBusMessage
from azure.servicebus import ServiceBusMessageBatch
import azure.servicebus.amqp as azure_servicebus_amqp
from azure.servicebus.amqp import AmqpAnnotatedMessage

from ddtrace import config
from ddtrace._trace.pin import Pin
from ddtrace._trace.span import Span
from ddtrace.contrib.trace_utils import ext_service
from ddtrace.ext import SpanTypes
from ddtrace.ext import azure_servicebus as azure_servicebusx
Expand All @@ -13,7 +22,12 @@
from ddtrace.propagation.http import HTTPPropagator


def create_context(context_name, pin, operation_name, resource=None):
def create_context(
context_name: str,
pin: Pin,
operation_name: str,
resource: Optional[str] = None,
) -> core.ExecutionContext:
return core.context_with_data(
context_name,
span_name=operation_name,
Expand All @@ -24,15 +38,21 @@ def create_context(context_name, pin, operation_name, resource=None):
)


def handle_service_bus_message_context(span, message_arg_value):
if isinstance(message_arg_value, (azure_servicebus.ServiceBusMessage, azure_servicebus_amqp.AmqpAnnotatedMessage)):
def handle_service_bus_message_context(
span: Span,
message_arg_value: Union[
ServiceBusMessage,
AmqpAnnotatedMessage,
List[Union[ServiceBusMessage, AmqpAnnotatedMessage]],
ServiceBusMessageBatch,
],
):
if isinstance(message_arg_value, (ServiceBusMessage, AmqpAnnotatedMessage)):
inject_context(span, message_arg_value)
elif (
isinstance(message_arg_value, list)
and message_arg_value
and isinstance(
message_arg_value[0], (azure_servicebus.ServiceBusMessage, azure_servicebus_amqp.AmqpAnnotatedMessage)
)
and isinstance(message_arg_value[0], (ServiceBusMessage, AmqpAnnotatedMessage))
):
for message in message_arg_value:
inject_context(span, message)
Expand All @@ -43,7 +63,7 @@ def handle_service_bus_message_context(span, message_arg_value):
span.link_span(parent_context)


def inject_context(span, message):
def inject_context(span: Span, message: Union[ServiceBusMessage, AmqpAnnotatedMessage]):
"""
ServiceBusMessage.application_properties is of type Dict[str | bytes, PrimitiveTypes] | None
AmqpAnnotatedMessage.application_properties is of type Dict[str | bytes, Any] | None
Expand All @@ -62,7 +82,14 @@ def inject_context(span, message):
message.application_properties.update(inject_carrier)


def handle_service_bus_message_attributes(message_arg_value):
def handle_service_bus_message_attributes(
message_arg_value: Union[
ServiceBusMessage,
AmqpAnnotatedMessage,
List[Union[ServiceBusMessage, AmqpAnnotatedMessage]],
ServiceBusMessageBatch,
],
) -> Tuple[Union[str, None], Union[str, None]]:
if isinstance(message_arg_value, azure_servicebus.ServiceBusMessage):
batch_count = None
message_id = message_arg_value.message_id
Expand All @@ -81,16 +108,22 @@ def handle_service_bus_message_attributes(message_arg_value):
elif isinstance(message_arg_value, list):
batch_count = str(len(message_arg_value))
message_id = None
else:
message_id = None
batch_count = None
return message_id, batch_count


def dispatch_message_modifier(
ctx, args, kwargs, message_operation, resource_name, fully_qualified_namespace, message_arg
ctx: core.ExecutionContext,
args: Any,
kwargs: Any,
message_operation: str,
resource_name: str,
fully_qualified_namespace: str,
message_arg: str,
):
message_arg_value = get_argument_value(args, kwargs, 0, message_arg, True)
if message_arg_value is None:
return

message_id, batch_count = handle_service_bus_message_attributes(message_arg_value)

if config.azure_servicebus.distributed_tracing:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import azure.functions as func
import requests

from ddtrace import patch


patch(azure_functions=True, azure_servicebus=True, requests=True)
import ddtrace.auto # noqa: F401


app = func.FunctionApp()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import azure.functions as func
import azure.servicebus as azure_servicebus

from ddtrace import patch
import ddtrace.auto # noqa: F401


patch(azure_functions=True, azure_servicebus=True, requests=True)

app = func.FunctionApp()


Expand Down
Loading