跳到主要内容

配置Kafka桥接

Kafka桥接允许您在Kafka集群和Solace PubSub+软件事件代理之间配置双向的消息流. Kafka 桥接在 Solace 事件代理中进行配置,无需外部 Kafka Connect 基础设施即可在 Kafka 之间传递消息.

要配置 Kafka 桥接,您需要配置一个 Kafka 发送器和一个 Kafka 接收器.

  • Kafka 接收器 — 从一个或多个 Kafka 主题接收事件,将事件转换为 Solace 消息格式(SMF),并将其发布到 PubSub+ 事件代理上的主题.

  • Kafka 发送器 — 从一个或多个队列中获取 SMF 消息,将消息转换为 Kafka 事件,并将它们发送到远程 Kafka 集群上的 Kafka 主题.

有关 Kafka 桥接的工作原理的更多信息,请参阅 Kafka 桥接.

本主题包括以下任务:

  • 创建 Kafka 接收器
  • 为 Kafka 接收器配置主题绑定
  • 查看接收器数据
  • 创建 Kafka 发送器
  • 为 Kafka 发送器配置队列绑定
  • 查看发送器数据

创建Kafka接收器

Kafka 接收器允许累积 Kafka 事件并将其转换为 SMF 消息.创建 Kafka 接收器时,事件代理会自动为每个 Kafka 接收器创建一个客户端,该客户端将从 Kafka 主题接收到的消息发布到 Solace 消息总线.客户端名称为 #kafka/rx/<rx-name>,客户端用户名为 #kafka/rx/<rx-name>。客户端用户名使用 #kafka 客户端配置文件和 #acl-profile ACL 配置文件.

如果在消息 VPN 上没有其他 Kafka 接收器或发送器,事件代理还会创建一个名为 #kafka 的客户端配置文件.此配置文件是 Kafka 发送器和接收器所必需的.当您创建第一个 Kafka 发送器或接收器时会创建此配置文件,如果您删除所有 Kafka 发送器和接收器,则会将其删除.

要创建 Kafka 接收器,请执行以下步骤:

  1. 打开事件代理管理器.有关说明,请参阅 PubSub+ 事件代理管理器.

  2. 选择一个消息 VPN.

  3. 在导航栏中点击 桥接.

  4. 选择 Kafka 桥接 选项卡.

  5. 选择 Kafka 接收器 选项卡.

  6. 点击 + Kafka 接收器.

  7. 输入接收器的 名称 并点击 创建.

  8. 为接收器设置以下选项:

    选项描述
    启用指定 Kafka 接收器是否启用.
    启动地址列表指定 Kafka 集群中一个 Kafka 经纪人的完全限定域名(FQDN)或 IP 地址以及可选端口,Kafka 接收器可以从该地址获取整个集群的状态.启动地址必须解析为 Kafka 经纪人上适当配置且与所选身份验证方案兼容的监听端口.如果您不提供端口,则默认端口为 9092.
    您可以为 Kafka 接收器配置一个以逗号分隔的地址列表,以便在尝试连接到第一个地址失败时尝试连接到下一个地址.
    以点分十进制表示法形式指定 IPv4 地址,nnn.nnn.nnn.nnn.您必须将 IPv6 地址用方括号括起来.以 0 到 65535 的数值指定端口.
    例如,正确格式化的 IPv4 地址是:192.168.100.1:9092.相同的 IPv6 地址格式是 [::ffff:c0a8:6401]:9092.该地址对应于 Kafka 消费者 API 参数 bootstrap.servers.
    身份验证方案指定 Kafka 接收器用于建立与远程 Kafka 集群连接的身份验证方案.选项包括:
    • —不需要身份验证.此选项可能适用于匿名连接或 Kafka 接收器不需要身份验证的情况.
    • 基本—使用指定的用户名和密码登录.凭据可以使用明文或使用 SSL 加密传输.
    • Scram—使用带盐的挑战响应身份验证(SCRAM)登录.您必须指定用户名、密码和 SCRAM 哈希.
    • 客户端证书—使用客户端证书登录以验证接收器的身份.您必须指定客户端证书的 PEM 格式内容和证书密码.指定的客户端证书也可以用于其他身份验证方案中识别 Kafka 接收器.
    • Kerberos—使用 Kerberos 机制登录.您必须指定远程 Kafka 经纪人的服务名称、Kafka 接收器的用户主体名称和 Kafka 接收器的 keytab 文件.
    • OAuth 客户端—使用 OAuth 2.0 客户端凭据登录.您必须指定 OAuth 客户端 ID 和令牌端点 URL.您还可以指定 OAuth 范围.| | 启用加密| 指定 Kafka 接收器是否启用 TLS 加密.禁用加密时,启动地址必须解析为 PLAINTEXT 或 SASL_PLAINTEXT 监听端口,启用加密时,必须解析为 SSL 或 SASL_SSL 监听端口.

    下表描述了 TLS/SSL 加密和身份验证方案设置的组合如何对应于 Kafka 消费者 API 参数 security.protocol

    TLS/SSL 加密身份验证方案security.protocol 参数的值
    no ssl客户端证书plaintext
    ssl客户端证书ssl
    no ssl基本ScramOAuth 客户端sasl_plaintext
    ssl基本ScramOAuth 客户端sasl_ssl
  9. (可选)点击 显示高级设置 并为接收器设置以下选项中的任何一个:

    选项描述
    延迟指定在从 Kafka 集群累积一批消息之前要等待的延迟,以毫秒为单位.
    此设置对应于 Kafka 消费者 API 参数 fetch.wait.max.ms.
    最大大小指定消息批次的最大大小,以字节为单位.
    此设置对应于 Kafka 消费者 API 参数 fetch.min.bytes.
    组 ID指定接收器的 Kafka 消费者组 ID.
    消费者组允许 Kafka 消费者协同工作并并行处理 Kafka 主题中的事件.同一组中的每个消费者都被分配来自 Kafka 主题或主题集的不同分区子集.根据您的部署,您可能需要指定 Kafka 接收器所属消费者组的某些详细信息.
    此设置对应于 Kafka 消费者 API 参数 group.id.
    保活间隔指定向消费者组成员发送保活消息之间的时间,以毫秒为单位.
    此设置对应于 Kafka 消费者 API 参数 heartbeat.interval.ms.
    保活超时指定在移除无响应的消费者组成员之前的时间,以毫秒为单位,这会触发组中其他成员之间的分区重新平衡.
    此设置对应于 Kafka 消费者 API 参数 session.timeout.ms.
    成员资格类型指定接收器的 Kafka 消费者组的成员资格类型.选项包括:
    • 动态—指定动态组成员资格.此选项对应于 Kafka 消费者 API 参数 group.instance.id 为空值.
    • 静态—指定静态组成员资格.静态成员可以在不触发重新平衡的情况下离开并重新加入组(在保活超时期间内).
      此选项对应于 Kafka 消费者 API 参数 group.instance.id 的字符串值 <broker-name>/<vpn-name>/<receiver-name>,其中 <broker-name> 是事件代理名称,<vpn-name> 是消息 VPN 名称,<receiver-name> 是 Kafka 接收器名称.| | 分区方案列表| 指定此接收器的消费者组的分区分配方案的有序、以逗号分隔的列表.支持急切("range.roundrobin")和协作("cooperative-sticky")方案.当选定的组领导者选择所有组成员提供的第一个共同策略时,请不要混合急切和协作方案.有关这些方案的更多信息,请参阅您的 Kafka 实现的文档.
      此设置对应于 Kafka 消费者 API 参数 partition.assignment.strategy.| | 主题排除列表| 指定要忽略的 Kafka 主题的以逗号分隔的列表.您必须将主题指定为正则表达式,包括 POSIX.2 正则表达式.每个正则表达式必须以 ^ 字符开头,否则将被解释为字面主题名称.| | 主题刷新间隔| 指定从 Kafka 集群刷新主题元数据之间的时间,以毫秒为单位.
      此设置对应于 Kafka 消费者 API 参数 topic.metadata.refresh.interval.ms.|
  10. 点击 应用.

为Kafka接收器配置主题

主题绑定指定事件代理接收消息的 Kafka 主题,并指定 Kafka 主题的消息如何发送到 PubSub+ 事件代理.

要配置主题绑定,请执行以下步骤:

  1. 打开事件代理管理器.有关说明,请参阅 PubSub+ 事件代理管理器.

  2. 选择一个消息 VPN.

  3. 在导航栏中点击 桥接.

  4. 选择 Kafka 桥接 选项卡.

  5. 选择 Kafka 接收器 选项卡.

  6. 选择您要为其设置主题绑定的接收器.

  7. 选择 主题绑定 选项卡.

  8. 点击 + 主题绑定.

  9. 输入主题绑定的 名称 并点击 创建.

  10. 为主题绑定设置以下选项:

    选项描述
    启用指定主题绑定是否启用.
    本地主题表达式指定用于生成从 Kafka 接收到的每条消息的 SMF 主题的替换表达式.此表达式可以包括从 Kafka 主题接收到的每条单独 Kafka 消息的元数据中提取的数据.有关更多信息,请参阅替换表达式概述.
    本地键表达式指定用于为从 Kafka 接收到的每条消息生成分区键的替换表达式,以确定消息发送到哪个队列分区.此表达式可以包括从 Kafka 主题接收到的每条单独 Kafka 消息的元数据中提取的字段.有关更多信息,请参阅替换表达式概述.
    初始偏移量指定如果组中没有成员已消费并提交任何偏移量,或者最后提交的偏移量已被删除,则从 Kafka 主题消费的初始偏移量.偏移量在每个分区中是唯一的.
    此设置对应于 Kafka 消费者 API auto.offset.reset 配置设置.选项包括:
    • 开始—从最早可用的偏移量开始
    • 结束—仅从新偏移量开始|
  11. 点击 应用.

查看接收器数据

要查看有关 Kafka 接收器的信息,请执行以下步骤:

  1. 打开事件代理管理器.有关说明,请参阅 PubSub+ 事件代理管理器.
  2. 选择一个消息 VPN.
  3. 在导航栏中点击 桥接.
  4. 选择 Kafka 桥接 选项卡.
  5. 选择 Kafka 接收器 选项卡.
  6. 选择您要查看信息的接收器.
  7. 点击 概览 选项卡以显示有关接收器的信息,包括消息速率、运行时间和连接数.
  8. 点击 统计 选项卡以显示接收器的其他统计信息.
  9. 点击 远程代理 选项卡以显示连接的 Kafka 经纪人的信息.

创建Kafka发送器

Kafka 发送器将 Solace 消息转换为 Kafka 事件,并将这些事件传播到远程 Kafka 集群.创建 Kafka 发送器时,事件代理会自动为每个 Kafka 发送器创建一个客户端,该客户端绑定到与 Kafka 发送器相同的队列.客户端名称为 #kafka/tx/<tx-name>,客户端用户名为 #kafka/tx/<tx-name>。客户端用户名使用 #kafka 客户端配置文件和 #acl-profile acl 配置文件.

如果在消息 VPN 上没有其他 Kafka 发送器或接收器,事件代理还会创建一个名为 #kafka 的客户端配置文件.此配置文件是 Kafka 发送器和接收器所必需的.当您创建第一个 Kafka 发送器或接收器时会创建此配置文件,如果您删除所有 Kafka 发送器和接收器,则会将其删除.

要创建 Kafka 发送器,请执行以下步骤:

  1. 打开事件代理管理器.有关说明,请参阅 PubSub+ 事件代理管理器.

  2. 选择一个消息 VPN.

  3. 在导航栏中点击 桥接.

  4. 选择 Kafka 桥接 选项卡.

  5. 选择 Kafka 发送器 选项卡.

  6. 点击 + Kafka 发送器.

  7. 输入发送器的 名称 并点击 创建.

  8. 为发送器设置以下选项:

    选项描述
    启用指定 Kafka 发送器是否启用.
    启动地址列表指定 Kafka 集群中一个 Kafka 经纪人的完全限定域名(FQDN)或 IP 地址以及可选端口,Kafka 发送器可以从该地址获取整个集群的状态.启动地址必须解析为 Kafka 经纪人上适当配置且与所选身份验证方案兼容的监听端口.如果您不提供端口,则默认端口为 9092.
    您可以为 Kafka 发送器配置一个以逗号分隔的地址列表,以便在尝试连接到第一个地址失败时尝试连接到下一个地址.
    以点分十进制表示法形式指定 IPv4 地址,nnn.nnn.nnn.nnn.您必须将 IPv6 地址用方括号括起来.以 0 到 65535 的数值指定端口.
    例如,正确格式化的 IPv4 地址是:192.168.100.1:9092.相同的 IPv6 地址格式是 [::ffff:c0a8:6401]:9092.该地址对应于 Kafka 消费者 API 参数 bootstrap.servers.
    启用幂等性指定是否启用幂等性.幂等性保证按顺序至少一次将消息传递到远程 Kafka 主题,但会牺牲性能.如果您启用此选项,则为 Kafka 发送器配置的每个队列绑定的确认模式必须设置为 all 才能正常工作.
    此设置对应于 Kafka 生产者 API 参数 enable.idempotence.
    启用幂等性时:
    • Kafka 发送器会为每条消息发送一个递增的序列号.
    • 远程 Kafka 经纪人会确认每条消息.
    • Kafka 经纪人会记住为每个 Kafka 发送器写入的最大序列号.
    • Kafka 经纪人会丢弃收到的序列号小于已写入的最大序列号的任何消息.
      如果禁用幂等性,Kafka 发送器可以自由地因超时、领导者更改等原因重新发送消息到 Kafka 经纪人.在这种情况下,可能会发生消息重复和/或重新排序.| | 身份验证方案| 指定 Kafka 发送器用于建立与远程 Kafka 集群连接的身份验证方案.选项包括:
    • —不需要身份验证
    • 基本—使用指定的用户名和密码登录
    • Scram—使用带盐的挑战响应身份验证(SCRAM)登录.您必须指定用户名、密码和 SCRAM 哈希
    • 客户端证书—使用客户端证书登录.您必须指定客户端证书的 PEM 格式内容和证书密码.指定的客户端证书也可以用于其他身份验证方案中识别 Kafka 发送器.
    • Kerberos—使用 Kerberos 机制登录.您必须指定远程 Kafka 经纪人的服务名称、Kafka 发送器的用户主体名称和 Kafka 发送器的 keytab 文件.
    • OAuth 客户端—使用 OAuth 2.0 客户端凭据登录.您必须指定 OAuth 客户端 ID、令牌端点 URL 和 OAuth 范围.| | 启用压缩| 指定 Kafka 发送器是否启用压缩.如果您启用压缩,则必须指定压缩类型和级别.
      压缩类型对应于 Kafka 生产者 API 参数 compression.codec,选项包括:
    • Gzip—使用 Gzip 压缩
    • Snappy—使用 Snappy 压缩
    • Lz4—使用 LZ4 压缩
    • Zstd—使用 Zstandard 压缩
      压缩级别对应于 Kafka 生产者 API 参数 compression.level,选项包括:
    • -1—使用编解码器依赖的默认压缩级别
    • 0- 9 适用于 Gzip 压缩
    • 0 适用于 Snappy 压缩
    • 0- 12 适用于 LZ4 压缩
    • 0- 22 适用于 Zstandard 压缩| | 启用加密| 指定 Kafka 发送器是否启用 TLS 加密.禁用加密时,启动地址必须解析为 PLAINTEXT 或 SASL_PLAINTEXT 监听端口,启用加密时,必须解析为 SSL 或 SASL_SSL 监听端口.|
  9. (可选)点击 显示高级设置 并为接收器设置以下选项中的任何一个:

    选项描述
    延迟指定在向 Kafka 集群累积一批消息之前要等待的延迟,以毫秒为单位。此设置对应于 Kafka 生产者 API 参数 queue.buffering.max.ms.
    最大消息数指定在单个批次中向 Kafka 集群发送的最大消息数。此设置对应于 Kafka 生产者 API 参数 batch.num.messages.
    最大大小指定消息批次的最大大小,以字节为单位。此设置对应于 Kafka 生产者 API 参数 batch.size.
  10. 点击 应用.

为Kafka发送器配置队列绑定

队列绑定指定 PubSub+ 事件代理上的一个队列,并指定队列中的消息如何发送到 Kafka 主题.

要配置队列绑定,请执行以下步骤:

  1. 打开事件代理管理器.有关说明,请参阅 PubSub+ 事件代理管理器.

  2. 选择一个消息 VPN.

  3. 在导航栏中点击 桥接.

  4. 选择 Kafka 桥接 选项卡.

  5. 选择 Kafka 发送器 选项卡.

  6. 选择您要为其设置队列绑定的发送器.

  7. 选择 队列绑定 选项卡.

  8. 点击 + 队列绑定.

  9. 选择一个队列并点击 创建.

  10. 为队列绑定设置以下选项:

    选项描述
    启用指定队列绑定是否启用.
    远程主题指定要将队列中的每条消息发送到的 Kafka 集群上的主题.
    远程键表达式指定用于为发送到 Kafka 的每条消息生成分区键的替换表达式,以确定消息发送到哪个队列分区.此表达式可以包括从 Kafka 主题接收到的每条单独 Kafka 消息的元数据中提取的字段.有关更多信息,请参阅替换表达式概述.
    确认模式指定此队列绑定从远程 Kafka 集群需要的确认数量.
    确认模式对应于 Kafka 生产者 API 参数 request.required.acks.选项包括:
    • —指定不需要从远程 Kafka 集群确认.如果您选择此选项,消息将最多传递一次.
    • 一个—指定需要从远程 Kafka 集群确认一个.如果选择此选项,消息将至少传递一次,但可能会重新排序.
    • 全部—指定远程 Kafka 集群上的所有副本必须确认消息.如果选择此选项,消息将至少传递一次,但可能会重新排序.| | 分区方案| 指定队列绑定在发布到远程 Kafka 集群时使用的分区选择方案.选项包括:
    • 一致—队列绑定根据 Kafka 发送器生成的 Kafka 分区键的哈希选择 Kafka 分区
    • 特定—队列绑定根据指定的分区号选择 Kafka 分区
    • 随机—队列绑定选择一个随机的 Kafka 分区.默认情况下,此分区选择方案用作一致分区选择方案的回退,在一致分区选择方案使用但消息没有分区键时使用.| | 哈希算法| 如果您选择 一致 分区方案,则指定用于选择分区的哈希算法.选项包括:
    • CRC
    • Murmur2
    • Fowler-Noll-Vo 1a (Fnv1A)| | 启用随机回退| 如果您选择 一致 分区方案,则指定如果没有可用的消息键,是否使用随机选择方案作为回退.如果您禁用此选项,则为所有无键消息选择一个单一分区.| | 编号| 如果您选择 显式 分区方案,则指定分区号.|
  11. 点击 应用.

查看发送器数据

要查看有关 Kafka 发送器的信息,请执行以下步骤:

  1. 打开事件代理管理器.有关说明,请参阅 PubSub+ 事件代理管理器.
  2. 选择一个消息 VPN.
  3. 在导航栏中点击 桥接.
  4. 选择 Kafka 桥接 选项卡.
  5. 选择 Kafka 发送器 选项卡.
  6. 选择您要查看信息的接收器.
  7. 点击 概览 选项卡以显示有关发送器的信息,包括消息速率、桥接状态和连接数.
  8. 点击 统计 选项卡以显示接收器的其他统计信息.
  9. 点击 远程代理 选项卡以显示连接的 Kafka 经纪人的信息.