跳到主要内容

使用Go API发布持久消息

当您的应用程序需要确认处理和“恰好一次”投递时,我们建议您使用持久消息而不是直接消息。要在 PubSub+ Go API 中发布持久消息,您需要先在 PubSub+ 事件代理上设置一个消息队列。

有关在事件代理上创建和配置队列的信息,请参阅配置队列。

要使用 PubSub+ Go API 处理持久消息,请按照以下步骤操作:

  1. 创建一个 PersistentMessagePublisher
  2. 配置并创建一个 OutboundMessage
  3. 处理发布持久消息时的反向压力。
  4. 发布一个持久消息。
  5. 确认消息和处理错误。
  6. 用户上下文。

在某些用例中,您的应用程序发送消息的速度可能比消息传输的速度更快。这可能会导致消息在 API 内部缓冲区中积累,造成反向压力。如果这种情况可能发生,请考虑更改反向压力设置以满足应用程序的要求。有关更多信息,请参阅处理发布持久消息时的反向压力。

有关发布持久消息的应用程序示例,请参阅 Solace GitHub 页面上的 guaranteed_publisher.go

创建持久消息发布者

MessagingService 实例与事件代理建立连接后,您使用 PersistentMessagePublisher 发布持久消息。您可以使用 PersistentMessagePublisherBuilder 来配置您的 PersistentMessagePublisher 使用 API 的某些功能,例如反向压力策略。在您的 PersistentMessagePublisherBuilder 上调用 Build() 函数,以返回一个 PersistentMessagePublisher 实例。要使您的 PersistentMessagePublisher 开始发布消息,请在其上调用 Start()

以下示例展示了如何使用 PersistentMessagePublisher 通过 MessagingService 实例连接到事件代理:

/* 创建一个 PersistentMessagePublisherBuilder 实例,用于创建 PersistentMessagePublishers。 */
persistentPublisher, builderError := messagingService.
CreatePersistentMessagePublisherBuilder(). // 创建一个 PersistentMessagePublisherBuilder,可用于配置持久消息发布者实例。
Build() // 根据配置的属性创建一个新的 PersistentMessagePublisher 实例。
if builderErr != nil {
panic(builderErr)
}

startErr := persistentPublisher.Start() // Start 同步启动发布者。在调用此函数之前,发布者被视为非工作状态。
if startErr != nil {
panic(startErr)
}

异步发布者

也可以使用回调监听器启动持久消息发布者,以便在启动操作完成后进行异步通知。

以下示例展示了如何异步启动一个 PersistentMessagePublisher

func PublisherCallback(publisher solace.PersistentMessagePublisher, err error) {
if err != nil {
panic(err) // 发布者启动时出现错误。
} else {
// 发布者启动成功。
}
}
// ...
// ...
persistentPublisher.StartAsyncCallback(PublisherCallback) // 异步启动发布者。启动时如果出现错误则调用回调,否则返回 nil。

在调用 Start()StartAsyncCallback() 之前,您的发布者应用程序是无法运行的。

配置和创建出站消息

您的客户端应用程序需要显式创建要发布的出站消息。在 PubSub+ Go API 中,发布消息时您需要使用 OutboundMessage 实例。要配置和创建 OutboundMessage 实例,请按照以下步骤操作:

  1. 调用 messagingService.MessageBuilder() 以返回一个 OutboundMessageBuilder 实例。为了获得更好的性能,我们建议您使用单个 OutboundMessageBuilder 来创建多个 OutboundMessage 实例。
messageBuilder := messagingService.MessageBuilder()
  1. 使用 OutboundMessageBuilder 配置您的消息,然后调用 Build() 函数以返回一个消息实例。您可以使用以下方法配置消息属性。

    • 使用 config.MessagePropertyMapconfig.MessageProperty* 函数。以下示例展示了如何为消息 ID、发送者 ID 和消息类型常量设置 MessagePropertyMap 值,然后使用 FromConfigurationProvider() 函数配置消息:

      messageProperties := config.MessagePropertyMap{
      config.MessagePropertyApplicationMessageID: "消息 ID",
      config.MessagePropertySenderID: "发送者 ID",
      config.MessagePropertyApplicationMessageType: "消息类型",
      // 有关 MessageProperty 常量的完整列表,请参阅 Go 的 PubSub+ 消息 API 参考。
      }
      // ...
      // ...
      message, err := messageBuilder.FromConfigurationProvider(messageProperties).
      BuildWithStringPayload("我的消息正文")
      if err != nil {
      panic(err)
      }

    • 使用 OutboundMessageBuilder 接口和 WithProperty(propertyName,propertyValue) 函数。Solace 定义的 config.MessageProperty 键以及任意用户定义的属性键均被接受。以下示例展示了如何为消息设置自定义键值属性:

      messageBuilder := messagingService.MessageBuilder().
      WithProperty("propertyName", "propertyValue")
      // ...
      // ...
      message, err := messageBuilder.BuildWithStringPayload("我的消息正文")
      if err != nil {
      panic(err)
      }

以下代码示例展示了如何创建消息构建器、设置消息属性并创建消息:

/* 用于创建类似配置消息的构建器 */
messageBuilder := messagingService.MessageBuilder()
message, err := messageBuilder.
FromConfigurationProvider(messageProperties). // 用于发送者 ID、序列号等。
WithExpiration(time.Now().Add(10 * time.Second)). // 10 秒后过期该消息。
BuildWithStringPayload("我的消息正文") // 构建消息。

有关这些函数的更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

设置分区键

您可以设置分区键以使用分区队列。分区队列是配置在 PubSub+ 事件代理上的一个功能,允许您轻松扩展绑定到队列的消费者应用程序的数量。可以在发布应用程序中的每条消息上设置分区键,以确保所有具有相同分区键的消息都无需在消费者应用程序中进行额外逻辑即可投递到同一个消费者。有关更多信息,请参阅分区队列。

使用 WithProperty(property,value) 函数为 Go API 消息设置属性值对。

  • property—常量 config.QueuePartitionKey 或字符串值 JMSXGroupID

  • value—一个字符串,表示您的分区键的值。客户端应用程序在发布时设置该值。

以下示例展示了如何设置常量 config.QueuePartitionKey 的值:

// 使用 `with_property()` 构造器方法在出站消息上设置队列分区键。
func SetQueuePartitionKeyUsingWithProperty(queuePartitionKeyValue string) {
payload := "my_payload"

outboundMessage := messaging.
MessagingService.
MessageBuilder().
WithProperty(config.QueuePartitionKey, queuePartitionKeyValue).
BuildWithStringPayload(payload)
}

// 您也可以使用 `from_properties()` 构造器方法在出站消息上设置队列分区键。
func SetQueuePartitionKeyUsingFromConfigurationProvider(queuePartitionKeyValue string) {
payload := "my_payload"
messageConfig := config.MessagePropertyMap{
config.QueuePartitionKey: queuePartitionKeyValue,
}

outboundMessage := messaging.
MessagingService.
MessageBuilder().
FromConfigurationProvider(messageConfig).
Build(payload)
}

处理发布持久消息时的反向压力

使用持久消息时,消息会通过主题发送到 PubSub+ 事件代理,并可能会被排队到任何具有匹配主题订阅的队列上。事件代理随后会异步地将消息投递给订阅该队列的任何消费者。在客户端应用程序发布持久消息时,API 会在将消息发送到事件代理之前,将消息排队到内部缓冲区。在理想情况下,应用程序发布消息后,API 会立即将该消息发送到网络上,最终由事件代理接收该消息。由于网络拥塞或连接问题,客户端应用程序可能会比 API 更快地发布消息。这种延迟可能会导致内部缓冲区积累消息,直到达到其容量,阻止 API 存储更多消息。这种情况称为 反向压力。配置应用程序以处理反向压力发生的情况非常重要。

在 PubSub+ Go API 中,PersistentMessagePublisherBuilder 有两个主要机制来处理反向压力,您可以使用:

  • 当达到指定限制时拒绝消息
  • 当达到指定限制时限制发布者

当达到指定限制时拒绝消息

当发生反向压力时,您可以选择在内部缓冲区达到指定限制时拒绝来自客户端应用程序的消息。您可以使用 OnBackPressureReject(bufferSize uint) 函数为要积累的消息数量指定一个定义的缓冲区容量。达到指定容量后,将不再可能发布新消息,直到缓冲区再次有容量之前会返回错误。在发布调用中,bufferSize 必须大于或等于零。

/* 创建一个 PersistentMessagePublisherBuilder 实例,用于创建 PersistentMessagePublishers。 */
persistentPublisher, builderError := messagingService.
CreatePersistentMessagePublisherBuilder(). // 创建一个 PersistentMessagePublisherBuilder,可用于配置持久消息发布者实例。
OnBackPressureReject(500). // 设置压力策略,当缓冲区大小达到 bufferSize 时拒绝消息。
Build() // 根据配置的属性创建一个新的 PersistentMessagePublisher 实例。
if builderErr != nil {
panic(builderErr)
}

startErr := persistentPublisher.Start() // Start 同步启动发布者。在调用此函数之前,发布者被视为非工作状态。
if startErr != nil {
panic(startErr)
}

使用发布者就绪监听器

当您使用 OnBackPressureReject() 函数时,我们建议您使用 PublisherReadinessListener,因为它可以通知您的应用程序缓冲区何时有容量可用,可以恢复发布消息。

以下是一个注册事件处理器 PublisherReadinessListener 的示例:

func PublisherReadinessListener(){
// 当可以再次发布消息时,您的应用程序要做的事情。
}
// ...
// ...
persistentPublisher, builderError := messagingService.
CreatePersistentMessagePublisherBuilder(). // 创建一个 PersistentMessagePublisherBuilder,可用于配置持久消息发布者实例。
OnBackPressureReject(500). // 设置压力策略,当缓冲区大小达到 bufferSize 时拒绝消息。
Build() // 根据配置的属性创建一个新的 PersistentMessagePublisher 实例。
if builderErr != nil {
panic(builderErr)
}

startErr := persistentPublisher.Start() // Start 同步启动发布者。在调用此函数之前,发布者被视为非工作状态。
if startErr != nil {
panic(startErr)
}
persistentPublisher.SetPublisherReadinessListener(canPublishListener); // 注册一个监听器,当发布者可以发送消息时调用。

为了获得最佳性能,我们建议您使用 OnBackPressureReject(0)PublisherReadinessListenerbufferSize 为零意味着发布者应用程序在将消息发送到代理之前不会缓冲消息,这可以减少延迟并提高性能。这可以防止大量消息在发布者的内部缓冲区中积累,是处理反向压力的最具性能的方法。

当达到指定限制时限制发布者

如果内部缓冲区达到指定限制,您可以选择限制发布应用程序。使用限制可以释放其内部缓冲区的容量。您可以使用 OnBackPressureWait(bufferSize uint) 函数设置缓冲区中可以积累的最大消息数量。PubSub+ Go API 的默认设置是发布者应用程序在 bufferSize 为 50 时等待。当达到此最大容量 (bufferSize) 时,发布者例程会暂停并等待内部缓冲区有可用容量,然后才允许应用程序发布更多消息。

当您希望发布应用程序在缓冲区容量达到后等待空间时,应使用此函数。使用此机制可以为 API 清空内部缓冲区腾出时间。使用此配置进行持久化消息传递的另一个好处是,API 不会丢弃任何消息。

以下示例展示了如何配置内部缓冲区以容纳多达一千条消息:

persistentPublisher, builderError := messagingService.
CreatePersistentMessagePublisherBuilder(). // 创建一个 PersistentMessagePublisherBuilder,可用于配置持久消息发布者实例。
OnBackPressureWait(1000). // 设置反向压力策略,等待直到缓冲区有空间,缓冲区大小为 bufferSize。
Build() // 根据配置的属性创建一个新的 PersistentMessagePublisher 实例。
if builderErr != nil {
panic(builderErr)
}

startErr := persistentPublisher.Start() // Start 同步启动发布者。在调用此函数之前,发布者被视为非工作状态。
if startErr != nil {
panic(startErr)
}

发布持久消息

在使用 MessagingService 实例与事件代理建立连接后,您可以使用 PersistentMessagePublisher 发布持久消息。

持久消息具有以下组成部分:

  • 要发布的主题(必需)
  • 消息正文(可选)

持久消息发布涉及接收确认或 发布收据。根据您的需求,您的客户端应用程序可以作为以下方式发布:

  • 非阻塞,允许您的应用程序在 PublishReceiptListener 等待确认的同时执行其他功能
  • 阻塞,等待直到收到确认;确认表示消息已被代理接收并持久化

您不能发布大于事件代理的最大持久消息大小(在大多数情况下为 30MB)的持久消息。如果您尝试发布超过最大大小的消息,PubSub+ Go API 将返回一个带有子代码 MESSAGE_TOO_LARGE 的错误。为了避免中断,请设计您的应用程序以控制消息大小并确保它们保持在支持的限制内。

非阻塞函数

以下是非阻塞函数:

  • PublishBytes(message []byte, destination *resource.Topic)
  • PublishString(message string, destination *resource.Topic)
  • Publish(message message.OutboundMessage, destination *resource.Topic, properties config.MessagePropertiesConfigurationProvider, context interface{})

有关这些函数的更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

阻塞函数

如果您希望发布者在从 Publish() 调用返回之前等待代理的确认,则可以使用以下阻塞函数:

  • PublishAwaitAcknowledgement(message message.OutboundMessage, destination *resource.Topic, timeout time.Duration, properties config.MessagePropertiesConfigurationProvider)

前面的函数可以与 PersistentMessagePublisher 一起使用,以使用主题将 OutboundMessage 发布到代理。此函数会阻塞主线程,直到:

  • 发布者 API 收到代理的确认
  • 超时时间段过去
messageBody := "来自 Go 持久发布者示例的问候"
messageBuilder := messagingService.MessageBuilder(). // 创建一个 OutboundMessageBuilder,可用于构建消息。
WithProperty("application", "samples"). // 在结果消息上设置单个消息属性。
WithProperty("language", "go") // 既接受 Solace 定义的 config.MessageProperty 键,也接受用户定义的属性键。

go func() {
for persistentPublisher.IsReady() { // 检查发布者是否可以发布消息。
message, err := messageBuilder.BuildWithStringPayload(messageBody)
if err != nil {
panic(err)
}

topic := resource.TopicOf("go/persistent/publisher/")

publishErr := persistentPublisher.PublishAwaitAcknowledgement(message, topic, 2*time.Second, nil) // 发送消息并等待确认。在 OutboundMessage 上
if publishErr != nil { // 可选地,您可以在 OutboundMessageProperties 的形式中提供属性,以覆盖
panic(publishErr) // 在 OutboundMessage 上设置的任何属性
}
}
}()

确认消息和处理错误

发布收据是投递确认,指示事件代理是否成功处理了消息。这些发布收据可以指示成功或失败,并由 MessagePublisReceiptListener 实例处理。您可以使用 SetMessagePublishReceiptListener() 函数设置您的 MessagePublishReceiptListener

以下示例展示了如何使用 MessagePublishReceiptListener 监听发布收据:

/* 处理所有消息的投递确认/超时的监听器 */
func PublishReceiptListener(receipt solace.PublishReceipt) {
fmt.Println("从代理收到发布收据\n")
fmt.Println("是否已持久化:", receipt.IsPersisted())
fmt.Println("消息:", receipt.GetMessage())
if receipt.GetError() != nil {
fmt.Println("消息未在代理上持久化!收到 NACK")
fmt.Println("错误是:", receipt.GetError())
// 可能需要在这里做一些事情。一些错误处理的可能性:
// - 再次发送消息
// - 将其发送到其他地方(错误处理队列?)
// - 记录并继续
// - 暂停并重试(退避) - 可能设置一个标志以减慢发布者的速度
}
}

persistentPublisher.SetMessagePublishReceiptListener(PublishReceiptListener) // 监听所有发送消息的投递确认。
publishErr := persistentPublisher.PublishString("Hello world!", topicDestination) // 发布一个带有字符串正文的消息。
if publishErr != nil {
panic(publishErr)
}

处理发布收据错误的策略

以下是可以用于处理发布时收据错误的应用程序特定策略。

等待并重试等待几秒钟后再尝试重新发送消息。例如,使用 time.sleep(1000) 在尝试重新发布之前等待 1 秒。重试预定义次数在放弃之前尝试重新发布消息的预定义次数。丢弃消息简单地丢弃具有失败发布收据的消息。如果您的应用程序不能容忍消息丢失,我们不推荐此策略。

要接收没有匹配订阅时的失败发布收据,必须为事件代理或事件代理服务设置此选项。有关更多信息,请参阅处理没有匹配项的持久消息(针对设备和软件事件代理)或 在没有订阅匹配时将消息拒绝给发送方以丢弃(针对 PubSub+ Cloud)。

用户上下文

可选地,您可以使用 用户上下文 将持久消息与发布收据相关联的信息相关联到您的应用程序中。此信息是用户特定的,仅对您的发布应用程序有意义,并且不会发送到代理。用户上下文