博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
七、rdd究竟是什么
阅读量:6879 次
发布时间:2019-06-27

本文共 2855 字,大约阅读时间需要 9 分钟。

RDD是个抽象类,定义了诸如map()、reduce()等方法,但实际上继承RDD的派生类一般只要实现两个方法:

 

  • def getPartitions: Array[Partition]
  • def compute(thePart: Partition, context: TaskContext): NextIterator[T]

 

getPartitions()用来告知怎么将input分片;

 
compute()用来输出每个Partition的所有行(行是我给出的一种不准确的说法,应该是被函数处理的一个单元);
 

◆ RDD的特点:

  1. 它是在集群节点上的不可变的、已分区的集合对象。
  2. 通过并行转换的方式来创建如(map, filter, join, etc)。
  3. 失败自动重建。
  4. 可以控制存储级别(内存、磁盘等)来进行重用。
  5. 必须是可序列化的。
  6. 是静态类型的。
 
a、分区
b、依赖(lineage)
c、函数
d、最佳位置(数据本地化)
e、分区策略

◆ RDD的好处

  1. RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
  2. RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
  3. RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。
  4. RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。

◆ RDD的存储与分区

  1. 用户可以选择不同的存储级别存储RDD以便重用。
  2. 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
  3. RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。

◆ RDD的内部表示

在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:

  1. 分区列表(数据块列表)
  2. 计算每个分片的函数(根据父RDD计算出此RDD)
  3. 对父RDD的依赖列表
  4. 对key-value RDD的Partitioner【可选】
  5. 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】

◆ RDD的存储级别

RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:

 
  1. val NONE = new StorageLevel(false, false, false)   
  2.     val DISK_ONLY = new StorageLevel(true, false, false)   
  3.     val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)   
  4.     val MEMORY_ONLY = new StorageLevel(false, true, true)   
  5.     val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)   
  6.     val MEMORY_ONLY_SER = new StorageLevel(false, true, false)   
  7.     val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)   
  8.     val MEMORY_AND_DISK = new StorageLevel(true, true, true)   
  9.     val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)   
  10.     val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)   
  11.     val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)  

◆ RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。

RDD的生成

◆ RDD有两种创建方式:

1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。

2、从父RDD转换得到新RDD。

◆ 下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:

 
  1. // SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像   
  2.    // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。   
  3.    def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {   
  4.        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],   
  5.        classOf[Text], minSplits) .map(pair => pair._2.toString) }  
  6.    
  7.    // 根据Hadoop配置,及InputFormat等创建HadoopRDD    
  8.    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) 

◆ 对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:

RDD的转换与操作

◆ 对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。

◆ 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

 
◆ 操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。 

转载于:https://www.cnblogs.com/ylcoder/p/5730987.html

你可能感兴趣的文章
pwnable.kr bof之write up
查看>>
图片居中,自适应大小
查看>>
Sql语句查询某列A相同值的另一列B最大值的数据
查看>>
汇编中的inc和dec
查看>>
技术串讲 CAS 有用
查看>>
Hadoop排序工具用法小结
查看>>
qdoj.xyz 6.17
查看>>
HDU 1176 免费馅饼(简单DP,数塔变形)
查看>>
最短路径
查看>>
怒学三算法 POJ 2387 Til the Cows Come Home (Bellman_Ford || Dijkstra || SPFA)
查看>>
js --- 执行机制
查看>>
创业板跌跌不休
查看>>
类,对象,方法,变量
查看>>
导航和渲染首页文章列表
查看>>
Unity脚本用VS打开出现 "以下文件中的行尾不一致,要将行尾标准化吗?"
查看>>
开始Flask项目
查看>>
linux命令---常用组合
查看>>
深入理解OOP(第一天):多态和继承(初期绑定和编译时多态)
查看>>
JS获取select 当前选种值
查看>>
requestAnimFrame 动画的使用方法
查看>>