跳到主要内容

在PubSub+ JCSMP API中检测重复消息

在 PubSub+ JCSMP API 中,保证消息包含一个复制组消息 ID。此消息 ID 可用于检测重复消息投递,以便应用程序尝试避免多次处理相同的消息。

要获取消息的复制组消息 ID,请使用 XMLMessage.getReplicationGroupMessageId()

特定消息的 ID 仅在复制组内的特定队列或主题端点上的该消息副本中保证相同。同一复制组内不同队列或主题端点上的相同消息可能具有相同的复制组消息 ID,也可能没有。通过网桥、DMR 链接、MNR 链接或应用程序的多次发布在不同复制组中传播的相同消息将具有不同的复制组消息 ID。因此,ID 的预期用途是供从复制组内的特定队列或主题端点消费消息的应用程序避免从该队列或主题端点多次处理相同的消息。请注意,如果消息从队列或主题端点移动到其关联的死信队列,消息将获得不同的复制组消息 ID。

可以使用以下技术来检测重复消息,每种技术都有其优缺点。您决定使用的技术将取决于应用程序的性质。

  • 在数据库中查找复制组消息 ID

  • 比较复制组消息 ID

这两种技术都需要将复制组消息 ID 转换为字符串,应用程序可以将其持久存储。

要将复制组消息 ID 转换为字符串,请使用 ReplicationGroupMessageId.toString()

有关更多信息,请参阅 PubSub+ JCSMP 消息传递 API 参考。

在数据库中查找复制组消息ID

只有维护数据库中处理的消息历史记录的应用程序才适合使用此技术。使用此技术的应用程序将遵循以下步骤:

  1. 接收消息。

  2. 将消息的复制组消息 ID 转换为字符串。

  3. 在数据库中查找复制组消息 ID 的字符串形式。

  4. 如果找到,则该消息已被处理。确认该消息以将其从队列中删除。

  5. 如果未找到,请处理该消息,并将字符串存储在数据库中,确保所有这些操作作为一个原子事务完成。

比较复制组消息ID

此技术要求仅存储最后处理的消息 ID。但是,存在比较两个 ID 可能失败的情况。尽管这些失败被认为是罕见的,但如果正在比较的消息源自不同的复制站点,则可能会发生。另一方面,这些失败在检测复制故障转移后的重复项时很有用,因为故障转移后重复项的最大可能性涉及源自新激活复制站点的配对站点的消息。

由于复制组消息 ID 对象的比较可能会失败,因此该对象的比较函数故意未实现某些 API 本地语言中存在的标准比较接口。这些接口假设失败的唯一原因是应用程序错误,并且不允许指定由应用程序处理的检查异常。

此技术最有可能无法检测到重复项的情况是,如果在客户端消费完上一次复制故障转移中源自配对复制站点的所有消息之前发生了复制故障转移。

使用此技术的应用程序将遵循以下步骤:

  1. 在启动时,应用程序将从持久存储中检索最后处理的消息的复制组消息 ID,并将此字符串转换为应用程序最后处理的复制组消息 ID 对象。此字符串必须等于通过使用 API 将消息的复制组消息 ID 转换为字符串而先前获得的字符串。

  2. 接收消息。

  3. 检索消息的复制组消息 ID。

  4. 将消息的复制组消息 ID 与最后处理的消息进行比较。

  5. 如果最后处理的消息不是更新的,则将该消息视为重复项并丢弃。

  6. 如果比较失败,应用程序必须决定如何继续。大多数应用程序将希望确保没有丢失消息的风险,并希望将该消息视为“未处理”,因此它将像处理更新的消息一样处理该消息。

  7. 如果最后处理的消息是更新的,则应用程序将消息的复制组消息 ID 转换为字符串,然后处理该消息,并将字符串持久存储为最后处理的消息作为事务。此持久存储将在下一次应用程序启动时如步骤 1 中所述读取。然后,应用程序的最后处理的复制组消息 ID 更新为该消息的复制组消息 ID 对象,然后接收下一条消息。

要将字符串转换为复制组消息 ID,请使用 JCSMPFactory.createReplicationGroupMessageId(…)

要比较复制组消息 ID 与另一个复制组消息 ID,请使用 ReplicationGroupMessageId.compare(…)

有关更多信息,请参阅 PubSub+ JCSMP 消息传递 API 参考。