kafka

背景

套件kafka基于社区的0.10.0.1版本开发。主要扩展了认证授权。 认证的修改包括: 1)增加自定义TBDS认证 2)修改社区认证SASL_PLAIN的校验逻辑:增加白名单,非集群内用户或白名单配置项之外的用户不能访问kafka

使用

1.无认证访问(低版本kafka访问方式)

大数据套件已经关闭无认证端。不支持无认证的访问

2.SASL_PLAIN认证访问

命令行访问(console-consumer,console-producer)

生产示例:

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker_ip:6668 --topic first_topic --producer.config ./config/client_sasl.properties

消费示例:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server broker_ip:6668 --topic first_topic --new-consumer --consumer.config ./config/client_sasl.properties --from-beginning

其中,一定要用新版的消费API即加参数--new-consumer ,client_sasl.properties文件位置随意,内容必须包含如下配置:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

java客户端

步骤1:用新消费者API去访问,在consumer的参数中多加两个认证相关参数,代码片段:

producer代码类似,也只需要给producer的参数加上红色部分的配置

步骤2:启动client java程序,加jvm参数:

kafka_client_jaas.conf内容:

username和password的具体值需要参考broker安装路径下的config/kafka_jaas.conf文件中的值,从JAAS的KafkaServer段中选择一个用户

python客户端

python只支持pykafka访问,API很简单,示例片段:

其中各个参数含义很直观。

3.TBDS认证访问

命令行访问

生产示例:

消费示例:

需要注意的是端口,端口号跟SASL_PLAIN认证的端口是不一样的,即每种认证都有独立的端口。client_sasl.properties文件中的内容:

其中id和key是用户对应kafka模块的accesskey相关信息,记住一定是kafka模块的,然后一定是在portal里处于enabled状态(默认申请的是disabled,管理员控制使能)

java客户端

跟SASL_PLAIN类似,只是把client_sasl.properties中的内容作为key-val加到客户端类(KafkaConsumer,KafkaProducer)的构造函数properties参数中中.示例代码片段:

python客户端

不支持

4.集群外客户端部署

方案1:

方案2:

5.客户端代码编译打包

1)套件版kafka jar命名规则

套件的kafka是基于社区二次开发命名规则采用"社区版本号-TBDS-套件版本号"的方式命名.例:我们现在基于社区0.10.0.1版本的kafka进行开发,套件版本是4.0.3.3,则我们打出的kafka jar版本为0.10.0.1-TBDS-4.0.3.3,完整的kafka client maven jar文件名为:kafka-clients-0.10.0.1-TBDS-4.0.3.3.jar

2)基于套件提供的maven库开发

(1)拷贝或部署套件提供的maven库到开发者可访问的本地仓库或远程仓库 (2)在客户端maven工程pom引入对应的套件版kafka依赖,以套件4.0.3.3版本为例, 需要在pom中加入的依赖片段(其他版本依次类推):

3)基于开源maven库开发 (强烈不建议使用) 这种方式建议不使用。 (1)在客户端maven工程引入对应的社区版本kafka依赖。

(2)编译完成之后,在运行客户端程序运行之前,把套件版的kafka jar加入classpath 中:

6.运行

运行客户端代码与社区方式无区别

Last updated

Was this helpful?