# kafka

## 背景

套件kafka基于社区的0.10.0.1版本开发。主要扩展了认证授权。\
认证的修改包括：\
1）增加自定义TBDS认证\
2）修改社区认证SASL\_PLAIN的校验逻辑：增加白名单，非集群内用户或白名单配置项之外的用户不能访问kafka

## 使用

### 1.无认证访问（低版本kafka访问方式）

大数据套件已经关闭无认证端。不支持无认证的访问

### 2.SASL\_PLAIN认证访问

**命令行访问（console-consumer，console-producer）**

**生产示例：**

```
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker_ip:6668 --topic first_topic --producer.config ./config/client_sasl.properties
```

**消费示例：**

```
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server broker_ip:6668 --topic first_topic --new-consumer --consumer.config ./config/client_sasl.properties --from-beginning
```

其中，一定要用新版的消费API即加参数--new-consumer ，client\_sasl.properties文件位置随意，内容必须包含如下配置：

```
security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN
```

**java客户端**

步骤1：用新消费者API去访问，在consumer的参数中多加两个认证相关参数，代码片段：

```
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(security.protocol,"SASL_PLAIN");
props.put(sasl.mechanism, "PLAIN");
KafkaConsumer<string, string="string"> consumer = new KafkaConsumer<string,string>(props);
```

producer代码类似，也只需要给producer的参数加上红色部分的配置

步骤2：启动client java程序，加jvm参数：

```
-Djava.security.auth.login.config=/data/home/tbds/kafka/kafka010/config/kafka_client_jaas.conf
```

kafka\_client\_jaas.conf内容：

```
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
                username="kafka"
                password="kafka@Tbds.com";
            };
```

username和password的具体值需要参考broker安装路径下的config/kafka\_jaas.conf文件中的值，从JAAS的KafkaServer段中选择一个用户

&#x20;**python客户端**&#x20;

python只支持pykafka访问，API很简单，示例片段：

```
consumer =KafkaConsumer('monitor_metrics_topic',
                    group_id='metrics-group',
                    sasl_plain_username='{{sasl_plain_username}}',
                    sasl_plain_password='{{sasl_plain_password}}',
                    sasl_mechanism='{{sasl_mechanism}}',
                    security_protocol='{{security_protocol}}',
                    bootstrap_servers=[{{kafka_broker_list}}])
```

其中各个参数含义很直观。

### 3.TBDS认证访问

**命令行访问**

**生产示例：**

```
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker_ip:6667 --topic first_topic --producer.config ./config/client_sasl.properties
```

**消费示例：**

```
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server broker_ip:6667 --topic first_topic --new-consumer --consumer.config ./config/client_sasl.properties --from-beginning
```

需要注意的是端口，端口号跟SASL\_PLAIN认证的端口是不一样的，即每种认证都有独立的端口。client\_sasl.properties文件中的内容：

```
security.protocol=SASL_TBDS
sasl.mechanism=TBDS
sasl.tbds.secure.id=xxxx
sasl.tbds.secure.key=xxx
```

其中id和key是用户对应kafka模块的accesskey相关信息，记住一定是kafka模块的，然后一定是在portal里处于enabled状态（默认申请的是disabled，管理员控制使能）

**java客户端**

跟SASL\_PLAIN类似，只是把client\_sasl.properties中的内容作为key-val加到客户端类(KafkaConsumer,KafkaProducer)的构造函数properties参数中中.示例代码片段：

```
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("security.protocol","SASL_TBDS");
    props.put("sasl.mechanism", "TBDS");
    props.put("sasl.tbds.secure.id","U6hXsHKVGwwBc0dZRBGPjaby1a3VCf0NfMnV");
    props.put("sasl.tbds.secure.key", "riglLTEAk2fBCWZdmjvhskKpsvXAn7rj");
    KafkaConsumer<string, string="string"> consumer = new KafkaConsumer<string,string>(props);
```

**python客户端**

不支持

### 4.集群外客户端部署

**方案1：**

```
1）通过yum源或rpm安装套件版本的kafka到集群外机器：
    yum  install  kafka_2_2_0_0_2041
  或
    rpm  -ivh  kafka_2_2_0_0_2041-0.10.0.1-xxxx.xxxx.x86_64.rpm
2）拷贝集群内任一kafka客户端机器/etc/kafka/config目录的内容，替换集群外客户端           对应路径:
集群内客户端节点：
    cd  /etc/kafka/conf
    tar  zcvf  kafka.tar.gz  /etc/kafka/conf/*
集群外客户端节点：
    cd  /etc/kafka/conf
    tar  zxvf  kafka.tar.gz
```

**方案2：**

```
1.打包集群内任一kafka客户端所在节点的/usr/hdp/2.2.0.0-2041/kafka/，并拷贝到集群外目标客户端节点
2.同方案1的步骤2
```

### 5.客户端代码编译打包

**1）套件版kafka jar命名规则**

套件的kafka是基于社区二次开发命名规则采用"社区版本号-TBDS-套件版本号"的方式命名.例：我们现在基于社区0.10.0.1版本的kafka进行开发，套件版本是4.0.3.3，则我们打出的kafka jar版本为0.10.0.1-TBDS-4.0.3.3，完整的kafka client maven jar文件名为：kafka-clients-0.10.0.1-TBDS-4.0.3.3.jar

**2）基于套件提供的maven库开发**

（1）拷贝或部署套件提供的maven库到开发者可访问的本地仓库或远程仓库\
（2）在客户端maven工程pom引入对应的套件版kafka依赖，以套件4.0.3.3版本为例， 需要在pom中加入的依赖片段（其他版本依次类推）：

```
        <dependency> 
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1-TBDS-4.0.3.3</version>
        </dependency>
        <dependency> <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1-TBDS-4.0.3.3</version>
        </dependency>
```

3）基于开源maven库开发 （强烈不建议使用）\
这种方式建议不使用。\
（1）在客户端maven工程引入对应的社区版本kafka依赖。

```
        <dependency> 
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <dependency> 
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
```

（2）编译完成之后，在运行客户端程序运行之前，把套件版的kafka jar加入classpath 中：

```
            kafka_2.11-0.10.0.1-TBDS-4.0.3.3.jar,  
            kafka-clients-0.10.0.1-TBDS-4.0.3.3.jar
```

### 6.运行

运行客户端代码与社区方式无区别


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://tbds-book.gitbook.io/tbds/zu-jian-fang-wen-kai-fa/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
