跳到主要内容

使用Go API发布直接消息

直接消息在需要高吞吐量和低延迟时非常有用。如果可以容忍消息丢失而不会对客户端应用程序产生负面影响,我们建议您使用直接消息发布事件。消息丢失可能由外部因素导致,例如网络拥塞或客户端断开连接。如果您的应用程序需要保证消息投递和消息确认,那么我们建议使用持久消息。

要使用 PubSub+ Go API 处理直接消息,请按照以下步骤操作:

  1. 创建一个 DirectMessagePublisher
  2. 配置并创建一个 OutboundMessage
  3. 处理发布直接消息时的反向压力。
  4. 发布一个直接消息。
  5. 处理错误。

在某些用例中,您的应用程序发送消息的速度可能比消息传输的速度更快。消息可能会填满 API 的内部缓冲区,导致 反向压力。如果这种情况可能发生,我们建议您考虑更改反向压力设置以满足应用程序的要求。有关更多信息,请参阅处理发布直接消息时的反向压力。

有关发布直接消息的应用程序示例,请参阅 Solace GitHub 页面上的 direct_publisher.go

创建 DirectMessagePublisher

MessagingService 实例与事件代理建立连接后,使用 DirectMessagePublisher 发布直接消息。您可以使用 DirectMessagePublisherBuilder 来配置您的 DirectMessagePublisher 使用 API 的某些功能,例如反向压力策略。在您的 DirectMessagePublisherBuilder 上调用 Build() 函数,以返回一个 DirectMessagePublisher 实例。要使您的 DirectMessagePublisher 开始发布消息,请在其上调用 Start()

有关更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下示例展示了如何使用直接消息发布者使您的应用程序能够向事件代理发布消息:

/* 创建一个 DirectMessagePublisherBuilder 实例,用于创建 DirectMessagePublishers。 */
directPublisher, builderError := messagingService.
CreateDirectMessagePublisherBuilder(). // 创建一个 DirectMessagePublisherBuilder,可用于配置直接消息发布者实例。
OnBackPressureReject().
Build() // 根据配置的属性创建一个新的 DirectMessagePublisher 实例。
if builderErr != nil {
panic(builderErr)
}

startErr := directPublisher.Start() // Start 同步启动发布者。在调用此函数之前,发布者被视为非工作状态。
if startErr != nil {
panic(startErr)
}

异步发布者

也可以使用回调监听器启动直接消息发布者,以便在启动操作完成后进行异步通知。

以下示例展示了如何异步启动一个 DirectMessagePublisher

func PublisherCallback(publisher solace.DirectMessagePublisher, err error) {
if err != nil {
panic(err) // 发布者启动时出现错误。
} else {
// 发布者启动成功。
}
}
// ...
// ...
directPublisher.StartAsyncCallback(PublisherCallback) // 异步启动发布者。启动时如果出现错误则调用回调,否则返回 nil。

在调用 Start()StartAsyncCallback() 之前,您的发布者应用程序是无法运行的。

配置和创建出站消息

您的客户端应用程序需要显式创建要发布的出站消息。在 PubSub+ Go API 中,发布消息时您需要使用 OutboundMessage 实例。要配置和创建 OutboundMessage 实例,请按照以下步骤操作:

  1. 调用 messagingService.MessageBuilder() 以返回一个 OutboundMessageBuilder 实例。为了获得更好的性能,我们建议您使用单个 OutboundMessageBuilder 来创建多个 OutboundMessage 实例。
messageBuilder := messagingService.MessageBuilder()
  1. 使用 OutboundMessageBuilder 配置您的消息,然后调用 Build() 函数以返回一个消息实例。您可以使用以下方法配置消息属性。

    • 使用 config.MessagePropertyMapconfig.MessageProperty* 函数。以下示例展示了如何为消息 ID、发送者 ID 和消息类型常量设置 MessagePropertyMap 值,然后使用 FromConfigurationProvider() 函数配置消息:

      messageProperties := config.MessagePropertyMap{
      config.MessagePropertyApplicationMessageID: "消息 ID",
      config.MessagePropertySenderID: "发送者 ID",
      config.MessagePropertyApplicationMessageType: "消息类型",
      // 有关 MessageProperty 常量的完整列表,请参阅 Go 的 PubSub+ 消息 API 参考。
      }
      // ...
      // ...
      message, err := messageBuilder.FromConfigurationProvider(messageProperties).
      BuildWithStringPayload("我的消息正文")
      if err != nil {
      panic(err)
      }
    • 使用 OutboundMessageBuilder 接口和 WithProperty(propertyName,propertyValue) 函数。Solace 定义的 config.MessageProperty 键以及任意用户定义的属性键均被接受。以下示例展示了如何为消息设置自定义键值属性:

      messageBuilder := messagingService.MessageBuilder().
      WithProperty("propertyName", "propertyValue")
      // ...
      // ...
      message, err := messageBuilder.BuildWithStringPayload("我的消息正文")
      if err != nil {
      panic(err)
      }

以下代码示例展示了如何创建消息构建器、设置消息属性并创建消息:

/* 用于创建类似配置消息的构建器 */
messageBuilder := messagingService.MessageBuilder()
message, err := messageBuilder.
FromConfigurationProvider(messageProperties). // 用于发送者 ID、序列号等。
WithExpiration(time.Now().Add(10 * time.Second)). // 10 秒后过期该消息。
BuildWithStringPayload("我的消息正文") // 构建消息。

有关这些函数的更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

处理发布直接消息时的反向压力

使用直接消息时,消息会通过主题发送到 PubSub+ 事件代理。事件代理会将消息转发给订阅该主题的任何消费者。

在客户端应用程序发布直接消息时,API 会在将消息发送到事件代理之前,将消息排队到内部缓冲区。在理想情况下,应用程序发布消息后,API 会立即将该消息发送到网络上,最终由事件代理接收该消息。由于网络拥塞或连接问题,客户端应用程序可能会比 API 更快地发布消息。这种延迟可能会导致内部缓冲区积累消息,直到达到其容量,阻止 API 存储更多消息。这种情况称为 反向压力。配置应用程序以处理反向压力发生的情况非常重要。

在 PubSub+ Go API 中,DirectMessagePublisher 具有以下机制来处理反向压力:

  • 当达到指定限制时拒绝消息
  • 当达到指定限制时限制应用程序

当达到指定限制时拒绝消息

当发生反向压力时,您可以选择在内部缓冲区达到指定限制时拒绝来自客户端应用程序的消息。您可以使用 OnBackPressureReject(bufferSize uint) 函数为要积累的消息数量指定一个定义的缓冲区容量。达到指定容量后,将不再可能发布新消息,API 会在缓冲区再次有容量之前返回错误。在发布调用中,bufferSize 的值必须大于或等于零。

/* 创建一个 DirectMessagePublisherBuilder 实例,用于创建 DirectMessagePublishers。 */
directPublisher, builderError := messagingService.
CreateDirectMessagePublisherBuilder(). // 创建一个 DirectMessagePublisherBuilder,可用于配置直接消息发布者实例。
OnBackPressureReject(500). // 设置压力策略,当缓冲区大小达到 bufferSize 时拒绝消息。
Build() // 根据配置的属性创建一个新的 DirectMessagePublisher 实例。
if builderErr != nil {
panic(builderErr)
}

startErr := directPublisher.Start() // Start 同步启动发布者。在调用此函数之前,发布者被视为非工作状态。
if startErr != nil {
panic(startErr)
}

使用发布者就绪监听器

当您使用 OnBackPressureReject() 函数时,我们建议您使用 PublisherReadinessListener,因为它可以通知您的应用程序缓冲区何时有容量可用,可以恢复发布消息。反向压力策略 OnBackPressureWait() 不会 调用 PublisherReadinessListener

以下是一个注册事件处理器 PublisherReadinessListener 实例的示例:

func PublisherReadinessListener(){
// 当可以再次发布消息时,您的应用程序要做的事情。
}
// ...
// ...
directPublisher, builderError := messagingService.
CreateDirectMessagePublisherBuilder(). // 创建一个 DirectMessagePublisherBuilder,可用于配置直接消息发布者实例。
OnBackPressureReject(500). // 设置压力策略,当缓冲区大小达到 bufferSize 时拒绝消息。
Build() // 根据配置的属性创建一个新的 DirectMessagePublisher 实例。
if builderErr != nil {
panic(builderErr)
}

startErr := directPublisher.Start() // Start 同步启动发布者。在调用此函数之前,发布者被视为非工作状态。
if startErr != nil {
panic(startErr)
}
directPublisher.SetPublisherReadinessListener(PublisherReadinessListener); // 注册一个监听器,当发布者可以发送消息时调用。

为了获得最佳性能,我们建议您使用 OnBackPressureReject(0)PublisherReadinessListenerbufferSize 为零意味着发布者应用程序在将消息发送到代理之前不会缓冲消息,这可以减少延迟并提高性能。这可以防止大量消息在发布者的内部缓冲区中积累,是处理反向压力的最具性能的方法。

当达到指定限制时限制发布者

如果内部缓冲区达到指定限制,您可以选择限制发布应用程序。使用限制可以释放其内部缓冲区的容量。您可以使用 OnBackPressureWait(bufferSize uint) 函数设置缓冲区中可以积累的最大消息数量。Go API 的默认设置是发布者应用程序在 bufferSize 为 50 时等待。当达到此最大容量 (bufferSize) 时,发布者例程会暂停并等待内部缓冲区有可用容量,然后才允许应用程序发布更多消息。

当您希望在缓冲区容量达到后应用程序请求花费更长时间时,应使用此函数。使用此机制可以有效地为 API 清空内部缓冲区腾出时间。使用持久化发布时,API 不会丢弃任何消息,这是另一个好处。

以下示例展示了如何配置内部缓冲区以容纳多达一千条消息:

/* 创建一个 DirectMessagePublisherBuilder 实例,用于创建 DirectMessagePublishers。 */
directPublisher, builderError := messagingService.
CreateDirectMessagePublisherBuilder(). // 创建一个 DirectMessagePublisherBuilder,可用于配置直接消息发布者实例。
OnBackPressureWait(1000). // 设置反向压力策略,等待直到缓冲区有空间,缓冲区大小为 bufferSize。
Build() // 根据配置的属性创建一个新的 DirectMessagePublisher 实例。
if builderErr != nil {
panic(builderErr)
}

startErr := directPublisher.Start() // Start 同步启动发布者。在调用此函数之前,发布者被视为非工作状态。
if startErr != nil {
panic(startErr)
}

发布直接消息

创建 DirectMessagePublisher 后,您可以开始发送消息。发送消息时,有两个主要组成部分:

  • 要发布的主题(了解主题)
  • 消息正文(可选)

主题必须是遵循 Solace 层次结构格式的 resource.Topic 类的实例,例如:solace/messaging/direct/pub。发布函数目前支持简单字符串消息、字节数组以及可以通过 MessagingService.MessageBuilder() 获得的 OutboundMessage 实例。以下是发布直接消息的函数:

  • Publish(message message.OutboundMessage, destination *resource.Topic)
  • PublishWithProperties(message message.OutboundMessage, destination *resource.Topic, properties config.MessagePropertiesConfigurationProvider)
  • PublishBytes(message []byte, destination *resource.Topic)
  • PublishString(message string, destination *resource.Topic)

有关使用 DirectMessagePublisher 时可用的发布函数的信息,请参阅 Go 的 PubSub+ 消息 API 参考。

以下示例展示了如何发布直接消息:

messageBody := "来自 Go 直接发布者示例的问候"
messageBuilder := messagingService.MessageBuilder(). // 创建一个 OutboundMessageBuilder,可用于构建消息。
WithProperty("application", "samples"). // 在结果消息上设置单个消息属性。
WithProperty("language", "go") // 既接受 Solace 定义的 config.MessageProperty 键,也接受用户定义的属性键。

go func() {
for directPublisher.IsReady() { // 检查发布者是否可以发布消息。
message, err := messageBuilder.BuildWithStringPayload(messageBody)
if err != nil {
panic(err)
}

topic := resource.TopicOf("go/persistent/publisher/")

publishErr := directPublisher.Publish(message, topic) // 将类型为 OutboundMessage 的消息发布到指定的目的地
if publishErr != nil {
panic(publishErr)
}
}
}()

处理错误

Go API 提供了 SetPublishFailureListener() 函数,如果 API 无法发布消息,将通知客户端。发布失败事件可能是由于诸如无效主题或服务终止等问题导致的。请参阅以下示例:

func PublishFailureListener(){
// 处理发布失败事件
}
// ...
// ...
directPublisher, builderError := messagingService.
CreateDirectMessagePublisherBuilder(). // 创建一个 DirectMessagePublisherBuilder,可用于配置直接消息发布者实例。
Build() // 根据配置的属性创建一个新的 DirectMessagePublisher 实例。
if builderErr != nil {
panic(builderErr)
}
directPublisher.SetPublishFailureListener(PublishFailureListener) // 设置一个监听器,如果直接消息发布失败则调用。