Spark基础
0. 多说几句
写这个post只是为了自己有所遗忘的时候,方便快速回忆上手。
Spark现在提供很多的版本:Java、Python、R、Scala,本文主要针对Python和Scala版本的进行记录, 大概先从公共的一些操作方法开始,之后记一下spark-submit是怎么用的,以及工程上的一些东西。
现在网上有很多Spark的教程,本文 = 我学习网上的资源 + 自己的理解 + 自己遇到的坑, 网络上的主要学习来源是子雨大数据之Spark入门教程,这个教程真的只是入门。
以Spark 2.1.0,Python2.7,Scala 2.11 版本进行描述
1. RDD编程
- 基本RDD“转换”运算
- map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
- filter(func):筛选出满足函数func的元素,并返回一个新的数据集
- flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
- distinct(去重运算)
- randomSplit(根据指定的比例随机分为N各RDD)
- reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
- groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
- union(两个RDD取并集)
- intersection(两个RDD取交集)
- subtract(两个RDD取差集)
- cartesian(两个RDD进行笛卡尔积运算)
- 基本RDD“动作”运算
- count() 返回数据集中的元素个数
- collect() 以数组的形式返回数据集中的所有元素
- first() 返回数据集中的第一个元素
- take(n) 以数组的形式返回数据集中的前n个元素
- reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
- foreach(func) 将数据集中的每个元素传递到函数func中运行
- takeOrdered(排序后取前N条数据)
- Key-Value形式 RDD“转换”运算
- filter(过滤符合条件的数据)
- mapValues(对value值进行转换)
- sortByKey(根据key值进行排序)
- reduceByKey(合并相同key值的数据)
- join(内连接两个KDD)
- leftOuterJoin(左外连接两个KDD)
- rightOuterJoin(右外连接两个RDD)
- subtractByKey(相当于key值得差集运算)
- Key-Value形式 RDD“动作”运算
- first(取第一条数据)
- take(取前几条数据)
- countByKey(根据key值分组统计)
- lookup(根据key值查找value值)
- RDD持久化
- persist用于对RDD进行持久化
- unpersist取消RDD的持久化,注意持久化的存储等级
2. text文本数据(需要自己解析)
spark 1.6.1
1 | val sparkConf = new SparkConf() |
读取文本数据后,使用map进行自定义的解析
spark 2.1.0
1 | val warehouseLocation = "spark-warehouse" |
3. 读Hive表数据
spark 1.6.1
1 | val sparkConf = new SparkConf() |
spark 2.1.0
1 | val warehouseLocation = "spark-warehouse" |
4. 提交到集群中运行
pyspark
1 |
|
scala 2.11
spark 2.1.0
1 |
|
scala建议使用:Idea开发 + Maven依赖打包 + Scopt参数解析
5. Spark多路输出
version:
spark 2.1.0
scala 2.11.8
1 | class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { |
结果应该是在output_path路径下生成3个文件w
, b
, c
。其中文件w
中的内容为www\nbt
, 文件b
中的内容为blog
, 文件c
中的内容为com
。
6. PySpark中自定义环境
6.1 背景
不同用户的应用程序使用的Python版本及需要的Python依赖可能存在不同,若每次缺少依赖都请 op
去update所有节点,对于大集群的维护来说相对比较困难,而且走变更流程等还影响 Spark 用户的工作效率。
为解决上述问题,我们探索一种用户可自定义Python环境的方法,用户可依据自己的需要,定制自己的Python并在spark-submit时做简单指定即可
。
6.2 具体操作流程
下面以构造一个Python 3.6.10的依赖包为例
6.2.1 构建Python
如果系统中没有安装 wget
的话,请先安装,具体方法自行搜索。
1 | 下载Python源码 |
6.2.2 安装依赖
根据个人需要安装Spark环境中需要的依赖包,例如 pyspark
、google
等。其中google
包主要是因为在Spark中需要解析protobuf。
1 | install pyspark package,-t是指定安装路径 |
6.2.3 打包Python环境
1 | 进入python根目录,注意打包路径决定下面的spark配制方式 |
6.2.4 其他打包方法
也可以不是用编译Python的方式进行操作得到Python打包环境,上面的打包方式了解以后,conda创建的方式大同小异。具体:
- 使用conda创建python虚拟环境,对应
6.2.1
- 进入创建的虚拟环境,安装需要第三方库,对应
6.2.2
- 打包整个虚拟环境,对应
6.2.3
6.3 Spark配置使用
因客户机存在公用的可能,且每个应用程序的需求不同,为了降低不同用户之间的影响,我们推荐在提交命令中配制的作法。
- client模式
Driver在用户提交宿主机运行,提交机和线上集群环境可能存在差异,因此,区分配制两端的 Python 环境 (若相同只须指定“spark.pyspark.python”即可)。
1 | time spark-submit \ |
- cluster模式
Driver运行在ApplicationMaster, 而ApplicationMaster运行在Executor(container)中,因此,可视为Driver和Executor环境统一,只需要配制spark.pyspark.python即可。
1 | time spark-submit \ |
6.4 缺点
上述方案虽可实现用户自定义python环境,但执行过程中每个Executor从HDFS下载一次python环境,增加RPC等开销,在开启动态资源伸缩功能时,下载次数会更多……
7. Spark的一些技巧
技巧包括:
- broadcast
- map_join
- 大表 join 小表
- 局部聚合+全局聚合
- ……