跳到主要内容

使用Go API消费直接消息

在需要高吞吐量和低延迟的场景中,直接消息非常有用。由于外部因素(如网络拥塞或偶尔的客户端断开连接)的影响,直接消息可能会导致一些消息丢失。直接消息适用于需要最新信息但不一定需要每条消息的应用程序。例如天气应用程序、价格检查器、GPS 跟踪等。

直接消息不需要额外的事件代理配置。如果您的应用程序不能容忍消息丢失,我们建议您使用持久消息。

要使用 PubSub+ Go API 消费直接消息,请按照以下步骤操作:

  1. 创建一个 DirectMessageReceiver
  2. 处理订阅直接消息时的反向压力。
  3. 同步接收直接消息。
  4. 异步接收直接消息。
  5. 从入站消息中提取属性。
  6. 使用 Go API 与 PubSub+ 缓存

在某些用例中,API 从事件代理接收消息的速度可能比您的应用程序处理它们的速度更快。消息可能会填满 API 的内部缓冲区,导致 反向压力。如果这种情况可能发生,您可能需要考虑更改默认的反向压力设置以满足您的要求。有关更多信息,请参阅处理订阅直接消息时的反向压力。

有关消费直接消息的应用程序示例,请参阅 Solace GitHub 页面上的 direct_subscriber.go

创建直接消息接收者

MessagingService 实例与事件代理建立连接后,使用 DirectMessageReceiver 从事件代理消费直接消息。

您可以使用 DirectMessageReceiverBuilder 来配置您的 DirectMessageReceiver 使用 API 的某些功能,例如主题订阅和反向压力策略。在您的 DirectMessageReceiverBuilder 上调用 Build() 函数,以返回一个 DirectMessageReceiver 实例。要使您的 DirectMessageReceiver 开始接收消息,请在其上调用 Start()

有关更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下示例展示了如何向 DirectMessageReceiver 添加主题订阅并连接到事件代理:

directReceiver, err := messagingService.
CreateDirectMessageReceiverBuilder(). // 创建一个 DirectMessageReceiverBuilder,可用于配置直接消息接收者实例。
WithSubscriptions(topicsSub...). // 设置在启动接收者时要订阅的主题订阅列表。
Build() // 根据指定的属性创建一个 DirectMessageReceiver。

if err != nil {
panic(err)
}

if err := directReceiver.Start(); err != nil { // 同步启动接收者。在调用此函数之前,接收者被视为非工作状态。
panic(err)
}

异步接收者

或者,也可以使用回调监听器启动直接消息接收者,以便在启动操作完成后进行异步通知。

以下示例展示了如何异步启动 DirectMessageReceiver

func ReceiverCallback(receiver solace.DirectMessageReceiver, err error) {
if err != nil {
panic(err) // 接收者启动时出现错误
} else {
// 接收者启动成功
}
}
// ...
// ...
directReceiver.StartAsyncCallback(ReceiverCallback) // 异步启动接收者。启动时如果出现错误则调用回调,否则返回 nil。

在调用 Start()StartAsyncCallback() 之前,您的接收者应用程序是无法运行的。

处理订阅直接消息时的反向压力

订阅直接消息时,API 使用内部缓冲区来存储从事件代理接收的消息。在理想情况下,消息一被接收,应用程序就会处理该消息。根据处理速度和其他因素,可能会收到比订阅应用程序处理速度更快的消息(例如,高容量的消息突发)。如果消息未被处理并被允许积累,内部缓冲区可能会达到其容量,这被称为 反向压力

在 PubSub+ Go API 中,DirectMessageReceiver 有以下机制来处理反向压力:

  • 丢弃最新消息
  • 丢弃最旧消息

丢弃最新消息

Go API 的默认行为是,当 API 的内部缓冲区中积累了 50 条消息时,丢弃最新消息。达到此容量后,由于内部缓冲区已满,不会将最新消息放入内部缓冲区,而是将其丢弃(丢失)。

要配置不同的缓冲区大小,请在 DirectMessageReceiverBuilder 上调用 OnBackPressureDropLatest(bufferSize uint) 函数,然后设置可以在丢弃消息之前积累的最大消息数量 (bufferSize)。

以下示例展示了如何配置应用程序在 API 的内部缓冲区中有 1000 条消息排队时丢弃消息:

/* 创建一个 DirectMessageReceiverBuilder 实例,用于创建 DirectMessageReceivers。 */
directReceiver, builderError := messagingService.
CreateDirectMessageReceiverBuilder(). // 创建一个 DirectMessageReceiverBuilder,可用于配置直接消息接收者实例。
OnBackPressureDropLatest(1000). // 如果缓冲区已满且有消息到达,则丢弃传入的消息。
Build() // 根据配置的属性创建一个新的 DirectMessageReceiver 实例。

if builderError != nil {
panic(builderError)
}

startErr := directReceiver.Start() // Start 同步启动接收者。在调用此函数之前,接收者被视为非工作状态。
if startErr != nil {
panic(startErr)
}

丢弃最旧消息

您可以配置 API 在 API 的内部缓冲区达到指定容量时丢弃最旧消息。达到此容量后,将从内部队列中移除最旧的项目,以为接收更新的消息腾出空间。达到指定容量后,将移除接收者接收到的最旧项目,以便将更近期的消息排队。

要配置此机制,请在 DirectMessageReceiverBuilder 上调用 OnBackPressureDropOldest(bufferSize uint) 函数,然后设置可以在从内部缓冲区中移除最旧消息之前积累的最大消息数量 (bufferSize)。

以下示例展示了如何配置应用程序在 API 的内部缓冲区中有 1000 条消息排队时丢弃最旧消息:

/* 创建一个 DirectMessageReceiverBuilder 实例,用于创建 DirectMessageReceivers。 */
directReceiver, builderError := messagingService.
CreateDirectMessageReceiverBuilder(). // 创建一个 DirectMessageReceiverBuilder,可用于配置直接消息接收者实例。
OnBackPressureDropOldest(1000). // 如果缓冲区已满且有消息到达,则丢弃缓冲区中最旧的消息。
Build() // 根据配置的属性创建一个新的 DirectMessageReceiver 实例。

if builderError != nil {
panic(builderError)
}

startErr := directReceiver.Start() // Start 同步启动接收者。在调用此函数之前,接收者被视为非工作状态。
if startErr != nil {
panic(startErr)
}

同步接收直接消息

在使用 MessagingService 实例与事件代理建立连接后,您可以使用 DirectMessageReceiver 订阅消息。DirectMessageReceiver 必须至少订阅一个主题,才能开始接收消息。

以下示例展示了 DirectMessageReceiver 如何接收 InboundMessage,以及 ReceiveMessage() 函数如何阻塞例程,直到收到下一条消息:

// ReceiveMessage 会等待指定的超时时间以接收消息,如果指定的超时时间是负值,则会永远等待
var receivedMessage message.InboundMessage
var regErr solace.Error
if receivedMessage, regErr = directReceiver.ReceiveMessage(1 * time.Second); regErr != nil {
panic(regErr)
} else {
// 处理接收到的消息
}

异步接收直接消息

在使用 MessagingService 实例与事件代理建立连接后,您可以使用 DirectMessageReceiver 消费直接消息,并异步处理它们。要异步处理直接消息,您需要使用 MessageHandler 作为回调函数,以告知应用程序何时接收到了消息。

以下示例展示了如何异步接收消息:

// 将消息回调处理器注册到消息接收器
if regErr := directReceiver.ReceiveAsync(MessageHandler); regErr != nil {
panic(regErr)
}

// 消息处理器
func MessageHandler(message message.InboundMessage) {
// 处理接收到的消息
}

从入站消息中提取属性

在与事件代理建立连接后,您的接收者应用程序可以订阅主题。每当您的应用程序从代理接收到具有匹配主题的消息时,就会将一个 InboundMessage 实例返回给应用程序。您可以从 InboundMessage 中提取许多属性,例如消息正文(作为字节或字符串)和发送者 ID。以下示例展示了如何从消息中提取属性。

  • 在异步接收消息时使用 MessageHandler 回调:
func MessageHandler(message message.InboundMessage) {
var messagePayload string
var senderID string

if payload, ok := message.GetPayloadAsString(); ok { // 从接收到的消息中提取正文。
messagePayload = payload
}

if senderID, ok := message.GetSenderID(); ok { // 从接收到的消息中提取发送者 ID。
senderID = senderID
}
}
  • 在同步接收消息时使用 ReceiveMessage() 函数:
var receivedMessage message.InboundMessage
var regErr solace.Error
if receivedMessage, regErr = directReceiver.ReceiveMessage(1 * time.Second); regErr != nil {
panic(regErr)
} else {
var messagePayload string
var senderID string
if payload, ok := receivedMessage.GetPayloadAsString(); ok { // 从接收到的消息中提取正文。
messagePayload = payload
}
if senderID, ok := receivedMessage.GetSenderID(); ok { // 从接收到的消息中提取发送者 ID。
senderID = senderID
}
}

有关可以从 InboundMessage 中提取属性的函数的完整列表,请参阅 Go 的 PubSub+ 消息 API 参考。