跳到主要内容

使用Go API与PubSub+缓存

PubSub+ 缓存是一个可扩展的、内存中的直接消息缓存。它允许客户端应用程序在上线时,或者当它们开始订阅原本未订阅的主题时,检索之前保存的主题消息。根据缓存请求的类型,客户端可以访问缓存请求发起后到达的保存消息和实时消息。有关设置 PubSub+ 缓存实例的信息,请参阅 PubSub+ 缓存。PubSub+ Go API 允许您的应用程序发送缓存请求,以接收您的接收应用程序感兴趣的缓存消息。以下部分描述了如何发送缓存请求和接收缓存消息:

要消费缓存消息,您的应用程序需要访问 ReceiverCacheRequests 接口。DirectMessageReceiver 接口包括 ReceiverCacheRequests 接口,因此需要一个 DirectMessageReceiver 实例来消费缓存消息。有关创建 DirectMessageReceiver 的更多信息,请参阅创建 DirectMessageReceiver。

  1. 创建 CachedMessageSubscriptionRequest 实例
  2. 发送缓存请求
  3. 检查接收消息中的缓存状态和缓存请求 ID
  4. 接收缓存消息时的注意事项

创建 CachedMessageSubscriptionRequest 实例

要发送缓存请求,您首先需要调用 NewCachedMessageSubscriptionRequest() 构造函数。此函数接受四个不同的常量之一,这些常量代表请求缓存消息的不同策略。选择最适合您的应用程序需求的一个:

  • 随时可用 (CacheRequestStrategyAsAvailable)
    • 返回一个 CachedMessageSubscriptionRequest 实例,用于配置缓存数据请求。如果您的接收器尚未订阅传递给函数参数的主题,则请求对象会向该主题添加一个实时数据订阅。这意味着您的应用程序将接收缓存请求的缓存消息和主题订阅的实时消息的混合。
  • 实时取消缓存 (CacheRequestStrategyLiveCancelsCached)
    • 返回一个 CachedMessageSubscriptionRequest 实例,该实例配置缓存请求应用程序以接收最新消息。如果您的接收器尚未订阅传递给函数参数的主题,则请求对象会向该主题添加一个实时数据订阅。如果没有实时(非缓存)消息,则认为与您选择的主题订阅匹配的缓存消息是最新消息。否则,认为与您选择的主题订阅匹配的实时消息是最新消息。
  • 缓存优先 (CacheRequestStrategyCachedFirst)
    • 返回一个 CachedMessageSubscriptionRequest 实例,该实例配置缓存请求以在有可用缓存消息时接收它们,随后接收实时(非缓存)消息。如果您的接收器尚未订阅传递给函数参数的主题,则请求对象会向该主题添加一个实时数据订阅。
  • 仅缓存 (CacheRequestStrategyCachedOnly)
    • 返回一个 CachedMessageSubscriptionRequest 实例,该实例配置您的应用程序仅在缓存请求期间接收缓存消息。
    • 仅当您的接收器没有与缓存请求订阅匹配的实时订阅时,才使用 CacheRequestStrategyCachedOnly,因为您的接收器会在重叠的主题订阅上接收任何缓存消息的重复。当您有与缓存请求主题订阅重叠的现有实时数据订阅时,使用 CacheRequestStrategyAsAvailableCacheRequestStrategyCachedFirst 以避免消息重复。

当您使用 CacheRequestStrategyAsAvailableCacheRequestStrategyLiveCancelsCachedCacheRequestStrategyCachedFirst 常量时,如果您的接收器上尚不存在该主题的实时订阅,则缓存请求会创建一个实时主题订阅。此订阅即使在缓存请求结束后也会持续存在。您可以通过在缓存结果返回后调用接收器对象上的 RemoveSubscription() 函数来移除该订阅,从而取消该订阅上对客户端应用程序的任何实时数据接收。

NewCachedMessageSubscriptionRequest() 函数返回一个 CachedMessageSubscriptionRequest 实例。该函数接受以下参数:

  • cachedMessageSubscriptionStrategy — 表示缓存订阅策略的常量。
  • cacheName — 表示要从中检索的 PubSub+ 缓存实例名称的字符串。
  • subscription — 缓存请求应匹配的主题订阅。
  • cacheAccessTimeout — 表示缓存请求超时(以毫秒为单位)的整数。此值必须在 3,000 到 2,147,483,647 之间,对于大多数应用程序,10,000(10 秒)的值是足够的。有关更多信息,请参阅分配适当的 cacheAccessTimeout 值。
  • maxCachedMessages — 表示从缓存中预期接收的最大消息数量的整数。值为 0 表示从缓存中接收的消息数量没有限制。
  • cachedMessageAge — 表示从缓存中检索的消息的最大年龄(以秒为单位)的整数。值为 0 表示从缓存中接收的消息年龄没有限制。

以下示例展示了如何使用 CacheRequestStrategyAsAvailable 策略创建 CachedMessageSubscriptionRequest 实例:

const (
myCacheInstanceName = "myCacheInstanceName"
cacheAccessTimeout = 4000
maxCachedMessages = 0
cachedMessageAge = 30
)

var myTopicSubscription = TopicSubscriptionOf("myTopic")

cachedMessageSubscriptionRequest, err := NewCachedMessageSubscriptionRequest(
CacheRequestStrategyAsAvailable,
myCacheInstanceName,
myTopicSubscription,
cacheAccessTimeout,
maxCachedMessages,
cachedMessageAge,
)

有关更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

发送缓存请求

要创建缓存请求并开始从 PubSub+ 缓存实例接收缓存消息,您需要使用 ReceiverCacheRequests 接口中的 RequestCachedAsync()RequestCachedAsyncWithCallback() 函数。如果您在接收缓存结果之前发出多个缓存请求,则需要使用唯一的缓存请求 ID,以避免接收重复消息。这些函数从设置为使用 PubSub+ 缓存的事件代理请求之前缓存的消息。

  • 使用通道发送缓存请求
  • 使用回调发送缓存请求
  • 缓存响应

使用通道发送缓存请求

RequestCachedAsync() 函数提供了一种基于通道的缓存消息请求方法。它返回一个 Go 通道,该通道异步接收 CacheResponse 对象。这些响应包含有关缓存请求结果的信息。有关更多信息,请参阅缓存响应。该函数接受以下参数:

RequestCachedAsync(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID message.CacheRequestID) (<- chan CacheResponse, error)

  • cachedMessageSubscriptionRequest — 请求与指定订阅匹配的缓存消息。
  • cacheRequestID — 缓存请求标识符,可用于响应回调关联目的。此 ID 必须在 MessagingService 实例的生命周期内是唯一的,应用程序有责任保证这一点。未能做到这一点将导致未定义的行为。此值作为函数返回的通道中的 CacheResponse 的一部分接收。此标识符是应用程序关联缓存请求与缓存响应和数据消息的方式。实时消息没有缓存请求 ID。应用程序有责任确保标识符是唯一的,以避免关联中的冲突。此值不应与 InboundMessage 类中用于一般消息关联的 correlationID 属性混淆。

以下示例展示了如何使用 RequestCachedAsync() 函数发送缓存请求:

cacheRequestID := 12345

responseChannel, err := directMessageReceiver.RequestCachedAsync(cachedMessageSubscriptionRequest, cacheRequestID)
if err != nil {
fmt.Printf("Failed to request cache: %s\n", err)
return
}

// 等待通道上的响应
cacheResponse := <-responseChannel
if cacheResponse.GetError() != nil {
fmt.Printf("Cache request failed: %s\n", cacheResponse.GetError())
return
}

fmt.Printf("Cache request %d completed with outcome: %d\n",
cacheResponse.GetCacheRequestID(),
cacheResponse.GetCacheRequestOutcome())

使用回调发送缓存请求

RequestCachedAsyncWithCallback() 函数提供了一种基于回调的缓存消息请求方法。它通过提供的回调函数处理缓存响应。该函数接受以下参数:

  • RequestCachedAsyncWithCallback(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID message.CacheRequestID, callback func(CacheResponse)) error
    • cachedMessageSubscriptionRequest — 请求与指定订阅匹配的缓存消息。
    • cacheRequestID — 请求标识符,可用于响应回调关联目的。此 ID 必须在 MessagingService 实例的生命周期内是唯一的,应用程序有责任保证这一点。未能做到这一点将导致未定义的行为。此值作为传递给提供的回调函数的 CacheResponse 的一部分接收。此标识符是应用程序关联缓存请求与缓存响应和数据消息的方式。实时消息没有 cacheRequestID。应用程序有责任确保标识符是唯一的,以避免关联中的冲突。此值不应与 InboundMessage 类中用于一般消息关联的 correlationID 属性混淆。
    • callback — 接受一个回调函数,该函数将为每个接收到的缓存响应调用。回调接收一个 CacheResponse 参数,该参数提供有关从缓存接收到的响应的信息。有关更多信息,请参阅缓存响应。

以下示例展示了如何使用 RequestCachedAsyncWithCallback() 函数发送缓存请求:

cacheRequestID := 12345

cacheCallback := func(cacheResponse CacheResponse) {
if cacheResponse.GetError() != nil {
fmt.Printf("Cache request failed: %s\n", cacheResponse.GetError())
return
}

fmt.Printf("Cache request %d completed with outcome: %d\n",
cacheResponse.GetCacheRequestID(),
cacheResponse.GetCacheRequestOutcome())
}

err := directMessageReceiver.RequestCachedAsyncWithCallback(cachedMessageSubscriptionRequest, cacheRequestID, cacheCallback)
if err != nil {
fmt.Printf("Failed to request cache: %s\n", err)
return
}

缓存响应

CacheResponse 接口提供了有关从缓存接收到的响应的信息:

  • CacheRequestOutcome — 缓存请求的结果,可以是以下常量之一:

    • CacheRequestOutcomeOk — 缓存数据已在缓存回复中返回,或者缓存请求已通过实时数据完成。
    • CacheRequestOutcomeNoData — 缓存回复中没有数据。
    • CacheRequestOutcomeSuspectData — 缓存回复中存在可疑数据。
    • CacheRequestOutcomeFailed — 请求因某种原因失败,例如超时。这是唯一导致 CacheResponse 返回错误的 CacheRequestOutcome
  • CacheRequestID — 与提交的缓存请求相关联的缓存请求标识符。此 ID 可用于响应回调关联目的,以关联缓存请求与其相关消息。

  • error — 如果缓存响应成功,则此值为 nil。如果 CacheRequestOutcomeCacheRequestOutcomeFailed,则 CacheResponse 返回一个错误。

检查接收消息中的缓存状态和缓存请求 ID

发出缓存请求后,您的 DirectMessageReceiver 将开始接收可能是缓存的或实时的消息。您可以使用 GetCacheStatus() 函数检查消息是否为缓存消息。此函数返回一个 CacheStatus 常量,可以是以下三个值之一:

  • Live — 消息是从 PubSub+ 事件代理直接检索的,而不是从缓存实例检索的。

  • Cached — 消息是从缓存实例检索的。

  • Suspect — 消息是从可疑缓存实例检索的。

您可以使用 GetCacheRequestID() 函数从缓存消息中获取缓存请求标识符。以下示例展示了 MessageHandler 的实现,该实现检查入站消息的缓存状态,并从缓存消息中检索缓存请求 ID:

func messageHandler(inboundMessage message.InboundMessage) {
if inboundMessage.GetCacheStatus() == message.Cached {
requestID, ok := inboundMessage.GetCacheRequestId()
if !ok {
fmt.Println("failed to retrieve cache request ID from cached message")
return
}
fmt.Printf("Message is cached from cache response %d", requestID)
} else if inboundMessage.GetCacheStatus() == message.Suspect {
requestID, ok := inboundMessage.GetCacheRequestId()
if !ok {
fmt.Println("failed to retrieve cache request ID from suspect message")
return
}
fmt.Printf("Message is suspect from cache response %d, check on cache", requestID)
} else {
fmt.Println("Message is live, continue")
requestID, ok := inboundMessage.GetCacheRequestId()
if ok {
fmt.Println("unexpected: retrieved cache request ID from live message")
return
}
// 如果缓存请求 ID 不可用,返回的值为 0:
fmt.Printf("As expected, no cache request ID found on live message (returned value: %d)\n", requestID)
}
}

directMessageReceiver.ReceiveAsync(messageHandler)

如果在缓存请求期间发生错误,或者消息接收器或 MessagingService 被终止,PubSub+ Go API 将取消所有未完成的缓存请求。终止过程会阻塞,直到 PubSub+ Go API 将所有缓存响应传递给应用程序。对于使用 RequestCachedAsync() 函数提交的缓存请求,API 会将响应推送到通道,关闭通道,然后完成终止。对于使用 RequestCachedAsyncWithCallback() 提交的缓存请求,应用程序 必须 在 API 能够完成终止之前执行提供给 API 的回调。

有关更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

接收缓存消息时的注意事项

使用 PubSub+ Go API 接收缓存消息时,需要注意以下几点:

  • 最大并发缓存请求
  • 尽量避免主题订阅重叠
  • 分配适当的 cacheAccessTimeout

最大并发缓存请求

应用程序最多可以提交 1024 个并发的、未完成的缓存请求。如果您的 PubSub+ Go API 应用程序超过了这个数量,就会导致错误。

尽量避免主题订阅重叠

当您使用 PubSub+ Go API 消费缓存消息时,我们建议您尽量避免主题订阅重叠。主题订阅重叠是指在连接到同一个 MessagingService 实例的一个或多个消息消费者中使用相同或重叠的主题订阅。如果您有主题订阅重叠,了解缓存请求如何影响您的应用程序非常重要。

多个消息消费者主题订阅重叠

下表解释了当消息消费者发出缓存请求时会发生什么,如果

  • 有多个消息消费者连接到同一个 MessagingService 实例

并且

  • 缓存请求中使用主题订阅与这些消息消费者中的一个或多个使用的主题订阅重叠。
用于配置缓存请求的策略结果
随时可用缓存响应将缓存消息传递给连接到 MessagingService 实例的所有消息消费者,这些消费者具有匹配的主题订阅。
实时取消缓存缓存响应将缓存消息传递给连接到 MessagingService 实例的所有消息消费者,这些消费者具有匹配的主题订阅。
缓存优先缓存响应将缓存消息传递给连接到 MessagingService 实例的所有消息消费者,这些消费者具有匹配的主题订阅。
并且
所有具有实时数据订阅的消费者在接收到任何重叠主题订阅的所有缓存消息之前,停止接收实时数据。
仅缓存缓存响应将缓存消息传递给所有具有匹配主题订阅的消费者。

单个消息消费者主题重叠

如果您的缓存消息请求中使用了 CacheRequestStrategyCachedOnly 策略,并且您的应用程序满足以下两个条件:

  1. 只有一个消息消费者连接到 MessagingService 实例。

  2. 缓存请求中使用主题订阅与消息消费者的任何实时主题订阅匹配。

您的消息消费者将在那些重叠的主题订阅上接收任何缓存消息的重复。为了避免这种情况下的消息重复,使用 CacheRequestStrategyAsAvailableCacheRequestStrategyCachedFirst 策略。

有关更多信息,请参阅 Go 的 PubSub+ 消息 API 参考。

分配适当的 cacheAccessTimeout

cacheAccessTimeout 没有默认值。在大多数情况下,10,000(等于 10 秒)的值是足够的。此值为 PubSub+ Go API 与 PubSub+ 缓存实例之间发生的内部请求指定一个计时器。对 requestCached() 的单次调用可能导致一个或多个这些内部请求。只要这些内部请求在指定的 cacheAccessTimeout 值之前完成,就不会发生超时。

例如,如果您指定 cacheAccessTimeout 为 3000 毫秒,并且有 10 个内部请求,每个请求完成需要 2000 毫秒,那么 RequestCachedAsync()RequestCachedAsyncWithCallback() 函数返回所需的时间是这些请求的总和,即 20,000 毫秒。因为没有单个请求的完成时间超过 3000 毫秒,所以在这个场景中不会发生超时。

在某些场景中,例如网络延迟较高时,可能需要增加 cacheAccessTimeout 的值。