跳到主要内容

在PubSub+ JCSMP API中创建流

要接收保证消息,客户端必须在会话中创建一个消费者流,并将该流绑定到 PubSub+ 事件代理上的一个端点,消息会发布或吸引到该端点。一个会话中可以创建一个或多个流。

流是一个 API 对象,允许客户端从端点接收保证消息。在一个会话中最多可以创建 1,000 个流。

要在已连接的会话中创建流,请调用列出的适当创建流方法或函数,并提供以下内容:

  • 流属性

创建流时,必须提供流属性。重要流(消息消费者)属性表列出许多对消息传递 API 共有的重要流属性。有关所需流属性的完整列表、有效语法和参数以及默认值,请参阅相应消息传递 API 的 PubSub+ 消息传递 API。

  • 主题订阅

绑定到主题端点时使用主题订阅。此参数仅对 Java RTO 和 .NET API 必需。对于 Java、JavaScript、Node.js 和 C API,主题订阅在流属性中提供。

  • 消息处理器

对于每种消息传递 API,必须提供一个回调来处理流上接收的消息:

  • 对于 JCSMP,传入一个 XMLMessageListener 回调接口以异步接收消息。如果没有设置 XMLMessageListener(即,将消息监听器设置为 null),则以同步方式接收消息。
  • 流事件处理器

如果启用了活动流指示属性,还必须提供一个流事件回调、事件监听器或处理器,以便处理指示客户端是否有一个活跃流到独占队列的事件。参阅活动流指示。

  • 绑定到临时端点时的端点属性

有关端点属性列表及其默认值,请参阅相应消息传递 API 的 PubSub+ 消息传递 API。

由于直接消息是通过会话接口直接接收的,因此不使用消费者流来接收直接消息。

要通过逻辑流接收消息,客户端应用程序必须首先获取一个 FlowReceiver 实例。然后,要开始在流上接收消息,请调用 FlowReceiver.start()。要停止在流上接收消息,请调用 stop() 方法。

FlowReceiver 保持打开状态,直到调用 FlowReceiver.close() 方法。

要创建保证消息流:

  • 调用 JCSMPSession.createFlow(...) 以获取 FlowReceiver 实例。

  • 获取 FlowReceiver 后,调用 start() 以使用指定的异步 XMLMessageListener 回调接口或同步 receive(...) 调用来从底层连接开始接收消息。

  • 当使用同步接收模式时,在为 FlowReceiver 调用 start() 后,使用以下方法接收消息:

    • receive() — 接收下一个可用消息,并等待消息可用。
    • receive(int timeoutInMillis) — 接收下一个可用消息。如果没有消息可用,该方法将阻塞,直到设置的时间量到期。
    • receiveNoWait() — 接收下一个可用消息。
属性描述
活动流指示启用时,此属性允许生成活动流事件,以指示绑定到独占队列的客户端是否有一个活动流(即,正在投递消息的流)。\n\n有关详细信息,请参阅活动流指示。
消息确认模式设置 API 是否为每个接收的消息自动生成应用程序级确认(默认值),还是客户端必须明确确认每个接收的消息。\n\n当客户端提供确认时,相应的存储消息可以从事件代理上的端点中删除。\n\n此参数不影响保证消息窗口。\n\n有关消息确认模式的详细信息,请参阅 PubSub+ JCSMP API 中的客户端接收消息确认。
消息确认阈值发送窗口消息确认的阈值(设置为窗口大小的百分比)。\n\n这会影响流控制窗口确认。API 每 N 条消息发送一次传输确认。如果端点在绑定时的 max-delivered-unacked-msgs-per-flow 设置大于或等于传输窗口大小,则 N 为该百分比的流窗口大小;否则,N 为该百分比的端点在绑定时的 max-delivered-unacked-msgs-per-flow 设置。此阈值不控制应用程序级消息确认。
消息确认计时器在通过流接收保证消息后,必须发送确认的最大时间量(以毫秒为单位)。\n\n此参数用于保证消息已接收但未达到消息确认窗口阈值的情况。
端点要绑定到的端点。
主题订阅如果绑定到主题端点,要设置在端点上的主题订阅。
选择器用于选择消息进行投递的可选 SQL-92 选择器(参阅 PubSub+ JCSMP API 中的选择器使用)。
保证消息窗口大小通过流接收消息的最大数量,然后 API 必须向事件代理发送确认,确认已接收消息。\n\n有关详细信息,请参阅 PubSub+ JCSMP API 中的客户端接收消息确认。\n\n保证消息窗口大小不应超过在事件代理上配置的队列的 max-delivered-unacked-msgs-per-flow 值,否则直到消息确认时间值被超过,API 接收的消息将不会被确认。
启动状态创建时流是否应处于启动状态。在启动状态,流可以立即开始接收消息。
无本地启用时,即使订阅匹配发布消息的主题,同一会话中创建的活动流也无法接收在该会话上发布的消息。有关详细信息,请参阅无本地投递。
重新连接重试次数在收到原因代码为重放开始或服务不可用的错误响应后,流重新连接尝试的最大次数。有关详细信息,请参阅流重新连接。
重新连接重试间隔流重新连接尝试之间的等待时间。有关详细信息,请参阅流重新连接。

重要流(消息消费者)属性

相关示例

有关如何配置流属性和绑定到流的示例,请参阅 Solace 开发者中心上的 SimpleFlowToQueueSimpleFlowToTopic 示例。

活动流指示

如果队列具有独占访问类型(参阅 PubSub+ JCSMP API 中的定义端点属性),多个客户端可以绑定到队列,但一次只有一个客户端可以积极从队列中接收消息。因此,当客户端创建流并绑定到独占队列时,如果其他客户端已绑定到队列,则该客户端的绑定流可能不是活动流。

如果启用了活动流指示属性,则当客户端的绑定流变为活动流时,会向客户端返回流活动事件。每当客户端失去活动流时(例如,如果流断开连接),客户端也会收到流非活动事件。

客户端必须传入一个 FlowEventHandler,用于处理流活动和流非活动事件。

当流绑定到非独占队列时,忽略活动流指示属性,除非该队列是分区的。在分区队列的情况下,客户端逻辑上绑定到父(非独占)队列,但事件代理实际上将它们绑定到分区(独占)队列。因此,活动流指示可以与分区队列一起使用。

无本地投递

客户端可以发布非持久或持久消息到主题,并且如果同一客户端具有匹配的主题订阅,则可以在同一会话上的活动流中接收它们。要防止客户端接收其发布的任何非持久或持久消息,可以为客户端用于绑定独占队列(即,只允许一个绑定客户端接收消息的队列)或主题端点的流启用无本地属性,并且该流必须是端点的活动流。(当流绑定到非独占队列时,忽略无本地属性。)有关访问类型的更多信息,请参阅 PubSub+ JCSMP API 中的定义端点属性。

  • 无本地属性仅防止客户端发布的消息为其自身排队,前提是它已建立了活动流。如果客户端没有建立活动流,则发布到其订阅主题的消息仍然可以被排队。
  • 无本地属性也可以为会话启用。当为会话启用时,无本地属性仅影响在同一会话上发布的直接消息向客户端的投递。它不影响非持久或持久消息的投递。(有关详细信息,请参阅 PubSub+ JCSMP API 中的为会话启用无本地投递。)

当仅为会话启用无本地属性或仅为流启用时,事件代理可能会更改发布消息的投递模式,以便仍然可以将其投递到发布客户端。例如,如果客户端向主题“A”发布消息,则当为会话启用无本地时,该消息不能作为直接消息投递到该客户端,但如果未为绑定到具有主题订阅“A”的主题端点的流启用无本地,则该消息可以作为非持久消息在该客户端的流上投递。

有关在主题匹配时如何自动修改消息投递模式的信息,请参阅主题匹配和消息投递模式。

要为流启用无本地订阅,请使用 ConsumerFlowProperties.setNoLocal(boolean noLocal)

流重新连接

启用流重新连接时,所有 API 在收到错误响应时都会自动尝试重新连接流,只要错误响应字符串中给出的原因是以下之一:

  • 重放开始
  • 服务不可用 — 表示队列或主题端点已关闭或删除,或者代理上的保证投递服务已被禁用。

此外,当报告这些错误之一时,它们会伴随每个 API 中定义的 RECONNECTING 事件。如果重新连接成功,API 会生成 RECONNECTED 事件。如果重新连接失败,则会发生适当的 DOWN 错误事件。

此行为仅适用于已建立的流。如果消息代理因任何原因拒绝初始绑定,则将其报告为绑定失败事件,不会尝试重试。

流重新连接在 PubSub+ JCSMP API 10.7.0 及更高版本中受支持并默认启用。

启用时,所有 API 每三秒尝试重新连接一次,无限期进行。

可以通过在创建流时提供适当的属性来配置最大流重新连接尝试次数以及尝试之间的间隔。

要禁用流重新连接,请将重新连接重试次数设置为 0。

要设置流的重新连接重试次数,请使用 ConsumerFlowProperties.setReconnectTries

要设置流的重新连接重试间隔,请使用 ConsumerFlowProperties.setReconnectRetryIntervalInMsecs