spark

1.TBDS认证

由于套件增加了自己的认证体系,在执行bin/spark-submit之前,需要先export认证相关的环境变量,

  export hadoop_security_authentication_tbds_username=hdfs
  export hadoop_security_authentication_tbds_secureid=F3QdVfxbQkNHVkn1OzLA3yK3In0bL6HgX
  export hadoop_security_authentication_tbds_securekey=o8AnGFYQ2lIB0AJ78TIeoJ0Uu1nkph12

admin用户可以在portal创建所有用户的securekey密钥。普通用户需要securekey密钥则向管理员申请。详情请见认证相关文档。

执行上述export语句之后,就可正常提交spark任务。

2.Spark Streaming 消费 Kafka

由于访问套件Kafka也需要进行认证,因此这个地方也与开源版本不同。首先需要在自己的Spark Streaming App里面将maven依赖由社区版本替换成套件版本,

      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.1.0-TBDS-4.0.4</version>

Kafka认证支持两种方式,接下来分别说明。

2.1 TBDS认证访问

需要在Kafka参数里面增加参数,

       "security.protocol" -> "SASL_TBDS",
       "sasl.mechanism" -> "TBDS",
       “sasl.tbds.secure.id” -> “F3QdVfxbQkNHVkn1OzLA3yK3In0bL6HgX”,
       “sasl.tbds.secure.key” -> “o8AnGFYQ2lIB0AJ78TIeoJ0Uu1nkph12”

其中认证相关的密钥信息同上述。

2.2 SASL_PLAIN认证访问

需要在Kafka参数里面增加参数,

        "security.protocol" -> "SASL_PLAINTEXT",
        "sasl.mechanism" -> "PLAIN"

完整示例代码如下,

  package org.apache.spark.examples.streaming

  import org.apache.kafka.common.serialization.StringDeserializer

  import org.apache.spark.SparkConf
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.kafka010._
  import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

  object DirectKafka010WordCount {
    def main(args: Array[String]) {
      if (args.length < 2) {
        System.err.println(s"""
          |Usage: DirectKafka010WordCount <brokers> <topics>
          |  <brokers> is a list of one or more Kafka brokers
          |  <topics> is a list of one or more kafka topics to consume from
          |
          """.stripMargin)
        System.exit(1)
      }

      StreamingExamples.setStreamingLogLevels()

      val Array(brokers, topics) = args

      // Create context with 2 second batch interval
      val sparkConf = new SparkConf().setAppName("DirectKafka010WordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))

      // Create direct kafka stream with brokers and topics
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> brokers,
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "tbds_spark_streaming_group",
        "security.protocol" -> "SASL_PLAINTEXT",
        "sasl.mechanism" -> "PLAIN"
      )
      val messages = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](topics.split(","), kafkaParams)
      )

      // Get the lines, split them into words, count the words and print
      val lines = messages.map(record => record.value)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
      wordCounts.print()

      // Start the computation
      ssc.start()
      ssc.awaitTermination()
    }
  }
  // scalastyle:on println
```  

并且,在执行bin/spark-submit的时候需要添加以下两个参数,

  --driver-java-options -Djava.security.auth.login.config=/etc/hadoop/conf/kafka_client_for_ranger_yarn_jaas.conf
  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/etc/hadoop/conf/kafka_client_for_ranger_yarn_jaas.conf"

这样才能保证Spark的driver和executor都能访问到认证所需要的jaas配置文件。

3.集群外客户端部署

(1)在任一安装spark客户端的集群内节点,打包spark的安装路径: /usr/hdp/2.2.0.0-2041/spark/,并拷贝到集群外目标客户端安装节点 (2)如果有套件的yum源,在集群外目标安装节点使用yum install安装 (3)如果有spark的rpm包,在集群外目标安装节点使用rpm -ivh安装

Last updated