📖
tbds
  • 前言
  • 部署
    • 安装步骤
  • 平台管理
    • 用户管理
    • 项目管理
    • 系统设置
  • 组件访问开发
    • kafka
    • hadoop
    • hbase
    • accessKey
    • hive
    • spark
    • ftp
    • portalApi
    • hermes
    • 代码访问组件demos
      • hbase
      • hdfs
      • kafka
      • hive
  • 数据接入
    • 数据接入
      • TDBank产品化介绍及使用指引
      • TDBank数据接入指引
      • TDBank采集接口详解
      • OGG Flume Adapter 部署文档
      • DB Demo
      • HDFS Demo
    • Hippo管理
    • 进度监控
    • 血缘分析
  • 实时计算
    • 任务管理
    • 库表模板管理
  • 数据分析
    • 数据交互
    • ideX
  • 任务调度
    • 操作指引
    • 工作流
      • 基本概念
      • 任务基本信息
      • 任务调度设置
      • 任务参数配置
        • shell 脚本
        • ftp导入hdfs
        • kafka导入hbase
        • kafka导入hdfs
        • hdfs导出hbase
        • hive sql 脚本
        • hive导入hdfs
        • hdfs导出DB(mysql,postgreSQL,sql server)
        • tstorm任务
        • spark任务
        • mapreduce任务
        • db导入hdfs
        • hdfs导出hive
        • hdfs导出hive-tdsort
        • hive导出mysql
        • mysql导入hive
      • Demo
        • FTP导入HDFS
        • HDFS导出HBASE
        • HDFS导出Mysql
        • HDFS导入HIVE
        • Hive SQL脚本
        • Hive导出MySQL
        • Hive导入HDFS
        • KAFKA导出HBASE
        • KAFKA导出HDFS
        • MYSQL导入HDFS
        • MySQL导入Hive
        • Shell脚本
        • SPARK任务
      • 升级
        • 集成代码更新
        • rpm升级
      • 补充
        • 手动迁移base组件
        • 手动安装runner组件
        • 自定义任务开发文档
        • 时间隐式变量说明
        • 下发并发数说明和调整操作
        • Issues版本说明
        • 设置分组
        • 跨工作流依赖
      • 常见问题定位和解决方式
        • 常用操作
        • 实时接入任务hdfs2hive (tdsort)
        • 实例日志获取失败
        • 实例日志中提示下载文件失败
        • taskSchedule指标为空定位方法
        • 实例依赖失效确认方式
        • 任务实例诊断按钮无调度信息
        • 诊断和定位操作参考
        • 实例一直等待终止定位
        • mongodb 常见问题和处理办法
    • 任务管理
      • 工作流列表
      • 任务管理
      • 任务运行管理
      • 其他
    • 服务器配置
      • 基本概念
      • 操作指南
  • Tstorm
    • Tstorm介绍
    • 开发实例
      • wordcount
  • 数据展现
    • 自助报表
  • 数据资产
    • 库表管理
      • 可管理库表
      • 可读写库表
      • 无归属库表
      • 维表管理
      • 新建表
    • 数据血缘
    • 数据提取
      • 数据地图
      • 任务列表
      • 架构关联
  • 运维中心
    • 系统运维
      • 组件部署
      • 链接归集
      • 诊断
      • 备份
    • 访问管理
    • 文件管理
    • 监控告警
      • 监控
      • 告警
  • 机器学习
    • 系统简介
    • TDInsight功能介绍
      • 工作流
        • 新建工程
        • 新建工作流
        • 创建和配置节点
        • 运行
        • 日志查看
      • 数据输入输出
      • 组件
        • Spark组件
        • Sparkstreaming组件
        • pySpark组件
        • xgboost组件
    • 多实例并发
      • 3种方式驱动实例
      • 实例查询
      • 历史实例
    • TDInsight模型与在线推理
      • 数据流
      • 模型的训练与使用
      • 模型在线服务
    • TDInsight算法/组件说明
      • 数据说明
      • 特征工程
        • 数据预处理
        • 特征提取
        • 特征转换
        • 特征选择
        • 特征评估
      • 机器学习
        • 分类算法
        • 聚类算法
        • 回归算法
        • 推荐算法
        • 关联规则
      • 模型评估
        • Binary Evaluator
        • Multi Evaluator
        • Regression Evaluator
      • 可视化
        • 关系
        • 分布
        • 对比
        • 组合
      • 深度学习算法简介
        • 计算机视觉
        • 自然语言处理
        • 表示学习
Powered by GitBook
On this page
  • 1.TBDS认证
  • 2.Spark Streaming 消费 Kafka
  • 2.1 TBDS认证访问
  • 2.2 SASL_PLAIN认证访问
  • 3.集群外客户端部署

Was this helpful?

  1. 组件访问开发

spark

1.TBDS认证

由于套件增加了自己的认证体系,在执行bin/spark-submit之前,需要先export认证相关的环境变量,

  export hadoop_security_authentication_tbds_username=hdfs
  export hadoop_security_authentication_tbds_secureid=F3QdVfxbQkNHVkn1OzLA3yK3In0bL6HgX
  export hadoop_security_authentication_tbds_securekey=o8AnGFYQ2lIB0AJ78TIeoJ0Uu1nkph12

admin用户可以在portal创建所有用户的securekey密钥。普通用户需要securekey密钥则向管理员申请。详情请见认证相关文档。

执行上述export语句之后,就可正常提交spark任务。

2.Spark Streaming 消费 Kafka

由于访问套件Kafka也需要进行认证,因此这个地方也与开源版本不同。首先需要在自己的Spark Streaming App里面将maven依赖由社区版本替换成套件版本,

      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.1.0-TBDS-4.0.4</version>

Kafka认证支持两种方式,接下来分别说明。

2.1 TBDS认证访问

需要在Kafka参数里面增加参数,

       "security.protocol" -> "SASL_TBDS",
       "sasl.mechanism" -> "TBDS",
       “sasl.tbds.secure.id” -> “F3QdVfxbQkNHVkn1OzLA3yK3In0bL6HgX”,
       “sasl.tbds.secure.key” -> “o8AnGFYQ2lIB0AJ78TIeoJ0Uu1nkph12”

其中认证相关的密钥信息同上述。

2.2 SASL_PLAIN认证访问

需要在Kafka参数里面增加参数,

        "security.protocol" -> "SASL_PLAINTEXT",
        "sasl.mechanism" -> "PLAIN"

完整示例代码如下,

  package org.apache.spark.examples.streaming

  import org.apache.kafka.common.serialization.StringDeserializer

  import org.apache.spark.SparkConf
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.kafka010._
  import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

  object DirectKafka010WordCount {
    def main(args: Array[String]) {
      if (args.length < 2) {
        System.err.println(s"""
          |Usage: DirectKafka010WordCount <brokers> <topics>
          |  <brokers> is a list of one or more Kafka brokers
          |  <topics> is a list of one or more kafka topics to consume from
          |
          """.stripMargin)
        System.exit(1)
      }

      StreamingExamples.setStreamingLogLevels()

      val Array(brokers, topics) = args

      // Create context with 2 second batch interval
      val sparkConf = new SparkConf().setAppName("DirectKafka010WordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))

      // Create direct kafka stream with brokers and topics
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> brokers,
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "tbds_spark_streaming_group",
        "security.protocol" -> "SASL_PLAINTEXT",
        "sasl.mechanism" -> "PLAIN"
      )
      val messages = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](topics.split(","), kafkaParams)
      )

      // Get the lines, split them into words, count the words and print
      val lines = messages.map(record => record.value)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
      wordCounts.print()

      // Start the computation
      ssc.start()
      ssc.awaitTermination()
    }
  }
  // scalastyle:on println
```  

并且,在执行bin/spark-submit的时候需要添加以下两个参数,

  --driver-java-options -Djava.security.auth.login.config=/etc/hadoop/conf/kafka_client_for_ranger_yarn_jaas.conf
  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/etc/hadoop/conf/kafka_client_for_ranger_yarn_jaas.conf"

这样才能保证Spark的driver和executor都能访问到认证所需要的jaas配置文件。

3.集群外客户端部署

(1)在任一安装spark客户端的集群内节点,打包spark的安装路径: /usr/hdp/2.2.0.0-2041/spark/,并拷贝到集群外目标客户端安装节点 (2)如果有套件的yum源,在集群外目标安装节点使用yum install安装 (3)如果有spark的rpm包,在集群外目标安装节点使用rpm -ivh安装

PrevioushiveNextftp

Last updated 4 years ago

Was this helpful?