跳到主要内容

在PubSub+ JCSMP API中的线程使用

PubSub+ JCSMP API 使用上下文来组织客户端应用程序与 PubSub+ 事件代理之间的通信。上下文作为配置一个或多个会话以及处理会话相关事件的容器,并封装了驱动会话的网络 I/O 和消息传递通知的线程。消息传递和接收也可能需要应用程序提供的线程。

发布消息时的线程使用

客户端应用程序提供发布保证消息到事件代理目标所需的处理线程。

当事件代理成功接收到保证消息时,它会向 API 返回确认。它不会为直接消息返回确认。(此规则的例外是使用 JCSMP 的非流式发布模式发布直接消息。)

上下文使用一个处理线程从事件代理读取确认,然后解析窗口确认,为每条消息(通过消息 ID 标识)单独排队确认,然后将其分派到应用程序回调。

JCSMP

使用流式发布模式发布消息时,上下文使用一个线程读取事件代理为一组连续保证消息发送的窗口确认,解析它,然后在生产者通知分发队列上为每条消息单独排队确认。此队列也用于消息发布异常。每个上下文有一个队列。

上下文使用另一个线程用于通知和分发队列中的确认到应用程序回调。

请注意,如果线程尝试将通知追加到队列时队列上没有足够空间,线程可能会阻塞,直到有足够的空间。可以通过 JCSMPGlobalProperties.setProducerDispatcherQueueSize() 修改队列大小。

下图显示了用于发送保证消息的应用程序线程以及用于处理事件代理窗口消息确认的上下文线程。

通过 JCSMP 使用流式模式发布保证消息

img

接收消息时的线程使用

接收发布消息时,上下文使用一个处理线程从套接字读取消息,解析它们,然后根据消息传递 API 和是否使用同步或异步模式,将消息排队以进行传递或消费,或执行客户通知和消息分发。

客户端应用程序可以使用以下模式之一接收发布消息:

  • 异步 客户端应用程序可以使用 JCSMP、Java RTO、C 和 .NET API 异步接收发布消息。参阅异步接收消息。

  • 同步 客户端应用程序也可以使用 JCSMP 同步接收发布消息。参阅同步接收消息。

异步接收消息

客户端应用程序可以使用 JCSMP、Java RTO、C 和 .NET API 以异步方式接收消息。也就是说,当消息可用时,它们会自动被发送(“推送”)到 API 到客户端应用程序的消息回调或消息委托接口。

异步接收消息时,上下文使用一个处理线程用于消费者通知和将队列中的消息分发给消费者;API 到应用程序的所有回调(如消息接收回调、事件回调和计时器回调)都从上下文线程运行。

当客户端应用程序使用异步事务会话时,消息从一个或多个事务会话分发线程中分发。可以使用绑定到上下文的单个事务会话分发线程(这是默认设置),也可以使用绑定到每个事务会话的单独事务会话分发线程。有关详细信息,请参阅在 PubSub+ JCSMP API 中使用本地事务。

JCSMP

默认情况下,JCSMP 的上下文使用一个线程从套接字读取消息,解析它们,然后将它们排队到消费者通知分发队列上。它使用另一个线程进行消费者通知和将队列中的消息分发给消费者应用程序。

上下文在会话中可以排队的最大消息数量(在传递给消费者之前)如下:

  • 直接消息 — 5,000
  • 保证消息 — 每个流的保证消息窗口大小允许的最大消息数量。(保证消息窗口大小限制了 API 在必须向事件代理返回确认之前可以接收的消息数量,以表示它已接收窗口中的消息。)

用于通过 XMLMessageListener 异步通知消费者消息和异常的消费者通知分发队列可以根据需要调整大小。此队列应足够大,以缓冲上下文中所有会话中所有消费者流(保证和直接消息)生成的最大通知数量。如果 XMLMessageListener 不总是快速返回控制权,并且消费者通知分发队列已满,尝试将通知追加到此队列的 API 线程可能会暂时阻塞,并可能导致消息在事件代理上排队。

要调整消费者通知分发队列的大小,请调用 JCSMPGlobalProperties.setConsumerDispatcherQueueSize()

全局属性只能在从 JCSMPFactory 创建任何会话之前设置。

下图显示了用于通过 JCSMP 异步接收消息的上下文线程。

通过 JCSMP 异步接收消息

img

对于超低延迟应用程序,您可以启用 MESSAGE_CALLBACK_ON_REACTOR 会话属性以减少消息延迟。启用此会话属性时,异步传递到 XMLMessageListener 的消息将直接从 I/O 线程传递,而不是从消费者通知和分发线程传递。虽然启用此会话属性可以减少消息延迟,但它也会降低最大消息吞吐量。

使用 MESSAGE_CALLBACK_ON_REACTOR 会话属性的应用程序在 onReceive() 回调中不得调用任何阻塞方法;否则可能会导致应用程序死锁。

下图显示了启用 MESSAGE_CALLBACK_ON_REACTOR 会话属性时,用于通过 JCSMP 异步接收消息的上下文线程。

启用消息回调在反应器上时,通过 JCSMP 异步接收消息

img

同步接收消息

在同步方式接收消息时,客户端应用程序使用明确的接收调用来从 API 用于每个消费者的队列中检索消息。同步接收消息时,客户端应用程序提供“拉取”队列中消息的线程。

JCSMP

要接收消息,客户端应用程序必须使用 start() 启用从事件代理接收消息,然后使用同步 receive(...) 调用来接收下一条可用消息。接收方法可以通过等待消息无限期地管理潜在阻塞,在没有消息时不等待(即立即在没有消息时超时),或在没有消息时设置一定时间后超时。

下图显示了通过 JCSMP 同步接收消息时使用的上下文线程和应用程序线程。

使用 JCSMP 同步接收消息