跳到主要内容

在Python API中使用分布式追踪的情境传播

分布式追踪允许您的企业应用程序追踪消息在从发布者通过事件网格到接收应用程序的流动过程。有关详细概述,请参阅分布式追踪。有关版本要求的信息,请参阅分布式追踪版本兼容性。

  • PubSub+ OpenTelemetry API 库仅支持 W3C 传播器。
  • 有关配置 OpenTelemetry SDK 环境变量的信息,请参阅 OpenTelemetry SDK 配置。
  • 默认情况下,追踪包括后端应用程序(如 Jaeger)可见的命令行参数。 Solace 建议出于安全原因禁用此功能,因为这些参数可能包含敏感信息,如您的用户名和密码。有关说明,请参阅 GitHub 上 OpenTelemetry 文档中的禁用自动资源提供程序。

为分布式追踪对 Python 进行工具化

手动工具化涉及对您的企业应用程序的源代码进行更改,并允许您将额外的情境(如行李和追踪状态)注入和从消息中提取。情境传播使调试和优化您的应用程序变得容易。有关 Solace 事件消息中情境传播的更多信息,请参阅分布式追踪情境传播。以下示例展示了如何使用 OpenTelemetry API 创建跨度。

理解情境传播如何在 Python PubSub+ API 中启用分布式追踪

在您的客户端应用程序中,您可以使用 OpenTelemetry API 创建一个跨度,其中包含分布式系统中操作的元数据。此跨度与一个情境相关联,其中包含一个唯一的 TraceID。接下来,当您使用 PubSub+ 消息生产者发布消息时,Solace OTel 集成包将包含 TraceID 的情境注入到消息中。当消息通过事件代理并被消费应用程序接收时,每个步骤都会生成跨度,并且原始消息情境中存在的相同 TraceID。当发布或消费应用程序中的每个跨度关闭时,Python OpenTelemetry API 将其发送到 OpenTelemetry 收集器,该收集器收集、处理并将跨度导出到使用其唯一 TraceID 对跨度进行关联的后端应用程序。后端应用程序使用相关跨度创建一个 trace,这是一个端到端的快照,详细说明了消息如何通过分布式系统传输。如果您不使用情境传播,那么后端应用程序将无法使用唯一的 TraceID 将跨度链接起来,这使得追踪消息通过分布式系统的流动变得困难。

依赖项

要启用分布式追踪的情境传播,您必须首先将 Solace PubSub+ OpenTelemetry Python 集成包作为依赖项添加到您的应用程序中。您也可以使用以下命令添加此包:

pip install pubsubplus-opentelemetry-integration

然后使用以下命令添加所需的情境传播的 OpenTelemetry API 和 SDK 库:

pip install opentelemetry-api
pip install opentelemetry-sdk

有关 OpenTelemetry 版本兼容性的信息,请参阅分布式追踪版本兼容性。添加这些库将使您的应用程序能够访问以下内容:

  • InboundMessageCarrier — OpenTelemetry 消息载体和 Solace 入站消息包装器
  • InboundMessageGetter — 允许 API 从 InboundMessageCarrier 提取传播字段
  • OutboundMessageCarrier — OpenTelemetry 消息载体和 Solace 出站消息包装器
  • OutboundMessageSetter — 允许 API 将传播字段注入到 OutboundMessageCarrier
  • OutboundMessageGetter — 允许 API 从 OutboundMessageCarrier 提取传播字段

本指南假定您熟悉配置 OpenTelemetry 类的实例。有关配置 OpenTelemetry 对象的说明,请参阅 OpenTelemetry 文档中的 OpenTelemetry 手动工具化。

要在 Python PubSub+ API 中使用情境传播,请在您的应用程序中包含以下包:

from opentelemetry.trace import StatusCode, Status  # 追踪状态
from opentelemetry import context # 情境功能
from opentelemetry import propagate # 追踪情境传播
from opentelemetry import trace, baggage # 追踪和行李功能
from opentelemetry.sdk.resources import Resource # 在追踪中定义资源
from opentelemetry.sdk.trace import TracerProvider # 追踪功能
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter # 使用 gRPC 的 OTLP 服务器的跨度导出器
from opentelemetry.sdk.trace.export import ( # 通用跨度导出器
BatchSpanProcessor,
ConsoleSpanExporter,
SimpleSpanProcessor
)
from opentelemetry.semconv.trace import ( # 追踪的标准语义约定
SpanAttributes,
MessagingDestinationKindValues,
MessagingOperationValues
)
from solace_otel.messaging.trace.propagation import ( # Solace 消息载体用于追踪传播
OutboundMessageCarrier,
OutboundMessageGetter,
OutboundMessageSetter,
InboundMessageCarrier,
InboundMessageGetter
)

在消息发布时生成发送跨度

您的发布应用程序可以生成发送跨度并将其导出到 OpenTelemetry 收集器。要将情境注入消息并为发布的消息生成发送跨度,请执行以下步骤:

  1. 创建一个 OutboundMessageSetter 实例,允许 API 将传播字段注入到 Solace 出站消息载体中。接下来,使用 get_global_textmap() 函数创建一个传播器实例。此函数返回一个全局文本映射传播器,使 TraceContextBaggage 结构在您的应用程序中可用。
default_setter = OutboundMessageSetter()
propagator = propagate.get_global_textmap()
  1. 使用 get_tracer() 函数创建一个追踪器实例。追踪器实例允许您的应用程序创建、启动和结束跨度。
tracer = trace.get_tracer("my_tracer")
  1. 创建您的 OutboundMessage 以发布,并使用您的追踪器实例使用 start_as_current_span() 函数创建和启动一个跨度。当 with 块中的代码完成时,跨度会自动结束:
outbound_message = message_builder.build('my message body', additional_message_properties=additional_properties)
with tracer.start_as_current_span(f"{my_topic}_publish") as span:
  1. 使用 set_attribute() 函数设置跨度属性,允许您以键值对的形式附加额外的情境和元数据到跨度:
    span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "PubSub+")
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.TOPIC.value)
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, my_topic)
span.set_attribute(SpanAttributes.MESSAGING_OPERATION, "publish")
span.set_attribute(SpanAttributes.MESSAGING_PROTOCOL, "SMF")
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, outbound_message.get_message_id())
  1. (可选)使用 set_baggage() 函数将行李附加到跨度。如果未明确提供情境,attach() 函数会隐式检索当前情境。此函数使用键值对:
    BAGGAGE_KEY = "my_key"
BAGGAGE_VALUE = "my_value"
context.attach(baggage.set_baggage(BAGGAGE_KEY, BAGGAGE_VALUE))
  1. 创建一个 OutboundMessageCarrier 实例,并传入您的出站消息对象。然后使用 inject() 函数将情境注入到您的消息中:
    carrier = OutboundMessageCarrier(outbound_message)
propagator.inject(carrier=carrier, setter=default_setter)
  1. 发布消息:
    try:
direct_publisher.publish(
destination=Topic.of("my/sample/topic"),
message=outbound_message)
span.set_status(Status(StatusCode.OK))
except Exception as ex:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(ex)

在上面的代码片段中,当 with 块中的代码完成时,跨度会结束。如果您不使用第 3 步中演示的 with 代码块,则必须显式调用 span.end() 来结束跨度。

在消息接收时生成接收跨度

您的消费应用程序可以生成接收跨度,然后将其导出到 OpenTelemetry 收集器。要从消息中提取追踪情境并为接收到的消息生成接收跨度,请执行以下步骤:

  1. 创建一个 InboundMessageGetter 实例,允许 API 从 Solace InboundMessageCarrier 提取传播字段。
default_getter = InboundMessageGetter()
  1. 创建一个 MessageHandler 以接收消息。在您的处理器中,使用 get_tracer() 函数创建一个追踪器实例。追踪器实例允许您的应用程序创建、启动和结束跨度。然后,使用 get_global_textmap() 函数创建一个传播器实例。此函数返回一个全局文本映射传播器,使 TraceContextBaggage 结构在您的应用程序中可用。
class MessageHandlerImpl(MessageHandler):
def on_message(self, message: 'InboundMessage'):
tracer = trace.get_tracer("my.tracer")
propagator = propagate.get_global_textmap()
  1. 创建一个 InboundMessageCarrier 实例,并传入您的入站消息对象。然后使用 extract() 函数从接收到的消息中提取情境:
carrier = InboundMessageCarrier(message)
extracted_ctx = propagator.extract(carrier=carrier, getter=default_getter)
  1. (可选)从跨度中提取行李:
baggage_entries = baggage.get_all(extracted_ctx)
  1. 使用 attach() 函数将提取的追踪情境链接到当前情境。这允许您在应用程序的分布式组件之间传播追踪情境。将此分配给一个令牌实例,稍后在接收消息操作完成时,您将使用它将追踪情境从当前情境中分离。这使得对当前情境所做的更改仅限于每个单独的消息接收操作,这意味着后续操作不会受到早期操作的追踪情境的影响。
token = context.attach(extracted_ctx)
  1. 使用您的追踪器实例使用 start_as_current_span() 函数创建和启动一个跨度。为每条消息创建一个新跨度。当 with 块中的代码完成时,跨度会自动结束:
try:
with tracer.start_as_current_span("{topicName} process".format(topicName=message.get_destination_name())) as span:
  1. 使用 set_attribute() 函数设置跨度属性,允许您以键值对的形式附加额外的情境和元数据到跨度:
        span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "PubSub+")
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.TOPIC.value)
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, message.get_destination_name())
span.set_attribute(SpanAttributes.MESSAGING_OPERATION, MessagingOperationValues.PROCESS.value)
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, message.get_application_message_id())
  1. 处理接收到的消息:
    try:
# 在这里处理接收到的消息
span.set_status(Status(StatusCode.OK))
except Exception as ex:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(ex)
  1. 调用 detach() 函数以结束当前情境,这标志着追踪过程此步骤的范围结束。detach() 将情境恢复到第 5 步调用 attach() 之前的状态,这意味着在 attach()detach() 之间所做的任何修改不再属于活动情境的一部分。这允许后续操作在不受此期间所做更改的影响下继续进行。
    finally:
context.detach(token)

在上面的代码片段中,当 with 块中的代码完成时,跨度会结束。如果您不使用第 6 步中演示的 with 代码块,则必须显式调用 span.end() 来结束跨度。