怎么两边教程学习顺序出入恁大呢
从数据源创建RDD
- 从本地文件系统中加载数据
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
- 从HDFS文件系统中加载数据
以下三条命令均为等价命令,区别是目录形式不同>>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
>>> lines = sc.textFile("user/hadoop/word.txt")
>>> lines = sc.textFile("word.txt")
在使用 Spark 读取文件时:
- 如果使用了本地文件系统加载数据,就必须要保证在所有得 worker 节点上也能用同样的路径访问到该文件
- textFile() 方法的输入参数可以是文件名、目录、压缩文件等
-
textFile() 方法接受第二个可选的输入参数,用来指定分区的数目。默认情况下,Spark 会为 HDFS 的每个 block 创建一个分区(默认128MB),可以提供一个比 blcok 数量更大的值作为分区数目,但是这个值不能小于 block 数量的值
从本地集合中创建RDD
>>> nums = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(nums)
parallelize 方法会将位于单个节点的数据集合转换成为一个并行集合。在创建改并行集合时,还可以显式指定该并行集合的分片数量>>> myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\.split(" ")
>>> words = spark.parallelize(myCollection, 2)
也可以命名一个 RDD 以方便将其展示于 Spark UI 上>>> words.setName("myWords")
>>> words.name()
RDD操作
RDD 被创建好后,后续一般会发生两种操作:
- 转换(Transformation):基于现有的数据集创建一个新的数据集
-
转换操作
转换操作分为两类:指定窄依赖关系的转换操作和指定宽依赖关系的转换操作,对于 RDD 而言,每一次转换操作都会产生不同的 RDD,提供给下一个转换操作使用。转换得到的 RDD 是惰性的,即:整个转换过程只记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,从血缘关系源头开始,进行物理的转换操作
常见的转换操作(Transformation API): - distinct():删除重复项
- filter(func):筛选出满足函数 func 的元素,并返回一个新的数据集
- map(func):将每个元素传递到 func 中,并将结果返回为一个新的数据集
- flatMap(func):与 map() 相似,但每个输入元素都可以映射到0或者多个输出结果
- groupByKey(func):应用于 (K, V)键值对的数据集时,返回一个新的(K, iterable) 形式的数据集
- reduceByKey(func):应用于(K ,V)键值对数据集时,返回一个新的(K, V)形式的数据集,其中每个值是将每个 key 传递到函数中进行聚合
-
randomSplit(Array):将一个 RDD 随即切分为若干个RDD,返回一个由被切割的 RDD 组成的一个 RDD 的 Array;提供一个可选参数 seed,随机数种子
动作操作
行动操作才是真正触发计算的操作。Spark 程序只有执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终完成行动操作得到结果
常见的动作操作(Action API): - reduce(func):指定一个函数将 RDD 中的任何值“规约”为一个值
- count():返回数据集中的元素个数
- countApprox(timeoutMilliseconds, confidence):count() 方法的近似方法,用于返回大概的计数结果,但是必须在指定的时间(timeoutMilliseconds)内完成,如果超过指定时间,就会返回一个不准确的近似结果,confidence(置信度) 表示近似结果的误差区间包括真实值的概率
- countByValue():对给定 RDD 中的值得个数进行技术,需要将结果集记载到驱动器的内存中来实现计数,只有在总行数较少或不同 Key 数量较少时,才适合使用这种方法
- max():返回最大值
- min():返回最小值
- take(n):从 RDD 中读取n个值
- collect()以数组的形式返回数据集中的所有元素
- first()返回数据集中的第一个元素
-
保存文件
指将 RDD 写入纯文本文件。
- saveAsTextFile(path):要将 RDD 保存到文本文件中,只需要指定文件路径和压缩编码器(可选)即可
- savAsObjectFile(path):将 RDD 写入 sequenceFile 格式文件中,sequenceFile 是一种由二进制键值对(k-v)组成的文件,也是 Hadoop MapReduce 作业中常用的输入/输出格式
缓存
可以通过缓存或持久化 RDD 来避免重复计算的开销
>>> words.cache()
也可以通过方法手动把持久化的 RDD 从缓存中删除>>> words.unpersist()
检查点
检查点(checkpoint)是将 RDD 保存在磁盘上的操作,以便将来对此 RDD 的引用能直接访问磁盘上的那些中间结果,而不需要从其源头重新计算 RDD。它与缓存累死,但是它不存储在内存中,只存储再磁盘上
>>> sc.setCheckpointDir("/some/path/for/checkpointing")
>>> words.checkpoint()
惰性评估
惰性评估(lazy evaluation)的意思是等到绝对需要时才进行计算。
在 Spark 中,当用户表达一些对数据的操作时,不会立即修改数据,而是建立一个作用到原始数据的转换计划。Spark 首先会将这个计划编译为可以在集群中高效运行的流水线式物理执行计划,然后等待到最后的时刻才开始执行代码。这样可以优化整个从输入端到输出端的数据流