跳到主要内容

配置Kafka桥接

Kafka桥接使Solace PubSub+软件事件代理能够将放置在一个或多个队列中的Solace消息转换为Kafka事件,并将其发布到远程Kafka集群,以及消费Kafka事件并将它们发布到Solace事件代理的主题上。有关更多信息,请参见Kafka桥接。

要配置Kafka桥接,您需要创建以下内容:

  1. Kafka接收器——从一个或多个Kafka主题接收事件,将事件转换为Solace消息,并以Solace消息格式(SMF)发布到PubSub+事件代理的主题上。

  2. Kafka发送器——从一个或多个队列中获取SMF消息,将消息转换为Kafka事件,并将其发布到远程Kafka集群的Kafka主题上。

此外,您可能希望更改事件代理支持的当前消息VPN的Kafka代理连接的最大数量。有关更多信息,请参见配置Kafka代理连接的最大数量。

以下部分描述了如何配置Kafka接收器和Kafka发送器。有关详细示例,展示了设置工作Kafka桥接场景所需的所有步骤,请参见Kafka桥接示例。

有关在PubSub+ Broker Manager中配置Kafka桥接的说明,请参见配置Kafka桥接。

在独立部署中使用的BETA版本(10.6.1之前)中进行的任何Kafka桥接配置在升级到10.6.1及更高版本时将被丢弃。此外,在尝试升级到10.6.1或更高版本之前,您必须删除在冗余(HA)部署中使用的任何BETA Kafka桥接配置。

配置Kafka接收器

在事件代理上,Kafka事件到Solace消息的累积和转换由Kafka接收器启用,该接收器在消息VPN级别进行配置。您可以为接收器指定任何名称。当您为消息VPN创建Kafka接收器时,事件代理会自动创建以下对象:

  • 每个Kafka接收器的单个客户端。此客户端将从其所有主题绑定的Kafka主题接收到的消息发布到Solace消息总线。此客户端的名称为#kafka/rx/<rx-name>,并使用客户端用户名#kafka/rx/<rx-name>。此客户端用户名使用#kafka客户端配置文件和#acl-profile ACL配置文件。

  • 单个客户端配置文件,称为#kafka,这是消息VPN中所有Kafka发送器和接收器所需的。当创建第一个Kafka发送器或接收器时创建此配置文件,并在删除最后一个Kafka发送器或接收器时删除。

img

有关更多信息,请参见Kafka接收器。

要创建Kafka接收器,请输入以下命令:

solace(configure)# message-vpn <name>
solace(configure/message-vpn)# kafka
solace(configure/message-vpn/kafka)# create kafka-receiver <name>

要配置现有的Kafka接收器:

solace(configure)# message-vpn <name>
solace(configure/message-vpn)# kafka
solace(configure/message-vpn/kafka)# kafka-receiver <name>

要启用已创建的Kafka接收器:

solace(configure/message-vpn/kafka/kafka-receiver)# no shutdown

其中:

message-vpn <name> 是您希望创建Kafka接收器的消息VPN的名称。

kafka-receiver <name> 是Kafka接收器的名称。

您可以为Kafka接收器执行的配置任务包括:

  • 配置Kafka接收器的身份验证方案

  • 配置Kafka接收器的消息批处理

  • 配置Kafka接收器的引导地址列表

  • 配置Kafka接收器的消费者组

  • 配置Kafka接收器的主题元数据处理

  • 配置Kafka接收器的主题绑定

  • 为Kafka接收器连接启用TLS/SSL加密

配置Kafka接收器的身份验证方案

要配置Kafka接收器将用于与远程Kafka集群建立连接的身份验证方案,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver)# authentication
solace(configure/message-vpn/kafka/kafka-receiver/authentication)# auth-scheme {none | basic | scram | client-certificate | kerberos | oauth-client}

其中:

none 指定不使用身份验证进行登录。有关更多信息,请参见无身份验证。

basic 指定使用用户名和密码进行登录。有关更多信息,请参见基本身份验证。

scram 指定使用SCRAM(Salted Challenge Response Authentication)进行登录。有关更多信息,请参见SCRAM身份验证。

client-certificate 指定使用客户端TLS证书进行登录。有关更多信息,请参见客户端证书身份验证。

kerberos 指定使用Kerberos机制进行登录。有关更多信息,请参见Kerberos身份验证。

oauth-client 指定使用OAuth 2.0客户端凭据进行登录。有关更多信息,请参见OAuth客户端身份验证。

无身份验证

如果未配置Kafka接收器的身份验证方案,则接收器不会使用身份验证方案。这可能适用于匿名连接或Kafka接收器不需要身份验证的情况。

基本身份验证

使用基本身份验证方案,Kafka接收器可以使用用户名和密码组合进行身份验证。凭据可以使用明文传输或使用SSL加密。

要配置基本身份验证方案的设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# basic
solace(configure/message-vpn/kafka/kafka-receiver/authentication/basic)# username <name> [password <password>]

其中:

<name> 是用于与远程Kafka集群进行身份验证的客户端用户名。用户名可以包含最多255个字符。

<password> 是与指定用户名一起使用的密码。密码可以包含最多255个字符。

此命令的否定版本no username,将移除任何配置的用户名和密码。

SCRAM身份验证

当使用SCRAM身份验证(RFC 5802)时,Kafka接收器使用挑战/响应机制与远程Kafka集群进行用户名和密码的身份验证。

要配置SCRAM身份验证方案的设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# scram
solace(configure/message-vpn/kafka/kafka-receiver/authentication/scram)# hash {sha-256 | sha-512}
solace(configure/message-vpn/kafka/kafka-receiver/authentication/scram)# username <name> [password <password>]

其中:

sha-256 指定使用SHA-2 256位哈希进行SCRAM身份验证。

sha-512 指定使用SHA-2 512位哈希进行SCRAM身份验证。这是默认设置。

<name> 是用于与远程Kafka集群进行身份验证的客户端用户名。用户名可以包含最多255个字符。

<password> 是与指定用户名一起使用的密码。密码可以包含最多255个字符。

hash命令的否定形式no hash,将值重置为默认值。

username命令的否定形式no username,将移除任何配置的用户名和密码。

客户端证书身份验证

当使用客户端证书身份验证时,Kafka接收器提供证书文件以验证其身份。客户端证书身份验证仅适用于使用TLS/SSL的连接(请参见为Kafka接收器连接启用TLS/SSL加密)。

在此处安装的客户端证书也可以在Kafka集群请求其他身份验证方案时使用。如果您为Kafka接收器配置了客户端证书身份验证方案,接收器仅提供此客户端证书作为其身份识别的手段。然而,在使用基本或SCRAM身份验证方案时也可以提供客户端证书。

要配置客户端证书身份验证方案的设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# client-certificate
solace(configure/message-vpn/kafka/kafka-receiver/authentication/client-certificate)# certificate <filename>

其中:

<filename> 是证书文件的文件名。证书文件必须位于事件代理的certs文件夹中。一旦配置了证书,其副本将被内部保存。certs目录中的文件不再需要。

此命令的否定版本no certificate-file,将移除当前Kafka接收器的证书关联。

Kerberos身份验证

当使用Kerberos身份验证时,Kafka接收器必须首先与Kerberos认证服务器(AS)进行身份验证,该服务器授予Kafka接收器一个票据授予票据(TGT)。有了有效的TGT,Kafka接收器可以尝试使用从票据授予服务(TGS)获得的服务票据与远程Kafka代理进行身份验证。AS和TGS(密钥分发中心(KDC)的组件)托管在外部服务器上,而不是在事件代理上。

要为Kafka接收器实现Kerberos身份验证,您必须配置以下内容:

  1. 远程Kafka代理的服务名称。

  2. Kafka接收器的用户主体名称。

  3. Kafka接收器的keytab文件。

此Kafka接收器和远程Kafka代理所属的Kerberos领域(包括KDC地址)必须在启用Kerberos身份验证之前进行配置。有关更多信息,请参见管理Kerberos领域。

要配置这些设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# kerberos
solace(configure/message-vpn/kafka/kafka-receiver/authentication/kerberos)# service-name <value>
solace(configure/message-vpn/kafka/kafka-receiver/authentication/kerberos)# user-principal-name <value> <keytab-file>

其中:

service-name <value> 是远程Kafka代理的Kerberos服务名称,不包括/hostname@<REALM>

user-principal-name <value> 是Kafka接收器的Kerberos用户主体名称。这必须包括@<REALM>后缀。

<keytab-file> 是Kerberos keytab文件的文件名。keytab文件必须位于事件代理的keytabs文件夹中。这些keytab与用于客户端身份验证的keytab不同。用于Kafka桥接身份验证的keytab包含用户主体名称而不是服务主体名称,并适用于特定的消息VPN而不是全局。

OAuth客户端身份验证

当使用OAuth客户端身份验证时,Kafka接收器使用客户端凭据授权流程(RFC 6749 §4.4),也称为两腿OAuth,从授权服务器获取访问令牌。在此流程中,使用基本身份验证向授权服务器发送简单请求以获取访问令牌。然后,Kafka接收器可以使用返回的访问令牌对远程Kafka集群进行认证请求,直到令牌过期(令牌通常有效一小时)。当令牌即将过期时,Kafka消费者将自动请求另一个。

要启用OAuth客户端身份验证,您必须配置Kafka接收器将用于请求访问令牌的客户端ID、客户端秘密和令牌端点。可选地,您还可以配置OAuth范围。

要配置这些设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# oauth-client
solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# client-id <client-id>
solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# client-secret <client-secret>
solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# token-endpoint <token-endpoint>
solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# scope <scope>

其中:

client-id <client-id> 是Kafka接收器在请求访问令牌时用于登录授权服务器的OAuth客户端ID。OAuth客户端ID可以包含最多200个字符。

client-secret <client-secret> 是Kafka接收器在请求访问令牌时用于登录授权服务器的OAuth客户端秘密。OAuth客户端秘密可以包含最多512个字符。

token-endpoint <token-endpoint> 是Kafka接收器用于请求登录远程Kafka集群的令牌的OAuth令牌端点URL。OAuth令牌端点可以包含最多2048个字符。此外,令牌端点参数必须使用TLS,即必须以https://开头(不区分大小写)。

(可选)scope <scope> 是OAuth范围。OAuth范围可以包含最多200个字符。

配置Kafka接收器的消息批处理

要配置事件代理在从Kafka集群累积一批消息之前等待的延迟,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver)# batch
solace(configure/message-vpn/kafka/kafka-receiver/batch)# delay <ms>

其中:

<ms> 指定在从Kafka集群累积一批消息之前等待的延迟(以毫秒为单位)。有效值范围为0-300000。默认值为500。这对应于Kafka消费者API参数fetch.wait.max.ms

此命令的否定版本no delay,将值重置为默认值。

要配置消息批的最大大小,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver)# batch
solace(configure/message-vpn/kafka/kafka-receiver/batch)# max-size <bytes>

其中:

<bytes> 指定消息批的最大大小(以字节为单位)。有效值范围为1到100000000。默认值为1。这对应于Kafka消费者API参数fetch.min.bytes

此命令的否定版本no max-size,将值重置为默认值。

配置Kafka接收器的引导地址列表

引导地址是Kafka集群中一个Kafka代理的完全限定域名(FQDN)或IP地址和可选端口,Kafka接收器可以从该地址获取整个集群的状态。您可以为Kafka接收器配置这些地址的列表,以便在尝试连接到一个地址失败时尝试连接。

要配置Kafka接收器的引导地址列表,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver)# bootstrap-addresses <address-list>

其中:

<address-list> 是Kafka集群中代理的FQDN或地址(和可选端口)的逗号分隔列表,可以从这些地址获取整个集群的状态。IPv4地址必须以点分十进制表示法形式指定,nnn.nnn.nnn.nnn。IPv6地址必须用方括号括起来。端口指定为0到65535的十进制值。例如,正确格式化的IPv4地址是:192.168.100.1:9092。相同的IPv6格式地址是[::ffff:c0a8:6401]:9092。这对应于Kafka消费者API参数bootstrap.servers

如果未提供端口,则默认为9092。

此命令的否定形式no bootstrap-addresses,将从Kafka接收器中移除引导地址列表。

配置Kafka接收器的消费者组

消费者组允许Kafka消费者协同工作,并从主题中并行处理Kafka事件。同一组中的每个消费者都被分配了来自Kafka主题或主题集的不同分区子集。根据您的部署,您可能希望指定Kafka接收器所属消费者组的某些详细信息。

要配置消费者组,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver)# group

您可以为Kafka接收器消费者组执行的配置任务包括:

  • 配置Kafka消费者组ID

  • 配置Kafka消费者组保持活动设置

  • 配置Kafka消费者组成员类型

  • 配置Kafka消费者组分区方案

配置Kafka消费者组ID

要配置Kafka接收器的Kafka消费者组ID,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/group)# id <value>

其中:

<value> 指定ID。这对应于Kafka消费者API参数group.id

此命令的否定形式no id,将移除消费者组ID的指定。

配置Kafka消费者组保持活动设置

要配置Kafka消费者组的保持活动间隔,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/group)# interval <ms>

其中:

<ms> 指定发送保持活动到组之间的时间(以毫秒为单位)。有效值范围为1到3600000。默认值为3000。这对应于Kafka消费者API参数heartbeat.interval.ms

此命令的否定形式no interval,将值重置为默认值。

要配置Kafka消费者组的保持活动超时,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/group)# timeout <ms>

其中:

<ms> 指定直到无响应的组成员被移除的时间(以毫秒为单位),这将触发组中其他成员之间的分区重新平衡。有效值范围为1到3600000。默认值为45000。这对应于Kafka消费者API参数session.timeout.ms

此命令的否定形式no timeout,将值重置为默认值。

配置Kafka消费者组成员类型

要配置Kafka接收器消费者组的成员类型,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/group)# membership-type {dynamic | static}

其中:

dynamic 指定动态成员。这是默认值。如果组成员类型为dynamic,则Kafka消费者API参数group.instance.id为空。 static 指定静态成员。静态成员可以在保持活动超时值内离开并重新加入组,而不会触发组重新平衡。如果组成员类型为static,则Kafka消费者API参数group.instance.id设置为字符串<broker-name>/<vpn-name>/<receiver-name>,其中<broker-name>是AD启用的路由器名称,<vpn-name>是消息VPN名称,<receiver-name>是Kafka接收器名称。

此命令的否定版本no membership-type,将值重置为默认值。

配置Kafka消费者组分区方案

要配置Kafka接收器消费者组的分区方案,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/group)# partition-scheme <partition-scheme-list>

其中:

<partition-scheme-list> 是此接收器消费者组分区分配的有序、逗号分隔的方案列表。支持Eager("range, roundrobin")和Cooperative("cooperative-sticky")方案。当选定的组领导者将选择所有组成员提供的第一个共同策略。Eager和Cooperative方案不得混合使用。有关这些方案的更多信息,请参见您的Kafka实现的文档。默认分区方案是("range, roundrobin")。这对应于Kafka消费者API参数partition.assignment.strategy

此命令的否定版本no partition-scheme,将值重置为默认值。

配置Kafka接收器的主题元数据处理

要排除在刷新期间的某些主题元数据,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver)# metadata
solace(configure/message-vpn/kafka/kafka-receiver/metadata)# topic
solace(configure/message-vpn/kafka/kafka-receiver/metadata/topic)# exclude <regex-list>

其中:

<regex-list> 是正则表达式(包括POSIX.2正则表达式)的逗号分隔列表。任何匹配的主题名称将在代理元数据中被忽略。请注意,每个正则表达式都需要以^字符开头,否则将被解释为字面量主题名称。

此命令的否定形式no exclude,将移除所有配置的表达式。

要配置从Kafka集群刷新主题元数据之间的时间间隔,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver)# metadata
solace(configure/message-vpn/kafka/kafka-receiver/metadata)# topic
solace(configure/message-vpn/kafka/kafka-receiver/metadata/topic)# refresh-interval <ms>

其中:

<ms> 指定从Kafka集群刷新主题元数据之间的时间(以毫秒为单位)。有效值范围为1000到3600000。默认值为30000。这对应于Kafka消费者API参数topic.metadata.refresh.interval.ms

此命令的否定形式no refresh-interval,将值重置为默认值。

配置Kafka接收器的主题绑定

每个主题绑定指定了从中获取消息的Kafka主题,并包括决定如何将该Kafka主题的消息发送到Solace PubSub+软件事件代理的属性。

要创建主题绑定以从远程Kafka主题接收消息,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver)# create topic-binding <kafka-topic-or-regex>

其中:

<kafka-topic-or-regex> 是您希望此接收器从中获取消息的Kafka主题的名称,或用于从多个Kafka主题接收的正则表达式模式(包括POSIX.2正则表达式)。请注意,每个正则表达式都需要以^字符开头,否则将被解释为字面量主题名称。

要启用主题绑定,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# no shutdown

您可以为Kafka接收器主题绑定执行的配置任务包括:

  • 配置从Kafka主题消费的初始偏移量

  • 配置分区键生成

  • 配置SMF主题生成

配置从Kafka主题消费的初始偏移量

您可以配置Kafka接收器从Kafka主题消费的初始偏移量,如果消费者组的任何成员尚未消费并提交任何偏移量,或者最后一个提交的偏移量已被删除。偏移量在Kafka分区中是唯一的。

要配置Kafka接收器从Kafka主题消费的初始偏移量,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# initial-offset {beginning | end}

其中:

beginning 指定从最早的可用偏移量开始。

end 指定仅从新偏移量开始。这是默认值。

初始偏移量对应于Kafka消费者API参数auto.offset.reset

此命令的否定形式no initial-offset,将值重置为默认值。

配置分区键生成

当从Kafka主题接收到消息时,您可以配置主题绑定为每条消息生成分区键。这对于确定消息发送到哪个队列分区非常有用。有关更多信息,请参见使用分区队列的消息分发。

要配置主题绑定如何为从Kafka主题接收到的每条消息生成分区键,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# local
solace(configure/message-vpn/kafka/kafka-receiver/topic-binding/local)# key <key-expression>

其中:

<key-expression> 指定用于为从Kafka主题接收到的每条消息生成分区键的替换表达式。有关更多信息,请参见替换表达式概述。如果没有配置值,则在消息发布到事件代理时不会为每条消息包含键。

配置SMF主题生成

当从Kafka主题接收到消息时,它们会根据您在创建主题绑定时配置的表达式发布到SMF主题。

要配置主题绑定如何为从Kafka主题接收到的每条消息生成SMF主题,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# local
solace(configure/message-vpn/kafka/kafka-receiver/topic-binding/local)# topic <topic-expression>

其中:

<topic-expression> 指定用于为从Kafka主题接收到的每条消息生成SMF主题的替换表达式。有关更多信息,请参见替换表达式概述。此表达式可以包括从Kafka主题接收到的每条单独消息的元数据中提取的数据。如果没有配置值,则主题绑定将无法运行。

为Kafka接收器连接启用TLS/SSL加密

要为Kafka接收器连接启用TLS/SSL加密,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-receiver)# transport
solace(configure/message-vpn/kafka/kafka-receiver/transport)# ssl

下表描述了TLS/SSL加密和身份验证方案设置的组合如何对应于Kafka消费者API参数security.protocol

TLS/SSL加密身份验证方案security.protocol参数的值
no sslnoneclient-certificateplaintext
sslnoneclient-certificatessl
no sslbasicscramoauth-clientsasl_plaintext
sslbasicscramoauth-clientsasl_ssl

此命令的否定形式no ssl,将禁用TLS/SSL加密。

配置Kafka发送器

在事件代理上,Solace消息到Kafka事件的转换以及这些事件到远程Kafka集群的传播由Kafka发送器启用,该发送器在消息VPN级别进行配置。您可以为发送器指定任何名称。当您为消息VPN创建Kafka发送器时,事件代理会自动创建以下对象:

  • 每个Kafka发送器的单个客户端。此客户端绑定到其发送器的队列绑定所需的队列。此客户端的名称为#kafka/tx/<tx-name>,并使用客户端用户名#kafka/tx/<tx-name>。此客户端用户名使用#kafka客户端配置文件和#acl-profile acl配置文件。

  • 单个客户端配置文件,称为#kafka,这是消息VPN中所有Kafka发送器和接收器所需的。当创建第一个Kafka发送器或接收器时创建此配置文件,并在删除最后一个Kafka发送器或接收器时删除。

img

有关更多信息,请参见Kafka发送器。

要创建Kafka发送器,请输入以下命令:

solace(configure)# message-vpn <name>
solace(configure/message-vpn)# kafka
solace(configure/message-vpn/kafka)# create kafka-sender <name>

要配置现有的Kafka发送器:

solace(configure)# message-vpn <name>
solace(configure/message-vpn)# kafka
solace(configure/message-vpn/kafka)# kafka-sender <name>

要启用已创建的Kafka发送器:

solace(configure/message-vpn/kafka/kafka-sender)# no shutdown

其中:

message-vpn <name> 是您希望创建Kafka发送器的消息VPN的名称。

kafka-sender <name> 是Kafka发送器的名称。

您可以为Kafka发送器执行的配置任务包括:

  • 配置Kafka发送器的身份验证方案

  • 配置Kafka发送器的消息批处理

  • 配置Kafka发送器的引导地址列表

  • 为Kafka发送器配置压缩

  • 为Kafka发送器启用幂等性

  • 配置Kafka发送器的队列绑定

  • 为Kafka发送器连接启用TLS/SSL加密

配置Kafka发送器的身份验证方案

要配置Kafka发送器将用于与远程Kafka集群建立连接的身份验证方案,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# authentication
solace(configure/message-vpn/kafka/kafka-sender/authentication)# auth-scheme {none | basic | scram | client-certificate | kerberos | oauth-client}

其中:

none 指定不使用身份验证进行登录。有关更多信息,请参见无身份验证。

basic 指定使用用户名和密码进行登录。有关更多信息,请参见基本身份验证。

scram 指定使用SCRAM(Salted Challenge Response Authentication)进行登录。有关更多信息,请参见SCRAM身份验证。

client-certificate 指定使用客户端TLS证书进行登录。有关更多信息,请参见客户端证书身份验证。

kerberos 指定使用Kerberos机制进行登录。有关更多信息,请参见Kerberos身份验证。

oauth-client 指定使用OAuth 2.0客户端凭据进行登录。有关更多信息,请参见OAuth客户端身份验证。

无身份验证

如果未配置Kafka发送器的身份验证方案,则发送器不会使用身份验证方案。这可能适用于匿名连接或Kafka发送器不需要身份验证的情况。

基本身份验证

使用基本身份验证方案,Kafka发送器可以使用用户名和密码组合进行身份验证。凭据可以使用明文传输或使用SSL加密。

要配置基本身份验证方案的设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# basic
solace(configure/message-vpn/kafka/kafka-sender/authentication/basic)# username <name> [password <password>]

其中:

<name> 是用于与远程Kafka集群进行身份验证的客户端用户名。用户名可以包含最多255个字符。

<password> 是与指定用户名一起使用的密码。密码可以包含最多255个字符。

此命令的否定版本no username,将移除任何配置的用户名和密码。

SCRAM身份验证

当使用SCRAM身份验证(RFC 5802)时,Kafka发送器使用挑战/响应机制与远程Kafka集群进行用户名和密码的身份验证。

要配置SCRAM身份验证方案的设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# scram
solace(configure/message-vpn/kafka/kafka-sender/authentication/scram)# hash {sha-256 | sha-512}
solace(configure/message-vpn/kafka/kafka-sender/authentication/scram)# username <name> [password <password>]

其中:

sha-256 指定使用SHA-2 256位哈希进行SCRAM身份验证。

sha-512 指定使用SHA-2 512位哈希进行SCRAM身份验证。这是默认设置。

<name> 是用于与远程Kafka集群进行身份验证的客户端用户名。用户名可以包含最多255个字符。

<password> 是与指定用户名一起使用的密码。密码可以包含最多255个字符。

hash命令的否定形式no hash,将值重置为默认值。

username命令的否定形式no username,将移除任何配置的用户名和密码。

客户端证书身份验证

当使用客户端证书身份验证时,Kafka发送器提供证书文件以验证其身份。客户端证书身份验证仅适用于使用TLS/SSL的连接(请参见为Kafka接收器连接启用TLS/SSL加密)。

在此处安装的客户端证书也可以在Kafka集群请求其他身份验证方案时使用。如果您为Kafka发送器配置了客户端证书身份验证方案,发送器仅提供此客户端证书作为其身份识别的手段。然而,在使用基本或SCRAM身份验证方案时也可以提供客户端证书。

要配置客户端证书身份验证方案的设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# client-certificate
solace(configure/message-vpn/kafka/kafka-sender/authentication/client-certificate)# certificate <filename>

其中:

<filename> 是证书文件的文件名。证书文件必须位于事件代理的certs文件夹中。一旦配置了证书,其副本将被内部保存。certs目录中的文件不再需要。

此命令的否定版本no certificate-file,将移除当前Kafka发送器的证书关联。

Kerberos身份验证

当使用Kerberos身份验证时,Kafka发送器必须首先与Kerberos认证服务器(AS)进行身份验证,该服务器授予Kafka发送器一个票据授予票据(TGT)。有了有效的TGT,Kafka发送器可以尝试使用从票据授予服务(TGS)获得的服务票据与远程Kafka代理进行身份验证。AS和TGS(密钥分发中心(KDC)的组件)托管在外部服务器上,而不是在事件代理上。

要为Kafka发送器实现Kerberos身份验证,您必须配置以下内容:

  1. 远程Kafka代理的服务名称。

  2. Kafka发送器的用户主体名称。

  3. Kafka发送器的keytab文件。

此Kafka发送器和远程Kafka代理所属的Kerberos领域(包括KDC地址)必须在启用Kerberos身份验证之前进行配置。有关更多信息,请参见管理Kerberos领域。

要配置这些设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# kerberos
solace(configure/message-vpn/kafka/kafka-sender/authentication/kerberos)# service-name <value>
solace(configure/message-vpn/kafka/kafka-sender/authentication/kerberos)# user-principal-name <value> <keytab-file>

其中:

service-name <value> 是远程Kafka代理的Kerberos服务名称,不包括/hostname@<REALM>

user-principal-name <value> 是Kafka发送器的Kerberos用户主体名称。这必须包括@<REALM>后缀。

<keytab-file> 是Kerberos keytab文件的文件名。keytab文件必须位于事件代理的keytabs文件夹中。这些keytab与用于客户端身份验证的keytab不同。用于Kafka桥接身份验证的keytab包含用户主体名称而不是服务主体名称,并适用于特定的消息VPN而不是全局。

OAuth客户端身份验证

当使用OAuth客户端身份验证时,Kafka发送器使用客户端凭据授权流程(RFC 6749 §4.4),也称为两腿OAuth,从授权服务器获取访问令牌。在此流程中,使用基本身份验证向授权服务器发送简单请求以获取访问令牌。然后,Kafka发送器可以使用返回的访问令牌对远程Kafka集群进行认证请求,直到令牌过期(令牌通常有效一小时)。当令牌即将过期时,Kafka发送器将自动请求另一个。

要启用OAuth客户端身份验证,您必须配置Kafka发送器将用于请求访问令牌的客户端ID、客户端秘密和令牌端点。可选地,您还可以配置OAuth范围。

要配置这些设置,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# oauth-client
solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# client-id <client-id>
solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# client-secret <client-secret>
solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# token-endpoint <token-endpoint>
solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# scope <scope>

其中:

client-id <client-id> 是Kafka发送器在请求访问令牌时用于登录授权服务器的OAuth客户端ID。OAuth客户端ID可以包含最多200个字符。

client-secret <client-secret> 是Kafka发送器在请求访问令牌时用于登录授权服务器的OAuth客户端秘密。OAuth客户端秘密可以包含最多512个字符。

token-endpoint <token-endpoint> 是Kafka发送器用于请求登录远程Kafka集群的令牌的OAuth令牌端点URL。OAuth令牌端点可以包含最多2048个字符。此外,令牌端点参数必须使用TLS,即必须以https://开头(不区分大小写)。

(可选)scope <scope> 是OAuth范围。OAuth范围可以包含最多200个字符。

配置Kafka发送器的消息批处理

要配置Kafka发送器在累积一批消息发送到Kafka集群之前等待的延迟,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# batch
solace(configure/message-vpn/kafka/kafka-sender/batch)# delay <ms>

其中:

<ms> 指定在累积一批消息发送到Kafka集群之前等待的延迟(以毫秒为单位)。有效值范围为0到90000。默认值为5。这对应于Kafka生产者API参数queue.buffering.max.ms

此命令的否定版本no delay,将值重置为默认值。

要配置批中消息的最大数量,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# batch
solace(configure/message-vpn/kafka/kafka-sender/batch)# max-messages <count>

其中:

<count> 指定在单个批中发送到Kafka集群的消息的最大数量。有效值范围为1到1000000。默认值为10000。这对应于Kafka生产者API参数batch.num.messages

此命令的否定版本no max-messages,将值重置为默认值。

要配置消息批的最大大小,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# batch
solace(configure/message-vpn/kafka/kafka-sender/batch)# max-size <bytes>

其中:

<bytes> 指定消息批的最大消息大小(以字节为单位)。有效值范围为1到2147483647。默认值为1000000。这对应于Kafka生产者API参数batch.size

此命令的否定版本no max-size,将值重置为默认值。

配置Kafka发送器的引导地址列表

引导地址是Kafka集群中一个Kafka代理的完全限定域名或IP地址和可选端口,Kafka发送器可以从该地址获取整个集群的状态。您可以为Kafka发送器配置这些地址的列表,以便在尝试连接到一个地址失败时尝试连接。

要配置Kafka发送器的引导地址列表,请输入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# bootstrap-addresses <address-list>

其中:

<address-list> 是Kafka集群中代理的FQDN或地址(和可选端口)的逗号分隔列表,可以从这些地址获取整个集群的状态。IPv4地址必须以点分十进制表示法形式指定,nnn.nnn.nnn.nnn。IPv6地址必须用方括号括起来。端口指定为0到65535的十进制值。例如,正确格式化的IPv4地址是:192.168.100.1:9092。相同的IPv6格式地址是[::ffff:c0a8:6401]:9092。这对应于Kafka生产者API参数bootstrap.servers

如果未提供端口,则默认为9092。

此命令的否定形式no bootstrap-addresses,将从Kafka发送器中移除引导地址列表。

为Kafka发送器配置压缩

要配置Kafka发送方在传播消息时使用的压缩类型,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# compression
solace(configure/message-vpn/kafka/kafka-sender/compression)# type {gzip | snappy | lz4 | zstd}

其中:

gzip 指定使用Gzip压缩。这是默认设置。

snappy 指定使用Snappy压缩。

lz4 指定使用LZ4压缩。

zstd 指定使用Zstandard压缩。

压缩类型对应于Kafka生产者API参数compression.codec

该命令的无操作版本no compression,将值重置为默认值。

要配置Kafka发送方在传播消息时使用的压缩级别,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# compression
solace(configure/message-vpn/kafka/kafka-sender/compression)# level

其中:

<value> 指定压缩级别。有效值范围取决于压缩类型:

压缩级别对应于Kafka生产者API参数compression.level

该命令的无操作版本no compression,将值重置为默认值。

  • -1 表示使用编解码器依赖的默认压缩级别,始终有效。这是默认设置。

  • 0-9gzip压缩编解码器有效。

  • 0snappy压缩编解码器有效。

  • 0-12lz4压缩编解码器有效。

  • 0-22zstd压缩编解码器有效。

要启用压缩,请参见“启用Kafka发送方连接的压缩”。

如果配置Kafka发送方同时使用加密和压缩,而crime-exploit-protection已启用(请参见“配置CRIME漏洞保护”),则Kafka发送方将无法操作。

为Kafka发送方启用幂等性

幂等性保证向远程Kafka主题按顺序至少一次传递消息,但会牺牲性能。它默认是禁用的。如果启用幂等性,则为Kafka发送方配置的每个队列绑定必须将确认模式设置为all才能操作。

要为Kafka发送方启用幂等性,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# idempotence

这对应于Kafka生产者API参数enable.idempotence

该命令的无操作版本no idempotence将禁用它。

当启用幂等性时:

  • Kafka发送方会随着每条消息发送一个递增的序列号。

  • 远程Kafka代理确认每条消息。

  • Kafka代理记住它为每个Kafka发送方写入的最大序列号。

  • Kafka代理丢弃任何收到的序列号小于最大写入序列号的消息。

如果禁用幂等性,Kafka发送方可以自由地因超时、领导者变更等原因重新发送消息给Kafka代理。在这种情况下,可能会出现消息重复和/或重新排序。

为Kafka发送方配置队列绑定

每个绑定命名了从中抽取消息的队列,并包括指示如何将这些消息发送到Kafka的属性。

要为Kafka发送方创建队列绑定,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# create queue-binding <queue-name>

要启用队列绑定,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# no shutdown

其中:

<queue-name> 是给定消息VPN中Kafka发送方绑定的队列的名称。

您可以为Kafka发送方队列绑定执行的配置任务包括:

  • 配置确认模式

  • 配置Kafka分区选择方案

  • 配置Kafka分区键生成

  • 配置SMF到Kafka主题映射

配置确认模式

要配置此队列绑定需要来自远程Kafka集群的确认数量,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# ack-mode {none | one | all}

其中:

none 指定不需要来自远程Kafka集群的确认。如果配置,消息最多传递一次。

one 指定需要来自远程Kafka集群的一条确认。如果配置,消息至少传递一次,但可能会重新排序。

all 指定需要远程Kafka集群的所有副本确认消息。如果配置,消息至少传递一次,但可能会重新排序。这是默认设置。

确认模式对应于Kafka生产者API参数request.required.acks

该命令的无操作版本no ack-mode,将值重置为默认值。

配置Kafka分区选择方案

要配置给定队列绑定在发布到远程Kafka集群时使用的分区选择方案,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# scheme {consistent | explicit | random}

其中:

consistent 指定使用一致的分区选择方案。有关更多信息,请参见一致分区选择。

explicit 指定使用显式分区选择方案。有关更多信息,请参见显式分区选择。

random 指定使用随机分区选择方案。有关更多信息,请参见随机分区选择。

一致分区选择

当使用一致的分区选择方案时,队列绑定根据Kafka发送方生成的Kafka分区键的哈希值(请参见配置Kafka分区键生成)来选择Kafka分区。如果没有为消息提供键,默认使用随机选择方案作为后备。您可以禁用此后备方案,如果您这样做,将为所有未键消息选择单个分区(请参见随机分区选择)。

要配置用于选择分区的哈希算法,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# consistent
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/consistent)# hash {crc | murmur2 | fnv1a}

其中:

crc 指定使用CRC哈希。这是默认设置。

murmur2 指定使用Murmur2哈希。

fnv1a 指定使用Fowler-Noll-Vo 1a哈希。

该命令的无操作版本no hash,将值重置为默认值。

显式分区选择

当使用显式分区选择方案时,队列绑定根据您指定的数字显式选择Kafka分区。

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# explicit
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/explicit)# number <value>

其中:

<value> 是您希望队列绑定在发布消息时使用的Kafka分区号。

该命令的无操作版本no number,将值重置为默认值。

随机分区选择

当使用随机分区选择方案时,队列绑定选择一个随机的Kafka分区。默认情况下,这种分区选择方案被用作后备,在一致分区选择方案被使用但没有为消息提供分区键的情况下。如果您禁用此后备方案,将为所有未键消息选择单个分区。

要禁用随机分区选择作为后备,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# random
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/random)# no fallback

要启用如果已禁用的后备方案,请进入以下命令。

solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/random)# fallback

配置Kafka分区键生成

要配置队列绑定为发送到远程Kafka集群的每条消息生成Kafka分区键的方式,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# remote
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/remote)# key <key-expression>

其中:

<key-expression> 指定用于生成发送到Kafka的每条消息的Kafka分区键的替换表达式。有关替换表达式的更多信息,请参见替换表达式概述。此表达式可以包括从每个SMF消息的元数据中提取的字段,当它从队列中取出时。如果为空,则在消息发布到Kafka时不会包含键。

配置SMF到Kafka主题映射

要配置此队列绑定发送的每条消息的Kafka集群主题,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# remote
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/remote)# topic <kafka-topic>

其中:

<kafka-topic> 是队列绑定从队列中取出的每条消息要发送到的Kafka集群主题。如果没有配置主题,则队列绑定将无法操作。

为Kafka发送方连接启用TLS/SSL加密

要为Kafka发送方连接启用TLS/SSL加密,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# transport
solace(configure/message-vpn/kafka/kafka-sender/transport)# ssl

有关TLS/SSL加密和认证方案设置组合如何对应于Kafka生产者API参数security.protocol的更多信息,请参见“为Kafka接收方连接启用TLS/SSL加密”。

该命令的无操作版本no ssl,将禁用TLS/SSL加密。

为Kafka发送方连接启用压缩

默认情况下,Kafka发送方连接不启用压缩。您可以根据为发送方配置的类型和级别启用压缩(请参见“为Kafka发送方配置压缩”)。

要为Kafka发送方连接启用压缩,请进入以下命令:

solace(configure/message-vpn/kafka/kafka-sender)# transport
solace(configure/message-vpn/kafka/kafka-sender/transport)# compressed

该命令的无操作版本no compressed,将禁用压缩。

配置Kafka代理连接的最大数量

默认情况下,消息VPN支持的Kafka代理连接的最大数量与系统支持的最大数量相同。根据您的部署,您可能希望更改此行为,以防止一个消息VPN消耗整个系统支持的所有连接。

要配置此消息VPN允许的Kafka代理连接的最大数量,请进入以下命令:

solace(configure/message-vpn/kafka)# max-broker-connections <value>

其中:

<value> 是指定消息VPN允许的Kafka代理连接的最大总数的整数值。

该命令的无操作版本no max-broker-connections,将值重置为默认值(即平台支持的最大值)。

  • 要查看Solace PubSub+事件代理可以支持的Kafka代理连接的最大总数,请输入show kafka用户执行命令。
  • 这是一个全局/读写参数,由系统管理员选择,而不是消息VPN管理员。