跳到主要内容

使用Go API消费持久消息

无法容忍消息丢失的订阅应用程序可以使用持久消息(在本文档的其他部分称为“保证消息”)而不是直接消息。使用持久消息时,消息会在事件代理上的队列中存储。只有在订阅应用程序(称为消息接收器)消费并确认消息后,消息才会从事件代理中删除。PubSub+ Go API 只能从队列中消费持久消息,而不能从主题端点中消费。

要消费持久消息,您必须首先在事件代理上设置一个消息队列。有关在事件代理上创建和配置持久队列的信息,请参阅配置队列。或者,可以在创建持久消息接收器(PersistentMessageReceiver)时创建一个非持久队列。

要使用持久消息接收器消费持久消息,请按照以下步骤操作:

  1. 创建一个 PersistentMessageReceiver
  2. 同步接收持久消息。
  3. 异步接收持久消息。
  4. 从入站消息中提取属性。
  5. 消息确认。
  6. 特定消息的否定确认。
  7. 使用 PubSub+ Go API 创建队列。

如果您的消费者(或订阅)应用程序在接收消息的速度超过处理消息的速度时,可能会发生反向压力。消息会继续在内部缓冲,直到达到高水位线,此时 API 会告知事件代理停止发送消息,以防止消息丢失。

有关消费持久消息的应用程序示例,请参阅 Solace GitHub 页面上的 guaranteed_subscriber.go

创建持久消息接收者

在使用 MessagingService 实例与事件代理建立连接后,使用 PersistentMessageReceiver 从事件代理上的队列中消费持久消息。

您可以使用 PersistentMessageReceiverBuilder 来配置您的 PersistentMessageReceiver 使用 API 的某些功能,例如主题订阅。然后,您在 PersistentMessageReceiverBuilder 上调用 Build(),它返回一个 PersistentMessageReceiver 实例。要使您的 PersistentMessageReceiver 开始接收消息,请在其上调用 Start()

有关 Go API 中使用的函数的更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下是如何使用 PersistentMessageReceiver 绑定到队列的示例:

nonDurableExclusiveQueue := resource.QueueNonDurableExclusive(queueName) // 创建一个具有指定名称的独占、非持久队列的引用
topic := resource.TopicSubscriptionOf("go/persistentReceiver/sample/topic") // 创建指定主题字符串的 TopicSubscription

/* 创建一个 PersistentMessageReceiverBuilder 实例,用于创建 PersistentMessageReceivers。 */
persistentReceiver, builderError := messagingService.
CreatePersistentMessageReceiverBuilder(). // 创建一个 PersistentMessageReceiverBuilder,可用于配置持久消息接收者实例。
WithSubscriptions(topic). // 设置启动接收器时要订阅的主题订阅列表。
Build(nonDurableExclusiveQueue) // 根据配置的属性创建一个新的 PersistentMessageReceiver 实例。如果队列为空,则返回 *IllegalArgumentError。

if builderError != nil {
panic(builderError)
}

startErr := persistentReceiver.Start() // Start 同步启动接收器。在调用此函数之前,接收器被视为非工作状态。
if startErr != nil {
panic(startErr)
}

您可以使用 Go API 在事件代理上创建持久和非持久队列。有关更多信息,请参阅使用 PubSub+ Go API 配置和取消配置队列。

异步接收器

也可以使用回调监听器异步启动持久消息接收器,以便在启动操作完成后进行异步通知。

以下示例展示了如何异步启动一个 PersistentMessageReceiver

func ReceiverCallback(receiver solace.PersistentMessageReceiver, err error) {
if err != nil {
panic(err) // 接收器启动时出现错误
} else {
// 接收器启动成功
}
}
// ...
persistentReceiver.StartAsyncCallback(ReceiverCallback) // 异步启动接收器。启动时如果出现错误则调用回调,否则返回 nil。

在调用 Start()StartAsyncCallback() 之前,您的接收器应用程序是无法运行的。

同步消费持久消息

您可以同步消费持久消息。为此,您需要创建一个 PersistentMessageReceiver 并将其绑定到队列。成功绑定到队列后,您的接收器应用程序可以开始消费持久消息。

当您使用 ReceiveMessage(message.InboundMessage) 函数时,它会阻塞例程,直到收到下一条消息。

当应用程序处理 InboundMessage 时,它可以使用 PersistentMessageReceiver.Ack() 向事件代理发送确认。然后,事件代理将从队列中移除 InboundMessage。在消息被确认之前,它保留在代理队列中,并且当应用程序重新连接到队列时可能会重新投递。

有关前面函数的更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下示例展示了如何同步消费持久消息:

persistentReceiver, err := messagingService.
CreatePersistentMessageReceiverBuilder(). // 创建一个 PersistentMessageReceiverBuilder,可用于配置持久消息接收者实例。
WithSubscriptions(topicsSub...). // 设置启动接收器时要订阅的主题订阅列表。
Build() // 根据指定的属性创建一个 PersistentMessageReceiver。

if err != nil {
panic(err)
}

if err := persistentReceiver.Start(); err != nil { // 同步启动接收器。在调用此函数之前,接收器被视为非工作状态。
panic(err)
}

var receivedMessage message.InboundMessage
var regErr solace.Error
if receivedMessage, regErr := persistentReceiver.ReceiveMessage(1 * time.Second); regErr != nil { // 等待指定超时时间以接收消息
panic(regErr)
} else {
// 处理接收到的消息
persistentReceiver.Ack(message) // 确认已接收消息。
}

如果您不调用 ReceiveMessage() 函数,消息可能会在 API 的内部缓冲区中积累,您可能会遇到反向压力场景。如果发生这种情况,Go API 会自动告知事件代理停止发送消息。

异步消费持久消息

您可以异步消费持久消息。为此,您需要创建一个 PersistentMessageReceiver 并像往常一样启动与事件代理的连接,但您需要使用一个 MessageHandler 作为回调函数,以通知您的应用程序何时接收到消息。

以下示例展示了如何异步消费持久消息:

// 消息处理器
messageHandler := func(message message.InboundMessage) {
var messageBody string
if payload, ok := message.GetPayloadAsString(); ok {
messageBody = payload
persistentReceiver.Ack(message) // 确认接收到的消息
} else if payload, ok := message.GetPayloadAsBytes(); ok {
messageBody = string(payload)
persistentReceiver.Ack(message) // 确认接收到的消息
}
fmt.Printf("Received Message Body %s \n", messageBody)
}

// ...
// 将消息回调处理器注册到消息接收器
if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil {
panic(regErr)
}

暂停和恢复从内部缓冲区的消息消费

当您的应用程序使用 ReceiveAsync() 函数异步消费消息时,您可以调用 Pause()Resume() 函数来控制消息流到您应用程序的回调。

Pause()Resume() 函数对 ReceiveMessage() 没有影响。

您可以使用 Pause()Resume() 函数来控制消息在 API 的内部缓冲区和您应用程序之间的流动。这个内部缓冲区是从事件代理接收消息的地方。这种流控制在您的应用程序必须暂时停止处理消息以处理其他操作时很有用。Pause()Resume() 函数不会控制事件代理和 API 的内部缓冲区之间的消息流。当您调用 Pause() 函数时,事件代理继续发送消息。Pause()Resume() 函数只控制消息传递到应用程序。从事件代理接收到的消息继续在内部缓冲区中积累。

由于事件代理继续发送消息,可能会发生反向压力场景——即,消息继续积累直到达到内部高水位线。此时,PersistentMessageReceiver 通知事件代理停止发送消息,直到积累的消息数量低于内部低水位线。这个内部 API 机制为您处理反向压力场景,并确保在事件代理和您的应用程序之间不会丢失任何消息。

以下函数用于暂停和恢复从 API 的内部缓冲区处理消息:

  • Pause()
  • Resume()

有关前面函数的更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下示例展示了如何使用调度器暂停和恢复从内部队列中的消息处理:

persistentReceiver.Pause() // 暂停接收器的消息传递到异步消息处理器。
// 在这里执行任何操作,例如等待 60 秒:time.Sleep(60 * time.Second)
persistentReceiver.Resume() // 恢复接收器的消息传递到异步消息处理器。

从入站消息中提取属性

在与事件代理建立连接后,您的接收器应用程序可以订阅主题。每当您的应用程序从代理接收到具有匹配主题的消息时,就会将一个 InboundMessage 实例返回给应用程序。您可以从 InboundMessage 中提取许多属性,例如消息正文(作为字节或字符串)和发送者 ID。以下示例展示了如何从消息中提取属性。

  • 在异步接收消息时使用 MessageHandler 回调:
func MessageHandler(message message.InboundMessage) {
var messagePayload string
var senderID string
if payload, ok := receivedMessage.GetPayloadAsString(); ok { // 从接收到的消息中提取正文。
messagePayload = payload
}
if senderID, ok := receivedMessage.GetSenderID(); ok { // 从接收到的消息中提取发送者 ID。
senderID = senderID
}
persistentReceiver.Ack(message) // 确认已接收消息。
}
  • 在同步接收消息时使用 ReceiveMessage() 函数:
var receivedMessage message.InboundMessage
var regErr solace.Error
if receivedMessage, regErr = persistentReceiver.ReceiveMessage(1 * time.Second); regErr != nil {
panic(regErr)
} else {
var messagePayload string
var senderID string
if payload, ok := receivedMessage.GetPayloadAsString(); ok { // 从接收到的消息中提取正文。
messagePayload = payload
}
if senderID, ok := receivedMessage.GetSenderID(); ok { // 从接收到的消息中提取发送者 ID。
senderID = senderID
}
persistentReceiver.Ack(receivedMessage) // 确认已接收消息。
}

有关可以从 InboundMessage 中提取属性的函数的完整列表,请参阅 Go 的 PubSub+ 消息 API 参考。

消息确认

确认是异步的。只要接收器尚未终止,任何例程都可以随时确认消息。重要的是要记住,在 PersistentMessageReceiver 向事件代理确认消息后,它会从事件代理上的队列中删除该消息。因此,在确认消息之前,重要的是要完成对消息的任何处理和存储。

以下示例展示了如何确认持久消息:

// 消息处理器
messageHandler := func(message message.InboundMessage) {
var messageBody string
if payload, ok := message.GetPayloadAsString(); ok {
messageBody = payload
persistentReceiver.Ack(message) // 确认接收到的消息
} else if payload, ok := message.GetPayloadAsBytes(); ok {
messageBody = string(payload)
persistentReceiver.Ack(message) // 确认接收到的消息
}
fmt.Printf("Received Message Body %s \n", messageBody)
}
// ...
// ...
if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil { // 将消息回调处理器注册到消息接收器
panic(regErr)
}

特定消息的否定确认

如果您的 PersistentMessageReceiver 未配置为自动确认收到的消息,您可以使用否定确认(NACK)。使用 NACK 时,您可以发送一个结算结果,以告知事件代理处理收到的保证消息的结果。根据结算结果,事件代理知道如何在其队列中处理消息。您可以使用以下结算结果:

  • ACCEPTED — 此确认通知事件代理您的客户端应用程序已成功处理保证消息。当事件代理收到此结果时,它会从其队列中移除该消息。
    • 当您使用 ACCEPTED 结果调用 Settle() 函数时,与使用 persistentReceiver.Ack(receivedMessage) 是相同的。
  • FAILED — 此否定确认通知事件代理您的客户端应用程序未处理该消息。当事件代理收到此否定确认时,它会尝试在遵守投递次数限制的情况下重新投递该消息。在重新投递期间,消息保留在代理上,直到代理收到非 FAILED 的确认状态为止。
  • REJECTED — 此否定确认通知事件代理您的客户端应用程序无法处理该消息。当事件代理收到此否定确认时,它会从其队列中移除该消息,然后如果已配置,则将消息移动到死信队列(DMQ)。

在使用 NACK 之前,您必须使用 WithRequiredMessageOutcomeSupport() 函数在创建 PersistentMessageReceiver 时添加 config.PersistentReceiverFailedOutcomeconfig.PersistentReceiverRejectedOutcome 或两种结果作为 NACK 类型,以准备它与否定确认一起工作。您不需要添加 config.PersistentReceiverAcceptedOutcome 结果,因为它始终可用。如果您尝试对未添加的结果调用 Settle(),您将收到一个 Required Settlement Outcome Not Supported 错误。以下代码展示了如何配置一个 PersistentMessageReceiver 以使用 NACK:

messagingService.
CreatePersistentMessageReceiverBuilder().
WithMessageClientAcknowledgement().
WithRequiredMessageOutcomeSupport(
config.PersistentReceiverFailedOutcome,
config.PersistentReceiverRejectedOutcome,
).
Build(durableExclusiveQueue)

或者,您可以使用属性映射而不是设置器:

messagingService.
CreatePersistentMessageReceiverBuilder().
WithMessageClientAcknowledgement().
FromConfigurationProvider(config.ReceiverPropertyMap{
config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf(
"%s,%s",
config.PersistentReceiverFailedOutcome,
config.PersistentReceiverRejectedOutcome,
),
}).
Build(durableExclusiveQueue)
  • NACK 在传输过程中可能会丢失(例如由于意外的网络问题)。在开发应用程序时,将这一事实作为处理消息的逻辑的一部分。
  • NACK 在 10.2.1 及更高版本的事件代理上受支持。如果事件代理不支持 NACK,则在调用配置为使用消息结果的 PersistentMessageReceiver 实例的 start() 时,会发生 InvalidOperationException

以下示例展示了如何结算消息,其中包含占位符用户定义的函数,以确定消息结果应该是 ACCEPTED、REJECTED 还是 FAILED。您收到消息时使用哪种 NACK 结果的决定应基于您的应用程序需求。

  • 同步接收消息:
message, err := persistentReceiver.ReceiveMessage()
if err != nil {
return // 在错误时退出
}

var messageBody string

// 将有效载荷解码为字符串或字节
if payload, ok := message.GetPayloadAsString(); ok {
messageBody = payload
} else if payload, ok := message.GetPayloadAsBytes(); ok {
messageBody = string(payload)
}
fmt.Printf("Received Message Body: %s\n", messageBody)

// 确定适当的结算结果
if isMsgOK(message) {
// 消息良好,在此处处理,然后确认
persistentReceiver.Settle(message, config.PersistentReceiverAcceptedOutcome)
persistentReceiver.Ack(message)
} else if isMsgPossiblySalvageableLater(message) {
// 应用程序目前无法处理该消息,稍后重新投递
persistentReceiver.Settle(message, config.PersistentReceiverFailedOutcome)
} else {
// 消息内容存在问题,从端点移除
persistentReceiver.Settle(message, config.PersistentReceiverRejectedOutcome)
}
  • 异步接收消息:
messageHandler := func(message message.InboundMessage) {
var messageBody string

if payload, ok := message.GetPayloadAsString(); ok {
messageBody = payload
} else if payload, ok := message.GetPayloadAsBytes(); ok {
messageBody = string(payload)
}
fmt.Printf("Received Message Body: %s\n", messageBody)

// 确定适当的结算结果
var messageSettlementError error
if isMsgOK(message) {
// 消息良好,在此处处理,然后确认
messageSettlementError = persistentReceiver.Settle(message, config.PersistentReceiverAcceptedOutcome)
persistentReceiver.Ack(message)
} else if isMsgPossiblySalvageableLater(message) {
// 应用程序目前无法处理该消息,稍后重新投递
messageSettlementError = persistentReceiver.Settle(message, config.PersistentReceiverFailedOutcome)
} else {
// 消息内容存在问题,从端点移除
messageSettlementError = persistentReceiver.Settle(message, config.PersistentReceiverRejectedOutcome)
}
}

if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil {
panic(regErr)
}