跳到主要内容

检测重复消息

保证消息包含一个复制组消息 ID。此消息 ID 可用于检测重复消息投递,以便应用程序尝试避免多次处理相同的消息。

PubSub+ 消息 API使用方法
JCSMPXMLMessage.getReplicationGroupMessageId()
Java RTOMessageHandle.getReplicationGroupMessageIdForHandle(…)
CsolClient_msg_getReplicationGroupMessageId(…)
.NETIMessage.ReplicationGroupMessageId
JavaScript 和 Node.jsMessage.getReplicationGroupMessageId()

如何获取消息的复制组消息 ID

特定消息的 ID 仅保证在复制组内的特定队列或主题端点上,该消息的副本具有相同的 ID。在同一个复制组内的不同队列或主题端点上,相同的消息可能具有相同的复制组消息 ID,也可能没有。通过桥接、DMR 链接、MNR 链接或应用程序的多次发布,将相同的消息传播到不同的复制组时,这些消息将具有不同的复制组消息 ID。因此,ID 的预期用途是供从复制组内的特定队列或主题端点消费消息的应用程序使用,以避免从该队列或主题端点多次处理相同的消息。请注意,如果消息从队列或主题端点移至其关联的死信队列,消息将获得不同的复制组消息 ID。

以下是用于检测重复消息的技术,每种技术都有其优缺点。您选择使用的技术将取决于应用程序的性质。

  • 在数据库中查找复制组消息 ID
  • 比较复制组消息 ID

这两种技术都需要将复制组消息 ID 转换为字符串,以便应用程序可以持久存储。

PubSub+ 消息 API使用方法
JCSMPReplicationGroupMessageId.toString()
Java RTOReplicationGroupMessageIdHandle.toString()
CsolClient_replicationGroupMessageId_toString(…)
.NETIReplicationGroupMessageId.ToString()
JavaScript 和 Node.jsReplicationGroupMessageId.toString()

如何将复制组消息 ID 转换为字符串

在数据库中查找复制组消息ID

此技术仅适用于维护数据库中处理消息历史记录的应用程序。使用此技术的应用程序将遵循以下步骤:

  1. 接收一条消息。
  2. 将消息的复制组消息 ID 转换为字符串。
  3. 在数据库中查找复制组消息 ID 的字符串形式。
  4. 如果找到,则该消息已被处理。确认该消息以从队列中删除。
  5. 如果未找到,请处理该消息,并将字符串存储在数据库中,确保所有操作作为一个原子事务完成。

比较复制组消息ID

此技术要求仅存储最后处理的消息 ID。然而,存在比较两个 ID 时可能会失败的情况。虽然这些失败被认为是罕见的,但如果要比较的消息源自不同的复制站点,则可能会发生。另一方面,这些失败在检测复制故障切换后的重复消息时非常有用,因为故障切换后重复消息的最大可能性涉及源自新激活复制站点的配对站点的消息。

由于复制组消息 ID 对象的比较可能会失败,因此对象的比较函数故意没有实现某些 API 原生语言中存在的标准比较接口。这些接口假设失败的唯一原因是应用程序错误,并且不允许指定由应用程序处理的检查异常。

使用此技术的应用程序将遵循以下步骤:

  1. 在启动时,应用程序将从持久存储中检索最后处理的消息的复制组消息 ID,并将此字符串转换为应用程序的最后处理的复制组消息 ID 对象。此字符串必须等于通过 API 将消息的复制组消息 ID 转换为字符串时之前获得的字符串。
  2. 接收一条消息。
  3. 检索消息的复制组消息 ID。
  4. 将消息的复制组消息 ID 与最后处理的消息进行比较。
  5. 如果最后处理的消息不是更新的,则将该消息视为重复消息并丢弃。
  6. 如果比较失败,应用程序必须决定如何继续。大多数应用程序希望确保没有丢失消息的机会,并希望将消息视为“未处理”,因此它将像处理更新的消息一样处理该消息。
  7. 如果最后处理的消息是更新的,应用程序将消息的复制组消息 ID 转换为字符串,然后处理该消息,并将字符串持久存储为最后处理的消息作为事务。此持久存储将在下次应用程序启动时读取,如步骤 1 中所述。然后在接收下一条消息之前,将应用程序的最后处理的复制组消息 ID 更新为该消息的复制组消息 ID 对象。
PubSub+ 消息 API使用方法
JCSMPJCSMPFactory.createReplicationGroupMessageId(…)
Java RTOSolclient.createReplicationGroupMessageIdForHandle(…)
CsolClient_replicationGroupMessageId_fromString(…)
.NETContextFactory.CreateReplicationGroupMessageId(…)
JavaScript 和 Node.jsSolclientFactory.createReplicationGroupMessageId(…)

如何将字符串转换为复制组消息ID

PubSub+ 消息 API使用方法
JCSMPReplicationGroupMessageId.compare(…)
Java RTOReplicationGroupMessageIdHandle.compare(…)
CsolClient_replicationGroupMessageId_compare(…)
.NETIReplicationGroupMessageId.Compare(…)
JavaScript 和 Node.jsReplicationGroupMessageId.compare(…)

如何比较复制组消息ID与另一个复制组消息ID