跳到主要内容

Kafka桥接示例

以下各节提供了一个示例,说明如何使用Kafka桥接在Solace事件代理和Kafka集群之间交换消息。

在独立部署中使用BETA版本(早于10.6.1)进行的任何Kafka桥接配置将在升级到10.6.1及更高版本时被丢弃。此外,在尝试升级到10.6.1或更高版本之前,您必须移除在冗余(HA)部署中使用的任何BETA Kafka桥接配置。

开始之前

在开始此示例之前,您必须具备以下条件:

  • 一个配置了基本用户名和密码认证的Kafka集群。
  • 一个已配置的消息VPN。
  • 在消息VPN中配置了适当主题订阅的队列(在本例中为q2)。
  • 已配置适当的系统和消息VPN扩展参数以启用Kafka桥接。有关更多信息,请参见“Kafka代理连接的最大数量”和“配置Kafka代理连接的最大数量”。

步骤1:配置Kafka接收器

在此示例中,第一步是在事件代理上配置一个Kafka接收器,以便它可以从远程Kafka集群消费事件。

要配置Kafka接收器:

  1. 创建Kafka接收器。在本例中,Kafka接收器receiver1配置在kafka-bridging-vpn消息VPN上。

    solace> enable
    solace# configure
    solace(configure)# message-vpn kafka-bridging-vpn
    solace(configure/message-vpn)# kafka
    solace(configure/message-vpn/kafka)# create kafka-receiver receiver1
  2. 配置引导地址列表。在本例中,至少可以在kafka.example.com找到远程Kafka集群中的一个Kafka代理。

    solace(...gure/message-vpn/kafka/kafka-receiver)# bootstrap-addresses kafka.example.com
  3. 选择认证方案。此示例使用基本用户名和密码组合来与远程Kafka集群进行认证。

    solace(...gure/message-vpn/kafka/kafka-receiver)# authentication auth-scheme basic
  4. 配置认证方案。

    solace(...gure/message-vpn/kafka/kafka-receiver)# authentication basic username example-user1 password example-password1
  5. 可选。配置消费者组。此示例将Kafka接收器的消费者组ID设置为cg1

    solace(...gure/message-vpn/kafka/kafka-receiver)# group id cg1
  6. 创建主题绑定。在本例中,Kafka接收器从远程Kafka集群上配置的主题topic1消费事件。

    solace(...gure/message-vpn/kafka/kafka-receiver)# create topic-binding topic1
  7. 配置主题绑定。在本例中,Kafka接收器从远程Kafka集群消费的所有事件都将发布到事件代理上的主题rectopic1

    solace(...pn/kafka/kafka-receiver/topic-binding)# local topic rectopic1
  8. 启用主题绑定。

    solace(...pn/kafka/kafka-receiver/topic-binding)# no shutdown
  9. 启用Kafka接收器。

    solace(...pn/kafka/kafka-receiver/topic-binding)# exit
    solace(...gure/message-vpn/kafka/kafka-receiver)# no shutdown

步骤2:配置Kafka发送器

接下来,我们将在事件代理上配置一个Kafka发送器,以便它可以向远程Kafka集群发送消息。

要配置Kafka发送器:

  1. 创建Kafka发送器。在本例中,Kafka发送器sender1配置在kafka-bridging-vpn消息VPN上。

    solace> enable
    solace# configure
    solace(configure)# message-vpn kafka-bridging-vpn
    solace(configure/message-vpn)# kafka
    solace(configure/message-vpn/kafka)# create kafka-sender sender1
  2. 配置引导地址列表。

    solace(...gure/message-vpn/kafka/kafka-sender)# bootstrap-addresses kafka.example.com
  3. 选择认证方案。

    solace(...gure/message-vpn/kafka/kafka-sender)# authentication auth-scheme basic
  4. 配置认证方案。

    solace(...gure/message-vpn/kafka/kafka-sender)# authentication basic username example-user1 password example-password1
  5. 可选。配置幂等性。

    solace(configure/message-vpn/kafka/kafka-sender)# idempotence
  6. 创建队列绑定。在本例中,Kafka发送器绑定到事件代理上配置的队列q2

    solace(configure/message-vpn/kafka/kafka-sender)# create queue-binding q2
  7. 配置队列绑定。在本例中,Kafka发送器期望从Kafka集群的所有副本获得确认,包含一个一致的分区方案,该方案根据绑定生成的分区键的CRC哈希选择Kafka分区(在此例中为key0)。此外,所有消息都发送到远程Kafka集群上的主题topic2

    solace(...-vpn/kafka/kafka-sender/queue-binding)# ack-mode all
    solace(...-vpn/kafka/kafka-sender/queue-binding)# partition scheme consistent
    solace(...-vpn/kafka/kafka-sender/queue-binding)# partition consistent hash crc
    solace(...-vpn/kafka/kafka-sender/queue-binding)# remote key key0
    solace(...-vpn/kafka/kafka-sender/queue-binding)# remote topic topic2
  8. 启用队列绑定。

    solace(...-vpn/kafka/kafka-sender/queue-binding)# no shutdown
  9. 启用Kafka发送器。

    solace(...-vpn/kafka/kafka-sender/queue-binding)# exit
    solace(configure/message-vpn/kafka/kafka-sender)# no shutdown

通过此配置,您现在可以向Kafka集群发送消息并从Kafka集群接收事件。