跳到主要内容

概述:应用程序如何与 PubSub+ 消息组件交互

在本节中,我们将带您了解 PubSub+ 事件代理的消息组件,并在高层次上解释它们如何使数据从生产者流向事件代理,再从事件代理流向消费者。

PubSub+ 事件代理为多协议和标准事件提供基础,包括 Solace 消息格式(SMF)、JMS 1.1、MQTT 3.11、REST 和 AMQP 1.0。当事件通过代理时,它们会从入口消息协议转换为每个接收消息的消费者的出口消息协议。鉴于协议支持的范围,重要的是要记住,您的应用程序可能使用的每个开源协议都会对代理的消息组件的操作方式产生自己的影响。首先,我们将讨论 Solace SMF 协议的工作方式,然后我们将讨论其他协议引入的变化。

在我们的巡览中,我们将从应用程序的角度了解操作情况。因此,让我们从 SMF 开始。

SMF

我们将向您展示 PubSub+ SMF 协议如何利用 PubSub+ 事件代理的消息组件,将数据从生产者传送到代理,再从代理传送到消费者,所有这些都从您的应用程序的角度出发。我们将通过以下两种方式来实现这一点:

  • SMF 流程图 部分包含一个图表,说明了从建立连接到使数据流动的步骤。该图表从您的应用程序开始,经过代理的组件。

  • SMF 组件操作 部分提供了流程图中描述的步骤的更多详细信息。

SMF流程图

在下面的图表中,按照从建立与事件代理的连接到启动消息流动的步骤进行操作。

图表中的带圈数字对应于以下详细描述的编号步骤:

步骤 1 - 5

这些步骤涉及应用程序和事件代理建立连接并为事件消息做好准备的操作。

  1. 应用程序启动 Solace API 和 Solace 上下文(可选,可以使用默认值)。
  2. 应用程序创建并配置会话,包括用户名、密码、代理 DNS:端口、TLS、压缩等。
    session = JCSMPFactory.onlyInstance().createSession(properties)
  3. 应用程序连接会话:
    session.connect();
    • 主机启动 TCP 会话
    • 会话发送 CONNECT 消息
  4. 代理认证连接(在步骤 3 后自动进行)。
  5. 代理授权连接(在步骤 4 后自动进行):事件代理发送 CONNECT ACK。

步骤 6, 7, 9 和 10

这些步骤关注于发送和接收事件消息的操作。

  1. 应用程序 SDK 创建发布者流:
    prod = session.getMessageProducer(..)
    • SDK 发送最后发送的消息 ID 和最后收到的 ACK 消息 ID
    • 代理发送最后收到的消息 ID 和最后发送的 ACK 消息 ID
    • 应用程序维护此状态,以便在重新连接时能够正确地重新传输消息。
  2. 当消息发布时:
    prod.send(msg, topic)
    • 应用程序 SDK 发送消息并将可用窗口减一
    • 事件代理接收、路由并持久化消息
    • 事件代理发送 ACK
    • 应用程序 SDK 收到 ACK 并将可用窗口加一。
  3. 应用程序创建消费者流,包括端点名称、窗口大小等:
    cons = session.createFlow(listener, flow_prop)
    • 应用程序连接流
    • 应用程序 API 对流进行握手并绑定到队列/端点
    • 消息然后能够从代理传送到订阅者客户端。
  4. 应用程序启动流:
    cons.start()
    • 代理发送消息并将可用窗口减一
    • 客户端 API 接收消息并传递给应用程序
    • 客户端 SDK 发送传输 ACK
    • 代理收到 ACK 并将可用窗口加一
    • 应用程序在处理完消息后发送应用 ACK
    • 代理收到应用 ACK 并从队列中删除消息。

步骤 8

此步骤涉及建立应用程序订阅。

  1. (可选,可以是管理员或直接订阅)应用程序通过向队列添加订阅来订阅:
    session.addSubscription(queue, topic,...)
    • 应用程序发送带有主题和端点的 SUBSCRIBE 消息
    • API 发送消息
    • 代理根据订阅端点将订阅添加到连接或后台队列
    • 代理可以发送 SUBACK。

了解更多信息关于涉及的 PubSub+ 组件:

  • Solace API:Solace API 基础知识概览。
  • 生产者 AD 流:客户端应用程序可以发布保证消息,即客户端应用程序分配持久或非持久传输模式到队列或主题目的地的消息。
  • 消费者 AD 流:要接收保证消息,客户端必须在会话内创建消费者流,并将该流绑定到消息发布的事件代理上的端点。
  • 客户端配置文件:客户端配置文件与客户端用户名账户相关联,以便您可以轻松地将通用配置应用于客户端组。
  • ACL 配置文件:客户端成功认证后,将检查分配给客户端使用的客户端用户名的 ACL 配置文件,或客户端所属的 LDAP 授权组(当使用 LDAP 授权时)。
  • 消息 VPN:消息虚拟私有网络(VPN)是 PubSub+ 事件代理上的管理对象,允许主题空间和客户端的隔离。
  • 路由:了解发布和订阅主题的基础知识。
  • 客户端用户名:客户端只能连接到与其分配的客户端用户名相关联的消息 VPN。
  • 队列:保证消息的基本操作。

SMF组件操作

在本节中,我们将详细说明流程图中显示的步骤。

正如我们之前提到的,步骤 1 - 5 涉及应用程序和事件代理建立连接并为事件消息做好准备的操作。以下部分提供了这些步骤的更多信息:

  • 实例化 Solace API 并创建 Solace 会话
  • TCP 会话属性
  • TCP 连接
  • 代理授权连接

实例化 Solace API 并创建 Solace 会话

您的客户端应用程序实例化 Solace API。从这里,它们可以创建 Solace 上下文(创建用于事件处理的工作线程),随后,创建 Solace 会话以连接到代理。您可以灵活地通过决定向上下文中添加多少会话来控制线程模型。您可以为每个上下文拥有多个会话,但每个会话只能有一个上下文。如果您需要更多线程来处理单个会话,您需要将事件处理传递到应用程序工作线程池。此外,单个会话一次只能连接到单个代理实例。

Solace 会话连接在您的应用程序和代理之间打开一个 TCP 流控制的、双向的连接。这种双向会话是所有生产者和消费者发送和接收所有消息交换模式(MEPs)消息的基础。

TCP 会话属性

通过 API 会话属性控制 TCP 会话的基本属性。这包括诸如 TCP(可能还有应用程序)缓冲区大小、连接和重新连接行为等属性。会话属性决定了连接的类型,无论是纯文本、压缩还是加密。最后,会话属性可以控制从客户端到代理的应用程序保持活动状态,这些用于检测连接故障。

TCP 连接

现在我们已经讨论了应用程序如何连接,让我们谈谈连接本身及其行为。

正如前面提到的,每个发送或接收数据的连接都是一个有状态的双向 TCP 连接。在 TCP 连接之上是一个消息层会话,它以连接消息开始,或者是一个包含用于认证的凭据头的数据消息,这在客户端认证中有讨论。根据连接请求中的目标端口和连接头属性,代理将期望会话属性,如压缩、加密等。

一旦会话连接到所需的服务并经过认证,我们就进入授权阶段。根据连接凭据中的客户端用户名,应用两个配置文件:客户端配置文件和 ACL 配置文件。

  • 客户端配置文件 设置客户端可以访问的代理资源。这包括配置选项,如发送和接收持久消息的能力、创建队列的能力以及设置客户端连接可以消耗的系统资源缓冲区的数量。有关更多信息,请参阅配置客户端配置文件。
  • ACL 配置文件 定义客户端可以产生和消费的数据事件。有关更多信息,请参阅使用 ACL 配置文件控制客户端访问。

步骤 6, 7, 9 和 10 关注于发送和接收事件消息的操作。您可以在事件处理详细信息中找到有关这些步骤的背景信息。

事件处理详细信息

现在我们已经有一个经过认证和授权的客户端连接,我们就可以发送和接收消息了。

对于直接事件消息传递,这相当简单,客户端 API 包含一个发布方法,可以直接通过消息会话发送事件,以及一个订阅方法,直接通过消息会话连接接收事件。没有消息层确认、计时器或重新传输,只有 TCP 层确认和重新传输。如果 TCP 连接失败,消息会话丢失,那么事件也会丢失。

对于持久事件消息传递,情况稍有不同,因为存在消息层窗口、计时器、确认和重新传输。这意味着与持久事件发布和消费相关联的另一个状态层。每个消费者状态机封装了客户端 API 中的端点流和代理上的队列/端点绑定。这些流包含端点、窗口、可用窗口、最后发送的消息和最后确认的消息。这些信息允许客户端在会话之间重新连接,并继续从上一个会话停止的地方开始发布/消费。如果客户端配置文件允许发布持久消息,则至少有一个生产者流允许发布客户端发送持久消息。如果客户端配置文件允许消费持久消息,则每个端点有一个消费者流。

MQTT

基于我们对 SMF 操作的理解,我们现在来看看 MQTT 服务如何利用 PubSub+ 事件代理的消息组件,将数据从 MQTT 生产者传送到代理,再从代理传送到 MQTT 消费者。首先,我们将查看 MQTT 和 SMF 之间的差异,然后检查 MQTT 流程图。

MQTT和SMF之间的差异

  • VPN

MQTT 中没有消息 VPN 的概念,因此对于每个唯一的消息 VPN,需要为每种 MQTT 流量变体分配一个新的监听端口,包括 MQTT/TCP、MQTT/WS、MQTT/TLS、MQTT/WSS。

  • QoS

对于 QoS0 和 QoS1 应用发布,系统的行为与 SMF 直接和持久消息传递相同,其中 QoS0 类似于直接消息传递,QoS1 类似于持久消息传递。Qos2 发布的消息由代理作为 QoS1 消息处理。对于 QoS1 订阅,为每个 MQTT 会话创建一个独特且动态创建的队列,并将订阅添加到该队列中。这允许队列吸引事件,一旦 MQTT 消费者连接,就可以检索这些事件。此外,API 和代理之间没有握手来学习最后一条消息,重新连接时也没有收到 ACK,这可能导致额外的重新传输。

MQTT流程图

在下面的图表中,按照从建立与事件代理的连接到启动数据流动的步骤进行操作。

图表中的带圈数字对应于以下详细描述的编号步骤:

步骤 1 - 4

这些步骤涉及应用程序和事件代理建立连接并为事件消息做好准备的操作。

  1. 应用程序启动 MQTT SDK:

    New MqttClient(host, clientName);
  2. 应用程序创建并配置会话,包括代理 DNS:端口、TLS、压缩等。

  3. 应用程序连接会话,包括用户名和密码:

    mqttClient.connect(connOpts);
    • 主机启动 TCP 会话
  4. 代理接收消息层连接。

  5. 代理认证连接。

  6. 代理授权连接。

步骤 5 和 7

这些步骤关注于发送和接收事件消息的操作。

  1. 当消息发布时:
    mqttClient.publish(topic, message);
    • 应用程序发送 PUBLISH 消息
    • SDK 将消息存储在内存或磁盘上
    • SDK 发送消息
    • 代理接收消息,根据 QoS 路由并持久化
    • 代理发送 PUBACK
    • SDK 删除消息
  2. 当代理发送消息时:
    • 代理发送 PUBLISH 消息并将可用窗口减一
    • 客户端 SDK 接收消息并传递给应用程序
    • 应用程序在处理完消息后发送 PUBACK
    • 代理收到 PUBACK 并从队列中删除消息

步骤 6

此步骤涉及建立应用程序订阅。

  1. 当应用程序订阅时:
    mqttClient.subscribe(topic, 1);
    • 应用程序发送 SUBSCRIBE 消息
    • SDK 发送消息
    • 代理根据订阅 QoS 将订阅添加到连接或后台队列
    • 代理发送 SUBACK

了解更多信息关于涉及的 PubSub+ 组件:

  • 客户端配置文件:客户端配置文件与客户端用户名账户相关联,以便您可以轻松地将通用配置应用于客户端组。
  • ACL 配置文件:客户端成功认证后,将检查分配给客户端使用的客户端用户名的 ACL 配置文件,或客户端所属的 LDAP 授权组(当使用 LDAP 授权时)。
  • 消息 VPN:消息虚拟私有网络(VPN)是 PubSub+ 事件代理上的管理对象,允许主题空间和客户端的隔离。
  • 路由:了解发布和订阅主题的基础知识。
  • 客户端用户名:客户端只能连接到与其分配的客户端用户名相关联的消息 VPN。
  • 队列:保证消息的基本操作。
  • MQTT 客户端:在 Eclipse Paho JAVA 文档中了解更多信息。

下一步

有关如何使用 PubSub+ 的 MQTT 的更多信息,请参阅开放 API 和协议部分中的 MQTT 页面。

AMQP

本节中讨论的演练与其他部分显示的略有不同。那些是基于 Java 的。这个是基于 Node.js 的。对于 AMQP Java,JMS 2.0 编程接口似乎是标准的,将是首选选项。首先,我们将查看 Solace 实现的 AMQP 和 SMF 之间的差异,然后检查 AMQP 流程图。

Solace实现的AMQP和SMF之间的差异

  • 持久性

目前,PubSub+ 的 AMQP 解决方案仅处理持久消息。

  • VPN

AMQP 中没有消息 VPN 的概念,因此对于每个唯一的消息 VPN,需要为 AMQP/TCP 和 AMQP/TLS 分配一个新的监听端口。

AMQP流程图

在下面的图表中,按照从建立与事件代理的连接到启动数据流动的步骤进行操作。

图表中的带圈数字对应于以下详细描述的编号步骤:

步骤 1 - 6

这些步骤涉及应用程序和代理建立连接并为事件消息做好准备的操作。

  1. 应用程序创建 AMQ10 客户端:
    new AMQP.Client(AMQP.Policy.merge({defaultSubjects : false}));
  2. 应用程序连接客户端:
    amqpClient.connect(pubsub+broker)
  3. 应用程序节点启动 TCP 会话:
    • 主机启动 TCP 会话
    • 连接为客户端连接打开传入和传出通道
  4. 代理认证连接,(从步骤 3 自动进行)。
  5. 代理授权连接,(从 4 自动进行):
    • 代理发送 CONNECT ACK
  6. 应用程序创建发送者:
    amqpClient.createSender(queueName)
    • SDK 创建发送者会话

步骤 8 和 9

这些步骤关注于发送和接收事件消息的操作。

  1. 应用程序创建接收者。
    amqpClient.createReceiver(queueName)
    • SDK 创建接收者会话
  2. 应用程序启动消息和错误的回调:
    amqpReceiver.on('message',...)
    • 数据放入 AMQP 帧并分配 transferId
    • 数据发送后,其 next-outgoing-id 增加,remote-incoming-window 减少
    • SDK 将使用处置帧 ACK 帧

步骤 7

此步骤涉及建立应用程序订阅。

  1. 当消息发布时:
    amqpSender.send(message)
    • 数据放入 AMQP 帧并分配 transferId
    • 数据发送后,其 next-outgoing-id 增加,remote-incoming-window 减少
    • 代理将使用处置帧 ACK 帧

了解更多信息关于涉及的 PubSub+ 组件:

  • 客户端配置文件:客户端配置文件与客户端用户名账户相关联,以便您可以轻松地将通用配置应用于客户端组。
  • ACL 配置文件:客户端成功认证后,将检查分配给客户端使用的客户端用户名的 ACL 配置文件,或客户端所属的 LDAP 授权组(当使用 LDAP 授权时)。
  • 消息 VPN:消息虚拟私有网络(VPN)是 PubSub+ 事件代理上的管理对象,允许主题空间和客户端的隔离。
  • 路由:了解发布和订阅主题的基础知识。
  • 客户端用户名:客户端只能连接到与其分配的客户端用户名相关联的消息 VPN。
  • 队列:保证消息的基本操作。

下一步

有关如何使用 PubSub+ 的 AMQP 的更多信息,请参阅开放 API 和协议部分中的 AMQP 页面。

JMS

现在我们已经了解了 SMF 的操作,我们可以利用这些知识来理解 JMS 对代理组件操作的影响。首先,我们将查看 JMS 和 SMF 之间的差异,然后检查 JMS 流程图。

需要注意的是,本节中记录的 JMS 过 SMF 的差异也是 JMS 过 AMQP 的差异。

JMS和SMF之间的差异

  • 持久和非持久发布者流

JMS 规范要求同步和异步数据传输。所有作为持久消息发布的消息都在具有窗口大小为一的发布者流上发送,并且是同步的;非持久消息可以在第二个发布者流上在一个可配置的窗口内发送。

  • JNDI

要使用代理作为 JNDI 服务器,JMS API 将懒惰地打开第二个会话来进行 JNDI 查找。虽然这个会话不用于发送数据,但它可能需要解析端点的 JNDI 名称等,以完成数据发送和接收功能。

有关 JNDI 服务的更多信息,请参阅 Solace JNDI 对象。

  • 主题端点

虽然 SMF 支持主题端点,但其主题到队列的映射功能更强大,是推荐的方法。JMS 没有主题到队列映射的编程接口,因此推荐使用主题端点。

有关更多信息,请参阅主题端点。

JMS流程图

在下面的图表中,按照从建立与事件代理的连接到启动数据流动的步骤进行操作。

图表中的带圈数字对应于以下详细描述的编号步骤:

步骤 1 - 5

这些步骤涉及应用程序和代理建立连接并为事件消息做好准备的操作。

  1. 应用程序启动 JMS SDK,开始一个 Solace 上下文。
    Application Creates initialContext , initialContext(env) with username, password, broker URL
  2. (可选,可以编程创建)应用程序进行 JNDI 查找以查找上下文工厂信息:
    initialContext.lookup(CONNECTION_FACTORY_JNDI_NAME)
    • 这提供了诸如消息凭据和默认消息 QoS、TLS、压缩等信息。
  3. 应用程序连接到代理的连接:
    connectionFactory.createConnection()
    • 主机启动 TCP 会话
    • 代理接收消息层连接
  4. 代理认证连接。
  5. 代理授权连接。

步骤 6, 7, 8, 10, 和 11

这些步骤关注于发送和接收事件消息的操作。

  1. 应用程序进行 JNDI 查找以查找端点名称:
    initialContext.lookup(QUEUE_JNDI_NAME)
  2. 应用程序 SDK 创建生产者:
    session.createProducer(..);
    • SDK 发送最后发送的消息 ID 和最后收到的 ACK 消息 ID
    • 代理发送最后收到的消息 ID 和最后发送的 ACK 消息 ID
    • 应用程序维护此状态,以便在重新连接时能够正确地重新传输消息。
  3. 当生产者发布消息时:
    producer.send(topic, message, DeliveryMode, Priority, TTL)
    • 应用程序 SDK 发送消息并将可用窗口减一
    • 事件代理接收、路由并持久化消息
    • 事件代理发送 ACK
    • 应用程序 SDK 收到 ACK 并将可用窗口加一。
  4. (可选,可以轮询消息)应用程序创建监听器:
    consumer.setMessageListener(..)
  5. 应用程序启动连接,这将启动所有消费者:
    connection.start()
    • 代理发送消息并将可用窗口减一
    • 客户端 SDK 接收消息并传递给应用程序
    • 客户端 SDK 发送传输 ACK
    • 代理收到 ACK 并将可用窗口加一
    • 应用程序在处理完消息后发送应用 ACK
    • 代理收到应用 ACK 并从队列中删除消息。

步骤 9

此步骤涉及建立应用程序订阅。

  1. (仅适用于主题端点)应用程序添加订阅:
    session.createConsumer(topic);
    • 应用程序发送带有主题和端点的 SUBSCRIBE 消息
    • SDK 发送消息
    • 代理根据订阅将订阅添加到连接或主题端点
    • 代理可以发送 SUBACK

对于 AMQP,上下文和队列名称是编程创建的,这里展示了如何创建它们的示例。

了解更多信息关于涉及的 PubSub+ 组件:

  • 客户端配置文件:客户端配置文件与客户端用户名账户相关联,以便您可以轻松地将通用配置应用于客户端组。
  • ACL 配置文件:客户端成功认证后,将检查分配给客户端使用的客户端用户名的 ACL 配置文件,或客户端所属的 LDAP 授权组(当使用 LDAP 授权时)。
  • 消息 VPN:消息虚拟私有网络(VPN)是 PubSub+ 事件代理上的管理对象,允许主题空间和客户端的隔离。
  • 路由:了解发布和订阅主题的基础知识。
  • 客户端用户名:客户端只能连接到与其分配的客户端用户名相关联的消息 VPN。
  • 队列:保证消息的基本操作。
  • 主题端点:创建持久主题订阅者。
  • 订阅:您可以创建消息消费者以从队列或特定主题接收消息。
  • 生产者 AD 流:客户端应用程序可以发布保证消息,即客户端应用程序分配持久或非持久传输模式到队列或主题目的地的消息。
  • JMS 上下文:JMSContext 是 JMS 2.0 引入的简化 JMS API 中的主要接口。
  • JMS 连接:连接对象是客户端与其 JMS 提供商的活动连接。它通常在 Java 虚拟机(JVM)之外分配提供商资源。
  • JMS 会话:会话对象是单线程上下文,用于生产和消费消息。

下一步

有关如何使用 PubSub+ 的 JMS 的更多信息,请参阅开放 API 和协议部分中的 JMS API 页面。

REST

在前面的部分中,我们查看了 SMF 的操作以及 JMS 和 MQTT,这些都由消息 API 驱动。我们现在来看看 REST 的基于标准的 HTTP 交换模式如何与事件代理的组件一起工作。首先,我们将查看 REST 和 SMF 之间的差异,然后检查 REST 流程图。

REST和SMF之间的差异

REST 消息不是由消息 API 驱动的,而是由基于标准的 HTTP 交换模式驱动的。然而,在核心中,事件代理以完全相同的方式对待 REST 消息,就认证、授权、路由和持久性而言。以下是基础 SMF 和 REST 之间的显著差异列表:

  • 发布

所有 API 发布都是带有预期 ACK 的 HTTP POST。POST 不是管道化的,即每个都必须携带所需的认证头,并且分别进行认证。这与面向消息的协议不同,在面向消息的协议中,连接通过包含认证凭据的连接消息进行认证,然后消息在连接上管道化。

  • 订阅

HTTP 中没有订阅类型方法。所有订阅都是通过管理操作完成的。

  • 事件传递给消费应用程序

REST 交付端点用于管理从代理到应用程序的连接,并通过标准的 web-hooks HTTP 传递事件。

REST流程图

在下面的图表中,按照从建立与事件代理的连接到启动数据流动的步骤进行操作。

图表中的带圈数字对应于以下详细描述的编号步骤:

步骤 1 - 6

这些步骤涉及应用程序和代理建立连接并为事件消息做好准备的操作。

  1. 应用程序创建 REST 消息:

    • 添加认证头
    • 添加 Content-Type 头
    • 如果需要回复,添加 Solace-Reply-Wait-Time-In-ms
    • 添加消息正文
  2. 应用程序发送 REST 消息:

    • URI 包含 Solace 代理 URI 以及目标主题或队列
  3. 代理认证(从 2 自动进行):

    • URI 包含 Solace 代理 URI 以及目标主题或队列
  4. 代理授权(从 2 自动进行)。

  5. 代理路由(从 2 自动进行):

    • 基于 URI
  6. 代理持久化:

    • 代理 ACK(200 OK),无论是空负载还是回复正文

步骤 8 和 9

这些步骤关注于发送和接收事件消息的操作。

  1. 代理通过 web-hooks 向应用程序打开连接:
    • 代理打开到应用程序的连接。
    • 代理通过 HTTP POST 发送消息。
  2. 应用程序 API ACK 消息:
    • 代理删除消息。

步骤 7

此步骤涉及建立应用程序订阅。

  1. 应用程序订阅:
    • 这不是通过编程完成的。管理员将 RDP 绑定到包含订阅的队列。

了解更多信息关于涉及的 PubSub+ 组件:

  • 客户端配置文件:客户端配置文件与客户端用户名账户相关联,以便您可以轻松地将通用配置应用于客户端组。
  • ACL 配置文件:客户端成功认证后,将检查分配给客户端使用的客户端用户名的 ACL 配置文件,或客户端所属的 LDAP 授权组(当使用 LDAP 授权时)。
  • 消息 VPN:消息虚拟私有网络(VPN)是 PubSub+ 事件代理上的管理对象,允许主题空间和客户端的隔离。
  • 路由:了解发布和订阅主题的基础知识。
  • 客户端用户名:客户端只能连接到与其分配的客户端用户名相关联的消息 VPN。
  • 队列:保证消息的基本操作。

下一步

有关如何使用 PubSub+ 的 REST 的更多信息,请参阅开放 API 和协议部分中的 REST 页面。