跳到主要内容

确认客户端接收的消息

消息 API 为通过流接收的保证消息向 PubSub+ 事件代理提供确认。下图展示了应用程序通过流接收的保证消息的确认过程。在此过程中,客户端应用程序还可以为格式错误的消息或无法处理的消息发送负面确认(NACK)。

确认已接收的保证消息

API确认

保证消息窗口大小限制了 API 在必须向事件代理返回确认以表明已接收窗口中的消息之前可以接收的消息数量。API 发送此确认后,保证消息窗口将重新打开,以便进一步的消息可以发送到 API。

应用程序可以通过更改通过流属性设置的默认确认计时器和阈值参数来调整窗口确认(参阅重要流(消息消费者)属性)。通常不需要更改这些默认值,这将改变流的性能特征。

应用程序确认

可以使用以下两种确认模式之一来确认消息:

  • 自动确认
  • 客户端确认

要使用的确认模式通过以下列出的流属性之一设置。默认情况下,使用自动确认模式。

下表展示了如何为不同 API 执行确认:

PubSub+ 消息 API使用方法
JCSMPConsumerFlowProperties.setAckMode(String ackMode)
可能的值包括:
  • JCSMPProperties.SUPPORTED_MESSAGE_ACK_AUTO
  • JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT | | Java RTO | FlowHandle.PROPERTIES.ACKMODE
    SolEnum.AckMode 中可能的值包括:
  • AckMode.AUTO
  • AckMode.CLIENT | | C | SOLCLIENT_FLOW_PROP_ACKMODE。可能的值包括:
  • SOLCLIENT_FLOW_PROP_ACKMODE_CLIENT
  • SOLCLIENT_FLOW_PROP_ACKMODE_AUTO(默认值) | | .NET | FlowProperties.AckMode
    可能的值包括:
  • AutoAck
  • ClientAck | | JavaScript 和 Node.js | solace.MessageConsumerProperties.acknowledgeMode
    可能的值包括:
  • solace.MessageConsumerAcknowledgeMode.AUTO
  • solace.MessageConsumerAcknowledgeMode.CLIENT |

自动确认模式

当使用自动确认模式时,API 自动生成应用程序级确认。

对于 JCSMP,确认发送的时间取决于消息是异步接收还是同步接收:

  • 当异步接收时,确认在消息回调完成之后发送。
  • 当同步接收时,确认在 receive() 方法期间从 API 的内部队列中移除消息后发送。重要的是要意识到,在控制权返回到应用程序之前(即在 receive() 方法完成后),确认已经发送。

客户端确认模式

当使用客户端确认模式时,客户端必须显式为每条接收到的消息的 ID 发送确认。您还可以选择使用负面确认。有关更多信息,请参阅特定消息的负面确认。

要显式发送客户端确认,请调用以下列出的方法之一。

避免未确认消息的数量变得过大(例如,10,000 条或更多),因为事件代理的出站消息速率可能会开始下降。

PubSub+ 消息 API使用方法
JCSMPXMLMessage.ackMessage()
Java RTOFlowHandle.ack(...)
有关信息,请参阅 Java RTO 的 PubSub+ 消息 API。
CsolClient_flow_sendAck(...)
.NETIFlow.Ack(...)
JavaScript 和 Node.jssolace.Message.acknowledge()
确认是在消息对象上调用的。

如何发送显式客户端确认

当使用 C API 的客户端确认模式时,可以通过 SOLCLIENT_FLOW_PROP_MAX_UNACKED_MESSAGES 流属性设置 API 可以通过流向应用程序传递的消息数量上限,而无需接收客户端确认。默认情况下,此属性的值为 -1,表示 API 不限制可以传递的未确认消息的最大数量。(要更改现有流可以通过其接收的未确认消息的最大数量,请调用 solClient_flow_setMaxUnacked()。)

特定消息的负面确认

如果已为客户端确认配置应用程序(参阅客户端确认模式),则可以使用负面确认(NACK)。使用 NACK 时,您可以发送结算结果,以告知事件代理已接收的保证消息的处理结果。根据结算结果,事件代理知道如何在其队列中处理该消息。您可以使用以下结算结果:

  • ACCEPTED — 此确认通知事件代理您的客户端应用程序已成功处理保证消息。当事件代理收到此结果时,它会从其队列中移除该消息。
    • 当您使用结果为 ACCEPTED 的结算函数/方法时,与在客户端确认模式下调用确认函数/方法相同。
  • FAILED — 此负面确认通知事件代理您的客户端应用程序未处理该消息。当事件代理收到此负面确认时,它会尝试在遵守投递次数限制的情况下重新投递该消息。
  • REJECTED — 此负面确认通知事件代理您的客户端应用程序已处理该消息,但未接受(例如,验证失败)。当事件代理收到此负面确认时,它会从其队列中移除该消息,然后如果已配置,则将该消息移至死信队列(DMQ)。

在使用 NACK 之前,您必须在创建流时将 FAILED、REJECTED 或两者都作为 NACK 类型添加,以便为流准备使用负面确认。您不需要添加 ACCEPTED 结果,因为它始终可用。如果尝试使用未添加的结果,您将收到 Required Settlement Outcome Not Supported 错误。

  • NACK 在传输过程中可能会丢失(例如,由于意外的网络问题)。在开发应用程序时,将这一事实纳入处理消息的逻辑中。
  • NACK 在 10.2.1 及更高版本的事件代理上受支持。如果事件代理不支持 NACK,则在流绑定请求期间指定结果时会发生 InvalidOperationException
PubSub+ 消息 API调用方法
JCSMP启用并准备流以使用负面确认:
ConsumerFlowProperties.addRequiredSettlementOutcomes(Outcome outcometoadd)
您需要添加以下一个或两个:
  • Outcome.FAILED
  • Outcome.REJECTED
    Outcome.ACCEPTED 不需要设置且始终受支持。
    您可以使用以下方法进行确认或负面确认:
    XMLMessage.settle(settlement_outcome)
    可能使用的值包括:
  • Outcome.ACCEPTED
  • Outcome.FAILED
  • Outcome.REJECTED | | Java RTO | 要启用并准备流以使用负面确认,请将 REQUIRED_OUTCOME_FAILEDREQUIRED_OUTCOME_REJECTED 或两者都设置为 ENABLE
    | JavaScript 和 Node.js | 在创建流时,启用并在消息消费者上设置以下属性以使用负面确认:
    requiredSettlementOutcomes: [MessageOutcome.FAILED, MessageOutcome.REJECTED]
    MessageOutcome.ACCEPTED 不需要与 requiredSettlementOutcomes 一起设置且始终受支持。
    您可以使用以下方法发送确认或负面确认:
    Message.settle(messageOutcome)
    对于 messageOutcome,您使用以下值之一:
  • MessageOutcome.ACCEPTED
  • MessageOutcome.REJECTED
  • MessageOutcome.FAILED |

使用 Java RTO API 的 NACK 示例

以下示例代码展示了如何使用 PubSub+ Java RTO 消息 API 发送 NACK:

// session 必须具有 SolEnum.CapabilityName.AD_APP_ACK_FAILED 能力,才能支持负面 MessageOutcomes
// MessageOutcome.ACCEPTED 始终受支持

FlowHandle flowHandle = Solclient.Allocator.newFlowHandle();

// 设置流属性
int flowProps = 0;
String[] flowProperties = new String[10];
flowProperties[flowProps++] = FlowHandle.PROPERTIES.ACKMODE;
flowProperties[flowProps++] = SolEnum.AckMode.CLIENT;
flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_FAILED;
flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE;
flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_REJECTED;
flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE;

sessionHandle.createFlowForHandle(flowHandle, flowProperties, queue, null,
new MessageCallbackSample() {
@Override
public void onMessage(Handle handle) {
FlowHandle flowHandle = (FlowHandle) handle;
MessageHandle msgHandle = flowHandle.getRxMessage();
long msgId = msgHandle.getGuaranteedMessageId();

/* 示例 try-catch 块,用于处理和结算消息 */
try {
processMessage(msgHandle);
flowHandle.settle(msgId, MessageOutcome.ACCEPTED);
} catch (Exception e) {
flowHandle.settle(msgId, MessageOutcome.FAILED);
}
}
}, flowEventCallback);

使用 JCSMP API 的 NACK 示例

以下示例代码展示了如何使用 PubSub+ JCSMP 消息 API 发送 NACK:

...
...
// 已使用 JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT 创建会话
final ConsumerFlowProperties cfp = new ConsumerFlowProperties().setEndpoint(q);

// 添加结算结果 — Outcome.ACCEPTED 不需要添加,因为它始终包含
// 消费者可以添加多个结果,例如 cfp.addRequiredSettlementOutcomes(Outcome.FAILED, Outcome.REJECTED);
cfp.addRequiredSettlementOutcomes(Outcome.FAILED);

final FlowReceiver flowReceiver = session.createFlow(null, cfp);
BytesXMLMessage msg = flowReceiver.receive(1000);
try {
processMessage(msg);
msg.settle(Outcome.ACCEPTED); // 与 msg.ackMessage() 相同
} catch (Exception e) {
msg.settle(Outcome.FAILED); // 未能处理消息,结算为 FAILED
}
...
...

使用 C API 的 NACK 示例

对于 PubSub+ 消息 API,要使用 NACK,您必须将 SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILEDSOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_REJECTED 或两者都设置为 SOLCLIENT_PROP_ENABLE_VAL

以下示例代码使用虚构的 processMessage() 函数处理保证消息,该函数提供处理状态,以便您可以使用 solClient_flow_settleMsg() 函数发送结算结果:

/* 假设您已经有一个会话和端点(队列) */
/* 已被客户端接受的消息计数 */
static int msgCount = 0;
static int msgCountSuccess = 0;
static int msgCountFailed = 0;
static int msgCountRejected = 0;

/* 消息接收回调 */
static solClient_rxMsgCallback_returnCode_t flowMessageReceiveCallback(solClient_opaqueFlow_pt flow_p,
solClient_opaqueMsg_pt msg_p,
void *user_p) {
solClient_msgId_t msgId;
int status = 0;

/* 处理消息 */
printf("Received message:\n");
// 某个函数用于处理消息并返回状态
status = processMessage(msg_p);
solClient_msg_getMsgId(msg_p, &msgId);
msgCount++;

/* 如果状态良好 */
if (status == 0) {
printf("Message passed validation\n");
solClient_flow_settleMsg(flow_p, msgId, SOLCLIENT_OUTCOME_ACCEPTED);
msgCountSuccess++;
} else if (status == 1) {
printf("Message failed validation\n");
solClient_flow_settleMsg(flow_p, msgId, SOLCLIENT_OUTCOME_REJECTED);
msgCountRejected++;
} else {
printf("Failed to process message.\n");
solClient_flow_settleMsg(flow_p, msgId, SOLCLIENT_OUTCOME_FAILED);
msgCountFailed++;
}
}
...
...
int main(int argc, char *argv[]) {
/* 假设已经创建了会话 */
/* 配置流功能信息 */
flowFuncInfo.rxMsgInfo.callback_p = flowMessageReceiveCallback;
flowFuncInfo.eventInfo.callback_p = flowEventCallback;

if (!solClient_session_isCapable(session_p, SOLCLIENT_SESSION_CAPABILITY_AD_APP_ACK_FAILED)) {
exit(1);
}

/* 必须添加这两个属性,如果想使用 FAIL 和 REJECT 作为结果,这将准备流以使用负面确认 */
props[propIndex++] = SOLCLIENT_FLOW_PROP_ACKMODE;
props[propIndex++] = SOLCLIENT_FLOW_PROP_ACKMODE_CLIENT;
props[propIndex++] = SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED;
props[propIndex++] = SOLCLIENT_PROP_ENABLE_VAL;
props[propIndex++] = SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_REJECTED;
props[propIndex++] = SOLCLIENT_PROP_ENABLE_VAL;
props[propIndex++] = NULL;

...
...
solClientRc = solClient_session_createFlow(props,
session_p,
&flow_p,
&flowFuncInfo,
sizeof(flowFuncInfo));

/* 等待消息 */
printf("Waiting for messages......\n");
fflush(stdout);
while (msgCount < 1) {
SLEEP(1);
}
...
...
}

使用 .NET API 的 NACK 示例

对于 PubSub+ .NET (C#) API,您必须将 RequiredOutcomeFailedRequiredOutcomeRejected 或两者都设置为 true 才能使用 NACK。以下示例代码展示了如何使用虚构的 messageProcessingObject.process() 方法发送 NACK,该方法确定通过 IFlow.Settle() 方法发送的结算结果:

// 假设您已经有一个会话和端点(队列)
// 使用对象实例化设置使用 NACK 所需的属性
FlowProperties flowProps = new FlowProperties()
{
AckMode = MessageAckMode.ClientAck,
RequiredOutcomeFailed = true,
RequiredOutcomeRejected = true
};

IFlow flow = session.CreateFlow(flowProps, queue, null,
new EventHandler<MessageEventArgs>(delegate (object source, MessageEventArgs args) {
// 收到消息并进行处理,基于结果使用虚构的 messageProcessingObject
if (messageProcessingObject.process(args.Message) == 1) {
// 接受消息
flow.Settle(args.Message.ADMessageId, MessageOutcome.Accepted);
} else if (messageProcessingObject.process(args.Message) == 3) {
// 拒绝消息
flow.Settle(args.Message.ADMessageId, MessageOutcome.Rejected);
} else {
// 未能处理消息
flow.Settle(args.Message.ADMessageId, MessageOutcome.Failed);
args.Message.Dispose();
}
}),
new EventHandler<FlowEventArgs>(delegate (object source, FlowEventArgs args) {
// 收到事件
}));
...
...

使用 JavaScript API 的 NACK 示例

以下示例代码展示了如何使用 PubSub+ JavaScript 消息 API 发送 NACK:

try {
// 创建支持 NACK 的消息消费者
const messageConsumer = session.createMessageConsumer({
// solace.MessageConsumerProperties
queueDescriptor: { name: 'QueueName', type: solace.QueueType.QUEUE, durable: true },
acknowledgeMode: solace.MessageConsumerAcknowledgeMode.CLIENT, // 启用客户端确认
// 会话已使用 SUPPORTED_MESSAGE_ACK_CLIENT 创建,以支持负面确认结果
// MessageOutcome.ACCEPTED 不需要设置且始终受支持
requiredSettlementOutcomes: [MessageOutcome.FAILED, MessageOutcome.REJECTED], // 为流设置结算结果。流类型将为 0x03
});

// 定义消息消费者事件监听器
messageConsumer.on(solace.MessageConsumerEventName.UP, function () {
// 消费者已启动
});

messageConsumer.on(solace.MessageConsumerEventName.CONNECT_FAILED_ERROR, function (error)
{
// 错误:消息消费者无法绑定到队列
});

messageConsumer.on(solace.MessageConsumerEventName.DOWN, function () {
// 消息消费者已关闭
});

messageConsumer.on(solace.MessageConsumerEventName.DOWN_ERROR, function (details) {
console.log('Received "DOWN_ERROR" event - details: ' + details);
});

// 定义消息接收事件监听器
messageConsumer.on(solace.MessageConsumerEventName.MESSAGE, function (message) {
console.log('Received message: "' + message.getBinaryAttachment() + '", details:\n' + message.dump());
// 使用新的 settle 方法从路由器确认/NACK 消息
message.settle(solace.MessageOutcome.ACCEPTED); // 与 message.acknowledge() 相同

// 在支持负面确认结果的代理会话上,也可以使用 FAILED 和 REJECTED 结果进行结算
// message.settle(solace.MessageOutcome.FAILED); // 与 message.acknowledge() 相同
// message.settle(solace.MessageOutcome.REJECTED); // 与 message.acknowledge() 相同
});

// 连接消息消费者
messageConsumer.connect();
} catch (error) {
// 创建消息消费者时发生错误
console.log(error.toString());
}