Weiguo's Station

  • 博客首页

  • 文章归档

  • 分类专栏

  • 各种标签

  • 站点搜索

Spark基础

发表于 2019-03-07 更新于 2021-03-22 分类于 大数据

0. 多说几句

写这个post只是为了自己有所遗忘的时候,方便快速回忆上手。

Spark现在提供很多的版本:Java、Python、R、Scala,本文主要针对Python和Scala版本的进行记录, 大概先从公共的一些操作方法开始,之后记一下spark-submit是怎么用的,以及工程上的一些东西。

现在网上有很多Spark的教程,本文 = 我学习网上的资源 + 自己的理解 + 自己遇到的坑, 网络上的主要学习来源是子雨大数据之Spark入门教程,这个教程真的只是入门。

以Spark 2.1.0,Python2.7,Scala 2.11 版本进行描述

1. RDD编程

  • 基本RDD“转换”运算
    • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
    • filter(func):筛选出满足函数func的元素,并返回一个新的数据集
    • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
    • distinct(去重运算)
    • randomSplit(根据指定的比例随机分为N各RDD)
    • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
    • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
    • union(两个RDD取并集)
    • intersection(两个RDD取交集)
    • subtract(两个RDD取差集)
    • cartesian(两个RDD进行笛卡尔积运算)
  • 基本RDD“动作”运算
    • count() 返回数据集中的元素个数
    • collect() 以数组的形式返回数据集中的所有元素
    • first() 返回数据集中的第一个元素
    • take(n) 以数组的形式返回数据集中的前n个元素
    • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
    • foreach(func) 将数据集中的每个元素传递到函数func中运行
    • takeOrdered(排序后取前N条数据)
  • Key-Value形式 RDD“转换”运算
    • filter(过滤符合条件的数据)
    • mapValues(对value值进行转换)
    • sortByKey(根据key值进行排序)
    • reduceByKey(合并相同key值的数据)
    • join(内连接两个KDD)
    • leftOuterJoin(左外连接两个KDD)
    • rightOuterJoin(右外连接两个RDD)
    • subtractByKey(相当于key值得差集运算)
  • Key-Value形式 RDD“动作”运算
    • first(取第一条数据)
    • take(取前几条数据)
    • countByKey(根据key值分组统计)
    • lookup(根据key值查找value值)
  • RDD持久化
    • persist用于对RDD进行持久化
    • unpersist取消RDD的持久化,注意持久化的存储等级

2. text文本数据(需要自己解析)

spark 1.6.1

1
2
3
4
5
val sparkConf = new SparkConf()
sparkConf.setAppName("...")
sparkConf.set("key", "value")
sparkContext = new SparkContext(sparkConf)
sparkContext.textFile("text_path").map(...)

读取文本数据后,使用map进行自定义的解析

spark 2.1.0

1
2
3
4
5
6
7
8
9
10
11
val warehouseLocation = "spark-warehouse"
val sparkConf = new SparkConf()
sparkConf.setAppName("...")
sparkConf.set("key", "value")
val sparkSession = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
sparkSession.read.textFile("text_path").map(...)

3. 读Hive表数据

spark 1.6.1

1
2
3
4
5
6
val sparkConf = new SparkConf()
sparkConf.setAppName("...")
sparkConf.set("key", "value")
sparkContext = new SparkContext(sparkConf)
hiveContext = new HiveContext(sparkContext)
hiveContext.sql("select * from table where condition").map(...)

spark 2.1.0

1
2
3
4
5
6
7
8
9
10
11
val warehouseLocation = "spark-warehouse"
val sparkConf = new SparkConf()
sparkConf.setAppName("...")
sparkConf.set("key", "value")
val sparkSession = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
sparkSession.sql("select * from table where condition").map(...)

4. 提交到集群中运行

pyspark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env bash
spark_submit=${spark_bin}/spark-submit
spark_master=yarn-client

time \
$spark_submit \
--queue whichQueue \
--master $spark_master \
--driver-memory 10g \
--executor-memory 40g \
--num-executors 24 \
--executor-cores 8 \
--jars SomeThirdPartyJarFile.jar \
--conf spark.yarn.executor.memoryOverhead=600 \
your_script.py params

scala 2.11

spark 2.1.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env bash

spark_submit=${spark_bin}/spark-submit
jar_file=packageAccordingMavenOrSbt.jar

time \
$spark_submit \
--class "package.className" \
--master yarn \
--deploy-mode client \
--num-executors 8 \
--executor-cores 3 \
--driver-memory 8g \
--executor-memory 24g \
--queue whichQueue \
--conf spark.classpath.userClassPathFirst=true \
${jar_file} \
--param ${param}

scala建议使用:Idea开发 + Maven依赖打包 + Scopt参数解析


5. Spark多路输出

version:
spark 2.1.0
scala 2.11.8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
// 不输出Key
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
// 文件名用Key表示
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.toString
}

object MultiOutputPathTest {

val warehouseLocation = "spark-warehouse"

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setAppName("MultiOutputPathTest")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.yarn.executor.memoryOverhead", "800")
sparkConf.set("spark.sql.hive.convertMetastoreParquet", "false")
// sparkConf.setMaster("local[*]")

val sparkSession = SparkSession
.builder()
.config(sparkConf)
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()

// 定义数据
val data = sparkSession.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt")))
// 将同一个Key的数据聚合到一起
.reduceByKey((x, y) => x + "\n" + y)
// 重新划分partition数量,3表示按照key的数量分成相同的partition数量
.partitionBy(new HashPartitioner(3))
// 多路输出,第一个参数为输出路径,第二个参数为Key的类型,第三个参数为Value的类型,第四个参数是输出格式类
.saveAsHadoopFile(output_path, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
}
}

结果应该是在output_path路径下生成3个文件w, b, c。其中文件w中的内容为www\nbt, 文件b中的内容为blog, 文件c中的内容为com。


6. PySpark中自定义环境

6.1 背景

不同用户的应用程序使用的Python版本及需要的Python依赖可能存在不同,若每次缺少依赖都请 op 去update所有节点,对于大集群的维护来说相对比较困难,而且走变更流程等还影响 Spark 用户的工作效率。 为解决上述问题,我们探索一种用户可自定义Python环境的方法,用户可依据自己的需要,定制自己的Python并在spark-submit时做简单指定即可。

6.2 具体操作流程

下面以构造一个Python 3.6.10的依赖包为例

6.2.1 构建Python

如果系统中没有安装 wget 的话,请先安装,具体方法自行搜索。

1
2
3
4
5
6
# 下载Python源码
wget https://www.python.org/ftp/python/3.6.10/Python-3.6.10.tgz
# 解压
tar -zxvf Python-3.6.10.tgz
# 编译 Python --prefix指定编译输出路径
./Python-3.6.10/configure --prefix=/search/odin/zhaoweiguo/temp/python-3.6.10 && make && make install

6.2.2 安装依赖

根据个人需要安装Spark环境中需要的依赖包,例如 pyspark、google等。其中google包主要是因为在Spark中需要解析protobuf。

1
2
3
4
# install pyspark package,-t是指定安装路径
/search/odin/zhaoweiguo/temp/python-3.6.10/bin/pip3 install -t /search/odin/zhaoweiguo/temp/python-3.6.10/lib/python3.6/site-packages pyspark
# install google package
/search/odin/zhaoweiguo/temp/python-3.6.10/bin/pip3 install -t /search/odin/zhaoweiguo/temp/python-3.6.10/lib/python3.6/site-packages google

6.2.3 打包Python环境

1
2
3
# 进入python根目录,注意打包路径决定下面的spark配制方式
cd /search/odin/zhaoweiguo/temp/python-3.6.10
tar -zcf python-3.6.10.tgz *

6.2.4 其他打包方法

也可以不是用编译Python的方式进行操作得到Python打包环境,上面的打包方式了解以后,conda创建的方式大同小异。具体:

  1. 使用conda创建python虚拟环境,对应 6.2.1
  2. 进入创建的虚拟环境,安装需要第三方库,对应 6.2.2
  3. 打包整个虚拟环境,对应6.2.3

6.3 Spark配置使用

因客户机存在公用的可能,且每个应用程序的需求不同,为了降低不同用户之间的影响,我们推荐在提交命令中配制的作法。

  • client模式

Driver在用户提交宿主机运行,提交机和线上集群环境可能存在差异,因此,区分配制两端的 Python 环境 (若相同只须指定“spark.pyspark.python”即可)。

1
2
3
4
5
6
7
8
9
10
11
time spark-submit \
--conf ...其他配置... \
# 上传该tgz压缩文件, 压缩的tgz文件会被提取到executor的工作目录下, 后面到#python-3.6.10表示压缩文件被解压成到文件名称
--conf spark.yarn.dist.archives=file:///search/odin/zhaoweiguo/temp/python-3.6.10/python-3.6.10.tgz#python-3.6.10 \
# 指定driver的工作环境
# spark-submit执行的及其就是driver机器,直接选择python在本机的绝对路径
--conf spark.pyspark.driver.python=/search/odin/zhaoweiguo/temp/python-3.6.10/bin/python3.6 \
# 指定executor的工作Python环境
# ./python-3.6.10指的是executor的工作目录下上面python-3.6.10.tgz解压的python-3.6.10文件夹
--conf spark.pyspark.python=./python-3.6.10/bin/python3.6 \
script.py ${params}
  • cluster模式

Driver运行在ApplicationMaster, 而ApplicationMaster运行在Executor(container)中,因此,可视为Driver和Executor环境统一,只需要配制spark.pyspark.python即可。

1
2
3
4
5
6
7
8
time spark-submit \
--conf ...其他配置... \
# 上传该tgz压缩文件, 压缩的tgz文件会被提取到executor的工作目录下, 后面到#python-3.6.10表示压缩文件被解压成到文件名称
--conf spark.yarn.dist.archives=file:///search/odin/zhaoweiguo/temp/python-3.6.10/python-3.6.10.tgz#python-3.6.10 \
# 指定executor的工作Python环境
# ./python-3.6.10指的是executor的工作目录下上面python-3.6.10.tgz解压的python-3.6.10文件夹
--conf spark.pyspark.python=./python-3.6.10/bin/python3.6 \
script.py ${params}

6.4 缺点

上述方案虽可实现用户自定义python环境,但执行过程中每个Executor从HDFS下载一次python环境,增加RPC等开销,在开启动态资源伸缩功能时,下载次数会更多……

  1. Spark on Yarn 之Python环境定制
  2. spark-python版本依赖与三方模块方案

7. Spark的一些技巧

技巧包括:

  1. broadcast
  2. map_join
  3. 大表 join 小表
  4. 局部聚合+全局聚合
  5. ……

wwcom614/Spark

# 语言基础
pig基础
python常用实例
  • 文章目录
  • 站点概览
WeiguoZHAO

WeiguoZHAO

Welcome to my blog~
87 日志
13 分类
49 标签
GitHub E-Mail
大牛们
  • colah's blog
  • 王喆的Github
  • 刘建平的Github
  • 美团技术团队
  1. 0. 多说几句
  2. 1. RDD编程
  3. 2. text文本数据(需要自己解析)
  4. 3. 读Hive表数据
  5. 4. 提交到集群中运行
  6. 5. Spark多路输出
  7. 6. PySpark中自定义环境
    1. 6.1 背景
    2. 6.2 具体操作流程
      1. 6.2.1 构建Python
      2. 6.2.2 安装依赖
      3. 6.2.3 打包Python环境
      4. 6.2.4 其他打包方法
    3. 6.3 Spark配置使用
    4. 6.4 缺点
  8. 7. Spark的一些技巧
© 2021 WeiguoZHAO
0%