跳到主要内容

使用Java API发布直接消息

当需要高吞吐量和低延迟时,直接消息非常有用。如果可以容忍消息丢失而不会对客户端应用程序产生负面影响,我们建议您使用直接消息发布事件。消息丢失可能由外部因素导致,例如网络拥塞或客户端断开连接。如果您的应用程序需要保证消息投递和消息确认,那么我们建议使用持久消息。

要使用 Java API 处理直接消息,请按照以下步骤操作:

  1. 创建一个 DirectMessagePublisher 对象。
  2. 配置并创建一个 OutboundMessage
  3. 发布直接消息。
  4. 处理错误。

在某些用例中,您的应用程序发送消息的速度可能比消息传输的速度更快。消息可能会填满 API 的内部缓冲区,导致 反向压力。如果这种情况可能发生,我们建议您考虑更改反向压力设置以满足应用程序的要求。有关更多信息,请参阅处理发布直接消息时的反向压力。

有关发布直接消息的应用程序示例,请参阅 Solace 开发者中心。

创建DirectMessagePublisher对象

MessagingService 对象与事件代理建立连接后,使用 DirectMessagePublisher 对象发布直接消息。与 MessagingService 对象一样,DirectMessagePublisher 对象允许您配置要在 API 中使用的功能。您可以使用以下方法设置 DirectMessagePublisherBuilder 对象的反向压力处理方式。这些方法包括 build() 方法,该方法返回一个 DirectMessagePublisher 对象。然后,您可以在 DirectMessagePublisher 对象上调用 DirectMessagePublisher 方法以连接到事件代理。

  • DirectMessagePublisherBuilder

    • createDirectMessagePublisherBuilder()

    • onBackPressureWait(int bufferCapacity)

    • onBackPressureReject(int bufferCapacity)

    • onBackPressureElastic()

    • build()

  • DirectMessagePublisher

    • start()

有关更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下示例展示了如何使用直接消息发布者使您的应用程序能够向事件代理发布消息:

/* 创建一个 DirectMessagePublisherBuilder 实例 */
/* 用于创建 DirectMessagePublisher 对象。 */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
.onBackPressureWait(1000) // 如果尝试发布超出指定缓冲区容量(此例中为 1000)的消息,则使发布者线程暂停。
.build() // 根据提供的配置构建一个 DirectMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

异步接收器

也可以使用回调监听器启动直接消息发布者,以便在启动操作完成后进行异步通知。

以下示例展示了如何调用 CompletionListener,这是用于监听未来计算结果的回调方法。此处理器还会在启动操作完成后通知应用程序。

final CompletionListener<DirectMessagePublisher> receiverStartupListener = (receiver, throwable) -> {
if (throwable == null) {
// 启动成功,可以接收消息。
} else {
// 处理启动期间的异常。
}
};
receiverToBeStarted.startAsync(receiverStartupListener);

在调用 start()startAsync() 之前,您的发布者应用程序是无法运行的。

配置和创建出站消息

您的客户端应用程序需要显式创建要发布的出站消息。在 Java 的 PubSub+ API 中,发布消息时您需要使用 OutboundMessage 对象。要配置和创建 OutboundMessage 对象,请按照以下步骤操作:

  1. 调用 messagingService.messageBuilder() 以返回一个 OutboundMessageBuilder 对象。为了获得更好的性能,我们建议您使用单个 OutboundMessageBuilder 来创建多个 OutboundMessage 对象。
final OutboundMessageBuilder messageBuilder = messagingService.messageBuilder()
  1. 使用 OutboundMessageBuilder 配置您的消息,然后调用 build() 方法以返回一个消息对象。您可以使用以下任一方法配置消息属性。

    • 使用 Properties 接口和 setProperty(name,value) 方法。以下示例展示了如何为 Properties 对象设置自定义名称-值属性,然后使用 fromProperties() 方法配置消息:

      final Properties messageProperties = new Properties();
      messageProperties.setProperty("PropertyName", "PropertyValue");
      final OutboundMessage message = messageBuilder
      .fromProperties(messageProperties)
      .build(messagePayload);
    • 使用 OutboundMessageBuilder 接口和 with*() 方法。以下示例展示了如何为消息设置发送者 ID 和自定义键值属性:

      final OutboundMessage myCustomMessage = messageBuilder
      .withSenderId("mySenderId")
      // 有关 with*() 方法的完整列表,请参阅 Java 的 PubSub+ 消息 API 参考
      .withProperty("PropertyKey", "PropertyValue")
      .build(messagePayload);

以下代码示例展示了如何创建消息构建器、设置消息属性并创建消息:

/* 用于创建类似配置消息的构建器 */
final OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
final OutboundMessage message = messageBuilder
.fromProperties(messageProperties) // 例如 TTL、发送者 ID、序列号等。
.withExpiration(Instant.now() // 使用当前系统时间作为起始点设置过期时间。
.toEpochMilli() + 10000L) // 10 秒后过期该消息。
.build("My_Message"); // 构建消息。

有关这些方法的更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

处理发布直接消息时的反向压力

使用直接消息时,消息会通过主题发送到 PubSub+ 事件代理。事件代理会将消息转发给订阅该主题的任何消费者。

在客户端应用程序发布直接消息时,API 会在将消息发送到事件代理之前,将消息排队到内部缓冲区。在理想情况下,应用程序发布消息后,API 会立即将该消息发送到网络上,最终由事件代理接收该消息。由于网络拥塞或连接问题,客户端应用程序可能会比 API 更快地发布消息。这种延迟可能会导致内部缓冲区积累消息,直到达到其容量,阻止 API 存储更多消息。这种情况称为 反向压力。配置应用程序以处理反向压力发生的情况非常重要。

在我们的 Java API 中,DirectMessagePublisher 有以下机制来处理反向压力:

  • 当达到指定限制时拒绝消息
  • 当达到指定限制时限制应用程序
  • 使用无限制的内部发布缓冲区(默认)

当达到指定限制时拒绝消息

当发生反向压力时,您可以选择在内部缓冲区达到指定限制时拒绝来自客户端应用程序的消息。您可以使用 onBackPressureReject(int bufferCapacity) 方法为要积累的消息数量指定一个定义的缓冲区容量。达到指定容量后,将不再可能发布新消息,API 会抛出异常,直到缓冲区再次有容量。在发布调用中,bufferCapacity 的值必须大于零。

/* 创建一个 DirectMessagePublisherBuilder 实例,用于创建 DirectMessagePublisher 对象。 */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
.onBackPressureReject(1000) // 创建一个消息缓冲区,空间可容纳 1000 条消息,达到该容量时将拒绝消息,直到有空间为止。
.build() // 根据提供的配置构建一个 DirectMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

使用发布者就绪监听器

当您使用 onBackPressureReject() 方法时,我们建议您使用 PublisherReadinessListener 接口,因为它可以通知您的应用程序缓冲区何时有容量可用,可以恢复发布消息。

以下是一个注册事件处理器 PublisherReadinessListener 接口的示例:

/* 创建一个 DirectMessagePublisherBuilder 实例,用于创建 DirectMessagePublisher 对象。 */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
.onBackPressureReject(1000) // 创建一个消息缓冲区,空间可容纳 1000 条消息,达到该容量时将拒绝消息,直到有空间为止。
.build() // 根据提供的配置构建一个 DirectMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

final PublisherReadinessListener canPublishListener = () -> {
// 当发布者再次准备好发布消息时,执行此代码。
};
publisher.setPublisherReadinessListener(canPublishListener); // 注册监听器,通常只需设置一次。

当达到指定限制时限制发布者

如果内部缓冲区达到指定限制,您可以选择限制发布应用程序。使用限制可以释放其内部缓冲区的容量。您可以使用 onBackPressureWait() 方法设置缓冲区中可以积累的最大消息数量。当达到此最大容量 (bufferCapacity) 时,发布者线程会暂停,并等待内部缓冲区有可用容量,然后才允许应用程序发布更多消息。

当您希望应用程序的发布请求在缓冲区容量达到后等待空间时,应使用此方法。使用此机制可以有效地为 API 清空内部缓冲区。使用持久化发布时的一个额外好处是,API 不会丢弃任何消息。

以下示例展示了如何配置内部缓冲区以容纳多达一千条消息:

/* 创建一个 DirectMessagePublisherBuilder 实例,用于创建 DirectMessagePublisher 对象。 */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
.onBackPressureWait(1000) // 创建一个消息缓冲区,空间可容纳 1000 条消息,达到该容量时线程将暂停,直到有空间为止。
.build() // 根据提供的配置构建一个 DirectMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

配置无限制的内部缓冲区

API 的默认配置是使用无限制大小的内部缓冲区来存储消息。这种配置不适合内存受限的环境,因为缓冲区被允许无限增长,可能会导致内存不足错误(或潜在的未定义错误)。当您的基础设施由多个短生命周期的微服务组成,并且可以在内部队列遇到内存不足场景的不太可能的情况下提供发布冗余时,使用这种配置很有用。使用这种配置也很有用,因为您无需编写代码来处理反向压力场景。

当您使用无限制缓冲区时,Java API 会持续将客户端应用程序发布的消息放入内部缓冲区。以下示例展示了对 onBackPressureElastic() 方法的显式调用,这不是必需的,因为它是默认行为:

/* 创建一个 DirectMessagePublisherBuilder 实例,用于创建 DirectMessagePublisher 对象。 */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
.onBackPressureElastic() // 创建一个无限制容量的消息缓冲区。
.build() // 根据提供的配置构建 DirectMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

发布直接消息

创建 DirectPublisher 对象后,您可以开始发送消息。发送消息时,有两个主要组成部分:

  • 要发布的主题(了解主题)
  • 消息正文(可选)

主题必须是遵循 Solace 层次结构格式的 Topic 类的实例,例如:solace/messaging/direct/pub。发布函数目前支持简单字符串消息、字节数组以及可以通过 MessagingService.messageBuilder() 获得的 OutboundMessage 类的实例。以下是发布直接消息的方法:

  • publish(byte[] message, Topic destination)
  • publish(OutboundMessage message, Topic destination)
  • publish(OutboundMessage message, Topic destination, Properties additionalMessageProperties)
  • publish(String message, Topic destination)

有关 DirectMessagePublisher 接口可用的发布方法的信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下示例展示了如何发布直接消息:

while (System.in.available() == 0 && !isShutdown) {
try {
String message = "This is a message from Solace"; // 消息的内容或正文。
String topicString = "solace/samples/direct/pub"; // 消息在事件代理上发布的主题。
publisher.publish(message, Topic.of(topicString)); // 此方法将消息发布到主题,直到 while 循环中的条件停止。
} catch (RuntimeException e) {
System.out.printf("### Caught while trying to publisher.publish(): %s%n", e);
isShutdown = true;
} finally {
try {
Thread.sleep(1000 / APPROX_MSG_RATE_PER_SEC);
} catch (InterruptedException e) {
isShutdown = true;
}
}
}

处理错误

PubSub+ Java API 提供了 setPublishFailureListener() 方法,如果 API 无法发布消息,将通知客户端。发布失败事件可能是由于无效主题或服务终止等问题导致的。请参阅以下示例:

/* 创建一个 DirectMessagePublisherBuilder 实例,用于创建 DirectMessagePublisher 对象。 */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
.onBackPressureWait(1000) // 创建一个无限制容量的消息缓冲区。
.build() // 根据提供的配置构建一个 DirectMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

publisher.setPublishFailureListener(e -> { // 可用于访问控制列表违规。
System.out.println("### FAILED PUBLISH " + e); // 在此示例中,打印错误消息。
});