跳到主要内容

在Python API中使用本地事务

PubSub+ Python API 中的本地事务允许您的应用程序将多个保证消息发送和/或保证消息接收操作组合在一起,作为一个称为事务的单一原子单元。本地事务确保事务内的所有操作要么全部提交,要么全部回滚。本地事务用于维护数据的完整性和一致性,在订单处理和金融交易等场景中非常有用。有关更多信息,请参阅使用本地事务。以下部分展示了如何在 PubSub+ Python API 中使用本地事务。

  1. 创建 TransactionalMessagingService。
  2. 在本地事务中发布消息。
  3. 在本地事务中接收消息。

创建 TransactionalMessagingService 对象

TransactionalMessagingService 对象允许 API 建立与事件代理的连接并创建事务。要创建 TransactionalMessagingService 对象,请执行以下操作:

  1. 调用 MessagingService 类的 create_transactional_service_builder() 函数以返回一个 TransactionalMessagingServiceBuilder 对象。

  2. TransactionalMessagingServiceBuilder 对象为您提供了一系列函数,用于自定义 TransactionalMessagingService 对象。这些包括以下内容:

    • from_properties(configuration: dict) — 将必要的代理属性字典传递给 TransactionalMessagingServiceBuilder
    • set_transaction_request_timeout(timeout: int = 10000) — 等待响应的超时时间(以毫秒为单位)。默认值为 10000。
  3. TransactionalMessagingServiceBuilder 对象上调用 build() 函数以返回一个 TransactionalMessagingService 对象。

  4. 在您的 TransactionalMessagingService 对象上调用 connect() 函数以连接到事件代理。

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下示例代码展示了如何创建一个简单的 TransactionalMessagingService 实例并将其连接到事件代理:

transactional_service = messaging_service.create_transactional_service_builder().build().connect()

在本地事务中发布消息

TransactionalMessagingService 实例与事件代理建立连接后,使用 TransactionalMessagePublisher 发布保证消息。要创建 TransactionalMessagePublisher 对象,请执行以下操作:

  1. TransactionalMessagingService 对象上调用 create_transactional_message_publisher_builder() 函数。这将返回一个 TransactionalMessagePublisherBuilder 对象。

  2. 在您的 TransactionalMessagePublisherBuilder 上调用 build() 函数以返回一个 TransactionalMessagePublisher 对象。

  3. 要使您的 TransactionalMessagePublisher 开始发布消息,请在其上调用 start() 函数。

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下示例展示了如何使用事务性消息发布者使您的应用程序能够向事件代理发布消息:

transactional_publisher = transactional_service.create_transactional_message_publisher_builder().build().start()

本地事务 publish() 示例

创建 TransactionalMessagingServiceTransactionalMessagePublisher 后,您现在可以使用 publish() 函数在本地事务中发布消息。下面的示例将尝试向 topics 数组中的每个主题发布十条消息。在每次循环中,应用程序尝试将十条消息发布到一个主题,作为一个事务使用 commit() 函数。如果在代码执行过程中出现问题,事务将回滚,该循环迭代的消息将不会被发布。

topics = [Topic.of("my/topic/1"), Topic.of("my/topic/2"), Topic.of("my/topic/3")]

for topic in topics:
try:
for i in range(10):
message = messaging_service.message_builder().build(f"Message #{i} for topic {topic.get_name()}")
transactional_publisher.publish(message, topic)
# 将所有消息发布到一个主题,或者根本不发布:
transactional_service.commit()
# 只有提交成功后,才能确定消息是否实际发布。
except TransactionRollbackError:
# 提交失败。
print("提交失败。")
except UnknownTransactionStateError:
# 提交结果未知。这种情况应该非常罕见。
print("提交结果未知。")
transactional_service.disconnect()

在本地事务中接收消息

TransactionalMessagingService 实例与事件代理建立连接后,使用 TransactionalMessageReceiver 接收保证消息。要创建 TransactionalMessageReceiver 对象,请执行以下操作:

  1. TransactionalMessagingService 对象上调用 create_transactional_message_receiver_builder() 函数。这将返回一个 TransactionalMessageReceiverBuilder 对象。

  2. 现在,您可以使用 TransactionalMessageReceiverBuilder 接口中的函数来配置 TransactionalMessageReceiver 使用 API 的某些功能:

    • with_message_replay(replay_strategy: ReplayStrategy) — 向持久接收器添加消息回放策略。
    • with_message_selector(selector_query_expression: str) — 基于消息头参数和消息属性值启用消息选择支持。
    • with_missing_resources_creation_strategy(strategy: MissingResourcesCreationStrategy) — 添加缺失队列创建策略,定义 API 可能采取的操作。
  3. 在您的 TransactionalMessageReceiverBuilder 上调用 build(endpoint_to_consume_from: Queue) 函数以返回一个 TransactionalMessageReceiver 对象。build() 函数以要从中消费的队列为参数。

  4. 要使您的 TransactionalMessageReceiver 开始接收消息,请在其上调用 start() 函数。

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下示例展示了如何使用事务性消息接收者使您的应用程序能够从事件代理接收消息:

transactional_receiver = transactional_service.create_transactional_message_receiver_builder().build(my_queue).start()

在本地事务中同步接收消息

创建 TransactionalMessagingServiceTransactionalMessageReceiver 后,您现在可以在事务中接收消息。下面的示例将尝试使用阻塞的 receive_message() 函数从 queues 数组中的每个队列接收十条消息,使用 for 循环。在每次循环中,应用程序尝试从一个队列接收十条消息,作为一个事务使用 commit() 函数。如果在代码执行过程中出现问题,事务将回滚,该循环迭代的消息将不会从队列中消费并从事件代理中删除。

# 在事务中从每个队列接收消息,`count` 次。每次计数是一个单独的事务:从每个队列中取出一条消息,或者一条也不取。
count = 10
queues = [queue1, queue2, queue3]
messages = []
receivers = []
# 为每个队列启动一个接收器
for queue in queues:
transactional_receiver = transactional_service.create_transactional_message_receiver_builder().build(queue).start()
receivers.append(transactional_receiver)

for _ in range(count):
# 从每个接收器/队列中获取一条消息
messages_in_this_transaction = []
for receiver in receivers:
message = receiver.receive_message()
if message is None:
if messages_in_this_transaction:
# 其中一个队列为空,回滚部分事务:
transactional_service.rollback()
break
messages_in_this_transaction.append(message)
else: # no break
try:
# 从所有队列中消费一条消息,或者一条也不消费:
transactional_service.commit()
messages.extend(messages_in_this_transaction)
except TransactionRollbackError:
# 提交失败。
print("提交错误")
except UnknownTransactionStateError:
# 提交结果未知。这种情况应该非常罕见。
print("未知提交错误")
transactional_service.disconnect()

使用消息处理器在本地事务中接收消息

创建 TransactionalMessagingServiceTransactionalMessageReceiver 后,您现在可以使用非阻塞的 receive_async() 函数在事务中接收消息。

非阻塞接收器函数名为 receive_async(),但它不是异步协程或生成器,也不兼容 asyncioreceive_async() 立即返回,并在底层使用原生线程。此函数为每条消息在新的 Python 线程中调用回调。

receive_async() 函数接受一个消息处理器作为参数。PubSub+ Python API 提供了一个专为本地事务设计的 TransactionalMessageHandler。此接口包含抽象的 on_message() 函数,您需要实现该函数以处理入站消息的处理,例如打印、修改、重新发布或提交操作。下面的 on_message() 函数实现是阻塞的,这是一种控制入站消息流的方法。

不要从消息处理器外部或其他线程对同一事务服务执行操作,因为它不是线程安全的。

下面的示例将尝试使用非阻塞的 receive_async() 函数从传递给 build() 函数的队列接收十条消息。如果事务成功并使用 commit() 函数提交,则所有消息将从事件代理队列中接收;如果在代码执行的任何点出现问题,事务将回滚,任何队列中的消息都不会被消费。

import threading
from solace.messaging.receiver.transactional_message_receiver import TransactionalMessageHandler
from solace.messaging.receiver.inbound_message import InboundMessage
from solace.messaging.resources.topic_subscription import TopicSubscription

transactional_service = messaging_service.create_transactional_service_builder().build().connect()
queue = Queue.durable_exclusive_queue("myQueue")
count = 11
messages_processed = 0
finished = threading.Event()

# 消息分发函数包装在一个类中
class MsgHandler(TransactionalMessageHandler):
def on_message(self, message: InboundMessage):
# 处理消息,例如:打印、修改、重新发布、提交。
# 在此方法中阻塞是一种控制入站消息流的方法。
# 不要从其他地方对同一事务服务执行操作。
global transactional_service, messages_processed, finished
print(f'事务性消息回调正在处理消息:{message.get_payload_as_string()}')

transactional_service.commit()
messages_processed += 1
if messages_processed >= count:
# 记住从其他线程(包括主线程)对事务服务执行操作是不安全的。
finished.set()

msgHandler = MsgHandler()
receiver_builder = transactional_service.create_transactional_message_receiver_builder()
receiver = receiver_builder.build(queue)

# 必须在启动接收器之前决定接收器的操作模式(阻塞与非阻塞)。
receiver.receive_async(msgHandler)
receiver.start()
finished.wait(10)
transactional_service.disconnect()