首页 > 学院 > 开发设计 > 正文

跟天齐老师学Spark(7)--关于Spark的RDD

2019-11-11 05:19:37
字体:
来源:转载
供稿:网友
关于spark的RDD:关于RDD,可以查看官方文档,可以看作者的论文,也可以看spark源码中关于RDD的注释。按Ctrl+N快捷键,搜索RDD,进入源码,如果没有关联源码,在IDEA中右上角会有一个提示:"Attach Sources".在IDEA中关联spark的源码,首先解压下载好的spark源码包(spark-1.6.2.tgz),然后在IDEA中选择右上角的Attach Sources,在弹出的窗口中选择自己解压后的spark的源码目录即可。RDD:一个弹性、可复原的、分布式的数据集。它是spark的一个最基本的抽象。不可变的(一旦创建好了,在计算的时候是不可变,对它进行各种操作都只能生成新的RDD),被分区的(一个分区只能属于一台机器,但是一台机器上可能有很多很多的分区),的集合,它可以被并行的计算。只有key-value格式的数据才可以使用groupByKey或者join。RDD的5个特点:数据是存放在多个分区里面的。(1)RDD中有很多的分区,分区List是有序的(意味着如果你的数据很少,而分区很多,那么就可能有的分区中有数据,有的可能没有数据);(2)一个函数会作用到每一台机器上的每一个分区上面(split);(3)RDD和RDD之间是存在依赖关系的(为了容错),最终的那一个RDD会触发Action提交任务,它会向前依次推断出前面的RDD,然后一点点计算;可复原;这些RDD之间是有序的;(4)如果你的RDD里面是key-value类型的数据,它一定会有一个Partitioner(分区器,它决定了这条数据属于哪一个分区,它默认使用的分区器是hash分区器);如果不是key-value类型,他就没有分区器;(在spark中没有reduce概念,但它有一个partition概念,二者是相似的)(5)RDD里面会保存着一个最优位置。也就是数据在哪,以后它的任务就启动在哪;把计算调度到数据所在的机器上,位置感知,实现数据本地化;宁愿移动计算,而不会移动数据;因为移动数据的代价很大,数据要消耗大量的网络带宽和磁盘io;(说明:如果我们是从hdfs这种分布式系统里面读数据,它会有一个最佳位置。它会在有数据的那台机器上创建分区,它在启动Executor的时候,它还不知道这个数据在哪,它在创建分区的时候才会向我们的namenode进行交互,知道这个分区在哪台机器上,然后在那台机器上创建分区。后面看源码的时候再介绍)--------------------------------------------------------------RDD的特点说明通过从hdfs中读取数据来验证RDD的一些特点首先textFile("")生成的RDD会有几个分区呢?hdfs中的每一个block(每一个输入切片)就会对应spark中的RDD的一个分区;通过rdd.partition.length来查看一个RDD的分区数。注意:它在分区的时候,会让每一个分区中的数据量尽量被均匀分配。在从HDFS中读取数据的时候,假如我们的hdfs中有两个小文件,它会用一个RDD来读,第一个分区是partition0,第二个分区是partition1,这样hdfs中的文件和RDD中的分区是一一对应的,分区位于Worker中的Executor进程中。这两个分区可能在一个Executor上,也可能在不同机器的Executor上,但是一个分区里面的数据不可能在两台机器上。注意textFile方法并不会触发Action,所以现在还不会真的去读数据,所以此时的分区中还没有数据。但是它会记录住每一个分区将要从哪个目录下的哪个block中读取数据。val rdd2 = rdd1.map((_, 1))如果没有改变RDD分区的数量,那么新生成的子RDD中分区的数量会和父RDD的分区的数量一样。RDD之间会记录住它们之间的血缘关系。虽然现在还没有出发Action,但是这些RDD会记住你调了什么方法,传入了什么函数。rdd2.saveAsTextFile("")调动saveAsTextFile方法之后。触发Action,开始提交任务,它会从最后一个RDD往前推知道推到最前面的rdd1,才开始读数据,它好比从一个迭代器中读数据,好比一个流水线,读一条处理一条。我们调rdd上的map方法(这个rdd上的map方法是针对多台机器的一个抽象方法),其实它最终会调每一个分区上的那个map方法(MapPartitionsRDD),然后这个分区上的map方法会调scala的map方法。在hdfs中就会产生两个结果文件,因为它有两个分区。打印RDD之间的依赖关系的方法:rdd.toDebugString验证分区器:有一个getPartition方法,用key的哈希code值对分区的数量求模。val rdd3 = rdd2.repartition(2)这样就给rdd2重新分区。rdd3.saveAsTextFile("")发现结果不一样了。因为重新分区会有一个shuffer的过程;import org.apache.spark.HashPartitionerrdd.partitonBy(new HashPartitioner(2))rdd3.saveAsTextFile("")再看生成的结果文件中的数据,发现数据被分类了,key相同的数据在一个文件中。说明我们设置的分区器发挥作用了。调用partitonBy方法会生成一个新的RDD,叫做ShuffleRDD。它会用key的hashcode对分区数求模。这样key相同的数据就会进入同一个分区。对每一个分区的数据进行局部求和,最后再汇总。val rdd3 = rdd2.reduceByKey(_+_)它里面也有一个默认的分区器。rdd3.saveAsTextFile("")结果文件还是两个。说明它把key相同的数据Shuffle到同一个分区中然后再分别聚合。//在使用reduceByKey的时候,指定分区数量val rdd3 = rdd2.reduceByKey(_+_, 1)rdd3.saveAsTextFile("")此时hdfs中只有一个结果文件。因为指定了只有一个分区,所以说有的数据都被Shuffle到同一个分区中。

上面的一些实验测试,是为了验证RDD的一些特征。

RDD上的一些复杂的方法:将老师发的文件spark_rdd_api.txt文件中的练习一下即可。查看分区中的数据是什么:val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)rdd1.mapPartitionsWithIndex(func).collect常用函数:math.max(_,_)和math.min(_, _)RDD并没有定义reduceByKey方法,但是它能调这个方法,其实它是在PairRDDFunction中定义的。它将普通的RDD转换成了PairRDDFunction,在RDD单例对象中就定义了一个rddToPairRDDFunction方法。在SparkContext(注意是object)中定义了很多implicit,其中就包含rddToPairRDDFunction方法,它已经被废弃,它调用的就是RDD单例对象中定义的那个rddToPairRDDFunction方法。foreachPartition:这个方法在以后开发中用的非常非常多!!!它可以将每一个分区中的数据拿出来进行处理,在Spark中计算好的数据不需要sqoop工具,定义一个函数就可以直接往关系型数据库中写(后面会专门有例子讲)。注意coalesce(分区数, Boolean)方法和repartition方法的关系,其实repartition方法底层调的就是coalesce(分区数, shuffle=true),只不过给它传了一个shuffle=true,表示分区中的数据一定要shuffle,也就是说数据一定要在网络中传递,以数据为单位重新分配到新的RDD中。如果我们直接调用coalesce(分区数,false)方法给它传一个false的话,就不会有shuffle。它只会以分区为单位分配给新的RDD中的分区。而分区中的数据是不会重新洗牌的。


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表