跳到主要内容

接收消息

本节描述了客户端如何使用 JMS API 接收消息。

相关事件代理配置信息

为了让 JMS 客户端能够消费其连接的消息 VPN 中的消息,它们必须被分配适当配置的客户端配置文件和/或访问控制列表(ACL)配置文件:

  • 要以保证传输模式接收消息,客户端必须被分配启用了 allow‑guaranteed‑message‑receive 参数的客户端配置文件。此外,如果客户端将使用非持久化消息消费者,分配的客户端配置文件还必须启用 allow‑guaranteed‑message‑create 参数,因为这些客户端会自动创建临时队列/主题端点。
  • 要允许客户端订阅主题(或仅订阅特定主题集),客户端必须被分配具有适当访问控制的 ACL。

有关在事件代理上配置客户端配置文件和 ACL 的更多信息,请参阅管理与 Shell 用户。

相关示例

有关如何接收消息的示例,请参阅 SolJMSConsumer 示例。

在接收消息时使用预接收钩子

您可以使用 JMS API 中的消费者拦截器(或预接收钩子)来实现应用程序的核心关注点,例如解密或日志记录。此预接收钩子用于在消息被接收时拦截 JMS 消息,并允许您实现核心关注点。要使用它,您必须实现 MessageReceiverInterceptor 接口,并使用 SupportedProperty.SOLACE_JMS_MESSAGE_CONSUMER_INTERCEPTOR_CLASS_NAME 激活该接口,该属性指定实现 MessageReceiverInterceptor 接口的消息消费者拦截器的完全限定名称。当您调用任何接收方法(包括异步接收器)时,您的实现将在消息被接收后立即运行,并在消息传递给您之前。有关更多信息,请参阅 Solace JMS API 参考中的以下内容:

  • MessageReceiverInterceptor
  • MessageReceiverInterceptor.ReceiverInterceptingContext
  • SupportedProperty.SOLACE_JMS_MESSAGE_CONSUMER_INTERCEPTOR_CLASS_NAME

以下是在 MyMessageConsumerInterceptor.java 文件中对收到的消息执行操作的示例实现:

package com.myexample.MyMessageConsumerInterceptor;
import com.solacesystems.jms.interceptors.MessageReceiverInterceptor;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

/**
* 一个可用于透明消息解密的示例消费者拦截器
*/
public class MyMessageConsumerInterceptor implements MessageReceiverInterceptor {
@Override
public void onPreReceive(ReceiverInterceptingContext interceptingContext) throws JMSException {
Message msg = interceptingContext.getMessage();
if (msg instanceof TextMessage) {
String text = ((TextMessage)msg).getText();
// 实现核心关注点的示例。这里我们简单地向消息添加了一些文本。
// 要在接收时更改消息内容,请先调用 msg.clearBody()。
msg.clearBody();
((TextMessage)msg).setText(text + ": intercepted by consumer and you can implement your core concern, such as logging, decryption, etc.");
}
}
}

以下是如何启用消费者拦截器的示例:

import com.solacesystems.jms.SolConnectionFactory;
import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.SupportedProperty;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class ConfigureReceiverInterceptor {
private static final String SOLJMS_INITIAL_CONTEXT_FACTORY = "com.solacesystems.jndi.SolJNDIInitialContextFactory";
private static final String CONSUMER_INTERCEPTOR_CLASS_NAME = MyMessageConsumerInterceptor.class.getName();

/**
* 使用初始上下文和 JNDI 配置拦截器
*
* @throws NamingException 当 JNDI 查找失败时可能会抛出
* @throws JMSException 由于内部 API 原因可能会抛出
*/
public void exampleConfigureInterceptorWithInitialContextAndJNDI() throws NamingException, JMSException {
// 默认连接工厂的 JNDI 名称。
// SolConnectionFactory 需要通过 JNDI 配置用于默认连接工厂
final Hashtable<String, Object> env = new Hashtable<String, Object>();
env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLJMS_INITIAL_CONTEXT_FACTORY);
//...
//... 添加其他配置属性
//...
//... 更多其他配置
// 在此示例中未定义用户名和密码,仅作为示例变量提供
env.put(Context.SECURITY_PRINCIPAL, username);
env.put(Context.SECURITY_CREDENTIALS, password);
// 启用 SMFs 的使用,无需指定信任库
// 仅用于说明目的;不建议在生产中使用
env.put(SupportedProperty.SOLACE_JMS_SSL_VALIDATE_CERTIFICATE, false);
// 在初始上下文中启用拦截器
env.put(SupportedProperty.SOLACE_JMS_MESSAGE_CONSUMER_INTERCEPTOR_CLASS_NAME, CONSUMER_INTERCEPTOR_CLASS_NAME);
final InitialContext initialContext = new InitialContext(env);
final ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("cf/default");
final Connection connection = cf.createConnection();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createTemporaryQueue();
consumeInterceptedMessages(session, queue);
}

// 消费者拦截器可以在消息被获取时透明地修改消息
private void consumeInterceptedMessages(Session session, Queue queue) throws JMSException {
final MessageConsumer consumer = session.createConsumer(queue);
// 等待消息。
while (true) {
// 在消费者拦截器运行后接收消息,拦截器在消息被接收时运行,并对消息应用了任何实现。
// 您可以在消息被拦截后对其进行操作。例如,如果拦截器解密了消息,
// 您接收到的消息是解密后的消息。
final Message nextMessage = consumer.receive();

// ...
}
}
}