📖
tbds
  • 前言
  • 部署
    • 安装步骤
  • 平台管理
    • 用户管理
    • 项目管理
    • 系统设置
  • 组件访问开发
    • kafka
    • hadoop
    • hbase
    • accessKey
    • hive
    • spark
    • ftp
    • portalApi
    • hermes
    • 代码访问组件demos
      • hbase
      • hdfs
      • kafka
      • hive
  • 数据接入
    • 数据接入
      • TDBank产品化介绍及使用指引
      • TDBank数据接入指引
      • TDBank采集接口详解
      • OGG Flume Adapter 部署文档
      • DB Demo
      • HDFS Demo
    • Hippo管理
    • 进度监控
    • 血缘分析
  • 实时计算
    • 任务管理
    • 库表模板管理
  • 数据分析
    • 数据交互
    • ideX
  • 任务调度
    • 操作指引
    • 工作流
      • 基本概念
      • 任务基本信息
      • 任务调度设置
      • 任务参数配置
        • shell 脚本
        • ftp导入hdfs
        • kafka导入hbase
        • kafka导入hdfs
        • hdfs导出hbase
        • hive sql 脚本
        • hive导入hdfs
        • hdfs导出DB(mysql,postgreSQL,sql server)
        • tstorm任务
        • spark任务
        • mapreduce任务
        • db导入hdfs
        • hdfs导出hive
        • hdfs导出hive-tdsort
        • hive导出mysql
        • mysql导入hive
      • Demo
        • FTP导入HDFS
        • HDFS导出HBASE
        • HDFS导出Mysql
        • HDFS导入HIVE
        • Hive SQL脚本
        • Hive导出MySQL
        • Hive导入HDFS
        • KAFKA导出HBASE
        • KAFKA导出HDFS
        • MYSQL导入HDFS
        • MySQL导入Hive
        • Shell脚本
        • SPARK任务
      • 升级
        • 集成代码更新
        • rpm升级
      • 补充
        • 手动迁移base组件
        • 手动安装runner组件
        • 自定义任务开发文档
        • 时间隐式变量说明
        • 下发并发数说明和调整操作
        • Issues版本说明
        • 设置分组
        • 跨工作流依赖
      • 常见问题定位和解决方式
        • 常用操作
        • 实时接入任务hdfs2hive (tdsort)
        • 实例日志获取失败
        • 实例日志中提示下载文件失败
        • taskSchedule指标为空定位方法
        • 实例依赖失效确认方式
        • 任务实例诊断按钮无调度信息
        • 诊断和定位操作参考
        • 实例一直等待终止定位
        • mongodb 常见问题和处理办法
    • 任务管理
      • 工作流列表
      • 任务管理
      • 任务运行管理
      • 其他
    • 服务器配置
      • 基本概念
      • 操作指南
  • Tstorm
    • Tstorm介绍
    • 开发实例
      • wordcount
  • 数据展现
    • 自助报表
  • 数据资产
    • 库表管理
      • 可管理库表
      • 可读写库表
      • 无归属库表
      • 维表管理
      • 新建表
    • 数据血缘
    • 数据提取
      • 数据地图
      • 任务列表
      • 架构关联
  • 运维中心
    • 系统运维
      • 组件部署
      • 链接归集
      • 诊断
      • 备份
    • 访问管理
    • 文件管理
    • 监控告警
      • 监控
      • 告警
  • 机器学习
    • 系统简介
    • TDInsight功能介绍
      • 工作流
        • 新建工程
        • 新建工作流
        • 创建和配置节点
        • 运行
        • 日志查看
      • 数据输入输出
      • 组件
        • Spark组件
        • Sparkstreaming组件
        • pySpark组件
        • xgboost组件
    • 多实例并发
      • 3种方式驱动实例
      • 实例查询
      • 历史实例
    • TDInsight模型与在线推理
      • 数据流
      • 模型的训练与使用
      • 模型在线服务
    • TDInsight算法/组件说明
      • 数据说明
      • 特征工程
        • 数据预处理
        • 特征提取
        • 特征转换
        • 特征选择
        • 特征评估
      • 机器学习
        • 分类算法
        • 聚类算法
        • 回归算法
        • 推荐算法
        • 关联规则
      • 模型评估
        • Binary Evaluator
        • Multi Evaluator
        • Regression Evaluator
      • 可视化
        • 关系
        • 分布
        • 对比
        • 组合
      • 深度学习算法简介
        • 计算机视觉
        • 自然语言处理
        • 表示学习
Powered by GitBook
On this page
  • 背景
  • 使用
  • 1.无认证访问(低版本kafka访问方式)
  • 2.SASL_PLAIN认证访问
  • 3.TBDS认证访问
  • 4.集群外客户端部署
  • 5.客户端代码编译打包
  • 6.运行

Was this helpful?

  1. 组件访问开发

kafka

背景

套件kafka基于社区的0.10.0.1版本开发。主要扩展了认证授权。 认证的修改包括: 1)增加自定义TBDS认证 2)修改社区认证SASL_PLAIN的校验逻辑:增加白名单,非集群内用户或白名单配置项之外的用户不能访问kafka

使用

1.无认证访问(低版本kafka访问方式)

大数据套件已经关闭无认证端。不支持无认证的访问

2.SASL_PLAIN认证访问

命令行访问(console-consumer,console-producer)

生产示例:

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker_ip:6668 --topic first_topic --producer.config ./config/client_sasl.properties

消费示例:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server broker_ip:6668 --topic first_topic --new-consumer --consumer.config ./config/client_sasl.properties --from-beginning

其中,一定要用新版的消费API即加参数--new-consumer ,client_sasl.properties文件位置随意,内容必须包含如下配置:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

java客户端

步骤1:用新消费者API去访问,在consumer的参数中多加两个认证相关参数,代码片段:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(security.protocol,"SASL_PLAIN");
props.put(sasl.mechanism, "PLAIN");
KafkaConsumer<string, string="string"> consumer = new KafkaConsumer<string,string>(props);

producer代码类似,也只需要给producer的参数加上红色部分的配置

步骤2:启动client java程序,加jvm参数:

-Djava.security.auth.login.config=/data/home/tbds/kafka/kafka010/config/kafka_client_jaas.conf

kafka_client_jaas.conf内容:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
                username="kafka"
                password="kafka@Tbds.com";
            };

username和password的具体值需要参考broker安装路径下的config/kafka_jaas.conf文件中的值,从JAAS的KafkaServer段中选择一个用户

python客户端

python只支持pykafka访问,API很简单,示例片段:

consumer =KafkaConsumer('monitor_metrics_topic',
                    group_id='metrics-group',
                    sasl_plain_username='{{sasl_plain_username}}',
                    sasl_plain_password='{{sasl_plain_password}}',
                    sasl_mechanism='{{sasl_mechanism}}',
                    security_protocol='{{security_protocol}}',
                    bootstrap_servers=[{{kafka_broker_list}}])

其中各个参数含义很直观。

3.TBDS认证访问

命令行访问

生产示例:

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker_ip:6667 --topic first_topic --producer.config ./config/client_sasl.properties

消费示例:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server broker_ip:6667 --topic first_topic --new-consumer --consumer.config ./config/client_sasl.properties --from-beginning

需要注意的是端口,端口号跟SASL_PLAIN认证的端口是不一样的,即每种认证都有独立的端口。client_sasl.properties文件中的内容:

security.protocol=SASL_TBDS
sasl.mechanism=TBDS
sasl.tbds.secure.id=xxxx
sasl.tbds.secure.key=xxx

其中id和key是用户对应kafka模块的accesskey相关信息,记住一定是kafka模块的,然后一定是在portal里处于enabled状态(默认申请的是disabled,管理员控制使能)

java客户端

跟SASL_PLAIN类似,只是把client_sasl.properties中的内容作为key-val加到客户端类(KafkaConsumer,KafkaProducer)的构造函数properties参数中中.示例代码片段:

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("security.protocol","SASL_TBDS");
    props.put("sasl.mechanism", "TBDS");
    props.put("sasl.tbds.secure.id","U6hXsHKVGwwBc0dZRBGPjaby1a3VCf0NfMnV");
    props.put("sasl.tbds.secure.key", "riglLTEAk2fBCWZdmjvhskKpsvXAn7rj");
    KafkaConsumer<string, string="string"> consumer = new KafkaConsumer<string,string>(props);

python客户端

不支持

4.集群外客户端部署

方案1:

1)通过yum源或rpm安装套件版本的kafka到集群外机器:
    yum  install  kafka_2_2_0_0_2041
  或
    rpm  -ivh  kafka_2_2_0_0_2041-0.10.0.1-xxxx.xxxx.x86_64.rpm
2)拷贝集群内任一kafka客户端机器/etc/kafka/config目录的内容,替换集群外客户端           对应路径:
集群内客户端节点:
    cd  /etc/kafka/conf
    tar  zcvf  kafka.tar.gz  /etc/kafka/conf/*
集群外客户端节点:
    cd  /etc/kafka/conf
    tar  zxvf  kafka.tar.gz

方案2:

1.打包集群内任一kafka客户端所在节点的/usr/hdp/2.2.0.0-2041/kafka/,并拷贝到集群外目标客户端节点
2.同方案1的步骤2

5.客户端代码编译打包

1)套件版kafka jar命名规则

套件的kafka是基于社区二次开发命名规则采用"社区版本号-TBDS-套件版本号"的方式命名.例:我们现在基于社区0.10.0.1版本的kafka进行开发,套件版本是4.0.3.3,则我们打出的kafka jar版本为0.10.0.1-TBDS-4.0.3.3,完整的kafka client maven jar文件名为:kafka-clients-0.10.0.1-TBDS-4.0.3.3.jar

2)基于套件提供的maven库开发

(1)拷贝或部署套件提供的maven库到开发者可访问的本地仓库或远程仓库 (2)在客户端maven工程pom引入对应的套件版kafka依赖,以套件4.0.3.3版本为例, 需要在pom中加入的依赖片段(其他版本依次类推):

        <dependency> 
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1-TBDS-4.0.3.3</version>
        </dependency>
        <dependency> <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1-TBDS-4.0.3.3</version>
        </dependency>

3)基于开源maven库开发 (强烈不建议使用) 这种方式建议不使用。 (1)在客户端maven工程引入对应的社区版本kafka依赖。

        <dependency> 
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <dependency> 
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>

(2)编译完成之后,在运行客户端程序运行之前,把套件版的kafka jar加入classpath 中:

            kafka_2.11-0.10.0.1-TBDS-4.0.3.3.jar,  
            kafka-clients-0.10.0.1-TBDS-4.0.3.3.jar

6.运行

运行客户端代码与社区方式无区别

Previous组件访问开发Nexthadoop

Last updated 4 years ago

Was this helpful?