跳到主要内容

使用Python API消费持久消息

无法容忍消息丢失的订阅应用程序可以使用持久消息(在本文档的其他部分也称为“保证消息”)而不是直接消息。使用持久消息时,消息会被存储在事件代理上的队列中。只有在订阅应用程序(称为消息接收器)消费并确认消息后,消息才会从事件代理中删除。PubSub+ Python API 只能从队列中消费持久消息,而不能从主题端点中消费。

要消费持久消息,您必须首先在事件代理上设置消息队列。有关在事件代理上创建和配置持久队列的信息,请参阅配置队列。或者,可以在创建持久消息接收器(PersistentMessageReceiver)时创建非持久队列。

使用 PubSub+ 消息 API for Python 消费持久消息的步骤如下:

  1. 创建一个 PersistentMessageReceiver
  2. 同步接收持久消息。
  3. 异步接收持久消息。
  4. 从入站消息中提取属性。
  5. 消息确认。
  6. 使用 PubSub+ Python API 创建队列。

如果您的消费者(或订阅)应用程序遇到无法处理消息的速度快于从事件代理接收消息的情况,则可能会发生内部反向压力。消息会继续在内部缓冲,直到达到高水位线,此时 API 会告知事件代理停止发送消息,以防止消息丢失。

有关消费持久消息的应用程序示例,请参阅 Solace GitHub 页面上的 guaranteed_subscriber.py

创建PersistentMessageReceiver

使用 MessagingService 实例与事件代理建立连接后,使用 PersistentMessageReceiver 从事件代理的队列中消费持久消息。要创建 PersistentMessageReceiver 对象,请执行以下操作:

  1. MessagingService 对象上调用 create_persistent_message_receiver_builder() 函数。这将返回一个 PersistentMessageReceiverBuilder 对象。

  2. PersistentMessageReceiverBuilder 对象为您提供了一系列函数,以便您可以自定义 PersistentMessageReceiver 对象。这些包括以下内容:

    • with_message_replay(replay_strategy: ReplayStrategy) — 向持久接收器添加消息回放策略。
    • with_message_selector(selector_query_expression: str) — 基于消息头参数和消息属性值启用消息选择支持。
  3. 在您的 PersistentMessageReceiverBuilder 上调用 build() 函数以返回一个 PersistentMessageReceiver 对象:

    • build(endpoint_to_consume_from: Queue) — 返回一个 PersistentMessageReceiver 对象。此函数以要从中消费的队列为参数。
  4. 要使您的 PersistentMessageReceiver 开始接收消息,请在其上调用 start() 函数。

要使您的 PersistentMessageReceiver 开始接收消息,请在它上调用 start()

有关 Python API 中使用的函数的更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下是如何使用 PersistentMessageReceiver 绑定到队列的示例:

# 定义主题订阅
topics_sub = [TopicSubscription.of("solace/sample/1")]

# 队列名称。这假设代理上已经存在一个具有正确主题订阅的持久队列
durable_exclusive_queue = Queue.durable_exclusive_queue("sample-queue")

# 创建一个 PersistentMessageReceiver Builder,允许您创建一个 PersistentMessageReceiver 并启动它
persistent_receiver = messaging_service.create_persistent_message_receiver_builder() \
.build(durable_exclusive_queue)

# 启动会同步启动配置好的 PersistentMessageReceiver。在调用此函数之前,接收器被视为非工作状态
persistent_receiver.start()

# 为您的接收器添加任何额外的订阅
persistent_receiver.add_subscription(topics_sub)

您可以使用 Python API 在事件代理上创建持久和非持久队列。有关更多信息,请参阅使用 Python API 创建队列。

异步接收器

也可以使用回调监听器启动持久消息接收器,以便在启动操作完成后接收异步通知。

在 PubSub+ Python API 中,您可以使用 start_async() 函数异步启动 PersistentMessageReceiver。这允许接收器对象在单独的后台线程中启动,这意味着您的代码可以继续执行,而无需等待函数返回。

import concurrent.futures
# ...
receivers = [persistent_message_receiver_builder.build() for _ in range(10)]
futures_to_receiver = {receiver.start_async(): receiver for receiver in receivers}

for future in concurrent.futures.as_completed(futures_to_receiver):
receiver = futures_to_receiver[future]
try:
# start async 没有返回值,但在启动失败时会抛出异常
future.result()
print(f'接收器{id(receiver)}启动成功')
except Exception as err:
print(f'接收器{id(receiver)}产生错误:{err}')

在调用 start()start_async() 之前,您的接收器应用程序是不可操作的。

同步消费持久消息

您可以同步消费持久消息。为此,您需要创建一个 PersistentMessageReceiver 并将其绑定到队列。成功绑定到队列后,您的接收器应用程序可以开始消费持久消息。

当您使用 receive_message(timeout: int) 函数时,它会阻塞例程,直到:

  • 您的接收器收到下一条消息。
  • 根据提供的 timeout 参数发生超时。
  • 或者出现服务中断,例如接收器终止、MessagingService 断开连接或不可恢复的服务中断。

当应用程序处理 InboundMessage 时,它可以使用 PersistentMessageReceiver.ack() 向事件代理发送确认。事件代理随后会从队列中删除 InboundMessage。在确认消息之前,它保留在代理队列中,并且当应用程序重新连接到队列时可能会重新传递。

有关上述函数的更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下示例展示了如何同步消费持久消息:

persistent_receiver = messaging_service.create_persistent_message_receiver_builder() \
.build(durable_exclusive_queue)
persistent_receiver.start()

# 阻塞请求以接收下一条消息。通常在循环中使用,以下示例接收 1000 条消息。
for _ in range(1000):
received_message: 'InboundMessage' = persistent_receiver.receive_message(1000) # 阻塞 1000 毫秒以等待消息到达。
persistent_receiver.ack(received_message)

如果您不调用 receive_message() 函数,消息可能会在 API 的内部缓冲区中累积,您可能会遇到反向压力场景。如果发生这种情况,Python API 会自动告知事件代理停止发送消息。

异步消费持久消息

您可以异步消费持久消息。为此,您需要创建一个 PersistentMessageReceiver 并像往常一样启动与事件代理的连接,但您需要使用 MessageHandler 作为回调函数,以便在收到消息时通知您的应用程序。

receive_async(message_handler: MessageHandler) 函数不会阻塞您的接收应用程序,这允许它同时执行其他代码。当您的接收器需要一次处理许多消息或处理连续消息流时,这非常有用。

receive_async() 不是异步协程或生成器,也不兼容 asyncioreceive_async() 立即返回,并在后台使用原生线程。此函数为每条消息在新 Python 线程中调用回调。

当应用程序处理 InboundMessage 时,它可以使用 PersistentMessageReceiver.ack() 向事件代理发送确认。事件代理随后会从队列中删除 InboundMessage。在确认消息之前,它保留在代理队列中,并且当应用程序重新连接到队列时可能会重新传递。

有关上述函数的更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下示例展示了如何异步消费和确认持久消息:

class MessageHandlerExample(MessageHandler):
def __init__(self, persistent_receiver: PersistentMessageReceiver):
self.receiver: PersistentMessageReceiver = persistent_receiver

def on_message(self, message: InboundMessage):
# 检查负载是否为字符串或字节,如果是后者则解码
payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
if isinstance(payload, bytearray):
print(f"收到的消息类型为:{type(payload)}。正在解码为字符串")
payload = payload.decode()
self.receiver.ack(message)

# 在 PersistentMessageReceiver 实例上注册异步消息接收器。
persistent_receiver.receive_async(MessageHandlerExample(persistent_receiver))

暂停和恢复从内部缓冲区的消息消费

当您的应用程序使用 receive_async() 函数异步消费消息时,您可以调用 pause()resume() 函数来控制消息流到应用程序的回调。

调用 pause() 函数时,同步 receive_message(timeout: int) 函数中使用的任何超时值仍然有效。这意味着如果您调用 pause() 函数,超时可能会发生。

您可以使用 pause()resume() 函数来控制消息从事件代理到应用程序的内部缓冲区的流动。此内部缓冲区是消息从事件代理接收的地方。这种流量控制非常有用,如果您的应用程序需要暂时停止处理消息以处理其他操作。pause()resume() 函数不控制事件代理和 API 内部缓冲区之间的消息流动。调用 pause() 函数时,消息继续从事件代理发送。pause()resume() 函数仅控制消息传递到应用程序。从事件代理接收的消息继续在内部缓冲区中累积。

由于事件代理继续发送消息,可能会发生反向压力场景,即消息继续累积,直到达到内部高水位线。此时,PersistentMessageReceiver 会通知事件代理停止发送消息,直到累积消息的数量低于内部低水位线。这种内部 API 机制为您处理反向压力场景,并确保在事件代理和您的应用程序之间不会丢失任何消息。

以下函数用于暂停和恢复从 API 内部缓冲区的消息处理:

  • pause()

  • resume()

有关上述函数的更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下示例展示了如何使用调度器暂停和恢复从 API 内部队列的消息处理:

persistent_receiver.pause() # 暂停接收器向异步消息处理器的消息传递。
# 在这里执行任何操作,例如等待 60 秒:time.sleep(60)
persistent_receiver.resume() # 恢复接收器向异步消息处理器的消息传递。

从入站消息中提取属性

与事件代理建立连接后,您的接收器应用程序可以订阅主题。每当您的应用程序从代理接收到具有匹配主题的消息时,就会将 InboundMessage 实例返回给应用程序。您可以从 InboundMessage 中提取许多属性,例如发送者 ID。以下示例展示了如何从消息中提取属性。

  • 使用 MessageHandler 回调异步接收消息时:
class MessageHandlerExample(MessageHandler):
def __init__(self, persistent_receiver: PersistentMessageReceiver):
self.receiver: PersistentMessageReceiver = persistent_receiver

def on_message(self, message: InboundMessage):
topic = message.get_destination_name()
payload_as_bytes = message.get_payload_as_bytes()
payload_as_string = message.get_payload_as_string()
sender_id = message.get_sender_id()
custom_property = message.get_property("custom_prop_name")
self.receiver.ack(message)

# 在 PersistentMessageReceiver 实例上注册异步消息接收器。
persistent_receiver.receive_async(MessageHandlerExample(persistent_receiver))
  • 使用 receive_message() 函数同步接收消息时:
# 阻塞请求以接收下一条消息。通常在循环中使用,以下示例接收 1000 条消息。
for _ in range(1000):
received_message: 'InboundMessage' = persistent_receiver.receive_message(1000) # 阻塞 1000 毫秒以等待消息到达。
payload_as_bytes = received_message.get_payload_as_bytes()
payload_as_string = received_message.get_payload_as_string()
sender_id = received_message.get_sender_id()
custom_property = received_message.get_property("custom_prop_name")
persistent_receiver.ack(received_message)

有关可以从 InboundMessage 中提取属性的完整函数列表,请参阅 PubSub+ Python 消息 API 参考。

消息确认

可以使用以下两种应用程序确认模式之一来确认消息:

  • 客户端确认(默认)

  • 自动确认

客户端确认模式

客户端确认模式是 PubSub+ Python API 的默认行为,意味着客户端必须明确地为每条收到的消息的消息 ID 发送确认。ACK 是异步的。只要接收器尚未终止,任何例程都可以在任何时候确认消息。重要的是要记住,在 PersistentMessageReceiver 向事件代理确认消息后,它会从事件代理的队列中删除该消息。因此,在确认消息之前,重要的是要完成对消息的任何处理和存储。

以下示例展示了如何同步和异步确认持久消息:

  • 使用 MessageHandler 回调异步接收消息时:
class MessageHandlerExample(MessageHandler):
def __init__(self, persistent_receiver: PersistentMessageReceiver):
self.receiver: PersistentMessageReceiver = persistent_receiver

def on_message(self, message: InboundMessage):
# 处理消息。
self.receiver.ack(message)

# 在 PersistentMessageReceiver 实例上注册异步消息接收器。
persistent_receiver.receive_async(MessageHandlerExample(persistent_receiver))
  • 使用 receive_message() 函数同步接收消息时:
# 阻塞请求以接收下一条消息。通常在循环中使用,以下示例接收 1000 条消息。
for _ in range(1000):
received_message: 'InboundMessage' = persistent_receiver.receive_message(1000) # 阻塞 1000 毫秒以等待消息到达。
# 处理消息。
persistent_receiver.ack(received_message)

自动确认模式

使用自动确认模式时,API 会自动生成应用程序级确认。要将您的 PersistentMessageReceiver 配置为使用自动确认,请使用 with_message_auto_acknowledgement() 方法:

persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder() \
.with_message_auto_acknowledgement() \
.build(durable_exclusive_queue)

对特定消息进行负确认

如果您的 PersistentMessageReceiver 未配置为自动确认收到的消息,则可以使用负确认(NACK)。使用 NACK 时,您可以发送一个结算结果,以告知事件代理处理收到的保证消息的结果。根据结算结果,事件代理知道如何在其队列上处理消息。您可以使用以下结算结果:

  • ACCEPTED — 此确认通知事件代理您的客户端应用程序已成功处理保证消息。当事件代理收到此结果时,它会从其队列中删除该消息。
    • 使用 settle() 函数并设置 ACCEPTED 结果时,与使用 persistent_receiver.ack(received_message) 是相同的。
  • FAILED — 此 NACK 通知事件代理您的客户端应用程序未处理该消息。当事件代理收到此 NACK 时,它会尝试重新传递该消息,同时遵守传递次数限制。在重新传递期间,消息保留在代理上,直到代理收到非 FAILED 的确认状态。
  • REJECTED — 此 NACK 通知事件代理您的客户端应用程序无法处理该消息。当事件代理收到此 NACK 时,它会从其队列中删除该消息,如果已配置,则将其移动到死信队列(DMQ)。

在使用 NACK 之前,您必须使用 with_required_message_outcome_support() 函数在创建 PersistentMessageReceiver 时添加 Outcome.FAILEDOutcome.REJECTED 或两者作为 NACK 类型,以准备其与负确认一起工作。您不需要添加 Outcome.ACCEPTED 结果,因为它始终可用。如果您尝试对未添加的结果调用 settle(),您会收到 Required Settlement Outcome Not Supported 错误。以下代码展示了如何配置 PersistentMessageReceiver 以使用 NACK:

from solace.messaging.config.message_acknowledgement_configuration import Outcome
# ...
nacking_receiver = messaging_service.create_persistent_message_receiver_builder() \
.with_required_message_outcome_support(Outcome.FAILED, Outcome.REJECTED) \
.build(Queue.durable_exclusive_queue("queue_name"))

或者,您可以使用属性映射而不是设置器:

from solace.messaging.config.solace_properties.receiver_properties import PERSISTENT_REQUIRED_MESSAGE_OUTCOME_SUPPORT
from solace.messaging.config.solace_constants.receiver_constants import PERSISTENT_RECEIVER_OUTCOME_FAILED, PERSISTENT_RECEIVER_OUTCOME_REJECTED
# ...
nacking_receiver = messaging_service.create_persistent_message_receiver_builder() \
.from_properties({PERSISTENT_REQUIRED_MESSAGE_OUTCOME_SUPPORT: f"{PERSISTENT_RECEIVER_OUTCOME_FAILED},{PERSISTENT_RECEIVER_OUTCOME_REJECTED}"}) \
.build(Queue.durable_exclusive_queue("queue_name"))
  • NACK 在传输过程中可能会丢失(例如由于意外的网络问题)。在开发应用程序时,将这一事实纳入处理消息的逻辑中。
  • NACK 在事件代理 10.2.1 及更高版本上受支持。如果事件代理不支持 NACK,则在调用已配置为使用消息结果的 PersistentMessageReceiver 实例的 start() 时,会发生 InvalidOperationException

以下示例展示了如何结算消息,其中包含占位符用户定义的函数,以确定消息结果应为 ACCEPTEDREJECTED 还是 FAILED。决定在收到消息时使用哪种 NACK 结果应基于您的应用程序需求。

  • 同步接收消息:
nacking_receiver.start()
nacking_receiver.add_subscription(TopicSubscription.of(topic_name))
msg: InboundMessage = receiver.receive_message()
if isMsgOK(msg):
# 消息正常,此处处理并确认它
self.receiver.settle(msg, Outcome.ACCEPTED)
elif isMsgPossiblySalvageableLater(msg):
# 例如,您的应用程序目前无法处理该消息,稍后重新传递
self.receiver.settle(msg, Outcome.FAILED)
else:
# 例如,消息内容存在问题,从端点中移除
self.receiver.settle(msg, Outcome.REJECTED)
  • 异步接收消息:
class MessageHandlerExample(MessageHandler):
def __init__(self, nacking_receiver: PersistentMessageReceiver):
super().__init__()
self.receiver: PersistentMessageReceiver = nacking_receiver

def on_message(self, msg: InboundMessage):
try:
# 处理消息。
if isMsgOK(msg):
# 消息正常,此处处理并确认它
self.receiver.settle(msg, Outcome.ACCEPTED)
elif isMsgPossiblySalvageableLater(msg):
# 例如,您的应用程序目前无法处理该消息
self.receiver.settle(msg, Outcome.FAILED)
else:
# 例如,消息内容存在问题
self.receiver.settle(msg, Outcome.REJECTED)
except Exception as e:
# 适当地记录或处理异常
print(f"处理消息时出错:{e}")
# 考虑在发生错误时如何结算消息
self.receiver.settle(msg, Outcome.FAILED)

nacking_receiver.start()
nacking_receiver.add_subscription(TopicSubscription.of(topic_name))
nacking_receiver.receive_async(MessageHandlerExample(nacking_receiver))