跳到主要内容

创建消息消费者

MessageConsumer 对象可用于从队列接收消息或从特定主题接收消息。通用的 MessageConsumer 接口与其各自的子接口 QueueReceiverTopicSubscriber(分别是 PTP 和发布/订阅特定的接口)的行为方式相同。

要在会话中创建消息消费者,请调用 session.createConsumer(...) 方法并传入队列或主题目标。

要在 XA 会话(而不是非事务性本地会话)中创建消息消费者,请调用 XASession.createConsumer(...) 方法并传入队列或主题。

创建消息消费者后,它将变得活跃,并可用于在启动的连接上接收消息。调用 close() 关闭消息消费者。

消息消费者持久性

当从队列消费消息时,消息消费者的持久性取决于队列的持久性。例如,如果队列是持久的,那么消息消费者也是持久的。有关目标持久性的更多信息,请参阅使用目标。

当为特定主题消费消息时,创建的主题订阅者始终是非持久的。也就是说,隐式创建的订阅者只能在连接时接收发布到主题的消息;当订阅者断开连接时,不会对消息进行转储。

如果要创建持久主题订阅者,可以使用 createDurableSubscriber(...) 方法(参阅创建持久主题订阅者)。

对于持久消费者和持久主题订阅者,客户端绑定到持久队列和主题端点。

对于非持久消费者(即绑定到临时队列和非持久主题订阅者的消费者),会自动为消费者创建临时主题端点和队列。

选择性接收消息

消息消费者还可以使用以下功能来微调其将接收的消息:

  • messageSelector — 在使用保证传输模式时,可以传入消息选择器以限制传递的消息。消息选择器使用标头字段引用和属性引用指定消息消费者感兴趣的消息。有关更多信息,请参阅选择器。
  • noLocal — 可以启用 noLocal 属性,以防止传递在同一连接上由消费者发布的消息。例如,在单个连接上,客户端可能会向其正在消费消息的同一主题发布消息。

要创建具有消息选择器和 noLocal 属性的持久 MessageConsumer,请调用 createConsumer(Destination destination, java.lang.String messageSelector, boolean noLocal) 方法。

当使用直接传输模式时,在创建非持久消费者时不能使用消息选择器或 NoLocal 属性。将返回配置异常。

接收独占队列消费者的活跃/非活跃指示

当队列具有独占访问类型时,可以将多个消息消费者绑定到该队列,但只有一个消息消费者可以积极接收消息。

如果您希望接收有关绑定到队列的消息消费者是否积极接收消息(即,它具有活跃流)或是否不积极接收消息(即,它具有非活跃流)的通知,您可以使用 Solace 特有的 com.solacesystems.jms.SolEventSource 接口,该接口允许 JMS 应用程序为感兴趣的事件注册事件监听器,例如活跃流指示。

从该接口调用 addSolEventListener(...) 并传入一个 SolEventListener,客户端应用程序可以使用它来处理任何返回的 SolEvents,这些事件指示消息消费者是否具有到队列的活跃或非活跃流。

com.solacesystems.jms.SolEventListener 接口依赖于以下方法来处理 SolEvents:

com.solacesystems.jms.SolEventListener.handleEvent(SolEventSource source, SolEvent event)

其中:

SolEventSource 是 SolEvents 的来源。对于活跃流指示,这是消息消费者。

SolEvent 是返回的 Solace 特有事件。对于活跃流指示,它是一个 ActiveFlowIndicationEvent,返回以下常量之一,指示消息消费者是否具有活跃流:

  • 0 — 流非活跃
  • 1 — 流活跃

要注销或移除 SolEventListener 实例,请调用 removeSolEventListener()

以下代码片段展示了如何使用 SolEvent JMS 接口接收独占队列消费者的活跃/非活跃流指示:

// session 是一个有效的 javax.jms.Session,queue 是一个有效的 javax.jms.Queue
consumer = session.createConsumer(queue);

try {
SolEventSource evtSource = (SolEventSource)consumer;
evtSource.addSolEventListener(new SolEventListener() {
public void handleEvent(SolEventSource source, SolEvent evt) {
ActiveFlowIndicationEvent afiEvent = (ActiveFlowIndicationEvent) evt;
if (evt.getFlowState() == ActiveFlowIndicationEvent.FLOW_ACTIVE) {
// 在这里采取行动
}
}
}, SolEvent.SOLEVENT_TYPE_ACTIVE_FLOW_INDICATION);
} catch (ClassCastException ex) {
ex.printStackTrace();
}

相关示例

有关如何使用 SolEvent JMS 接口接收独占队列消费者的活跃/非活跃流指示的更完整示例,请参阅 SolJMSActiveFlowIndication 示例。