跳到主要内容

使用Java API消费持久消息

无法容忍消息丢失的接收应用程序可以使用持久消息(在本文档的其他部分称为“保证消息”)而不是直接消息。使用持久消息时,消息会存储在事件代理上的队列中。只有在订阅应用程序(称为消息接收器)消费并确认消息后,消息才会从事件代理中删除。Java 的 PubSub+ API 只能从队列中消费持久消息,而不能从主题端点中消费。

要消费持久消息,您必须首先在事件代理上设置一个消息队列。有关在事件代理上创建和配置持久队列的信息,请参阅配置队列。或者,可以在创建持久消息接收器(PersistentMessageReceiver)时创建一个非持久队列。

要使用持久消息接收器消费持久消息,请按照以下步骤操作:

  1. 创建一个 PersistentMessageReceiver 对象。
  2. 同步接收持久消息。
  3. 异步接收持久消息。
  4. 从入站消息中提取属性。
  5. 消息确认。
  6. 特定消息的否定确认(NACKs)。
  7. 使用 Java API 创建队列。
  8. 使用 Java API 浏览队列。

如果您的消费者应用程序在接收消息的速度超过处理消息的速度时,可能会发生反向压力。消息会继续在内部缓冲,直到达到高水位线,此时 API 会告知事件代理停止发送消息,以防止消息丢失。

有关订阅持久消息的应用程序示例,请参阅 Solace 开发者中心。

创建PersistentMessageReceiver对象

在使用 MessagingService 对象与事件代理建立连接后,使用 PersistentMessageReceiver 对象从事件代理上的队列中消费持久消息。要使您的 PersistentMessageReceiver 开始接收消息,请在其上调用 start()

以下是可以用来配置如何从事件代理消费持久消息的对象和方法:

  • PersistentMessageReceiverBuilder

    • createPersistentMessageReceiverBuilder()
    • withMissingResourcesCreationStrategy(MissingResourcesCreationConfiguration.MissingResourcesCreationStrategy strategy)
    • build(Queue endpointToConsumeFrom)
  • PersistentMessageReceiver

    • start()
  • Queue

    • durableExclusiveQueue(String queueName)
    • durableNonExclusiveQueue(String queueName)
    • nonDurableExclusiveQueue()
    • nonDurableExclusiveQueue(String queueName)

有关前面方法和接口的更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下示例展示了如何使用 PersistentMessageReceiver 对象绑定到队列:

/* 创建一个 PersistentMessageReceiverBuilder 实例,用于创建 PersistentMessageReceiver 对象。 */
final PersistentMessageReceiver receiver = service.createPersistentMessageReceiverBuilder()
.build(Queue.durableExclusiveQueue(QUEUE_NAME)) // 创建一个 PersistentMessageReceiver 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

您可以使用 Java API 在事件代理上创建持久和非持久队列。有关更多信息,请参阅使用 Java API 创建队列。

异步接收器

也可以使用回调监听器启动持久消息接收器,以便在启动操作完成后进行异步通知。

以下示例展示了如何调用 CompletionListener,这是用于监听未来计算结果的回调方法。此处理器还会在启动操作完成后通知应用程序。

final CompletionListener<PersistentMessageReceiver> receiverStartupListener = (receiver, throwable) -> {
if (throwable == null) {
// 启动成功,可以接收消息。
} else {
// 处理启动期间的异常。
}
};
receiverToBeStarted.startAsync(receiverStartupListener);

在调用 start()startAsync() 之前,您的接收器应用程序是无法运行的。

同步消费持久消息

您可以同步消费持久消息。为此,您需要创建一个 PersistentMessageReceiver 对象并将其绑定到队列。成功绑定到队列后,您的对象可以开始使用 receiveMessage() 方法消费持久消息。此方法会阻塞线程,直到收到下一条消息。

当应用程序处理 InboundMessage 时,它可以使用 PersistentMessageReceiver.ack() 向事件代理发送确认。然后,事件代理会从队列中移除 InboundMessage。在消息被确认之前,它保留在代理队列中,并且当应用程序重新连接到队列时可能会重新投递。

有关前面方法的更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下示例展示了如何同步消费持久消息:

/* 创建一个 PersistentMessageReceiverBuilder 实例,用于创建 PersistentMessageReceiver 对象。 */
final PersistentMessageReceiver receiver = service.createPersistentMessageReceiverBuilder()
.build(queueToConsumeFrom) // 创建一个 PersistentMessageReceiver 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

final InboundMessage message = receiver.receiveMessage(); // 阻塞请求以接收下一条消息。
// 只要未调用 receiver.terminate(),即可随时确认。
receiver.ack(message); // 确认入站消息。

如果您不调用 receiveMessage() 方法,消息可能会在 API 的内部缓冲区中积累,您可能会遇到反向压力场景。如果发生这种情况,Java API 会自动告知事件代理停止发送消息。

异步消费持久消息

您可以异步消费持久消息。为此,您需要创建一个 PersistentMessageReceiver 对象并像往常一样启动与事件代理的连接,但您需要使用 MessageHandler 对象作为回调方法,以通知您的应用程序何时接收到消息。

以下示例展示了如何异步消费持久消息:

/* 创建一个 PersistentMessageReceiverBuilder 实例,用于创建 PersistentMessageReceiver 对象。 */
final PersistentMessageReceiver receiver = service.createPersistentMessageReceiverBuilder()
.build(queueToConsumeFrom) // 创建一个 PersistentMessageReceiver 实例。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

final MessageHandler messageHandler = (message) -> { // 入站消息处理程序的监听器接口。
if (message != null && message.getPayloadAsBytes() != null) { // 对消息进行一些操作,例如检查它是否不为 null 或无效。
receiver.ack(message); // 确认入站消息。
}
};
receiver.receiveAsync(messageHandler); // 请求注册异步消息处理程序。

暂停和恢复从内部缓冲区的消息消费

当您的应用程序使用 receiveAsync() 方法异步消费消息时,您可以调用 pause()resume() 方法来控制消息流向您应用程序的回调。

如果使用 receiveMessage()pause()resume() 方法将无效。

您可以使用 pause()resume() 方法来控制消息在 API 的内部缓冲区和您的应用程序之间的流动。这个内部缓冲区是从事件代理接收消息的地方。这种流控制在您的应用程序必须暂时停止处理消息以处理其他操作时很有用。pause()resume() 方法不控制事件代理和 API 内部缓冲区之间的消息流动。当您调用 pause() 方法时,消息会继续从事件代理发送。pause()resume() 方法只控制向应用程序的消息投递。从事件代理接收的消息会继续在内部缓冲区中积累。

由于事件代理继续发送消息,可能会发生反向压力场景——即,消息会继续积累直到达到内部高水位线。此时,PersistentMessageReceiver 对象会通知事件代理停止发送消息,直到积累的消息数量低于内部高水位线。这个内部 API 机制为您处理反向压力场景,并确保在事件代理和您的应用程序之间不会丢失任何消息。

以下对象和方法用于暂停和恢复从 API 的内部缓冲区处理消息:

  • ReceiverFlowControl

    • pause()
    • resume()

有关前面方法的更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

以下示例展示了如何使用调度器暂停和恢复从 API 内部队列中的消息处理:

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
scheduler.schedule(() -> receiver.pause(), 10L, TimeUnit.SECONDS); // 在 10 秒后暂停消息投递。
scheduler.schedule(() -> receiver.resume(), 20L, TimeUnit.SECONDS); // 在 20 秒后恢复消息投递。

从入站消息中提取属性

在与事件代理建立连接后,您可以使用 PubSub+ Java API 订阅消息。作为 API 的一部分,它会隐式地在匹配的投递上创建入站消息。

在您的应用程序接收到 InboundMessage 对象后,您可以从该消息中提取许多属性,例如消息正文(作为字节或字符串)、发送者 ID 和服务类别。以下示例展示了如何从消息中提取属性:

/* 使用 messageHandler */
final MessageHandler messageHandler = (inboundMessage) -> {
byte[] bytes = inboundMessage.getPayloadAsBytes(); // 以字节数组的形式获取消息的原始有效载荷。
String senderID = inboundMessage.getSenderId(); // 返回发送者的 ID。
int serviceClass = inboundMessage.getClassOfService(); // 检索给定消息的服务类别级别。
receiver.ack(inboundMessage);
};
receiver.receiveAsync(messageHandler);

/* 使用 receiveMessage() 方法 */
final InboundMessage inboundMessage= receiver.receiveMessage();
String msgPayload = inboundMessage.getPayloadAsString(); // 以 UTF-8 编码的字符串形式获取有效载荷。
String senderID = inboundMessage.getSenderId(); // 返回发送者的 ID。
int serviceClass = inboundMessage.getClassOfService(); // 检索给定消息的服务类别级别。
receiver.ack(inboundMessage);

有关可以从 InboundMessage 中提取属性的方法的完整列表,请参阅 Java 的 PubSub+ 消息 API 参考。

消息确认

可以使用以下两种应用程序确认模式之一来确认消息:

  • 自动确认
  • 客户端确认(默认)

自动确认模式

当使用自动确认模式时,API 会自动生成应用程序级别的确认。要将您的 PersistentMessageReceiver 配置为使用自动确认,请使用 withMessageAutoAcknowledgement() 方法:

final PersistentMessageReceiver receiver = messagingService
.createPersistentMessageReceiverBuilder()
.withMessageAutoAcknowledgement()
.build(Queue.durableExclusiveQueue(QUEUE_NAME));

对于 Java 的 PubSub+ API,根据消息是异步接收还是同步接收,确认发送的时间不同:

  • 当异步接收时,确认会在消息回调完成且没有异常后发送。

  • 当同步接收时,确认会在 receiveMessage() 方法期间从 API 的内部队列中移除消息后发送。重要的是要意识到,在控制权返回给应用程序之前(即在 receiveMessage() 方法完成后),确认已经发送。

客户端确认模式

客户端确认模式是 Java 的 PubSub+ API 的默认行为,意味着客户端必须明确地为收到的每条消息的消息 ID 发送确认。确认是异步的。只要接收器尚未终止,任何线程都可以随时确认消息。重要的是要记住,在 PersistentMessageReceiver 对象向事件代理确认消息后,它会从事件代理的队列中删除该消息。因此,在确认消息之前,重要的是要完成对消息的任何处理和存储。

以下示例展示了如何同步和异步确认持久消息:

/* 阻塞请求以接收消息。 */
final InboundMessage message = receiver.receiveMessage();
// 现在已自动确认。

/* 请求注册异步消息处理程序。 */
receiver.receiveAsync(message -> { // 异步接收器消息回调。
// 在这里处理/存储消息。
});
// 当消息处理回调方法在没有错误的情况下完成时,会自动确认。

特定消息的否定确认

如果您的 PersistentMessageReceiver 未配置为自动确认收到的消息,则可以使用否定确认(NACKs)。使用 NACKs 时,您可以发送一个结算结果,以告知事件代理处理收到的保证消息的结果。根据结算结果,事件代理知道如何在其队列中处理消息。您可以使用以下结算结果:

  • ACCEPTED — 此确认通知事件代理您的客户端应用程序已成功处理保证消息。当事件代理收到此结果时,它会从其队列中移除该消息。
    • 当您使用 ACCEPTED 结果调用 settle() 方法时,与使用 persistentReceiver.ack(message) 相同。
  • FAILED — 此否定确认通知事件代理您的客户端应用程序未处理该消息。当事件代理收到此否定确认时,它会尝试在遵守投递次数限制的情况下重新投递该消息。在重新投递期间,消息保留在代理上,直到代理收到非 FAILED 的确认状态。
  • REJECTED — 此否定确认通知事件代理您的客户端应用程序无法处理该消息。当事件代理收到此否定确认时,它会从其队列中移除该消息,然后如果已配置,将消息移动到死信队列(DMQ)。

在使用 NACKs 之前,您必须使用 withRequiredMessageClientOutcomeOperationSupport() 方法在创建 PersistentMessageReceiver 时添加 Outcome.FAILEDOutcome.REJECTED 或两种结果作为 NACK 类型,以准备它与否定确认一起工作。您不需要添加 Outcome.ACCEPTED 结果,因为它始终可用。如果您尝试对未添加的结果调用 settle(),您将收到一个 Required Settlement Outcome Not Supported 错误。以下代码展示了如何配置 PersistentMessageReceiver 以使用 NACKs:

import com.solace.messaging.config.MessageAcknowledgementConfiguration.Outcome;
// ...
final PersistentMessageReceiver persistentReceiver = messagingService
.createPersistentMessageReceiverBuilder()
.withRequiredMessageClientOutcomeOperationSupport(
new Outcome[]{Outcome.FAILED, Outcome.REJECTED})
.build(Queue.durableExclusiveQueue(QUEUE_NAME));
  • 在传输过程中可能会丢失 NACKs(例如,由于意外的网络问题)。在开发应用程序时,将这一事实作为处理消息的逻辑的一部分。
  • NACKs 在 10.2.1 及更高版本的事件代理上受支持。如果事件代理不支持 NACKs,则在调用配置为使用消息结果的 PersistentMessageReceiver 实例的 start() 时,会发生 InvalidOperationException

以下示例展示了如何结算已接受的消息,这与使用 persistentReceiver.ack(message) 相同:

persistentReceiver.receiveAsync(message -> {  // 异步接收器消息回调。
// 在这里处理/存储消息。
persistentReceiver.settle(message, Outcome.ACCEPTED);
});

以下示例展示了如何结算已拒绝的消息,例如您的应用程序当前无法处理该消息:

persistentReceiver.receiveAsync(message -> {  // 异步接收器消息回调。
// 消息已处理但未被接受,向代理发送 NACK
persistentReceiver.settle(message, Outcome.REJECTED);
});

以下示例展示了如何结算失败的消息,例如消息内容存在问题:

persistentReceiver.receiveAsync(message -> {  // 异步接收器消息回调。
// 处理消息失败,向代理发送 NACK
persistentReceiver.settle(message, Outcome.FAILED);
});