跳到主要内容

在PubSub+ JCSMP API中接收消息时的线程使用

使用 PubSub+ JCSMP API 接收发布消息时,上下文使用一个处理线程从套接字读取消息,解析它们,然后,根据是否使用同步或异步模式,将消息排队以进行传递或消费,或执行客户通知和消息分发。

客户端应用程序可以使用以下模式之一接收发布消息:

  • 异步 客户端应用程序可以使用 Java、Java RTO、C 和 .NET API 异步接收发布消息。参阅异步接收消息。

  • 同步 客户端应用程序也可以使用 JCSMP 同步接收发布消息。参阅同步接收消息。

异步接收消息

客户端应用程序可以使用 PubSub+ JCSMP API 以异步方式接收消息。也就是说,当消息可用时,它们会自动被发送(“推送”)到 API 到客户端应用程序的消息回调或消息委托接口。

异步接收消息时,上下文使用一个处理线程用于消费者通知和将队列中的消息分发给消费者;API 到应用程序的所有回调(如消息接收回调、事件回调和计时器回调)都从上下文线程运行。

当客户端应用程序使用异步事务会话时,消息从一个或多个事务会话分发线程中分发。可以使用绑定到上下文的单个事务会话分发线程(这是默认设置),也可以使用绑定到每个事务会话的单独事务会话分发线程。有关详细信息,请参阅在 PubSub+ JCSMP API 中使用本地事务。

默认情况下,PubSub+ JCSMP API 的上下文使用一个线程从套接字读取消息,解析它们,然后将它们排队到消费者通知分发队列上。它使用另一个线程进行消费者通知和将队列中的消息分发给消费者应用程序。

上下文在会话中可以排队的最大消息数量(在传递给消费者之前)如下:

  • 直接消息 — 5,000
  • 保证消息 — 每个流的保证消息窗口大小允许的最大消息数量。(保证消息窗口大小限制了 API 在必须向事件代理返回确认之前可以接收的消息数量,以表示它已接收窗口中的消息。)

用于通过 XMLMessageListener 异步通知消费者消息和异常的消费者通知分发队列可以根据需要调整大小。此队列应足够大,以缓冲上下文中所有会话中所有消费者流(保证和直接消息)生成的最大通知数量。如果 XMLMessageListener 不总是快速返回控制权,并且消费者通知分发队列已满,尝试将通知追加到此队列的 API 线程可能会暂时阻塞,并可能导致消息在事件代理上排队。

要调整消费者通知分发队列的大小,请调用 JCSMPGlobalProperties.setConsumerDispatcherQueueSize()

全局属性只能在从 JCSMPFactory 创建任何会话之前设置。

下图显示了通过 PubSub+ JCSMP API 异步接收消息时使用的上下文线程。

img

对于超低延迟应用程序,您可以启用 MESSAGE_CALLBACK_ON_REACTOR 会话属性以减少消息延迟。启用此会话属性时,异步传递到 XMLMessageListener 的消息将直接从 I/O 线程传递,而不是从消费者通知和分发线程传递。虽然启用此会话属性可以减少消息延迟,但它也会降低最大消息吞吐量。

使用 MESSAGE_CALLBACK_ON_REACTOR 会话属性的应用程序在 onReceive() 回调中不得调用任何阻塞方法;否则可能会导致应用程序死锁。

下图显示了启用 MESSAGE_CALLBACK_ON_REACTOR 会话属性时,通过 PubSub+ JCSMP API 异步接收消息时使用的上下文线程。

img

同步接收消息

在同步方式接收消息时,客户端应用程序使用明确的接收调用来从 API 用于每个消费者的队列中检索消息。同步接收消息时,客户端应用程序提供“拉取”队列中消息的线程。

PubSub+ JCSMP API

要接收消息,客户端应用程序必须使用 start() 启用从事件代理接收消息,然后使用同步 receive(...) 调用来接收下一条可用消息。接收方法可以通过等待消息无限期地管理潜在阻塞,在没有消息时不等待(即立即在没有消息时超时),或在没有消息时设置一定时间后超时。

下图显示了通过 PubSub+ JCSMP API 同步接收消息时使用的上下文线程和应用程序线程。

img