跳到主要内容

创建流

为了接收保证消息,客户端必须在会话中创建一个消费者流,并将该流绑定到 Solace PubSub+ 事件代理上的一个端点,消息将被发布或吸引到该端点。在一个会话中可以创建一个或多个流。

流是一个 API 对象,允许客户端从端点接收保证消息。在一个会话中最多可以创建 1000 个流。

JavaScript 和 Node.js API 使用 MessageConsumer 的概念来表示流。

要在已连接的会话中创建流,请调用以下列出的适当创建流方法或函数,并提供以下内容:

  • 流属性
    创建流时,必须提供流属性。表“重要流(消息消费者)属性”列出了许多对消息 API 共同的重要流属性。有关所需流属性的完整列表、有效语法和参数以及默认值,请参阅适用于相应消息 API 的 PubSub+ 消息 API。JavaScript 和 Node.js API 使用 solace.MessageConsumerProperties 作为流属性。

  • 要绑定的端点
    仅 Java RTO 和 .NET API 需要此参数。对于 JavaScript、Node.js 和 C API,要绑定的端点在流属性中提供。

  • 主题订阅
    绑定到主题端点时使用。仅 Java RTO 和 .NET API 需要此参数。对于 JavaScript、Node.js 和 C API,主题订阅在流属性中提供。

  • 消息处理器
    对于每种消息 API,必须提供一个用于处理流上接收消息的回调:

    • 对于 Java RTO API,传入一个 MessageCallback
    • 对于 C API,传入一个指向结构体的指针,该结构体提供接收消息和事件的回调函数信息。
    • 对于 .NET API,传入一个 messageEventHandler
    • 对于 JavaScript 和 Node.js API,定义一个监听器来处理 solace.MessageConsumerEventName.MESSAGE 事件。
  • 流事件处理器
    如果启用了活动流指示属性,则还必须提供一个流事件回调、事件监听器或处理器,以便处理指示客户端是否有一个活动流绑定到独占队列的事件。有关更多信息,请参阅活动流指示。

  • 绑定到临时端点时的端点属性
    有关端点属性及其默认值的列表,请参阅适用于相应消息 API 的 PubSub+ 消息 API。

消费者流不用于接收直接消息,因为直接消息是通过会话接口直接接收的。

PubSub+ 消息 API调用方法
Java RTOSessionHandle.createFlowForHandle(...)
CsolClient_session_createFlow(...)
.NETISession.createFlow(...)
JavaScript 和 Node.jssolace.Session.createMessageConsumer(...)

如何创建保证消息流

属性描述
活动流指示启用后,此属性会生成活动流事件,以指示绑定到独占队列的客户端是否有一个活动流(即,正在投递消息的流)。有关更多信息,请参阅活动流指示。
消息确认模式设置 API 是否为每条接收的消息自动生成应用程序级确认(默认值),还是客户端必须显式确认每条接收的消息。当客户端提供确认时,相应的存储消息可以从事件代理上的端点移除。此参数不影响保证消息窗口。有关消息确认模式的更多信息,请参阅确认客户端接收的消息。
消息确认阈值发送窗口消息确认的阈值(设置为窗口大小的百分比)。这影响流控制窗口确认。API 每 N 条消息发送一次传输确认。如果端点在绑定时的 max-delivered-unacked-msgs-per-flow 设置大于或等于传输窗口大小,则 N 按此百分比计算为流窗口大小;否则,N 按此百分比计算为端点在绑定时的 max-delivered-unacked-msgs-per-flow 设置。此阈值不控制应用程序级消息确认。
消息确认计时器在通过流接收保证消息后,必须在确认消息之前经过的最大时间(以毫秒为单位)。此参数用于保证消息已接收但未达到消息确认窗口阈值的情况。
端点(Java RTO、C、.NET API)要绑定的端点。
队列描述符(JavaScript 和 Node.js API)定义要从中消费(绑定到)的队列:
  • 类型(队列或主题端点)
  • 名称(对于主题端点可选)
  • 持久性(如果未指定则为持久化) | | 队列属性(JavaScript 和 Node.js API) | 远程队列的属性。
  • 对于非持久化队列和主题端点,这些属性定义了要创建的队列。可能的值,请参阅在线 API 文档。
  • 对于持久化队列,这些属性在消费者创建时必须未设置。队列连接后将填充这些值。 | | 主题订阅 | 如果绑定到主题端点,则在端点上设置的主题订阅。 | | 选择器 | 一个可选的 SQL-92 选择器,用于选择要投递的消息(参阅使用选择器)。 | | 保证消息窗口大小 | 通过流接收消息的最大数量,然后 API 必须向事件代理发送确认消息已接收。有关更多信息,请参阅确认客户端接收的消息。
    保证消息窗口大小不应超过事件代理上为队列配置的 max-delivered-unacked-msgs-per-flow 值,否则 API 接收到的消息将不会被确认,直到超过消息确认时间值。 | | 启动状态 | 是否在创建时将流置于启动状态。在启动状态下,流可以立即开始接收消息。 | | 本地禁用 | 启用后,同一会话中发布的非持久化或持久化消息不能通过在同一会话中创建的活动流接收,即使订阅匹配发布消息的主题。有关更多信息,请参阅本地禁用投递。 | | 重新连接重试次数 | 在收到原因码为“重放已开始”或“服务不可用”的错误响应后,流重新连接的最大尝试次数。有关更多信息,请参阅流重新连接。 | | 重新连接重试间隔 | 流重新连接尝试之间的时间间隔。有关更多信息,请参阅流重新连接。 |

重要流(消息消费者)属性

相关示例

有关如何配置流属性并绑定到流的示例,请参考适用于相应消息 API 的 SimpleFlowToQueueSimpleFlowToTopic 示例。对于 JavaScript 和 Node.js API,请参考 QueueConsumerDTEConsumer 示例。

活动流指示

如果队列具有独占访问类型(参阅定义端点属性),多个客户端可以绑定到队列,但一次只有一个客户端可以从中接收消息。因此,当客户端创建一个流并绑定到独占队列时,如果其他客户端已绑定到队列,则该客户端的流可能不会处于活动状态。

如果启用了活动流指示属性,则当绑定的流变为活动流时,会向客户端返回一个流活动事件。客户端还会在失去活动流时收到一个流非活动事件(例如,如果流断开连接)。

客户端在创建流时必须提供以下内容:

  • 使用 Java 或 Java RTO API 时,传入一个 FlowEventHandler 以处理流活动和流非活动事件。
  • 使用 C API 时,传入一个指向 functInfo_p 结构体的指针,以提供接收流活动和流非活动事件的回调函数信息。
  • 使用 .NET API 时,传入一个 flowEventHandler 以处理流活动和流非活动事件。

当流绑定到非独占队列时,活动流指示属性将被忽略,除非该队列被分区。在分区队列的情况下,客户端逻辑上绑定到父(非独占)队列,但事件代理实际上将它们绑定到分区(独占)队列。因此,活动流指示可以与分区队列一起使用。

本地禁用投递

客户端可以发布非持久化或持久化消息到主题,如果同一客户端具有匹配的主题订阅,它可以在同一会话的活动流上接收这些消息。为了防止客户端接收其发布的任何非持久化或持久化消息,可以为客户端用于绑定独占队列(即,仅允许单个绑定客户端接收消息的队列)或主题端点的流启用本地禁用属性,并且该流必须是端点的活动流。(当流绑定到非独占队列时,本地禁用属性将被忽略。)有关访问类型的更多信息,请参阅定义端点属性。

  • 本地禁用属性仅防止客户端发布的消息被存储到该客户端,如果它同时建立了一个活动流。如果客户端没有建立活动流,则发布到其订阅主题的消息仍然可以被存储。
  • 本地禁用 属性也可以为会话启用。当为会话启用时,本地禁用属性仅影响在同一会话上发布的直接消息向客户端的投递。它不影响非持久化或持久化消息的投递。(有关更多信息,请参阅为会话启用本地禁用投递。)

当仅为会话或仅为流启用本地禁用属性时,事件代理可能会更改已发布消息的传输模式,以便该消息仍然可以投递给其发布客户端。例如,如果客户端向主题“A”发布消息,则当会话启用了本地禁用时,该消息不能作为直接消息投递给该客户端,但如果未为绑定到主题“A”的主题端点的流启用本地禁用,则该消息可能会作为非持久化消息通过该流投递给该客户端。

有关消息传输模式如何在主题匹配时自动更改的信息,请参阅主题匹配与消息传输模式。

PubSub+ 消息 API属性
Java RTOFlowHandle.PROPERTIES.NO_LOCAL
CSOLCLIENT_FLOW_PROP_NO_LOCAL
.NETFlowProperties.NoLocal
JavaScript 和 Node.jssolace.MessageConsumerProperties.noLocal

如何为流启用本地禁用订阅

流重新连接

当启用流重新连接时,所有 API 在收到错误响应时会自动尝试重新连接流,只要错误响应字符串中给出的原因是以下之一:

  • 重放已开始
  • 服务不可用 — 表示队列或主题端点已被关闭或删除,或者事件代理上的保证投递服务已被禁用。

此外,当报告这些错误时,它们会伴随每个 API 定义的 RECONNECTING 事件。如果重新连接成功,API 会生成一个 RECONNECTED 事件。如果重新连接失败,则会发生相应的 DOWN 错误事件。

此行为仅适用于已建立的流。如果消息事件代理因任何原因拒绝初始绑定,则会报告为绑定失败事件,不会尝试重试。

流重新连接在以下 API 版本中受支持并默认启用:

  • Java RTO API 7.12.0+
  • C API 7.13.0+
  • .NET API 10.8.0+
  • JavaScript 和 Node.js API 10.4.0+

启用时,所有 API 每隔三秒无限尝试重新连接流。

您可以通过在创建流时提供适当的属性来配置流重新连接的最大尝试次数以及尝试之间的间隔(参阅下表)。

要禁用流重新连接,请将重新连接重试次数设置为 0。

PubSub+ 消息 API属性
Java RTOFlowHandle.PROPERTIES.MAX_RECONNECT_TRIES
CSOLCLIENT_FLOW_PROP_MAX_RECONNECT_TRIES
.NETFlowProperties.ReconnectTries
JavaScript 和 Node.jssolace.MessageConsumerProperties.reconnectAttempts

如何设置流的重新连接重试次数

PubSub+ 消息 API属性
Java RTOFlowHandle.PROPERTIES.RECONNECT_RETRY_INTERVAL_MS
CSOLCLIENT_FLOW_PROP_RECONNECT_RETRY_INTERVAL_MS
.NETFlowProperties.ReconnectRetryInterval
JavaScript 和 Node.jssolace.MessageConsumerProperties.reconnectIntervalInMsecs

如何设置流的重新连接重试间隔