跳到主要内容

在Python API中使用请求-应答消息传递

请求-应答消息传递是一种数据传输方法,其中应用程序使用单独的点对点通道:一个用于请求,另一个用于应答。在请求-应答消息传递中,每个从“消息请求者”发送的请求都需要来自“消息应答者”的一个应答。当消息应答者消费了一个请求消息时,它会向请求者发送一个应答。此消息模式在您的应用程序组件之间发送的每条消息都需要一个应答时非常有用,例如在执行身份验证或金融交易时。

img

PubSub+ 消息 API 发布请求消息时会在消息头字段中包含一个唯一的、自动生成的 ReplyTo 目标主题。此 ReplyTo 主题充当应答应发送到的返回地址。因为 ReplyTo 主题目标由 PubSub+ 消息 API 处理,所以它允许用户在执行请求-应答操作时无需担心注册适当的主题订阅以接收应答。

请求-应答消息传递只能在 PubSub+ Python API 中与直接消息一起使用。

要使用 Python API 的请求-应答消息模式,请按照以下步骤操作:

  1. 创建一个 RequestReplyMessagePublisher。
  2. 发送请求。
  3. 创建一个 RequestReplyMessageReceiver。
  4. 接收请求并发送应答。

创建 RequestReplyMessagePublisher

要发送消息请求,请创建一个 MessagingService 对象(参见消息服务)。创建 MessagingService 对象并将其连接到事件代理后,使用 request_reply() 函数构建一个 RequestReplyMessagePublisher 对象:

direct_requestor: RequestReplyMessagePublisher = messaging_service.request_reply() \
.create_request_reply_message_publisher_builder() \
.build()
direct_requestor.start()

接下来创建一个 OutboundMessage 实例。这是您的发布者发送给接收者实例的请求。有关创建 OutboundMessage 对象的信息,请参阅配置和创建 OutboundMessage。

发送请求

发送请求时,可以是阻塞的或非阻塞的。阻塞请求会阻塞您的应用程序,直到收到应答。非阻塞请求允许您的应用程序在收到任何应答之前发送多个请求。

  • 发送阻塞请求

  • 发送非阻塞请求

发送阻塞请求

PubSub+ Python API 提供同步请求-应答消息传递,它会阻塞每个请求,直到收到应答。这在同步、点对点通信中非常有用,例如在处理金融交易时,事件的顺序很重要。要发送阻塞请求,请使用您的 RequestReplyMessagePublisher 调用 publish_await_response() 函数。publish_await_response() 函数接受以下参数:

  • request_message — 要发送的 OutboundMessage 请求
  • request_destination — 请求消息的目标 Topic
  • reply_timeout — 表示等待应答消息的最大时间(以毫秒为单位)的 Int
  • additional_message_properties — (可选)包含额外消息属性的 Dict(参见配置和创建 OutboundMessage)。如果您没有额外的消息属性要设置,请省略此参数。

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下示例展示了如何发送阻塞消息请求并将应答分配给 InboundMessage 对象:

inbound_message_response = direct_requestor.publish_await_response(request_message=outbound_msg, \
request_destination=Topic.of('my/sample/topic'), \
reply_timeout=3000)

有关完整示例,请参阅 Solace 开发者中心上的 direct_requestor_blocking.py。

发送非阻塞请求

PubSub+ Python API 提供非阻塞请求-应答消息传递,允许您的应用程序在收到应答之前发送多个请求。这在事件顺序不重要的异步通信中非常有用。要发送非阻塞请求,请使用您的 RequestReplyMessagePublisher 调用 publish() 函数。publish() 函数接受以下参数:

  • request_message — 要发送的 OutboundMessage 请求。
  • request_destination — 请求消息的目标 Topic
  • additional_message_properties — (可选)包含额外消息属性的 Dict(参见配置和创建 OutboundMessage)。如果您没有额外的消息属性要设置,请省略此参数。
  • reply_timeout — 表示等待应答消息的最大时间(以毫秒为单位)的 Int

有关更多信息,请参阅 PubSolace Python 消息 API 参考。

以下代码片段展示了如何发送非阻塞消息请求并将应答分配给异步 publish_async() 函数的结果:

publish_async = direct_requestor.publish(request_message=outbound_msg, \
request_destination=Topic.of('my/sample/topic'), \
reply_timeout=3000)

if publish_async.exception() is not None:
logger.warn(f"Got exception from request reply publish: {publish_async.exception()}")

response = publish_async.result()

有关完整示例,请参阅 Solace 开发者中心上的 direct_requestor.py。

创建 RequestReplyMessageReceiver

要发送消息应答,请创建一个 MessagingService 对象(参见消息服务)。创建 MessagingService 对象并将其连接到事件代理后,使用 request_reply() 函数构建一个 RequestReplyMessageReceiver 对象:

direct_replier: RequestReplyMessageReceiver = messaging_service.request_reply() \
.create_request_reply_message_receiver_builder() \
.build(TopicSubscription.of('my/sample/topic'))
direct_replier.start()

接下来创建一个 OutboundMessage 实例。这是您的接收者发送给请求者实例的应答。有关创建 OutboundMessage 对象的信息,请参阅配置和创建 OutboundMessage。

接收请求并发送应答

您的 RequestReplyMessageReceiver 可以作为 InboundMessage 对象同步或异步接收请求。

  • 同步接收请求并发送应答

  • 异步接收请求并发送应答

同步接收请求并发送应答

PubSub+ Python API 提供同步请求-应答消息传递,它会阻塞您的应用程序,直到 receive_message() 函数返回。这在同步、点对点通信中非常有用,例如在处理金融交易时,事件的顺序很重要。要接收同步请求,请使用您的 RequestReplyMessageReceiver 对象调用 receive_message() 函数。receive_message() 函数返回接收到的消息和一个应答者对象。应答者对象允许您的 RequestReplyMessageReceiver 向请求者发送应答。receive_message() 函数接受以下参数:

  • timeout — (可选)表示在退出阻塞函数之前等待的时间(以毫秒为单位)的 Int 值。值应大于 0。如果您不想设置超时值,请省略此属性。

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下代码展示了如何接收阻塞消息请求,将应答分配给 OutboundMessage 对象,并使用 reply() 函数发送应答:

msg, replier = direct_replier.receive_message(5000)

if replier is not None:
outbound_msg = service.message_builder().build("my reply")
replier.reply(outbound_msg)

有关完整示例,请参阅 Solace 开发者中心上的 how_to_use_request_reply_pattern.py 中的 receive_request_and_send_response_message() 函数。

异步接收请求并发送应答

PubSub+ Python API 提供异步请求-应答消息传递,它允许您的应用程序使用 receive_async() 函数异步接收多个消息请求。这在事件顺序不重要的点对点通信中非常有用。要接收异步请求,请使用一个 RequestReplyMessageReceiver 对象调用 receive_async() 函数。receive_async() 函数接受以下参数:

  • message_handlerRequestMessageHandler 的一个实例,这是一个回调处理器,用于处理传入的请求消息和应答者对象。此回调允许 receive_async() 函数接收一个 inboundMessage(请求)和一个 RequestReplyMessageReceiver.Replier 的实例。应答者对象允许您的 RequestReplyMessageReceiver 向请求者发送应答。

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

以下是一个 RequestMessageHandler 的示例实现,它接收一个非阻塞消息请求,将应答分配给 OutboundMessage 对象,并使用 reply() 函数发送它:

class RequestMessageHandlerImpl(RequestMessageHandler):
def __init__(self, message_builder):
self.message_builder = message_builder

def on_message(self, request: InboundMessage, replier: Replier):
# 检查负载是否为字符串或字节,如果是后者则解码
payload = request.get_payload_as_string() if request.get_payload_as_string() is not None else request.get_payload_as_bytes()
if isinstance(payload, bytearray):
print(f"收到的消息类型为:{type(payload)}。正在解码为字符串")
payload = payload.decode()
# 处理消息负载,例如保存它、打印它等...
# 准备响应负载
response = "这是我的应答消息"

if replier is not None:
outbound_msg = self.message_builder \
.build(response)
replier.reply(outbound_msg)
else:
print(f'无效的请求,未设置 reply_to')
# ...
# 准备出站消息构建器
outbound_msg_builder = messaging_service.message_builder()
direct_replier.receive_async(RequestMessageHandlerImpl(outbound_msg_builder))

有关完整示例,请参阅 Solace 开发者中心上的 direct_replier.py。