跳到主要内容

使用Python API消费直接消息

直接消息在需要高吞吐量和低延迟的场景中非常有用。由于外部因素(如网络拥塞或偶尔的客户端断开连接)的影响,使用直接消息可能会导致某些消息丢失。直接消息适用于需要最新信息但不一定需要每条消息的应用程序。例如,天气应用程序、价格检查器、GPS 跟踪等。

直接消息不需要额外的事件代理配置。如果您的应用程序无法容忍消息丢失,我们建议您使用持久消息。

要使用 PubSub+ 消息 API for Python 消费直接消息,请按照以下步骤操作:

  1. 创建一个 DirectMessageReceiver
  2. 处理订阅直接消息时的反向压力。
  3. 同步接收直接消息。
  4. 异步接收直接消息。
  5. 从入站消息中提取属性。
  6. 使用 PubSub+ 缓存与 Python API。

在某些用例中,API 从事件代理接收消息的速度可能比您的应用程序处理消息的速度更快。消息可能会填满 API 的内部缓冲区,导致反向压力。如果这种情况可能发生,您可能需要考虑更改默认的反向压力设置以满足您的需求。有关更多信息,请参阅处理订阅直接消息时的反向压力。

有关消费直接消息的应用程序示例,请参阅 Solace GitHub 页面上的 direct_subscriber.py

创建DirectMessageReceiver

MessagingService 实例与事件代理建立连接后,使用 DirectMessageReceiver 从事件代理消费直接消息。要创建一个 DirectMessageReceiver 对象,请执行以下操作:

  1. MessagingService 对象上调用 create_direct_message_receiver_builder() 函数。这将返回一个 DirectMessageReceiverBuilder 对象。

  2. 现在,您可以使用 DirectMessageReceiverBuilder 接口中的函数来配置 DirectMessageReceiver 使用 API 的某些功能,例如主题订阅和反向压力策略。

  3. 在您的 DirectMessageReceiverBuilder 上调用 build() 函数以返回一个 DirectMessageReceiver 对象。

  4. 要使您的 DirectMessageReceiver 开始接收消息,请在其上调用 start() 函数。

有关更多信息,请参阅 PubSub+ 消息 API for Python 参考。

以下示例展示了如何向 DirectMessageReceiver 添加主题订阅列表并连接到事件代理:

# 定义主题订阅
topics_sub = [TopicSubscription.of("solace/sample/1")]

# 创建一个 DirectMessageReceiver Builder,允许您创建一个 DirectMessageReceiver 并启动它
direct_receiver = messaging_service.create_direct_message_receiver_builder() \
.with_subscriptions(topics_sub) \
.build()

# 同步启动配置好的 DirectMessageReceiver。在调用此函数之前,接收器被视为非工作状态
direct_receiver.start()

异步接收器

或者,可以使用回调监听器启动直接消息接收器,以便在启动操作完成后接收异步通知。

在 PubSub+ Python API 中,您可以使用 start_async() 函数异步启动 DirectMessageReceiver。这允许接收器对象在单独的后台线程中启动,这意味着您的代码可以继续执行,而无需等待函数返回。

import concurrent.futures
# ...
receivers = [direct_message_receiver_builder.build() for _ in range(10)]
futures_to_receiver = {receiver.start_async(): receiver for receiver in receivers}

for future in concurrent.futures.as_completed(futures_to_receiver):
receiver = futures_to_receiver[future]
try:
# start async 没有返回值,但在启动失败时会抛出异常
future.result()
print(f'接收器{id(receiver)}启动成功')
except Exception as err:
print(f'接收器{id(receiver)}产生错误:{err}')

在调用 start()start_async() 之前,您的接收器应用程序是不可操作的。

处理订阅直接消息时的反向压力

当订阅直接消息时,API 使用内部缓冲区来存储从事件代理接收的消息。在理想情况下,消息一被接收,应用程序就会处理该消息。根据处理速度和其他因素,可能会收到比订阅应用程序处理速度更快的消息(例如,高容量的消息突发)。如果消息未被处理并允许累积,内部缓冲区可能会达到其容量,这种情况称为反向压力。

在 PubSub+ Python API 中,DirectMessageReceiver 具有以下机制来处理反向压力:

  • 忽略限制(默认)
  • 丢弃最新消息
  • 丢弃最旧消息

配置无限制的内部缓冲区

此 API 的默认配置是使用无限制大小的内部缓冲区来存储消息。这种配置不适合内存受限的环境,因为缓冲区被允许无限增长,可能会导致内存不足错误(或潜在的未定义错误)。当您的基础设施由多个短生命周期的微服务组成时,这种配置很有用。使用这种配置也很有用,因为您无需编写代码来处理反向压力场景。

当您使用无限制缓冲区时,Python API 会持续将从事件代理接收的消息放入其内部缓冲区。以下示例展示了对 on_back_pressure_elastic() 函数的显式调用,这不是必需的,因为它是默认行为:

# 创建一个 DirectMessageReceiverBuilder,允许您创建一个 DirectMessageReceiver 并启动它
direct_receiver = messaging_service.create_direct_message_receiver_builder() \
.with_subscriptions(topics_sub) \
.on_back_pressure_elastic() \
.build()

# 同步启动配置好的 DirectMessageReceiver。在调用此函数之前,接收器被视为非工作状态
direct_receiver.start()

丢弃最新消息

当 API 的内部缓冲区达到指定容量时,您可以配置 API 丢弃最新消息。当达到此容量时,最新的消息不会放入内部缓冲区,因为它是满的,而是被丢弃(丢失)。

要配置不同的缓冲区大小,请在 DirectMessageReceiverBuilder 上调用 on_back_pressure_drop_latest(buffer_capacity: int) 函数,然后设置可以在内部缓冲区中累积的最大消息数量(buffer_capacity),在达到此数量之前不会丢弃消息。

以下示例展示了如何配置应用程序,如果 API 的内部缓冲区中有 1000 条消息排队,则丢弃消息:

# 创建一个 DirectMessageReceiverBuilder,允许您创建一个 DirectMessageReceiver 并启动它
direct_receiver = messaging_service.create_direct_message_receiver_builder() \
.with_subscriptions(topics_sub) \
.on_back_pressure_drop_latest(1000) \
.build()

# 同步启动配置好的 DirectMessageReceiver。在调用此函数之前,接收器被视为非工作状态
direct_receiver.start()

丢弃最旧消息

当 API 的内部缓冲区达到指定容量时,您可以配置 API 丢弃最旧消息。当达到此容量时,从内部队列中移除最旧的项目,以为接收更新的消息腾出空间。当达到指定容量时,从接收器的缓冲区中移除最旧的消息,以便为更新的消息腾出空间。

要配置此机制,请在 DirectMessageReceiverBuilder 上调用 on_back_pressure_drop_oldest(buffer_capacity: int) 函数,然后设置可以在内部缓冲区中累积的最大消息数量(buffer_capacity),在达到此数量之前不会移除最旧的消息。

以下示例展示了如何配置应用程序,如果 API 的内部缓冲区中有 1000 条消息排队,则丢弃最旧的消息:

# 创建一个 DirectMessageReceiverBuilder,允许您创建一个 DirectMessageReceiver 并启动它
direct_receiver = messaging_service.create_direct_message_receiver_builder() \
.with_subscriptions(topics_sub) \
.on_back_pressure_drop_oldest(1000) \
.build()

# 同步启动配置好的 DirectMessageReceiver。在调用此函数之前,接收器被视为非工作状态
direct_receiver.start()

同步接收直接消息

在使用 MessagingService 实例与事件代理建立连接后,您可以使用 DirectMessageReceiver 订阅消息。DirectMessageReceiver 必须订阅至少一个主题,才能开始接收消息。

以下示例展示了 InboundMessage 是如何被 DirectMessageReceiver 接收的。使用 receive_message(timeout: int) 函数,该函数会阻塞例程,直到:

  • 您的接收器收到下一条消息。
  • 根据提供的 timeout 参数发生超时。
  • 出现服务中断,例如接收器终止、MessagingService 断开连接或不可恢复的服务中断。
direct_receiver = messaging_service.create_direct_message_receiver_builder() \
.with_subscriptions(topics_sub) \
.build()
direct_receiver.start()

# 阻塞请求以接收下一条消息。通常在循环中使用,以下示例接收 1000 条消息。
for _ in range(1000):
inbound_message: 'InboundMessage' = direct_receiver.receive_message(1000) # 阻塞 1000 毫秒以等待消息到达。

异步接收直接消息

在使用 MessagingService 实例与事件代理建立连接后,您可以使用 DirectMessageReceiver 消费直接消息并异步处理它们。要异步处理直接消息,您需要使用 MessageHandler 作为回调函数,以便在收到消息时通知应用程序。抽象的 MessageHandler 类包含您用于处理 InboundMessage 实例的 on_message() 函数。

以下示例展示了 MessageHandler 的示例实现以及您用于异步接收消息的 on_message() 函数:

class MessageHandlerExample(MessageHandler):
def on_message(self, message: InboundMessage):
# 检查负载是否为字符串或字节,如果是后者则解码
payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
if isinstance(payload, bytearray):
print(f"收到的消息类型为:{type(payload)}。正在解码为字符串")
payload = payload.decode()

# 在 DirectMessageReceiver 实例上注册异步消息接收器。
direct_receiver.receive_async(MessageHandlerExample())

从入站消息中提取属性

在与事件代理建立连接后,您的接收器应用程序可以订阅主题。每当您的应用程序从代理接收到具有匹配主题的消息时,就会将 InboundMessage 实例返回给应用程序。您可以从 InboundMessage 中提取许多属性,例如发送者 ID。以下示例展示了如何从消息中提取属性。

  • 使用 MessageHandler 回调异步接收消息时:
class MessageHandlerExample(MessageHandler):
def __init__(self, direct_receiver: DirectMessageReceiver):
self.receiver: DirectMessageReceiver = direct_receiver

def on_message(self, message: InboundMessage):
topic = message.get_destination_name()
payload_as_bytes = message.get_payload_as_bytes()
payload_as_string = message.get_payload_as_string()
sender_id = message.get_sender_id()
custom_property = message.get_property("custom_prop_name")

# 在 DirectMessageReceiver 实例上注册异步消息接收器。
direct_receiver.receive_async(MessageHandlerExample())
  • 使用 receive_message() 函数同步接收消息时:
# 阻塞请求以接收下一条消息。通常在循环中使用,以下示例接收 1000 条消息。
for _ in range(1000):
inbound_message: 'InboundMessage' = direct_receiver.receive_message(1000) # 阻塞 1000 毫秒以等待消息到达。
payload_as_bytes = inbound_message.get_payload_as_bytes()
payload_as_string = inbound_message.get_payload_as_string()
sender_id = inbound_message.get_sender_id()
custom_property = inbound_message.get_property("custom_prop_name")

有关可以从 InboundMessage 中提取属性的完整函数列表,请参阅 PubSub+ 消息 API for Python 参考。