跳到主要内容

使用Java API发布持久消息

当您的应用程序需要确认处理和“恰好一次”投递时,我们建议您使用持久消息而不是直接消息。要在 Java 的 PubSub+ API 中发布持久消息,您需要先在 PubSub+ 事件代理上设置一个消息队列。

有关在事件代理上创建和配置队列的信息,请参阅配置队列。

要使用 Java 的 PubSub+ API 处理持久消息,请按照以下步骤操作:

  1. 创建一个 PersistentMessagePublisher 对象。
  2. 配置并创建一个 OutboundMessage
  3. 发布持久消息。
  4. 确认消息和处理错误。

在某些用例中,您的应用程序发送消息的速度可能比消息传输的速度更快。这可能会导致消息在 API 内部缓冲区中积累,造成反向压力。如果这种情况可能发生,请考虑更改反向压力设置以满足应用程序的要求。有关更多信息,请参阅处理发布持久消息时的反向压力。

有关发布持久消息的应用程序示例,请参阅 Solace 开发者中心。

创建PersistentMessagePublisher对象

MessagingService 对象与事件代理建立连接后,使用 PersistentMessagePublisher 对象发布持久消息。与 MessagingService 对象一样,PersistentMessagePublisher 对象允许您配置要在 API 中使用的功能。您可以使用以下方法设置 PersistentMessagePublisherBuilder 对象的反向压力处理方式。这些方法包括 build() 方法,该方法返回一个 PersistentMessagePublisher 对象。要使您的 PersistentMessagePublisher 开始发布消息,请在其上调用 start()

  • PersistentMessagePublisherBuilder

    • createPersistentMessagePublisherBuilder()

    • onBackPressureReject()

    • onBackPressureWait()

    • onBackPressureElastic()

    • build()

  • PersistentMessagePublisher

    • start()

以下示例展示了如何使用 PersistentMessagePublisher 对象通过 MessagingService 对象连接到事件代理:

/* 创建一个 PersistentMessagePublisherBuilder 实例,用于创建 PersistentMessagePublisher 对象。 */
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build() // 根据提供的配置构建一个 PersistentMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

异步接收器

也可以使用回调监听器启动持久消息发布者,以便在启动操作完成后进行异步通知。

以下示例展示了如何调用 CompletionListener,这是用于监听未来计算结果的回调方法。此处理器还会在启动操作完成后通知应用程序。

final CompletionListener<PersistentMessagePublisher> receiverStartupListener = (receiver, throwable) -> {
if (throwable == null) {
// 启动成功,可以接收消息。
} else {
// 处理启动期间的异常。
}
};
receiverToBeStarted.startAsync(receiverStartupListener);

在调用 start()startAsync() 之前,您的发布者应用程序是无法运行的。

配置和创建出站消息

您的客户端应用程序需要显式创建要发布的出站消息。在 Java 的 PubSub+ API 中,发布消息时您需要使用 OutboundMessage 对象。要配置和创建 OutboundMessage 对象,请按照以下步骤操作:

  1. 调用 messagingService.messageBuilder() 以返回一个 OutboundMessageBuilder 对象。为了获得更好的性能,我们建议您使用单个 OutboundMessageBuilder 来创建多个 OutboundMessage 对象。
final OutboundMessageBuilder messageBuilder = messagingService.messageBuilder()
  1. 使用 OutboundMessageBuilder 配置您的消息,然后调用 build() 方法以返回一个消息对象。您可以使用以下任一方法配置消息属性。

    • 使用 Properties 接口和 setProperty(name,value) 方法。以下示例展示了如何为 Properties 对象设置自定义名称-值属性,然后使用 fromProperties() 方法配置消息:

      final Properties messageProperties = new Properties();
      messageProperties.setProperty("PropertyName", "PropertyValue");
      final OutboundMessage message = messageBuilder
      .fromProperties(messageProperties)
      .build(messagePayload);
    • 使用 OutboundMessageBuilder 接口和 with*() 方法。以下示例展示了如何为消息设置生存时间、发送者 ID 和自定义键值属性:

      final OutboundMessage myCustomMessage = messageBuilder
      .withTimeToLive(1000)
      .withSenderId("mySenderId")
      // 有关 with*() 方法的完整列表,请参阅 Java 的 PubSub+ 消息 API 参考
      .withProperty("PropertyKey", "PropertyValue")
      .build(messagePayload);

以下代码示例展示了如何创建消息构建器、设置消息属性并创建消息:

/* 用于创建类似配置消息的构建器 */
final OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
final OutboundMessage message = messageBuilder
.fromProperties(messageProperties) // 例如 TTL、发送者 ID、序列号等。
.withExpiration(Instant.now() // 使用当前系统时间作为起始点设置过期时间。
.toEpochMilli() + 10000L) // 10 秒后过期该消息。
.build("My_Message"); // 构建消息。

有关这些方法的更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

设置分区键

您可以设置分区键以使用分区队列。分区队列是配置在 PubSub+ 事件代理上的一个功能,允许您轻松扩展绑定到队列的消费者应用程序的数量。可以在发布应用程序中的每条消息上设置分区键,以确保所有具有相同分区键的消息都无需在消费者应用程序中进行额外逻辑即可投递到同一个消费者。有关更多信息,请参阅分区队列。

您可以使用 setProperty(name,value)withProperty(key,value) 在消息上设置键值对,使用 OutboundMessageBuilder

  • name/key — 常量 MessageUserPropertyConstants.QUEUE_PARTITION_KEY 或字符串值 "JMSXGroupID"。

  • value — 一个字符串,表示您的分区键的值。客户端应用程序在发布时设置该值。

以下示例将分区键 value 设置为 "Group-0":

// 使用 fromProperties() 方法
final Properties additionalProperties = new Properties();
additionalProperties.setProperty(MessageUserPropertyConstants.QUEUE_PARTITION_KEY, "Group-0");
final OutboundMessage message = messageBuilder
.fromProperties(additionalProperties)
.build(payload);

// 使用 withProperty() 方法
final OutboundMessage myCustomMessage = messagingService.messageBuilder()
.withProperty(MessageUserPropertyConstants.QUEUE_PARTITION_KEY, "Group-0")
.build(payload);

处理发布持久消息时的反向压力

使用持久消息时,消息会通过主题发送到 PubSub+ 事件代理,并可能会被排队到任何具有匹配主题订阅的队列上。事件代理随后会异步地将消息投递给订阅该队列的任何消费者。在客户端应用程序发布持久消息时,API 会在将消息发送到事件代理之前,将消息排队到内部缓冲区。在理想情况下,应用程序发布消息后,API 会立即将该消息发送到网络上,最终由事件代理接收该消息。由于网络拥塞或连接问题,客户端应用程序可能会比 API 更快地发布消息。这种延迟可能会导致内部缓冲区积累消息,直到达到其容量,阻止 API 存储更多消息。这种情况称为 反向压力。配置应用程序以处理反向压力发生的情况非常重要。

在 Java 的 PubSub+ API 中,PersistentMessagePublisherBuilder 有以下三种主要机制来处理反向压力,您可以使用:

  • 当达到指定限制时拒绝消息
  • 当达到指定限制时限制发布者
  • 使用无限制的内部缓冲区(默认)

当达到指定限制时拒绝消息

当发生反向压力时,您可以选择在内部缓冲区达到指定限制时拒绝来自客户端应用程序的消息。您可以使用 onBackPressureReject(int bufferCapacity) 方法为要积累的消息数量指定一个定义的缓冲区容量。达到指定容量后,将不再可能发布新消息,直到缓冲区再次有容量之前会抛出异常。在发布调用中,bufferCapacity 必须大于零。

/* 创建一个 PersistentMessagePublisherBuilder 实例,用于创建 PersistentMessagePublisher 对象。 */
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.onBackPressureReject(1000) // 创建一个消息缓冲区,空间可容纳 1000 条消息,达到该容量时将拒绝消息,直到有空间为止。
.build() // 根据提供的配置构建一个 PersistentMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

使用发布者就绪监听器

当您使用 onBackPressureReject() 方法时,我们建议您使用 PublisherReadinessListener 接口,因为它可以通知您的应用程序缓冲区何时有容量可用,可以恢复发布消息。

以下是一个注册事件处理器 PublisherReadinessListener 接口的示例:

/* 用于确保 'ready' 事件在 'publish' 处理后且异常处理后发出的对象 */
final Object[] lock = new Object[0];
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.onBackPressureReject(1000) // 创建一个消息缓冲区,空间可容纳 1000 条消息,达到该容量时将拒绝消息,直到有空间为止。
.build() // 根据提供的配置构建一个 PersistentMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

final PublisherReadinessListener canPublishListener = () -> {
// 当发布者再次准备好发布消息时,执行此代码。
};
publisher.setPublisherReadinessListener(canPublishListener); // 注册监听器,通常只需设置一次。

当达到指定限制时限制发布者

如果内部缓冲区达到指定限制,您可以选择限制发布应用程序。使用限制可以释放其内部缓冲区的容量。您可以使用 onBackPressureWait(int bufferCapacity) 方法设置缓冲区中可以积累的最大消息数量。当达到此最大容量 (bufferCapacity) 时,发布者线程会暂停,并等待内部缓冲区有可用容量,然后才允许应用程序发布更多消息。

当您希望应用程序的发布请求在缓冲区容量达到后等待空间时,应使用此方法。使用此机制可以有效地为应用程序清空内部缓冲区。使用持久化发布时的一个额外好处是,API 不会丢弃任何消息。

以下示例展示了如何配置内部缓冲区以容纳多达一千条消息:

/* 创建一个 PersistentMessagePublisherBuilder 实例,用于创建 PersistentMessagePublisher 对象。 */
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.onBackPressureWait(1000) // 创建一个消息缓冲区,空间可容纳 1000 条消息,达到该容量时线程将暂停,直到有空间为止。
.build() // 根据提供的配置构建一个 PersistentMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

配置无限制的内部缓冲区

API 的默认配置是使用无限制大小的内部缓冲区来存储消息。这种配置不适合内存受限的环境,因为缓冲区被允许无限增长,可能会导致内存不足错误(或潜在的未定义错误)。当您的基础设施由多个短生命周期的微服务组成,并且可以在内部队列遇到内存不足场景的不太可能的情况下提供发布冗余时,使用这种配置很有用。使用这种配置也很有用,因为您无需编写代码来处理反向压力场景。

当您使用无限制缓冲区时,Java API 会持续将客户端应用程序发布的消息放入内部缓冲区。以下示例展示了对 onBackPressureElastic() 方法的显式调用,这不是必需的,因为它是默认行为:

/* 创建一个 PersistentMessagePublisherBuilder 实例,用于创建 PersistentMessagePublisher 对象。 */
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.onBackPressureElastic() // 创建一个无限制容量的消息缓冲区。
.build() // 根据提供的配置构建一个 PersistentMessagePublisher 对象。
.start(); // 使服务恢复常规职责。在调用此方法之前,服务被视为非工作状态。

发布持久消息

在使用 MessagingService 对象与事件代理建立连接后,您可以使用 PersistentMessagePublisher 对象发布持久消息。

持久消息具有以下组成部分:

  • 要发布的主题(必需)
  • 消息正文(可选)

持久消息发布涉及接收确认或 发布收据。根据您的需求,您的客户端应用程序可以作为:

  • 非阻塞,允许您的应用程序在 PublishReceiptListener 等待确认的同时执行其他功能
  • 阻塞,等待直到收到确认;确认表示消息已被代理接收并持久化

您不能发布大于事件代理的最大持久消息大小(在大多数情况下为 30MB)的持久消息。如果尝试发布超过最大大小的消息,Java 的 PubSub+ API 将抛出一个带有子代码 MESSAGE_TOO_LARGE 的异常。为了避免中断,请设计您的应用程序以控制消息大小并确保它们保持在支持的限制内。

非阻塞方法

以下是非阻塞方法:

  • publish(byte[] message, Topic destination)
  • publish(byte[] message, Topic destination, Object userContext)
  • publish(OutboundMessage message, Topic destination)
  • publish(OutboundMessage message, Topic destination, Object userContext)
  • publish(OutboundMessage message, Topic destination, Object userContext, Properties additionalMessageProperties)
  • publish(String message, Topic destination)
  • publish(String message, Topic destination, Object userContext)

有关这些方法的更多信息,请参阅 Java 的 PubSub+ 消息 API 参考。

阻塞方法

如果您希望发布者在从 publish() 调用返回之前等待代理的确认,则可以使用以下阻塞方法:

  • publishAwaitAcknowledgement(OutboundMessage message, Topic destination, long timeout)
  • publishAwaitAcknowledgement(OutboundMessage message, Topic destination, long timeout, Properties additionalMessageProperties)

上述任一方法都可以与 PersistentMessagePublisher 对象一起使用,以使用主题将 OutboundMessage 发布到代理。此方法会阻塞主线程,直到:

  • 发布者 API 收到代理的确认
  • 超时时间段过去
/* 阻塞发布持久消息的简单示例。 */
final long deliveryConfirmationTimeOutInMilliseconds = 20000L; // 最多等待 20 秒,然后认为消息未投递到代理
OutboundMessage message = messageBuilder.build("Hello World!");

try {
publisher.publishAwaitAcknowledgement(messageBuilder.build("Hello World!"), // 发布消息,阻塞。
toDestination, deliveryConfirmationTimeOutInMilliseconds); // 使用超时值发布到主题 "toDestination"。
} catch (PubSubPlusClientException.TimeoutException e) {
logger.warn(String.format("消息 %s 超时 - %s", message, e)); // 发生超时。
} catch (PubSubPlusClientException e) {
logger.warn(String.format("消息 %s 未被代理队列持久化,收到 NACK - %s", message, e)); // 消息未被代理队列持久化,收到 NACK。
} catch (InterruptedException e) {
logger.info("在等待发布确认时被中断,可能正在关闭",e); // 在等待发布确认时被中断。
}

用户上下文

可选地,您可以使用 用户上下文 将持久消息与发布收据相关联的信息相关联到您的应用程序中。此信息是用户特定的,仅对您的发布应用程序有意义,并且不会发送到代理。用户上下文允许您将数据附加到 publish() 调用中,稍后可以从发布收据监听器中检索这些数据。

当使用用户上下文时,它允许您处理多种场景。它还允许您的应用程序根据上下文决定采取什么行动或如何处理发布收据。

例如,如果非阻塞应用程序有多个线程用于发布持久消息,每个线程可以在发布持久消息时将其标识符作为用户上下文包含在其中。Java API 会跟踪每个消息指定的用户上下文,并在消息被代理确认或拒绝时,将用户上下文作为发布收据的一部分返回。然后,发布应用程序可以根据用户上下文将发布收据发送到发送消息的正确线程。

您可以在发布消息时设置用户上下文。例如,您使用 publish(OutboundMessage message,Topic destination, Object userContext) 方法,其中用户上下文指定为 Object 类型。

以下示例展示了如何从发布收据中获取用户上下文:

/* 持久消息发布收据的简单示例 */
final OutboundMessage acknowledgedMessage = publishReceipt.getMessage(); // 检索与收据相关联的消息。
final Object processingContext = publishReceipt.getUserContext(); // 当在消息发布期间提供时,可以从发布收据中以这种方式检索对应的上下文。

如果您的应用程序是非阻塞的,您还可以在回调中使用持久消息发布者与发布收据来记录信息。例如,您可以使用非阻塞消息发布,然后发送警报以通知应用程序已发布消息的状态,例如:

  • 代理成功接收并处理消息
  • 访问控制违规(ACL)
  • 队列超出配额
  • 无效主题/没有订阅者的话题

以下示例展示了如何使用 Log4j2 与 logger.warn()logger.debug() 等命令记录发布收据。

/* 处理所有消息的投递确认/超时的监听器 */
final MessagePublishReceiptListener deliveryConfirmationListener = (publishReceipt) -> {
final PubSubPlusClientException e = publishReceipt.getException();
if (e == null) { // 没有异常,代理已确认成功的发布收据。
OutboundMessage outboundMessage = publishReceipt.getMessage();
logger.debug(String.format("消息 %s 的 ACK", outboundMessage)); // 代理已收到消息,记录一个“ACK”。
} else { // 负面确认,代理未收到消息。
OutboundMessage outboundMessage = publishReceipt.getMessage(); // 哪条消息未被收到。
logger.warn(String.format("消息 %s 的 NACK - %s", outboundMessage, e)); // 记录“NACK”或负面确认。
}
};
publisher.setMessagePublishReceiptListener(deliveryConfirmationListener); // 监听所有发送消息的投递确认。
publisher.publish("Hello world!", topicDestination); // 发布一个带有字符串正文的消息。

确认消息和处理错误

发布收据是投递确认,指示事件代理是否成功将消息持久化到队列上。这些发布收据可以指示成功或失败,并由 MessagePublisReceiptListener 对象处理。您可以通过调用 setMessagePublishReceiptListener() 方法创建 MessagePublishReceiptListener 对象。

以下示例展示了如何使用 MessagePublishReceiptListener 监听发布收据:

/* 处理所有消息的投递确认/超时的监听器 */
final MessagePublishReceiptListener deliveryConfirmationListener = (publishReceipt) -> {
final PubSubPlusClientException e = publishReceipt.getException();
if (e == null) { // 没有异常,代理已确认收据。
// 记录、处理成功的发布收据的代码。
} else { // 负面确认,代理未收到消息。
// 记录、处理失败的发布收据的代码。
}
};
publisher.setMessagePublishReceiptListener(deliveryConfirmationListener); // 监听所有发送消息的投递确认。
publisher.publish("Hello world!", topicDestination); // 发布一个带有字符串正文的消息。

处理发布收据错误的策略

以下是可以用于处理发布时收据错误的应用程序特定策略。

  • 等待并重试:在尝试重新发送消息之前等待几秒钟。例如,使用 Thread.sleep(1000) 在尝试重新发布之前等待 1 秒。
  • 重试预定义次数:在放弃之前尝试重新发布消息预定义次数。
  • 丢弃消息:简单地丢弃具有失败发布收据的消息。如果您的应用程序不能容忍消息丢失,我们不推荐此策略。

要接收没有匹配订阅时的失败发布收据,必须为事件代理或事件代理服务设置此选项。有关更多信息,请参阅处理没有匹配项的保证消息(针对设备和软件事件代理)或 在没有订阅匹配时拒绝消息给发送方以丢弃(针对 PubSub+ Cloud)。