跳到主要内容

使用Python API发布持久消息

当您的应用程序需要确认处理和至少一次的投递时,我们建议您使用持久消息而不是直接消息。要在 PubSub+ Python API 中发布持久消息,您需要先在 PubSub+ 事件代理上设置一个消息队列。

关于如何在事件代理上创建和配置队列的信息,请参阅配置队列。

要使用 PubSub+ Python 消息 API 发布持久消息,请按照以下步骤操作:

  1. 创建一个 PersistentMessagePublisher。
  2. 配置并创建一个 OutboundMessage。
  3. 处理发布持久消息时的反向压力。
  4. 发布一个持久消息。
  5. 确认消息和处理错误。
  6. 用户上下文。

在某些用例中,您的应用程序可能会比消息能够传输的速度更快地发送消息。这可能会导致消息在 API 内部缓冲区中累积,从而导致反向压力。如果这种情况可能发生,请考虑更改反向压力设置以满足您的应用程序需求。有关更多信息,请参阅处理发布持久消息时的反向压力。

有关发布持久消息的应用程序示例,请参阅 Solace GitHub 页面上的 guaranteed_publisher.py。

创建PersistentMessagePublisher

MessagingService 实例与事件代理建立连接后,使用 PersistentMessagePublisher 发布持久消息。要创建一个 PersistentMessagePublisher 对象,请执行以下操作:

  1. MessagingService 对象上调用 create_persistent_message_publisher_builder() 函数。这将返回一个 PersistentMessagePublisherBuilder 对象。

  2. 现在,您可以使用 PersistentMessagePublisherBuilder 接口中的函数来配置 PersistentMessagePublisher 使用 API 的某些功能,例如反向压力策略。

  3. 在您的 PersistentMessagePublisherBuilder 上调用 build() 函数以返回一个 PersistentMessagePublisher 对象。

  4. 要使您的 PersistentMessagePublisher 开始发布消息,请在其上调用 start() 函数。

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下示例展示了如何使用持久消息发布者使您的应用程序能够向事件代理发布消息:

# 创建一个 PersistentMessagePublisherBuilder,允许您创建一个 PersistentMessagePublisher 并启动它
persistent_publisher = messaging_service.create_persistent_message_publisher_builder() \
.on_back_pressure_reject(1000) \
.build()

# 同步启动配置好的 PersistentMessagePublisher。在调用 start() 函数之前,发布者被视为非工作状态
persistent_publisher.start()

异步发布者

也可以使用回调监听器启动持久消息发布者,以便在启动操作完成后接收异步通知。

在 PubSub+ Python API 中,您可以使用 start_async() 函数异步启动一个 PersistentMessagePublisher。这允许发布者对象在单独的后台线程中启动,这意味着您可以同时启动多个发布者,或者您的代码可以继续执行,而无需等待函数返回。

以下示例展示了如何异步启动 1000 个发布者。wait() 函数会阻塞应用程序,直到所有发布者都已启动。

import concurrent.futures
# ...
publishers = [persistent_message_publisher_builder.build() for _ in range(10)]
futures_to_publisher = { publisher.start_async() : publisher for publisher in publishers}

for future in concurrent.futures.as_completed(futures_to_publisher):
publisher = futures_to_publisher[future]
try:
# start async 没有返回值,但在启动失败时会抛出异常
future.result()
print(f'发布者 {id(publisher)} 启动成功')
except Exception as err:
print(f'发布者 {id(publisher)} 产生错误:{err}')

在调用 start()start_async() 之前,您的发布者应用程序是不可操作的。

配置和创建出站消息

您的客户端应用程序需要显式创建要发布的出站消息。在 PubSub+ Python API 中,发布消息时使用 OutboundMessage 实例。要配置和创建 OutboundMessage 实例,请按照以下步骤操作:

  1. 在消息服务对象上调用 message_builder(),返回一个 OutboundMessageBuilder 实例。为了提高性能,我们建议您使用单个 OutboundMessageBuilder 创建多个 OutboundMessage 实例。
outbound_msg_builder = messaging_service.message_builder()
  1. 使用 OutboundMessageBuilder 配置消息,然后调用 build() 函数返回消息实例。您可以使用以下两种方法配置消息属性。

    • 使用 OutboundMessageBuilder 接口和 with_property(propertyName,propertyValue) 函数。接受 Solace 定义的 message_properties 键以及任意用户定义的属性键。以下示例展示了如何为消息 ID、发送者 ID 和消息类型设置 Solace 定义的属性,以及在消息上设置自定义键值属性:

      message_builder = messaging_service.message_builder()   \
      .with_application_message_id("myID") \
      .with_sender_id("senderID") \
      .with_application_message_type("messageType") \
      .with_property("key","value") \
      .build(message_body)
    • 使用带有 message_properties.* 键的消息属性字典。以下示例展示了如何创建一个带有消息 ID、发送者 ID 和消息类型常量值的消息属性字典,然后使用 from_properties() 函数配置消息:

      from solace.messaging.config.solace_properties import message_properties
      # ...
      message_props = {
      message_properties.APPLICATION_MESSAGE_ID: "myID",
      message_properties.SENDER_ID: "senderID",
      message_properties.APPLICATION_MESSAGE_TYPE: "messageType"
      # 有关 message_properties 常量的完整列表,请参阅 PubSub+ Python 消息 API 参考。
      }
      # ...
      outbound_msg = outbound_msg_builder \
      .from_properties(message_props) \
      .build(message_body)

以下代码示例展示了如何创建消息构建器、设置消息属性并创建消息:

# 用于创建配置相似消息的构建器。
message_builder = messaging_service.message_builder() \
.with_application_message_id(message_id) \
.from_properties(message_props) \
.with_property("key","value") \
.build(message_body)

有关这些函数的更多信息,请参阅 PubSub+ Python 消息 API 参考。

设置分区键

您可以设置分区键以使用分区队列。分区队列是 PubSub+ 事件代理上的一项功能,允许您轻松扩展绑定到队列的消费者应用程序数量。在发布应用程序中为每条消息设置分区键,可以确保所有具有相同分区键的消息都传递到同一个消费者,而无需在消费者应用程序中添加额外逻辑。有关更多信息,请参阅分区队列。

分区键是发布消息的属性,因此请使用 with_property(property,value) 设置 Python API 消息上的属性值对。

  • property—常量 message_user_property_constants.QUEUE_PARTITION_KEY 或字符串值 JMSXGroupID

  • value—表示分区键值的字符串。客户端应用程序在发布时设置该值。

以下示例展示了如何创建消息并将其发布到队列分区:

# 使用 with_property() 构造器方法在出站消息上设置队列分区键。
def set_queue_partition_key_using_with_property(queue_partition_key_value: str):
payload = "my_payload"
outbound_message = MessagingService \
.message_builder() \
.with_property(message_user_property_constants.QUEUE_PARTITION_KEY, queue_partition_key_value) \
.build(payload)

# 您也可以使用 from_properties() 构造器方法在出站消息上设置队列分区键。
def set_queue_partition_key_using_from_properties(queue_partition_key_value: str):
payload = "my_payload"
additional_properties = {message_user_property_constants.QUEUE_PARTITION_KEY, queue_partition_key_value}
outbound_message = MessagingService \
.message_builder() \
.from_properties(additional_properties) \
.build(my_payload)

处理发布持久消息时的反向压力

当您使用持久消息时,消息会与主题一起发送到 PubSub+ 事件代理,并且可能会在任何具有匹配主题订阅的队列上入队。然后,事件代理会异步地将消息传递给订阅该队列的任何消费者。在客户端应用程序发布持久消息时,API 会将消息放入内部缓冲区,然后再将其发送到事件代理。在理想情况下,应用程序发布消息后,API 会立即将该消息发送到网络,并且该消息最终会被事件代理接收。由于网络拥塞或连接问题,客户端应用程序可能会比 API 更快地发布消息。这种延迟可能会导致内部缓冲区累积消息,直到达到其容量,从而阻止 API 存储更多消息。这种情况称为反向压力。配置您的应用程序以处理反向压力发生的情况非常重要。

在 PubSub+ Python API 中,PersistentMessagePublisher 具有以下机制来处理反向压力:

  • 使用无限制的内部发布缓冲区(默认)
  • 达到指定限制时拒绝消息
  • 达到指定限制时限制应用程序

配置无限制的内部缓冲区锚点

API 的默认配置是使用无限制大小的内部缓冲区来存储消息。当您使用无限制缓冲区时,Python API 会持续将客户端应用程序发布的消息放入内部缓冲区。当您的发布者需要快速发送大量数据,即使网络无法跟上也不会阻塞时,这种配置很有用。这种配置(称为 on_back_pressure_elastic())也很有用,因为您无需编写代码来告诉您的应用程序如何处理内部缓冲区达到容量的情况。

当处理的数据大小不会过度消耗您的系统资源(无论是小数据还是大数据)时,我们建议使用此配置。需要注意的是,数据大小在确定传输带宽方面起着重要作用,影响您的应用程序将数据写入套接字的速度。大量小消息或少量大消息都可能对您的应用程序资源造成压力。如果您的应用程序在使用弹性反向压力时稳定性不足,我们建议您使用其他反向压力策略,如“等待”或“拒绝”,以管理应用程序的流量控制。

如果您的基础设施由多个短生命周期的微服务组成,并且可以在内部队列遇到内存不足的情况时提供发布冗余,那么这种配置很有用。

以下示例展示了对 on_back_pressure_elastic() 函数的显式调用,这不是必需的,因为它是默认行为:

# 创建一个 PersistentMessagePublisherBuilder,允许您创建一个 PersistentMessagePublisher 并启动它
persistent_publisher = messaging_service.create_persistent_message_publisher_builder() \
.on_back_pressure_elastic() \
.build()

# 同步启动配置好的 PersistentMessagePublisher。在调用 start() 函数之前,发布者被视为非工作状态
persistent_publisher.start()

达到指定限制时拒绝消息锚点

当发生反向压力时,您可以选择在内部缓冲区达到指定限制时从客户端应用程序中拒绝消息。您可以使用 on_back_pressure_reject(buffer_capacity: int) 函数为内部缓冲区中累积的消息数量指定一个定义的缓冲区容量。达到指定容量后,将不再可能发布新消息,API 将返回 PublisherOverflowError,直到缓冲区再次有容量为止。如果 buffer_capacity 设置为零,则 Python API 不会缓冲任何出站消息,并且在无法将消息写入本地库时立即拒绝。

# 创建一个 PersistentMessagePublisherBuilder,允许您创建一个 PersistentMessagePublisher 并启动它
persistent_publisher = messaging_service.create_persistent_message_publisher_builder() \
.on_back_pressure_reject(1000) \
.build()

# 同步启动配置好的 PersistentMessagePublisher。在调用 start() 函数之前,发布者被视为非工作状态
persistent_publisher.start()

使用发布者准备就绪监听器锚点

当您使用 on_back_pressure_reject() 函数时,我们建议您使用 PublisherReadinessListener,因为它可以让您的应用程序知道缓冲区何时有容量可用,并且可以恢复发布消息。

以下示例展示了如何注册一个 PublisherReadinessListener 实例:

class CanPublishListener(PublisherReadinessListener):
def ready(self):
# 当发布者准备好再次发布时,将执行此方法
# 您可以使用此回调来处理反向压力缓解时的消息发布
pass
# ...
# ...
persistent_publisher.set_publisher_readiness_listener(CanPublishListener())

# 回调的发起者
some_condition_is_true = True
while some_condition_is_true:
# 准备/处理发布消息之前的一些数据...
try:
message_publisher.publish("无法发送,缓冲区超出容量", my_topic)
except PublisherOverflowError as e:
# 处理溢出异常(反向压力)
# 您可以在这里做一些工作,或者发出信号以减慢消息发布的速度
pass

达到指定限制时限制发布者锚点

如果内部缓冲区达到指定限制,您可以选择限制发布应用程序。当应用程序可以阻塞并等待缓冲区中出现容量时,限制的使用很有用。您可以使用 on_back_pressure_wait(buffer_capacity: int) 函数设置缓冲区中可以累积的最大消息数量。当达到此最大容量(buffer_capacity)时,发布例程将阻塞并等待内部缓冲区中有可用容量,然后再允许应用程序发布更多消息。on_back_pressure_wait() 的反向压力策略不会调用 PublisherReadinessListener

当您希望应用程序请求在缓冲区容量达到后花费更长时间时,应使用此函数。有效使用此机制可以为 API 提供时间来清空内部缓冲区。

以下示例展示了如何配置内部缓冲区以容纳多达一千条消息:

# 创建一个 PersistentMessagePublisherBuilder,允许您创建一个 PersistentMessagePublisher 并启动它
persistent_publisher = messaging_service.create_persistent_message_publisher_builder() \
.on_back_pressure_wait(1000) \
.build()

# 同步启动配置好的 PersistentMessagePublisher。在调用 start() 函数之前,发布者被视为非工作状态
persistent_publisher.start()

发布持久消息

在使用 MessagingService 实例与事件代理建立连接后,您可以使用 PersistentMessagePublisher 发布持久消息。

持久消息具有以下组成部分:

  • 要发布的主题(必需)
  • 消息正文(可选)

持久消息发布涉及接收确认或发布收据。根据您的需求,您的客户端应用程序可以以以下方式发布消息:

  • 异步,允许您的应用程序在 PublishReceiptListener 等待确认的同时执行其他功能
  • 同步,等待直到收到确认;确认表明消息已被代理接收并持久化

您不能发布超过事件代理最大持久消息大小(在大多数情况下为 30MB)的持久消息。如果您尝试发布超过最大大小的消息,PubSub+ Python API 将引发带有子代码 MESSAGE_TOO_LARGE 的错误。为了避免中断,请设计您的应用程序以控制消息大小并确保其保持在支持的限制范围内。

异步发布函数锚点

以下是异步发布函数:

  • publish(message: bytearray | str | OutboundMessage, destination: Topic, user_context: Any | None = None, additional_message_properties: Dict[str, str | int | float | bool | dict | list | tuple | bytearray | None] | None = None)
    • message—出站消息,可以是 OutboundMessage 对象、bytearraystr。如果正文是 bytearraystr,API 将创建一个 OutboundMessage 对象以发送。
    • destination—要发布的 Topic
    • user-context—与在 MessagePublishReceiptListener.on_publish_receipt() 中确认消息传递到代理时执行的操作相关联的上下文。当不需要提供用户上下文时,省略 user_context 参数。有关更多信息,请参阅用户上下文。
    • additional_message_properties—额外的键值对属性,用于自定义消息。每个键可以由客户方提供,也可以是 solace.messaging.config.solace_properties.message_properties 类型的键。
# 异步发送持久消息
persistent_publisher.publish(my_message, topic_to_publish_to)

有关这些函数的更多信息,请参阅 PubSub+ Python 消息 API 参考。

同步发布函数锚点

同步发布函数在消息被接收、写入持久存储并由事件代理确认之前不会返回。PubSub+ Python API 提供了以下同步函数用于发布消息:

  • publish_await_acknowledgement(message: bytearray | str | OutboundMessage, destination: Topic, time_out: int | None = None, additional_message_properties: Dict[str, str | int | float | bool | dict | list | tuple | bytearray | None] | None = None)
    • message—出站消息,可以是 OutboundMessage 对象、bytearraystr。如果正文是 bytearraystr,API 将创建一个 OutboundMessage 对象以发送。
    • destination—要发布的 Topic
    • time_out—等待消息确认的最大时间(以毫秒为单位)。
    • additional_message_properties—额外的键值对属性,用于自定义消息。每个键可以由客户方提供,也可以是 solace.messaging.config.solace_properties.message_properties 类型的键。

前面的函数可以与 PersistentMessagePublisher 一起使用,以使用主题将 OutboundMessage 发布到代理。此函数会阻塞主线程,直到:

  • 发布者 API 收到来自代理的确认
  • 超时时间过去
# 发送持久消息,直到收到发布确认或超时发生才会阻塞。
persistent_publisher.publish_await_acknowledgement(my_message, topic_to_publish_to, 1000, None)

确认消息和处理错误

发布收据是确认消息是否成功由事件代理处理的交付确认。这些发布收据可以表明成功或失败,并由 MessagePublisReceiptListener 实例处理。您可以使用 set_message_publish_receipt_listener() 函数设置您的 MessagePublishReceiptListener

以下示例展示了如何使用 MessagePublishReceiptListener 监听发布收据:

# `MessagePublishReceiptListener` 接口的示例实现,用于处理代理消息发布通知。
class MessageReceiptListener(MessagePublishReceiptListener):
def __init__(self):
self._receipt_count = 0
@property
def receipt_count(self):
return self._receipt_count

def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
self._receipt_count += 1
print(f"\t消息:{publish_receipt.message}\n"
f"\t是否持久化:{publish_receipt.is_persisted}\n"
f"\t时间戳:{publish_receipt.time_stamp}\n"
f"\t异常:{publish_receipt.exception}\n")

# 将消息传递监听器设置到发布者
receipt_listener = MessageReceiptListener()
persistent_publisher.set_message_publish_receipt_listener(receipt_listener)
# ...
persistent_publisher.publish(my_message, topic_to_publish_to)

处理发布收据错误的策略锚点

以下是可以用于处理发布消息时收据错误的应用程序特定策略。根据您的应用程序需求,您可能需要结合使用所有三种策略:

  • 等待并重试:在尝试再次发送消息之前等待几秒钟。例如,如果代理暂时无法接受消息,请使用 time.sleep() 暂停您的消息发布者。
  • 重试预定义次数:在丢弃消息之前尝试重新发布消息预定义次数。这有助于避免无法恢复的传输问题,这些问题可能导致无限重试循环。
  • 丢弃消息:丢弃发布收据失败的消息。如果消息因任何原因无法传递到端点,这可能很有用。如果您的应用程序无法容忍消息丢失,我们不推荐此策略。

要接收没有匹配订阅时的失败发布收据,必须为事件代理或事件代理服务设置此选项。有关更多信息,请参阅配置客户端配置文件(针对设备和软件事件代理)或 PubSub+ Cloud 的在没有订阅匹配时将消息发送给发送方以丢弃

用户上下文锚点

可选地,您可以使用用户上下文将持久消息与发布收据相关联。此信息是用户特定的,仅对您的发布应用程序有意义,并且不会发送到代理。用户上下文允许您将数据附加到发布调用中,稍后可以从发布收据监听器中检索该数据。

当您使用用户上下文时,它允许您处理多种场景。它还允许您的应用程序根据上下文决定对发布收据采取什么操作或如何处理发布收据。

例如,如果非阻塞应用程序有多个例程发布持久消息,每个例程都可以在其发布持久消息时将其标识符作为用户上下文包含在内。PubSub+ Python API 会跟踪为每条消息指定的用户上下文,并在消息被代理确认或拒绝时,将用户上下文作为发布收据的一部分返回。然后,发布应用程序可以根据用户上下文将发布收据发送到发送消息的正确例程。

您可以在发布消息时设置用户上下文。例如,您可以使用 publish(message: bytearray | str | OutboundMessage, destination: Topic, user_context: Any | None = None, additional_message_properties: Dict[str, str | int | float | bool | dict | list | tuple | bytearray | None] | None = None) 函数,其中用户上下文可以是任何数据类型。

以下示例展示了如何从发布收据中获取用户上下文:

# `MessagePublishReceiptListener` 的示例实现
class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
print(f"\t消息:{publish_receipt.message}\n"
f"\t是否持久化:{publish_receipt.is_persisted}\n"
f"\t时间戳:{publish_receipt.time_stamp}\n"
f"\t异常:{publish_receipt.exception}\n")
if publish_receipt.user_context:
print(f'\t收到的用户上下文:{publish_receipt.user_context.get_custom_message}')

如果您的应用程序是非阻塞的,您还可以在回调中使用持久消息发布者与发布收据来记录信息。例如,您可以使用非阻塞消息发布,然后发送警报以通知应用程序已发布消息的状态,例如:

  • 事件代理成功接收并处理消息
  • 访问控制违规(ACL)
  • 队列超出配额
  • 无效主题/没有订阅者的话题

以下代码展示了 MessagePublishReceiptListener 的示例:

# `MessagePublishReceiptListener` 接口的示例实现,用于处理代理消息发布通知。
class MessageReceiptListener(MessagePublishReceiptListener):
def __init__(self):
self._receipt_count = 0
@property
def receipt_count(self):
return self._receipt_count

def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
self._receipt_count += 1
print(f"\t消息:{publish_receipt.message}\n"
f"\t是否持久化:{publish_receipt.is_persisted}\n"
f"\t时间戳:{publish_receipt.time_stamp}\n"
f"\t异常:{publish_receipt.exception}\n")
if publish_receipt.user_context:
print(f'\t收到的用户上下文:{publish_receipt.user_context.get_custom_message}')

# 将消息传递监听器设置到发布者
receipt_listener = MessageReceiptListener()
persistent_publisher.set_message_publish_receipt_listener(receipt_listener)
# ...
persistent_publisher.publish_await_acknowledgement(my_message, topic_to_publish_to, 1000, None)