跳到主要内容

使用Java API消费直接消息

在需要高吞吐量和低延迟的场景中,直接消息非常有用。由于外部因素(如网络拥塞或偶尔的客户端断开连接)的影响,直接消息可能会导致一些消息丢失。直接消息适用于需要最新信息但不一定需要每条消息的应用程序。例如天气应用程序、价格检查器、GPS 跟踪等。

直接消息不需要额外的事件代理配置。如果您的应用程序不能容忍消息丢失,我们建议您使用持久消息。

要使用 Java API 消费直接消息,请按照以下步骤操作:

  1. 创建一个 DirectMessageReceiver 对象。
  2. 同步接收直接消息。
  3. 异步接收直接消息。
  4. 从入站消息中提取属性。

在某些用例中,API 从事件代理接收消息的速度可能比您的应用程序处理它们的速度更快。消息可能会填满 API 的内部缓冲区,导致 反向压力。如果这种情况可能发生,您可能需要考虑更改默认的反向压力设置以满足您的要求。有关更多信息,请参阅处理订阅直接消息时的反向压力。

有关消费直接消息的应用程序示例,请参阅 Solace 开发者中心。

创建DirectMessageReceiver对象

MessagingService 对象与事件代理建立连接后,使用 DirectMessageReceiver 对象从事件代理消费直接消息。

MessagingService 对象一样,DirectMessageReceiver 对象可以配置为使用 API 的某些功能。以下是可以用来配置 DirectMessageReceiverBuilder 对象的方法列表。build() 方法返回一个 DirectMessageReceiver 对象。要使您的 DirectMessageReceiver 开始接收消息,请在其上调用 start()

  • DirectMessageReceiverBuilder

    • createDirectMessageReceiverBuilder()
    • withSubscriptions(TopicSubscriptions subscriptions)
    • build()
  • DirectMessageReceiver

    • start()

有关前面方法的更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下示例展示了如何向 DirectMessageReceiver 对象添加主题订阅并连接到事件代理:

/* 创建一个 DirectMessageReceiverBuilder 实例,用于创建 DirectMessageReceiver 对象。 */
final DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder()
.withSubscriptions(TopicSubscription.of("Solace/Example/direct/>")) // 允许将 SMF 主题添加到消息接收器
// <--如果您需要,可以在此处添加更多订阅。
.build(); // 根据提供的配置创建一个 DirectMessageReceiver 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

异步接收器

也可以使用回调监听器启动直接消息接收器,以便在启动操作完成后进行异步通知。

以下示例展示了如何调用 CompletionListener,这是用于监听未来计算结果的回调方法。此处理器还会在启动操作完成后通知应用程序。

final CompletionListener<DirectMessageReceiver> receiverStartupListener = (receiver, throwable) -> {
if (throwable == null) {
// 启动成功,可以接收消息。
} else {
// 处理启动期间的异常。
}
};
receiverToBeStarted.startAsync(receiverStartupListener);

在调用 start()startAsync() 之前,您的接收器应用程序是无法运行的。

处理订阅直接消息时的反向压力

订阅直接消息时,API 使用内部缓冲区来存储从事件代理接收的消息。在理想情况下,应用程序会立即处理收到的消息。根据处理速度和其他因素,可能会收到比订阅应用程序处理速度更快的消息(例如高容量的消息突发)。如果消息未被处理并被允许积累,内部缓冲区可能会达到其容量,这被称为 反向压力

在我们的 Java API 中,DirectMessageReceiver 有以下机制来处理反向压力:

  • 丢弃最新消息
  • 丢弃最旧消息
  • 忽略限制(默认)

丢弃最新消息

您可以配置 API,在 API 的内部缓冲区达到指定容量时丢弃最新到达的消息。达到此容量后,由于缓冲区已满,最新消息不会被放入内部缓冲区,而是被丢弃(丢失)。

要配置此机制,请在 DirectMessageReceiver 对象上调用 onBackPressureDropLatest(int bufferCapacity) 方法,然后设置可以在缓冲区中积累的最大消息数量 (bufferCapacity),在达到此数量之前不会丢弃消息。

以下示例展示了如何配置应用程序在 API 的内部缓冲区中有 1000 条消息排队时丢弃消息:

/* 创建一个 DirectMessageReceiverBuilder 实例,用于创建 DirectMessageReceiver 对象。 */
final DirectMessageReceiver receiver = service.createDirectMessageReceiverBuilder()
.fromProperties(receiverConfiguration) // 使用通过命令行传递或硬编码到程序中的身份验证属性。
.onBackPressureDropLatest(1000) // 设置反向压力容量为 1000 条消息,达到此容量时缓冲区中最新的消息将被丢弃。
.build() // 根据提供的配置构建一个 DirectMessageReceiver 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

丢弃最旧消息

您可以配置 API,在 API 的内部缓冲区达到指定容量时丢弃最旧消息。达到此容量后,从内部缓冲区中移除最旧的项目,以为接收更新的消息腾出空间。达到指定容量后,从接收器缓冲区中移除最旧的消息,以便为更近期的消息排队。

要配置此机制,请在 DirectMessageReceiver 对象上调用 onBackPressureDropOldest(int bufferCapacity) 方法,然后设置可以在缓冲区中积累的最大消息数量 (bufferCapacity),在达到此数量之前不会从内部缓冲区中移除最旧的消息。

以下示例展示了如何配置应用程序在 API 的内部缓冲区中有 1000 条消息排队时丢弃最旧的消息:

/* 创建一个 DirectMessageReceiverBuilder 实例,用于创建 DirectMessageReceiver 对象。 */
final DirectMessageReceiver receiver = service.createDirectMessageReceiverBuilder()
.fromProperties(receiverConfiguration) // 使用通过命令行传递或硬编码到程序中的身份验证属性。
.onBackPressureDropOldest(1000) // 设置反向压力容量为 1000 条消息,达到此容量时缓冲区中最旧的消息将被丢弃。
.build() // 根据提供的配置构建一个 DirectMessageReceiver 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

配置无限制的内部缓冲区

此 API 的默认配置是使用无限制大小的内部缓冲区来存储消息。这种配置不适合内存受限的环境,因为缓冲区被允许无限增长,可能会导致内存不足错误(或潜在的未定义错误)。当您的基础设施由多个短生命周期的微服务组成时,这种配置很有用。使用这种配置也很有用,因为您无需编写代码来处理反向压力场景。

当您使用无限制缓冲区时,Java API 会持续将从事件代理接收的消息放入其内部缓冲区。以下示例展示了对 onBackPressureElastic() 方法的显式调用,这不是必需的,因为它是默认行为:

/* 创建一个 DirectMessageReceiverBuilder 实例,用于创建 DirectMessageReceiver 对象。 */
final DirectMessageReceiver receiver = service.createDirectMessageReceiverBuilder()
.fromProperties(receiverConfiguration) // 使用通过命令行传递或硬编码到程序中的身份验证属性。
.onBackPressureElastic() // 为接收器对象设置无限制的缓冲区容量。
.build() // 根据提供的配置构建一个 DirectMessageReceiver 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

同步接收直接消息

在使用 MessagingService 对象与事件代理建立连接后,您可以使用 DirectMessageReceiver 对象订阅消息。DirectMessageReceiver 对象必须至少订阅一个主题,才能开始接收消息。

以下示例展示了如何接收 InboundMessage 对象,以及 receiveMessage() 方法如何阻塞线程,直到收到下一条消息:

final DirectMessageReceiver receiver = messagingService
.createDirectMessageReceiverBuilder()
.withSubscriptions(TopicSubscription.of("setSubscriptionExpressionHere"))
.build().start(); .

final InboundMessage message = receiver.receiveMessage();// 接收一个 InboundMessage 对象。

有关这些方法的更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

异步接收直接消息

在使用 MessagingService 对象与事件代理建立连接后,您可以使用 DirectMessageReceiver 对象异步消费直接消息并处理它们。要异步处理直接消息,您需要使用 MessageHandler 对象作为回调方法,以告知应用程序何时接收到消息。

以下示例展示了如何异步接收消息:

/* 创建一个接收器对象并添加订阅。 */
final DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder()
.withSubscriptions(TopicSubscription.of("setSubscriptionExpressionHere")) // 允许将 SMF 主题添加到消息接收器。
.build() // 根据提供的配置构建一个 DirectMessageReceiver 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

final MessageHandler messageHandler = (message) -> { // 入站消息处理程序的监听器接口。
byte[] bytes = message.getPayloadAsBytes(); // 对消息进行一些操作,例如访问原始有效载荷。
};
receiver.receiveAsync(messageHandler); // 此方法表示基于推送的非阻塞接口。
// 消息处理程序的回调方法在内部 API 线程上执行。

从入站消息中提取属性

在与事件代理建立连接后,您可以使用 PubSub+ Java API 订阅消息。作为 API 的一部分,它会隐式地在匹配的投递上创建入站消息。

在您的应用程序接收到 InboundMessage 对象后,您可以从该消息中提取许多属性,例如消息正文(作为字节或字符串)、发送者 ID 和服务类别。以下示例展示了如何从消息中提取属性:

/* 使用 messageHandler */
final MessageHandler messageHandler = (inboundMessage) -> {
byte[] bytes = inboundMessage.getPayloadAsBytes(); // 以字节数组的形式获取消息的原始有效载荷。
String senderID = inboundMessage.getSenderId(); // 返回发送者的 ID。
int serviceClass = inboundMessage.getClassOfService(); // 检索给定消息的服务类别级别。
};
receiver.receiveAsync(messageHandler);

/* 使用 receiveMessage() 方法 */
final InboundMessage inboundMessage= receiver.receiveMessage();
String msgPayload = inboundMessage.getPayloadAsString(); // 以 UTF-8 编码的字符串形式获取有效载荷。
String senderID = inboundMessage.getSenderId(); // 返回发送者的 ID。
int serviceClass = inboundMessage.getClassOfService(); // 检索给定消息的服务类别级别。

有关可以从 InboundMessage 中提取属性的方法的完整列表,请参阅 Java 的 PubSub+ 消息 API 参考。