跳到主要内容

使用Python API发布直接消息

当需要高吞吐量和低延迟时,直接消息非常有用。如果可以容忍一些消息丢失而不会对客户端应用程序产生负面影响,我们建议您使用直接消息发布事件。消息丢失可能是由于外部因素,如网络拥塞或客户端断开连接。如果您的应用程序需要保证消息传递和消息确认,那么我们建议您改用持久消息。

要使用Solace PubSub+ Python消息API发布直接消息,请按照以下步骤操作:

  1. 创建一个DirectMessagePublisher
  2. 配置并创建一个OutboundMessage
  3. 处理发布直接消息时的反压。
  4. 发布直接消息。
  5. 处理错误。

在某些用例中,您的应用程序可能会比消息传输更快地发送消息。消息可能会填满API的内部缓冲区,导致反压。如果这种场景可能发生,我们建议您考虑更改反压设置以满足应用程序的要求。有关详细信息,请参阅发布直接消息时的反压处理。

有关发布直接消息的应用程序示例,请参阅Solace GitHub页面上的direct_publisher.py

创建DirectMessagePublisher

MessagingService实例与事件代理建立连接后,使用DirectMessagePublisher发布直接消息。要创建DirectMessagePublisher对象,请执行以下操作:

  1. MessagingService对象上调用create_direct_message_publisher_builder()函数。这将返回一个DirectMessagePublisherBuilder对象。

  2. 现在,您可以使用DirectMessagePublisherBuilder接口中的函数配置DirectMessagePublisher以使用API的某些功能,例如反压策略。

  3. DirectMessagePublisherBuilder上调用build()函数以返回一个DirectMessagePublisher对象。

  4. 要使DirectMessagePublisher开始发布消息,请在其上调用start()函数。

有关详细信息,请参阅Solace PubSub+ Python消息API参考。

以下示例展示了如何使用直接消息发布者使您的应用程序能够向事件代理发布消息:

# 创建一个DirectMessagePublisherBuilder,允许您创建DirectMessagePublisher并启动它
direct_publisher = messaging_service.create_direct_message_publisher_builder() \
.on_back_pressure_reject(1000) \
.build()

# 同步启动配置的DirectMessagePublisher。在调用start()函数之前,发布者被视为非工作状态
direct_publisher.start()

异步发布者

也可以使用回调监听器启动直接消息发布者,以便在启动操作完成后进行异步通知。

在Solace PubSub+ Python API中,您可以使用start_async()函数异步启动DirectMessagePublisher。这允许发布者对象在一个单独的后台线程中启动,这意味着您可以同时启动多个发布者,或者您的代码可以继续执行,而无需等待函数返回。

以下示例展示了异步启动1000个发布者。as_completed()函数允许您的应用程序在发布者可用时使用它们,而不是等待所有发布者都处于运行状态。

import concurrent.futures
# ...
publishers = [direct_message_publisher_builder.build() for _ in range(10)]
futures_to_publisher = {publisher.start_async() : publisher for publisher in publishers}

for future in concurrent.futures.as_completed(futures_to_publisher):
publisher = futures_to_publisher[future]
try:
# start_async没有返回值,但如果启动失败会抛出异常
future.result()
print(f'Publisher {id(publisher)} started successfully')
except Exception as err:
print(f'Publisher {id(publisher)} generated an error: {err}')

在调用start()start_async()之前,您的发布者应用程序不处于运行状态。

配置和创建OutboundMessage

您的客户端应用程序显式创建要发布的出站消息。在Solace PubSub+ Python API中,发布消息时使用OutboundMessage实例。要配置和创建OutboundMessage实例,请按照以下步骤操作:

  1. 在消息服务对象上调用message_builder()以返回一个OutboundMessageBuilder实例。为了获得更好的性能,我们建议您使用一个OutboundMessageBuilder来创建多个OutboundMessage实例。
outbound_msg_builder = messaging_service.message_builder()
  1. 使用OutboundMessageBuilder配置消息,然后调用build()函数以返回消息实例。您可以使用以下任一方法配置消息属性。

    • 使用OutboundMessageBuilder接口和with_property(propertyName,propertyValue)函数。接受Solace定义的message_properties键以及任意用户定义的属性键。以下示例展示了如何为消息ID、发送者ID和消息类型设置Solace定义的属性,以及在消息上设置自定义键值属性:
message_builder = messaging_service.message_builder()   \
.with_application_message_id("myID") \
.with_sender_id("senderID") \
.with_application_message_type("messageType") \
.with_property("key","value") \
.build(message_body)
  • 使用带有message_properties.*键的消息属性字典。以下示例展示了如何创建一个带有消息ID、发送者ID和消息类型常量值的消息属性字典,然后使用from_properties()函数配置消息:
from solace.messaging.config.solace_properties import message_properties
# ...
message_props = {
message_properties.APPLICATION_MESSAGE_ID: "myID",
message_properties.SENDER_ID: "senderID",
message_properties.APPLICATION_MESSAGE_TYPE: "messageType"
# 有关message_properties常量的完整列表,请参阅Solace PubSub+ Python消息API参考。
}
# ...
outbound_msg = outbound_msg_builder \
.from_properties(message_props) \
.build(message_body)

以下代码示例展示了如何创建消息构建器、设置消息属性并创建消息:

# 用于创建配置相似的消息的构建器。
message_builder = messaging_service.message_builder() \
.with_application_message_id(message_id) \
.from_properties(message_props) \
.with_property("key","value") \
.build(message_body)

有关这些函数的详细信息,请参阅Solace PubSub+ Python消息API参考。

处理发布直接消息时的反压

当您使用直接消息时,消息通过主题发送到Solace PubSub+事件代理。事件代理将消息转发给订阅该主题的任何消费者。

在客户端应用程序发布直接消息时,API会在将消息发送到事件代理之前将其排队到内部缓冲区。在理想条件下,一旦应用程序发布消息,API就会通过网络发送该消息,并且该消息最终被事件代理接收。由于网络拥塞或连接问题,客户端应用程序可能会比API更快地发布消息。这种延迟可能导致内部缓冲区积压消息,直到达到其容量,阻止API存储更多消息。这种情况称为反压。配置应用程序以处理反压发生的情况非常重要。

在Solace PubSub+ Python API中,DirectMessagePublisher具有以下机制来处理反压:

  • 使用无限制的内部发布缓冲区(默认)
  • 达到指定限制时拒绝消息
  • 达到指定限制时限制应用程序

配置无限制的内部缓冲区

API的默认配置是使用无限制大小的内部缓冲区来存储消息。当您使用无限制缓冲区时,Python API会持续将客户端应用程序发布的消息放入内部缓冲区。这种配置适用于您的发布者需要快速发送大量数据的情况,即使网络无法跟上而不阻塞。这种配置(称为on_back_pressure_elastic())也很有用,因为您无需编写代码来告诉应用程序如何处理内部缓冲区达到容量的情况。

当处理的数据大小不会过度消耗系统资源时,无论是小数据还是大数据,我们都推荐这种配置。重要的是要注意,数据大小在确定传输带宽方面起着重要作用,影响应用程序向套接字写入数据的速度。大量小消息或少量大消息都可能使应用程序的资源紧张。如果使用弹性反压时应用程序稳定性不足,我们建议您使用“等待”或“拒绝”等替代反压策略来管理应用程序的流量控制。

这种配置不适合内存受限的环境,因为缓冲区被允许无限增长,可能会导致内存不足错误(或潜在的未定义错误)。当您的基础设施由多个短生命周期的微服务组成时,这种配置很有用,它可以为内部队列遇到内存不足情况这一不太可能的事件提供发布冗余。

以下示例展示了对on_back_pressure_elastic()函数的显式调用,这不是必需的,因为它是默认行为:

# 创建一个DirectMessagePublisherBuilder,允许您创建DirectMessagePublisher并启动它
direct_publisher = messaging_service.create_direct_message_publisher_builder() \
.on_back_pressure_elastic() \
.build()

# 同步启动配置的DirectMessagePublisher。在调用start()函数之前,发布者被视为非工作状态
direct_publisher.start()

达到指定限制时拒绝消息

当发生反压时,您可以选择在内部缓冲区达到指定限制时拒绝客户端应用程序的消息。您可以使用on_back_pressure_reject(buffer_capacity: int)函数指定内部缓冲区可以积压的消息数量上限。达到指定容量后,将无法发布新消息,API将返回错误,直到缓冲区再次有容量为止。如果buffer_capacity设置为零,Python API不会缓冲任何出站消息,并且在无法将消息写入本地库时立即拒绝。

# 创建一个DirectMessagePublisherBuilder,允许您创建DirectMessagePublisher并启动它
direct_publisher = messaging_service.create_direct_message_publisher_builder() \
.on_back_pressure_reject(1000) \
.build()

# 同步启动配置的DirectMessagePublisher。在调用start()函数之前,发布者被视为非工作状态
direct_publisher.start()

使用发布者就绪监听器

我们建议您在使用on_back_pressure_reject()函数时使用PublisherReadinessListener,因为它可以让您的应用程序知道缓冲区何时有容量可用,并且可以恢复发布消息。

以下是一个注册PublisherReadinessListener实例的示例:

class CanPublishListener(PublisherReadinessListener):
def ready(self):
# 当发布者再次准备好发布消息时,将执行此方法
# 您可以使用此回调在缓解反压时处理消息发布
pass
# ...
# ...
direct_publisher.set_publisher_readiness_listener(CanPublishListener())

# 回调发起者
some_condition_is_true = True
while some_condition_is_true:
# 在发布消息之前准备/处理一些数据...
try:
message_publisher.publish("can't send, buffer over capacity", my_topic)
except PublisherOverflowError as e:
# 处理溢出异常(反压)
# 您可以在这里做一些工作或发出信号以减缓消息发布
pass

达到指定限制时限制发布者

如果内部缓冲区达到指定限制,您可以选择限制发布应用程序。当应用程序可以阻塞并等待缓冲区中出现容量时,限制的使用非常有用。您可以使用on_back_pressure_wait(buffer_capacity: int)函数设置缓冲区可以积压的消息的最大数量。当达到此最大容量(buffer_capacity)时,发布例程将暂停,并等待内部缓冲区中出现可用容量,然后才允许应用程序发布更多消息。on_back_pressure_wait()的反压策略不会调用PublisherReadinessListener

当您不希望应用程序在缓冲区容量达到后抛出异常时,应使用此函数。有效使用此机制可以为API清空内部缓冲区提供时间。

以下示例展示了如何配置内部缓冲区以容纳多达一千条消息:

# 创建一个DirectMessagePublisherBuilder,允许您创建DirectMessagePublisher并启动它
direct_publisher = messaging_service.create_direct_message_publisher_builder() \
.on_back_pressure_wait(1000) \
.build()

# 同步启动配置的DirectMessagePublisher。在调用start()函数之前,发布者被视为非工作状态
direct_publisher.start()

发布直接消息

创建DirectMessagePublisher后,您可以开始发送消息。发送消息时,有两个主要组成部分:

  • 要发布的主题(了解主题)
  • 消息的有效载荷(可选)

主题必须是遵循Solace层次结构格式的Topic类对象,例如:solace/messaging/direct/pub。发布函数目前支持简单字符串消息、字节数组以及通过messaging_service.MessageBuilder()获得的OutboundMessage实例。以下函数发布直接消息:

  • publish(message: bytearray | str | OutboundMessage, destination: Topic, additional_message_properties: Dict[str, str | int | float | bool | dict | list | tuple | bytearray | None] | None = None)
    • message—出站消息,可以是OutboundMessage对象、bytearraystr。如果有效载荷是bytearraystr,API将创建一个OutboundMessage对象以发送。
    • destination—要发布的Topic
    • additional_properties—用于自定义消息的附加键值属性。每个键可以由客户 Kimi: 方提供,也可以是solace.messaging.config.solace_properties.message_properties类型的键。

有关使用DirectMessagePublisher时可用的发布函数的信息,请参阅Solace PubSub+ Python消息API参考。

以下示例展示了如何发布直接消息:

topic = Topic.of("topic/to/publish/to")
outbound_message = outbound_msg_builder \
.from_properties(message_props) \
.build("my message body")
direct_publisher.publish(outbound_message, topic)

处理错误

Python API提供了set_publish_failure_listener()函数,如果API无法发布消息,将通知客户端。发布失败事件可能是由于无效主题或服务终止等问题引起的。请参阅以下示例:

# PublishFailureListener的一个示例实现
class PublishFailureListenerImpl(PublishFailureListener):
def on_failed_publish(self, failed_publish_event: 'FailedPublishEvent'):
print(f"fail_destination name:{failed_publish_event.get_destination()}\n"
f"fail_message:{failed_publish_event.get_message()}\n"
f"fail_timestamp:{failed_publish_event.get_timestamp()}\n"
f"fail_exception:{failed_publish_event.get_exception()}\n")

direct_publisher.set_publish_failure_listener(PublishFailureListenerImpl())