跳到主要内容

Java API中的请求-回复消息传递

请求-回复消息传递是一种数据传输方法,应用程序通过该方法模拟单独的点对点通道:一个用于请求,另一个用于回复。在请求-回复消息传递中,从 消息请求者 发送的每个请求都需要来自 消息回复者 的回复。当消息回复者消费请求消息时,它会将回复发送回请求者。当您的应用程序中的组件之间发送的每条消息都需要回复时,例如在执行身份验证或金融交易时,这种消息模式非常有用。

img

Java 的 PubSub+ 消息 API 在消息头字段中发布带有唯一、自动生成的 ReplyTo 目的主题的请求消息。这个 ReplyTo 主题作为回复应该发送到的回退地址。由于 ReplyTo 目的主题由 Java 的 PubSub+ 消息 API 处理,它允许用户在无需担心注册适当的主题订阅以接收回复的情况下执行请求-回复操作。

要使用 Java 的 PubSub+ API 使用请求-回复消息模式,请按照以下步骤操作:

  1. 创建一个 RequestReplyMessagePublisher
  2. 发送请求
  3. 创建一个 RequestReplyMessageReceiver
  4. 接收请求并发送回复

创建 RequestReplyMessagePublisher

要发送消息请求,请创建一个 MessagingService 对象(有关说明,请参阅消息服务)。创建 MessagingService 对象并将其连接到事件代理后,使用 RequestReply() 方法构建一个 RequestReplyMessagePublisher 对象:

final RequestReplyMessagePublisher requestReplyMessagePublisher =
messagingService.requestReply()
.createRequestReplyMessagePublisherBuilder()
.build()
.start();

接下来创建一个 OutboundMessage 实例。这是您的发布者发送给接收器实例的请求。有关创建 OutboundMessage 对象的信息,请参阅配置和创建出站消息。

发送请求

发送请求时,可以是同步的也可以是异步的。同步请求会阻塞您的应用程序,直到收到回复。异步请求允许您的应用程序在收到任何回复之前发送多个请求。

  • 发送同步请求
  • 发送异步请求

发送同步请求

Java 的 PubSub+ API 提供同步请求-回复消息传递,它会阻塞每个请求,直到收到回复。这对于同步、点对点通信非常有用,其中事件顺序很重要,例如在处理金融交易时。要发送同步请求,请使用您的 RequestReplyMessagePublisher 调用 publishAwaitResponse() 方法。publishAwaitResponse() 方法接受以下参数:

  • requestMessage — 要发送的 OutboundMessage 请求
  • additionalMessageProperties — (可选)一个包含额外消息属性的 Property 对象(参阅配置和创建出站消息)。如果没有要设置的额外消息属性,请省略此参数。
  • requestDestination — 请求消息的 Topic 目的地
  • replyTimeout — 一个 Long 值,表示等待回复消息的最大时间(以毫秒为单位)

有关更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下示例展示了如何同步发送请求消息并将回复分配给一个 InboundMessage 对象:

final InboundMessage inboundMessage = requestReplyMessagePublisher.publishAwaitResponse(outboundMessage, Topic.of("my/sample/topic"), 1000);

有关完整示例,请参阅 Solace 开发者中心的 DirectRequestorBlocking.java

发送异步请求

Java 的 PubSub+ API 提供异步请求-回复消息传递,允许您的应用程序在收到任何回复之前发送多个请求。这对于事件顺序不重要的异步通信非常有用。要发送异步请求,请使用您的 RequestReplyMessagePublisher 调用 publish() 方法。publish() 方法接受以下参数:

  • requestMessage — 要发送的 OutboundMessage 请求
  • additionalMessageProperties — (可选)一个包含额外消息属性的 Property 对象(参阅配置和创建出站消息)。如果没有要设置的额外消息属性,请省略此参数。
  • replyMessageHandler — 一个 RequestReplyMessagePublisher.ReplyMessageHandler 实例,这是一个回调处理器,用于处理到达的回复消息或发生超时时。
  • userContext — (可选)一个 userContextObject,在回复消息处理期间可用(参阅用户上下文)。如果没有要设置的上下文,请省略此属性。
  • requestDestination — 请求消息的 Topic 目的地
  • replyTimeout — 一个 Long 值,表示等待回复消息的最大时间(以毫秒为单位)

有关更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下展示了 ReplyMessageHandler 的示例实现,它异步发送请求消息并将回复分配给一个 InboundMessage 对象:

requestReplyMessagePublisher.publish(outboundMessage,
(inboundMessage, userContext, pubSubPlusClientException) -> {
if (pubSubPlusClientException == null) { // Message received
System.out.println("The reply inboundMessage payload being logged is : " + inboundMessage.getPayloadAsString());
} else { // Error occurred
if (userContext != null) {
System.out.println(String.format("Error for Message %s - %s", userContext, pubSubPlusClientException));
}
if (pubSubPlusClientException instanceof PubSubPlusClientException.TimeoutException) {
// requestor application did not receive a reply for the published message within specified timeout
// good location for implementing resiliency or retry mechanisms
System.out.printf("Publishing action timed out without any reply. Error : : %s%n", pubSubPlusClientException);
System.out.println("Publish timed-out for message with payload :" + outboundMessage.getPayloadAsString());
} else {
throw new RuntimeException(pubSubPlusClientException);
}
}
}
, Topic.of("my/sample/topic"), 1000);

有关完整示例,请参阅 Solace 开发者中心的 DirectRequestorNonBlocking.java

创建 RequestReplyMessageReceiver

要发送消息回复,请创建一个 MessagingService 对象(有关说明,请参阅消息服务)。创建 MessagingService 对象并将其连接到事件代理后,使用 RequestReply() 方法构建一个 RequestReplyMessageReceiver 对象:

final RequestReplyMessageReceiver requestReplyMessageReceiver =
messagingService.requestReply()
.createRequestReplyMessageReceiverBuilder()
.build(topicSubscription)
.start();

接下来创建一个 OutboundMessage 实例。这是您的接收器发送给请求者实例的回复。有关创建 OutboundMessage 对象的信息,请参阅配置和创建出站消息。

接收请求并发送回复

您的 RequestReplyMessageReceiver 可以同步或异步接收请求,作为 InboundMessage 对象。

  • 同步接收请求并发送回复
  • 异步接收请求并发送回复

同步接收请求并发送回复

Java 的 PubSub+ API 提供同步请求-回复消息传递,它会阻塞您的应用程序,直到 receiveMessage() 方法返回。这对于同步、点对点通信非常有用,其中事件顺序很重要,例如在处理金融交易时。要同步接收请求,请使用您的 RequestReplyMessageReceiver 对象调用 receiveMessage() 方法。receiveMessage() 方法接受以下参数:

  • requestMessageHandler — 一个 RequestReplyMessageReceiver.RequestMessageHandler 实例,这是一个回调处理器,用于处理传入的请求消息和回复者对象。这个回调允许 receiveMessage() 方法接收一个 inboundMessage(请求)和一个 RequestReplyMessageReceiver.Replier 实例。回复者对象允许您的 RequestReplyMessageReceiver 将回复发送回请求者。
  • timeOut — (可选)一个 Long 值,表示在退出同步方法之前等待的时间(以毫秒为单位)。值应大于 0。如果不想设置超时值,请省略此属性。

有关更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下展示了 RequestMessageHandler 的示例实现,它同步接收请求消息,将回复分配给一个 OutboundMessage 对象,并使用 reply() 方法发送它:

final RequestReplyMessageReceiver.RequestMessageHandler messageHandler = (inboundMessage, replier) -> {
final String stringPayload = inboundMessage.getPayloadAsString();
// 处理 inboundMessage 请求,保存它,打印它等。
// 创建出站消息:
final String outboundMessageStringPayload = "This is my reply!";
final OutboundMessage outboundMessage = outboundMessageBuilder.build(outboundMessageStringPayload);
// 使用 replier 对象将出站消息发送回请求者:
replier.reply(outboundMessage);
};

requestReplyMessageReceiver.receiveMessage(messageHandler, 1000);

有关完整示例,请参阅 Solace 开发者中心的 DirectReplierBlocking.java

异步接收请求并发送回复

Java 的 PubSub+ API 提供异步请求-回复消息传递,允许您的应用程序使用 receiveAsync() 方法异步接收多个消息请求。这对于事件顺序不重要的点对点通信非常有用。要异步接收请求,请使用 RequestReplyMessageReceiver 对象调用 receiveAsync() 方法。receiveAsync() 方法接受以下参数:

  • requestMessageHandler — 一个 RequestReplyMessageReceiver.RequestMessageHandler 实例,这是一个回调处理器,用于处理传入的请求消息和回复者对象。这个回调允许 receiveAsync() 方法接收一个 inboundMessage(请求)和一个 RequestReplyMessageReceiver.Replier 实例。回复者对象允许您的 RequestReplyMessageReceiver 将回复发送回请求者。
  • executorService — (可选)一个用户提供的 ExecutorService 实例,用于消息调度。Executor 服务的关闭或任何其他维护工作由开发人员负责。当需要保留消息顺序时,需要一个基于单线程的执行器。如果不想使用执行器服务,请省略此属性。

有关更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下展示了 RequestMessageHandler 的示例实现,它异步接收请求消息,将回复分配给一个 OutboundMessage 对象,并使用 reply() 方法发送它:

final RequestReplyMessageReceiver.RequestMessageHandler messageHandler = (inboundMessage, replier) -> {
// 处理 inboundMessage 请求:
final String stringPayload = inboundMessage.getPayloadAsString();
// 创建带有标题和正文的出站消息:
final String outboundMessageStringPayload = "Hello World!";
final OutboundMessage outboundMessage = outboundMessageBuilder.build(outboundMessageStringPayload.toString());
// 使用 replier 对象将出站消息发送回请求者:
replier.reply(outboundMessage);
};

// 创建一个调度器变量,并使用所需的配置来处理消息:
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
requestReplyMessageReceiver.receiveAsync(messageHandler, executorService);

有关完整示例,请参阅 Solace 开发者中心的 DirectReplierNonBlocking.java