博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
24、Checkpoint原理剖析
阅读量:5220 次
发布时间:2019-06-14

本文共 3986 字,大约阅读时间需要 13 分钟。

一、原理

1、Checkpoint是什么

Checkpoint,是Spark提供的一个比较高级的功能。有的时候,比如说,我们的Spark应用程序,特别的复杂,然后呢,从初始的RDD开始,到最后整个应用程序完成,有非常多的步骤,比如超过20个transformation操作。而且呢,整个应用运行的时间也特别长,比如通常要运行1~5个小时。在上述情况下,就比较适合使用checkpoint功能。因为,对于特别复杂的Spark应用,有很高的风险,会出现某个要反复使用的RDD,因为节点的故障,虽然之前持久化过,但是还是导致数据丢失了。那么也就是说,出现失败的时候,没有容错机制,所以当后面的transformation操作,又要使用到该RDD时,就会发现数据丢失了(CacheManager),此时如果没有进行容错处理的话,那么可能就又要重新计算一次数据。简而言之,针对上述情况,整个Spark应用程序的容错性很差;

2、Checkpoint的功能

所以,针对上述的复杂Spark应用的问题(没有容错机制的问题)。就可以使用checkponit功能。checkpoint功能是什么意思?checkpoint就是说,对于一个复杂的RDD chain,我们如果担心中间某些关键的,在后面会反复几次使用的RDD,可能会因为节点的故障,导致持久化数据的丢失,那么就可以针对该RDD格外启动checkpoint机制,实现容错和高可用。checkpoint,就是说,首先呢,要调用SparkContext的setCheckpointDir()方法,设置一个容错的文件系统的目录,比如说HDFS;然后,对RDD调用调用checkpoint()方法。之后,在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD的数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。那么此时,即使在后面使用RDD时,它的持久化的数据,不小心丢失了,但是还是可以从它的checkpoint文件中直接读取其数据,而不需要重新计算。(CacheManager)

3、图解

Checkpoint和持久化的最主要的区别,就在于,持久化,只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系、依赖关系)是不变的;但是CheckPoint执行完之后,rdd已经没有了之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,也就是说,checkpoint后,rdd的lineage就改变了;其次,持久化的数据丢失的可能性更大,磁盘,或者是内存,可能丢失,但是checkpoint的数据,通常是保存在容错、高可用的文件系统中的,比如说HDFS、依赖于这种高容错的文件系统,所以checkpoint的数据丢失可能性非常低;默认情况下,如果某个rdd没有持久化,还设置了checkpoint,就是说,本来这个job都执行结束了,但是由于中间的rdd没有持久化,那么checkpoint job想要将rdd的数据写入外部文件系统的话,还得从之前所有的rdd,全部重新计算一次,然后计算出rdd的数据,再将其checkpoint到外部文件系统;所以,通常建议,对要checkpoint()的rdd,使用persist(StorageLevel.DISK_ONLY),该RDD计算之后,就直接将其持久化到磁盘上去,然后后面进行checkpoint操作时,直接从磁盘上读取rdd的数据,并checkpoint到外部文件系统即可,不需要重新计算一次rdd,这种checkpoint的效率就高很多了;我们实现了checkpoint之后,后续,在某个task又调用该rdd的iterator()方法时,就实现了高容错机制,即使rdd的持久化数据丢失,或者就没有持久化,但是还是可以通过readCheckpointOrComputer()方法,优先从rdd的父rdd的chechpointrdd中读取,hdfs(外部文件系统的数据);

二、源码

###org.apache.spark.rdd/RDD.scala/**    * 先presist()再checkpoint()原理如下    * 那么首先执行到该rdd的iterator()之后,会先发现storageLevel != StorageLevel.NONE,那么就会通过CacheManager读获取数据,此时会发现通过    * BlockManager获取不到数据(第一次执行)    * 第一次还是会计算一次该RDD的数据,然后通过CacheManager的putInBlockManager()将其通过BlockManager进行持久化    * rdd所在的job运行结束了,然后启动单独job进行checkpoint操作,此时就又执行到该rdd的iterator()方法,那么就会发现storageLevel != StorageLevel.NONE    * 默认从BlockManager直接读取持久化数据(正常情况下,是可以的),如果非正常情况下,持久化数据丢失了,那么此时会走else,调用computeOrReadCheckpoint()    * 方法,判断如果rdd的isCheckPoint为true,那么就会用它的父rdd的iterator()方法,其实就是从checkpoint外部文件系统中读取数据    */  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {    if (storageLevel != StorageLevel.NONE) {      // cacheManager相关东西      // 如果storageLevel不为NONE,就是说,我们之前持久化过RDD,那么就不要直接去父RDD执行算子,计算新的RDD的partition了      // 优先尝试使用CacheManager,去获取持久化的数据      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)    } else {      // 进行rdd partition的计算      computeOrReadCheckpoint(split, context)    }  }###org.apache.spark.rdd/CheckpointRDD.scalaoverride def compute(split: Partition, context: TaskContext): Iterator[T] = {    // 使用hadoop的api Path 创建了一个,针对hdfs文件的路径,然后用checkpointRDD的readFromFile()方法,来读取hdfs中的数据    val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))    CheckpointRDD.readFromFile(file, broadcastedConf, context)  }###org.apache.spark.rdd/CheckpointRDD.scaladef readFromFile[T](      path: Path,      broadcastedConf: Broadcast[SerializableWritable[Configuration]],      context: TaskContext    ): Iterator[T] = {    val env = SparkEnv.get    // 调用hdfs的api,FileSystem    //    val fs = path.getFileSystem(broadcastedConf.value.value)    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)    // FileSystem的open()方法,打开针对hdfs文件的输入流    val fileInputStream = fs.open(path, bufferSize)    // 对输入流进行了反序列化流的一个包装    val serializer = env.serializer.newInstance()    // 使用deserializeStream的asIterator()方法读取数据    val deserializeStream = serializer.deserializeStream(fileInputStream)     // Register an on-task-completion callback to close the input stream.    context.addTaskCompletionListener(context => deserializeStream.close())     deserializeStream.asIterator.asInstanceOf[Iterator[T]]  }

转载于:https://www.cnblogs.com/weiyiming007/p/11250911.html

你可能感兴趣的文章
程序员之路--关于代码风格[转载]
查看>>
POJ-1830 开关问题 高斯消元 | 搜索
查看>>
WEB_web2
查看>>
Spring进阶—如何用Java代码实现邮件发送(二)
查看>>
[LeetCode] 513. Find Bottom Left Tree Value
查看>>
LCIS
查看>>
算法:六种比较排序算法
查看>>
ztree总结
查看>>
Java&Selenium自动化测试之Page Object Model
查看>>
TynSerial流的序列(还原)
查看>>
我的Go语言学习之旅二:入门初体验 Hello World
查看>>
输入password登录到主界面,录入学生编号,排序后输出
查看>>
Java 实现适配器(Adapter)模式
查看>>
C#编程(二十五)----------接口
查看>>
Django笔记 3
查看>>
为openstack 制作CentOS镜像
查看>>
(六)电子邮件
查看>>
pip 或 pip3 升级操作
查看>>
[经验]创建支持多地区的分站功能
查看>>
鸡啄米vc++2010系列45(Ribbon界面开发:为Ribbon Bar添加控件)
查看>>