跳到主要内容

使用PubSub+缓存与Python API

PubSub+ 缓存是一种可扩展的、内存中的消息缓存机制,用于直接消息传递。它允许客户端应用程序在上线时或开始订阅它们原本未订阅的主题时,请求感兴趣主题的最新消息。有关设置 PubSub+ 缓存实例的信息,请参阅 PubSub+ 缓存。PubSub+ Python API 允许您的应用程序发送缓存请求以接收您的接收应用程序感兴趣的缓存消息。以下部分描述了如何发送缓存请求和接收缓存消息:

要消费缓存消息,您的应用程序需要访问 ReceiverCacheRequests 接口。DirectMessageReceiver 接口包含了 ReceiverCacheRequests 接口,因此需要一个 DirectMessageReceiver 实例来消费缓存消息。有关创建 DirectMessageReceiver 的更多信息,请参阅创建 DirectMessageReceiver。

  1. 创建 CachedMessageSubscriptionRequest 对象
  2. 发送缓存请求
  3. 使用 CacheRequestOutcomeListener 类创建事件监听器
  4. 接收缓存消息时的注意事项

创建CachedMessageSubscriptionRequest对象

要发送缓存请求,您首先需要创建一个 CachedMessageSubscriptionRequest 类的实例。此类包含四个不同的函数,用于请求缓存消息,每个函数都返回一个 CachedMessageSubscriptionRequest 实例。选择最适合您应用程序需求的函数:

  • as_available(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)
    • 返回一个 CachedMessageSubscriptionRequest 实例,用于配置缓存数据请求。如果您的接收器尚未订阅该主题,则请求对象会向传递给函数参数的主题添加一个实时数据订阅。这意味着您的应用程序将接收缓存请求的缓存消息和该主题订阅的实时消息的混合。
  • live_cancels_cached(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)
    • 返回一个 CachedMessageSubscriptionRequest 实例,该实例配置缓存请求应用程序以接收最新消息。如果您的接收器尚未订阅该主题,则请求对象会向传递给函数参数的主题添加一个实时数据订阅。当没有实时(非缓存)消息时,与您选择的主题订阅匹配的缓存消息被视为最新消息。否则,与您选择的主题订阅匹配的实时消息被视为最新消息。
  • cached_first(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)
    • 返回一个 CachedMessageSubscriptionRequest 实例,该实例配置缓存请求以在有可用时接收缓存消息,随后接收实时(非缓存)消息。如果您的接收器尚未订阅该主题,则请求对象会向传递给函数参数的主题添加一个实时数据订阅。
  • cached_only(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)
    • 返回一个 CachedMessageSubscriptionRequest 实例,该实例配置您的应用程序仅在缓存请求的持续时间内接收缓存消息。
    • 仅当您的接收器没有与缓存请求订阅匹配的实时订阅时,才使用 cached_only(),因为您的接收器将接收任何重叠主题订阅上的缓存消息的重复。当您有与缓存请求主题订阅重叠的现有实时数据订阅时,使用 as_available()cached_first() 以避免消息重复。

当您使用 as_available()live_cancels_cached()cached_first() 函数时,如果您的接收器上尚未存在,则缓存请求会创建一个实时主题订阅。此订阅即使在缓存请求结束后也会持续存在。您可以通过在缓存结果返回后,调用接收器对象上的 remove_subscription(topic_sub) 函数来移除该订阅,从而取消该订阅上向客户端应用程序的任何实时数据传输。

所有四个函数的参数相同:

  • cache_name — 要从中检索的 Solace 缓存的名称。
  • subscription — 匹配的主题订阅。
  • cache_access_timeout — 缓存请求超时时间(以毫秒为单位)。此值必须在 3,000 到 2,147,483,647 之间,对于大多数应用程序,10,000(10 秒)的值是足够的。有关更多信息,请参阅分配适当的 cache_access_timeout 值。
  • max_cached_messages — 从缓存中预期接收的消息的最大数量。默认值为 0,表示从缓存中接收的消息数量没有限制。
  • cached_message_age — 从缓存中检索的消息的最大年龄(以秒为单位)。默认值为 0,表示从缓存中接收的消息年龄没有限制。

以下示例展示了如何使用 as_available() 函数创建一个 CachedMessageSubscriptionRequest 实例:

cached_message_subscription_request = CachedMessageSubscriptionRequest.as_available("my_cache_name",
TopicSubscription.of("my/cache/example"),
10000)

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

发送缓存请求

要创建缓存请求并开始从 PubSub+ 缓存实例接收缓存消息,您需要使用 ReceiverCacheRequests 类中的 request_cached() 函数。需要注意的是,当您使用 Python API 发送缓存请求时,它使用的是一个非阻塞函数,该函数会立即返回,但实际请求可能会被延迟。如果您在接收缓存结果之前发送多个缓存请求,则需要使用唯一的缓存请求 ID 以避免接收重复消息。此函数请求之前使用 PubSub+ 缓存缓存的消息:

request_cached(cached_message_subscription_request: CachedMessageSubscriptionRequest, cache_request_id: int, completion_listener: CacheRequestOutcomeListener)

  • cached_message_subscription_request — 请求与指定订阅和其他满足条件匹配的缓存消息。
  • cache_request_id — 请求标识符,可用于响应回调关联目的。此 ID 在 MessagingService 实例上每次缓存请求的持续时间内必须是唯一的。此值将在 CacheRequestOutcomeListeneron_completion() 函数的回调中返回。此标识符是应用程序关联缓存请求与缓存响应和数据消息的方式。实时消息没有缓存请求 ID。应用程序有责任确保标识符是唯一的,以避免在您的关联中发生冲突。此值不应与 InboundMessage 类中用于一般消息关联的 CORRELATION_ID 属性混淆。
  • completion_listenerCacheRequestOutcomeListener 的一个实例,这是一个回调,用于通知应用程序缓存请求何时完成。

以下示例展示了如何发送缓存请求:

cache_request_id = 12345

cache_request_outcome_listener = MyCacheRequestOutcomeListener()

direct_message_receiver.requestCached(cached_message_subscription_request, cache_request_id, cache_request_outcome_listener)

在发送缓存请求后,您的 MessageHandler 开始接收消息,这些消息可以是缓存的、实时的或两者兼有,具体取决于用于配置缓存请求的缓存请求函数。您可以使用 get_cache_status() 函数检查消息是否为缓存消息。此函数返回一个 CacheStatus 枚举,可以是以下三个值之一:

  • LIVE — 消息是从 PubSub+ 事件代理直接检索的,而不是从缓存实例中检索的。

  • CACHED — 消息是从缓存实例中检索的。

  • SUSPECT — 消息是从可疑缓存实例中检索的。

您可以使用 get_cache_request_id() 函数从缓存消息中获取缓存请求标识符。以下示例展示了 MessageHandler 的一个实现,该实现检查入站消息的缓存状态,并从缓存消息中检索缓存请求 ID:

class MessageHandlerExample(MessageHandler):
def __init__(self, direct_receiver: DirectMessageReceiver):
self.receiver: DirectMessageReceiver = direct_receiver

def on_message(self, message: InboundMessage):
message_type = message.get_cache_status()
if message_type == CacheStatus.LIVE:
print("消息不是缓存的。")
elif message_type == CacheStatus.CACHED:
print("消息是缓存的。")
cache_request_id = message.get_cache_request_id()
else:
print("消息来自可疑缓存实例。请检查您的缓存实例。")

# 在 DirectMessageReceiver 实例上注册异步消息接收器。
direct_receiver.receive_async(MessageHandlerExample())

如果在缓存请求期间发生错误,或者消息接收器或 MessagingService 被终止,API 将取消所有未完成的缓存请求。此终止过程会阻塞您的应用程序,直到 API 已经向应用程序传递了所有已取消缓存请求的通知。

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

使用 CacheRequestOutcomeListener 类创建事件监听器

此类是一个回调,用于监听缓存请求的结果,并支持缓存请求标识符。它在缓存请求完成时通知您的应用程序。它必须包含一个函数,on_completion()

on_completion(result: CacheRequestOutcome, cache_request_id: int, exception: Union[Exception, None])

  • result — 缓存请求的结果,可以是以下枚举之一:

    • OK — 缓存数据在缓存回复中返回,或者缓存请求通过实时数据满足。
    • NO_DATA — 缓存回复中没有数据。
    • SUSPECT_DATA — 缓存回复中存在可疑数据。
    • FAILED — 请求因某种原因失败,例如超时。这是唯一一个在 on_completion() 回调中设置异常的 CacheRequestOutcome
  • cache_request_id — 与给定完成计算单元关联的缓存请求标识符。此 ID 可用于响应回调关联目的,以关联缓存请求及其相关消息。

  • exception — 如果缓存请求失败,则为 Exception。如果结果不是 FAILED,则异常为 None

以下示例展示了 CacheRequestOutcomeListener 的一个简单实现:

class MyCacheRequestOutcomeListener(CacheRequestOutcomeListener):
def on_completion(self, result: CacheRequestOutcome, cache_request_id: int, exception: Exception):
print("完成!")

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

接收缓存消息时的注意事项

使用 PubSub+ Python API 接收缓存消息时,需要注意一些事项。以下两个部分解释了当您有重叠的主题订阅时可能发生的情况,并提供了有关 cache_access_timeout 工作方式的额外信息。

尽量避免重叠的主题订阅

当您使用 PubSub+ Python API 消费缓存消息时,我们建议您不要有重叠的主题订阅。当您在一个或多个连接到同一 MessagingService 实例的消息消费者中使用相同或重叠的主题订阅时,就会发生重叠的主题订阅。如果您有重叠的主题订阅,了解缓存请求如何影响您的应用程序非常重要。

多个消息消费者与主题订阅重叠

下表解释了当一个消息消费者发出缓存请求时会发生什么

  • 多于一个消息消费者连接到同一个 MessagingService 实例

并且

  • 缓存请求中使用主题订阅与其中一个或多个消息消费者使用主题订阅重叠。
用于配置缓存请求的函数结果
as_available()缓存响应将缓存消息传递给连接到 MessagingService 实例的所有消息消费者,这些消费者具有匹配的主题订阅。
live_cancels_cached()缓存响应将缓存消息传递给连接到 MessagingService 实例的所有消息消费者,这些消费者具有匹配的主题订阅。
cached_first()缓存响应将缓存消息传递给连接到 MessagingService 实例的所有消息消费者,这些消费者具有匹配的主题订阅。
并且
所有具有实时数据订阅的消费者将停止接收实时数据,直到收到所有缓存消息,对于任何重叠的主题订阅都是如此。
cached_only()缓存响应将缓存消息传递给所有具有匹配主题订阅的消费者。

单个消息消费者与主题重叠

如果您在缓存消息请求中使用 cached_only() 函数,并且您的应用程序满足以下两个条件:

  1. 只有一个消息消费者连接到 MessagingService 实例。
  2. 缓存请求中使用主题订阅与消息消费者的任何实时主题订阅匹配。

您的消息消费者将在那些重叠的主题订阅上接收任何缓存消息的重复。为了避免这种情况下的消息重复,请使用 as_available()cached_first() 函数。

有关更多信息,请参阅 PubSub+ Python 消息 API 参考。

分配适当的 cache_access_timeout 值

cache_access_timeout 没有默认值。在大多数情况下,10,000(等于 10 秒)的值是足够的。此值为 Python API 和 PubSub+ 缓存实例之间发生的内部请求指定一个计时器。对 request_cached() 的单次调用可能导致一个或多个这样的内部请求。只要这些内部请求中的每一个都在指定的 cache_access_timeout 值之前完成,就不会发生超时。

例如,如果您指定 cache_access_timeout 为 3000 毫秒,并且有 10 个内部请求,每个请求完成需要 2000 毫秒,那么 request_cached() 函数返回所需的时间是这些请求的总和,即 20,000 毫秒。因为没有单个请求的完成时间超过 3000 毫秒,所以在这个场景中不会发生超时。

在某些场景中,例如高网络延迟以及您的应用程序使用的反向压力策略,可能需要增加 cache_access_timeout 的值。例如,如果您使用默认的反向压力策略 on_back_pressure_elastic(),即您有一个无限制的内部消息缓冲区,这种反向压力可能会导致 API 中的延迟,从而影响内部请求完成所需的时间。您可能需要增加 cache_access_timeout 的值以考虑这些延迟。