📖
tbds
  • 前言
  • 部署
    • 安装步骤
  • 平台管理
    • 用户管理
    • 项目管理
    • 系统设置
  • 组件访问开发
    • kafka
    • hadoop
    • hbase
    • accessKey
    • hive
    • spark
    • ftp
    • portalApi
    • hermes
    • 代码访问组件demos
      • hbase
      • hdfs
      • kafka
      • hive
  • 数据接入
    • 数据接入
      • TDBank产品化介绍及使用指引
      • TDBank数据接入指引
      • TDBank采集接口详解
      • OGG Flume Adapter 部署文档
      • DB Demo
      • HDFS Demo
    • Hippo管理
    • 进度监控
    • 血缘分析
  • 实时计算
    • 任务管理
    • 库表模板管理
  • 数据分析
    • 数据交互
    • ideX
  • 任务调度
    • 操作指引
    • 工作流
      • 基本概念
      • 任务基本信息
      • 任务调度设置
      • 任务参数配置
        • shell 脚本
        • ftp导入hdfs
        • kafka导入hbase
        • kafka导入hdfs
        • hdfs导出hbase
        • hive sql 脚本
        • hive导入hdfs
        • hdfs导出DB(mysql,postgreSQL,sql server)
        • tstorm任务
        • spark任务
        • mapreduce任务
        • db导入hdfs
        • hdfs导出hive
        • hdfs导出hive-tdsort
        • hive导出mysql
        • mysql导入hive
      • Demo
        • FTP导入HDFS
        • HDFS导出HBASE
        • HDFS导出Mysql
        • HDFS导入HIVE
        • Hive SQL脚本
        • Hive导出MySQL
        • Hive导入HDFS
        • KAFKA导出HBASE
        • KAFKA导出HDFS
        • MYSQL导入HDFS
        • MySQL导入Hive
        • Shell脚本
        • SPARK任务
      • 升级
        • 集成代码更新
        • rpm升级
      • 补充
        • 手动迁移base组件
        • 手动安装runner组件
        • 自定义任务开发文档
        • 时间隐式变量说明
        • 下发并发数说明和调整操作
        • Issues版本说明
        • 设置分组
        • 跨工作流依赖
      • 常见问题定位和解决方式
        • 常用操作
        • 实时接入任务hdfs2hive (tdsort)
        • 实例日志获取失败
        • 实例日志中提示下载文件失败
        • taskSchedule指标为空定位方法
        • 实例依赖失效确认方式
        • 任务实例诊断按钮无调度信息
        • 诊断和定位操作参考
        • 实例一直等待终止定位
        • mongodb 常见问题和处理办法
    • 任务管理
      • 工作流列表
      • 任务管理
      • 任务运行管理
      • 其他
    • 服务器配置
      • 基本概念
      • 操作指南
  • Tstorm
    • Tstorm介绍
    • 开发实例
      • wordcount
  • 数据展现
    • 自助报表
  • 数据资产
    • 库表管理
      • 可管理库表
      • 可读写库表
      • 无归属库表
      • 维表管理
      • 新建表
    • 数据血缘
    • 数据提取
      • 数据地图
      • 任务列表
      • 架构关联
  • 运维中心
    • 系统运维
      • 组件部署
      • 链接归集
      • 诊断
      • 备份
    • 访问管理
    • 文件管理
    • 监控告警
      • 监控
      • 告警
  • 机器学习
    • 系统简介
    • TDInsight功能介绍
      • 工作流
        • 新建工程
        • 新建工作流
        • 创建和配置节点
        • 运行
        • 日志查看
      • 数据输入输出
      • 组件
        • Spark组件
        • Sparkstreaming组件
        • pySpark组件
        • xgboost组件
    • 多实例并发
      • 3种方式驱动实例
      • 实例查询
      • 历史实例
    • TDInsight模型与在线推理
      • 数据流
      • 模型的训练与使用
      • 模型在线服务
    • TDInsight算法/组件说明
      • 数据说明
      • 特征工程
        • 数据预处理
        • 特征提取
        • 特征转换
        • 特征选择
        • 特征评估
      • 机器学习
        • 分类算法
        • 聚类算法
        • 回归算法
        • 推荐算法
        • 关联规则
      • 模型评估
        • Binary Evaluator
        • Multi Evaluator
        • Regression Evaluator
      • 可视化
        • 关系
        • 分布
        • 对比
        • 组合
      • 深度学习算法简介
        • 计算机视觉
        • 自然语言处理
        • 表示学习
Powered by GitBook
On this page
  • PySpark组件
  • 1. 从左侧组件列表里拖拽出一个PySpark节点
  • 2. 单击任务节点,会从右侧弹出配置框
  • PySpark使用建议
  • 使用Spark的DataFrame,而不要使用Pandas的DataFrame
  • 在Task里使用python库,而不是在driver上使用python库

Was this helpful?

  1. 机器学习
  2. TDInsight功能介绍
  3. 组件

pySpark组件

PreviousSparkstreaming组件Nextxgboost组件

Last updated 4 years ago

Was this helpful?

PySpark组件

PySpark组件是面向使用Python的Spark用户,用户通过Python编写Spark应用程序,通过PySpark组件完成部署,也支持pyspark的sql功能,本文有部分使用方法介绍(更多用法请参考社区指引:)。

和标准的Spark相比,pySpark支持上传Python脚本和实时修改,更加的灵活,而且支持SQL功能,所以我们推荐用来作为

1. 从左侧组件列表里拖拽出一个PySpark节点

2. 单击任务节点,会从右侧弹出配置框

  • 执行脚本:通过该配置框上传你的PySpark脚本,必填项

  • 依赖包文件:指定你的PySpark应用程序依赖的包,可以是.py .zip .egg文件,可选项

  • 算法参数:指定你的PySpark应用程序所需的参数,即传给PySpark脚本的参数,可选项

  • 配置资源:指定你的PySpark应用程序用到的配置文件,可选项

其他配置操作与Spark组件类似。

PySpark使用建议

使用PySpark的目的是更好地借助其分布式计算的优势,来解决单机完成不了的计算。如果你在PySpark中仍然是调用常规的Python库做单机计算,那就失去了使用PySpark的意义了。下面举例说明如何编写PySpark分布式计算代码。

使用Spark的DataFrame,而不要使用Pandas的DataFrame

PySpark本身就具有类似pandas.DataFrame的DataFrame,所以直接使用PySpark的DataFrame即可,基于PySpark的DataFrame的操作都是分布式执行的,而pandas.DataFrame是单机执行的,例如:

...
df = spark.read.json("examples/src/main/resources/people.json")
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

pandas_df = df.toPandas()
age = pandas_df['age']
...

上述代码中12行将PySpark的DataFrame转换成pandas.DataFrame,然后获取'age'列,注意df.toPandas()操作会将分布在各节点的数据全部收集到driver上,再转成单机的pandas.DataFrame数据结构,如果数据量很小还可以接受,但是数据量较大时,就不可取了,其实PySpark的DataFrame本身支持很多操作,直接基于它实现后续的业务逻辑即可,例如上述代码可以改成:

age = df.select('age')

在Task里使用python库,而不是在driver上使用python库

下面有段代码,将数据全部collect到driver端,然后使用sklearn进行预处理。

from sklearn import preprocessing
data = np.array(rdd.collect(), dtype=np.float)
normalized = preprocessing.normalize(data)

上述代码其实就退化为单机程序了,如果数据量较大的话,collect操作会把driver的内存填满,甚至OOM,通常基于RDD或DataFrame的API可以满足大多数需求,例如标准化操作:

from pyspark.ml.feature import Normalizer

df = spark.read.format("libsvm").load(path)

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)

就算RDD或DataFrame没有满足你要求的API,你可以自行写一个处理函数,针对每条记录进行处理:

# record -> other record
def process_fn(record):
  # your process logic
  # for example
  # import numpy as np
  # x = np.array(record, type=np.int32)
  # ...

# record -> True or Flase
def judge_fn(record):
  # return True or Flase

processed = rdd.map(process_fn).map(lambda x: x[1:3])
filtered = processed.filter(judge_fn)

process_fn或judge_fn会分发到每个节点上分布式执行,你可以在process_fn或judge_fn中使用任何python库(如numpy, scikit-learn等)

更多关于Spark的使用可以参考Spark官方文档

Spark SQL, DataFrames and Datasets Guide
数据预处理