跳到主要内容

Go API中的请求-回复消息传递

请求-回复消息传递是一种数据传输方法,应用程序通过该方法模拟单独的点对点通道:一个用于请求,另一个用于回复。在请求-回复消息传递中,从 消息请求者 发送的每个请求都需要来自 消息回复者 的回复。当消息回复者消费请求消息时,它会将回复发送回请求者。当您的应用程序中的组件之间发送的每条消息都需要回复时,例如在执行身份验证或金融交易时,这种消息模式非常有用。

img

PubSub+ 消息 API 发布带有唯一、自动生成的 ReplyTo 目的主题的请求消息,该主题位于消息头字段中。这个 ReplyTo 主题作为回复应该发送到的回退地址。由于 ReplyTo 主题目的地由 PubSub+ 消息 API 处理,因此它允许用户执行请求-回复操作,而无需担心注册适当的主题订阅以接收回复。

在 PubSub+ Go API 中,请求-回复消息传递只能与直接消息一起使用。

要使用 Go API 使用请求-回复消息模式,请按照以下步骤操作:

  1. 创建一个 RequestReplyMessagePublisher
  2. 发送请求
  3. 创建一个 RequestReplyMessageReceiver
  4. 接收请求并发送回复

创建 RequestReplyMessagePublisher

要发送消息请求,请创建一个 MessagingService 实例(有关说明,请参阅消息服务)。创建 MessagingService 实例并将其连接到事件代理后,使用 RequestReply() 函数构建一个 RequestReplyMessagePublisher 实例:

requestReplyPublisher, builderErr := messagingService.RequestReply().CreateRequestReplyMessagePublisherBuilder().Build()
if builderErr != nil {
panic(builderErr)
}

startErr := requestReplyPublisher.Start()
if startErr != nil {
panic(startErr)
}

接下来创建一个 OutboundMessage 实例。这是您的发布者发送给接收器实例的请求。有关创建 OutboundMessage 实例的信息,请参阅配置和创建出站消息。

发送请求

发送请求时,可以是同步的也可以是异步的。同步请求会阻塞您的应用程序,直到收到回复。异步请求允许您的应用程序在收到任何回复之前发送多个请求。

  • 发送同步请求
  • 发送异步请求

发送同步请求

PubSub+ Go API 提供同步请求-回复消息传递,它会阻塞每个请求,直到收到回复。这适用于同步、点对点通信,其中事件顺序很重要,例如在处理金融交易时。要发送同步请求,请使用您的 RequestReplyMessagePublisher 调用 PublishAwaitResponse() 函数。PublishAwaitResponse() 函数接受以下参数:

  • requestMessage — 要发送的 OutboundMessage 请求
  • requestDestination — 请求消息的 Topic 目的地
  • replyTimeout — 一个 time.Duration 值,表示等待回复消息的最大时间
  • properties — (可选)一个包含额外消息属性的 MessagePropertyMap(参阅配置和创建出站消息)。如果没有要设置的额外消息属性,请将此参数设置为 nil

有关更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下示例展示了如何同步发送消息请求并将回复分配给一个 InboundMessage 实例:

replyMsg, publishErr := requestReplyPublisher.PublishAwaitResponse(message, topic, 5*time.Second, nil)

if publishErr == nil {
fmt.Printf("The reply inbound payload: %s\n", replyMsg.GetPayloadAsString())
} else if terr, ok := publishErr.(*solace.TimeoutError); ok {
// No reply received, can implement resiliency or retry mechanisms here
fmt.Printf("The reply timed out with %s with msg number : %d\n", terr, msgSeqNum)
} else {
panic(publishErr)
}

发送异步请求

PubSub+ Go API 提供异步请求-回复消息传递,允许您的应用程序在收到任何回复之前发送多个请求。这适用于事件顺序不重要的异步通信。要发送异步请求,请使用您的 RequestReplyMessagePublisher 调用 Publish() 函数。Publish() 函数接受以下参数:

  • requestMessage — 要发送的 OutboundMessage 请求
  • replyMessageHandler — 一个 RequestReplyMessagePublisher.ReplyMessageHandler 实例,这是一个回调处理器,用于处理到达的回复消息或发生超时时。
  • requestDestination — 请求消息的 Topic 目的地
  • replyTimeout — 一个 time.Duration 值,表示等待回复消息的最大时间
  • properties — (可选)一个包含额外消息属性的 MessagePropertyMap(参阅配置和创建出站消息)。如果没有要设置的额外消息属性,请将此参数设置为 nil
  • userContext — (可选)一个 userContext 接口,用于在回复消息处理期间使用(参阅用户上下文)。如果没有要设置的上下文,请将此参数设置为 nil

有关更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下示例展示了 ReplyMessageHandler 的实现,它异步发送消息请求并将回复分配给一个 InboundMessage 实例:

func ReplyHandler(message message.InboundMessage, userContext interface{}, err error) {
if err == nil {
fmt.Printf("The reply inbound payload: %s\n", message.GetPayloadAsString())
} else if terr, ok := err.(*solace.TimeoutError); ok {
// No reply received, can implement resiliency or retry mechanisms here
fmt.Printf("The reply timed out with %s with user context : %s\n", terr, userContext)
} else {
panic(err)
}
}

// ...

publishErr := requestReplyPublisher.Publish(message, ReplyHandler, topic, 5*time.Second, nil, nil)
if publishErr != nil {
panic(publishErr)
}

创建 RequestReplyMessageReceiver

要发送消息回复,请创建一个 MessagingService 实例(有关说明,请参阅消息服务)。创建 MessagingService 实例并将其连接到事件代理后,使用 RequestReply() 函数构建一个 RequestReplyMessageReceiver 实例:

requestReplyReceiver, builderErr := messagingService.RequestReply().CreateRequestReplyMessageReceiverBuilder().Build(topicSubscription)
if builderErr != nil {
panic(builderErr)
}

startErr := requestReplyReceiver.Start()
if startErr != nil {
panic(startErr)
}

接下来创建一个 OutboundMessage 实例。这是您的接收器发送给请求者实例的回复。有关创建 OutboundMessage 实例的信息,请参阅配置和创建出站消息。

接收请求并发送回复

您的 RequestReplyMessageReceiver 可以同步或异步接收请求,作为 InboundMessage 实例。

  • 同步接收请求并发送回复
  • 异步接收请求并发送回复

同步接收请求并发送回复

PubSub+ Go API 提供同步请求-回复消息传递,它会阻塞您的应用程序,直到 ReceiveMessage() 函数返回。这适用于同步、点对点通信,其中事件顺序很重要,例如在处理金融交易时。要同步接收请求,请使用您的 RequestReplyMessageReceiver 实例调用 ReceiveMessage() 函数。ReceiveMessage() 函数接收请求消息和 RequestReplyMessageReceiver 的回复实例。该函数接受以下参数:

  • Timeout — 一个 time.Duration 值,表示函数等待请求消息的时间。如果此值为负,则函数将永远等待。

有关更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下示例展示了 RequestMessageHandler 的实现,它同步接收请求消息,将回复分配给一个 OutboundMessage 实例,并使用 Reply() 函数发送它:

message, replier, receiveErr := requestReplyReceiver.ReceiveMessage(5 * time.Second)
if receiveErr != nil { // receive pull was not successful
if terr, ok := receiveErr.(*solace.TimeoutError); ok { // A timeout occurred and no request message was received
// good location for implementing resiliency or retry mechanisms.
fmt.Printf("request message pull from the receiver timed out with %s with last msg number : %d\n", terr, msgSeqNum)
continue
} else {
fmt.Println("Receiver error while trying to pull request message. Error: ", receiveErr)
}
}

if replier != nil { // the replier is only set when received message is request message that can be replied to
// build reply message
replyMsg, replyMsgBuildErr := messageBuilder.BuildWithStringPayload(messageBody + "\nReply from: " + message.GetPayloadAsString())
if replyMsgBuildErr != nil {
panic(replyMsgBuildErr)
}
// send reply msg
replyErr := replier.Reply(replyMsg)
if replyErr != nil {
fmt.Println("error on send reply. Error: ", replyErr)
}
} else {
// messages received on the topic subscription without a repliable destination will return a nil replier
fmt.Printf("Received message: %d on topic %s that was not a request message\n", msgSeqNum, topicSubscription.GetName())
}

异步接收请求并发送回复

PubSub+ Go API 提供异步请求-回复消息传递,允许您的应用程序使用 ReceiveAsync() 函数异步接收多个消息请求。这适用于事件顺序不重要的点对点通信。要异步接收请求,请使用 RequestReplyMessageReceiver 实例调用 ReceiveAsync() 函数。ReceiveAsync() 函数接受以下参数:

  • requestMessageHandler — 一个 RequestReplyMessageReceiver.RequestMessageHandler 实例,这是一个回调处理器,用于处理传入的请求消息和回复实例。这个回调允许 ReceiveAsync() 函数接收一个 inboundMessage(请求)和一个 RequestReplyMessageReceiver.Replier 实例。回复实例允许您的 RequestReplyMessageReceiver 将回复发送回请求者。

有关更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下示例展示了 RequestMessageHandler 的实现,它异步接收请求消息,将回复分配给一个 OutboundMessage 实例,并使用 Reply() 函数发送它:

requestMessageHandler := func(message message.InboundMessage, replier solace.Replier) {

if replier == nil { // the replier is only set when received message is a request message that can be replied to
// messages received on the topic subscription without a reply destination will return a nil replier
fmt.Printf("Received message: %d on topic %s that was not a request message\n", msgSeqNum, topicSubscription.GetName())
return
}
replyMsg, replyMsgBuildErr := messageBuilder.BuildWithStringPayload(messageBody + "\nReply from: " + message.GetPayloadAsString())
if replyMsgBuildErr != nil {
panic(replyMsgBuildErr)
}
replyErr := replier.Reply(replyMsg)
if replyErr != nil {
fmt.Println("Error on send reply: ", replyErr)
}
}

requestReplyReceiver.ReceiveAsync(requestMessageHandler)