本篇内容主要讲解“Driver容错安全性是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Driver容错安全性是什么”吧!
从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息。
从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关。
ReceivedBlockTracker在接收到元数据信息后调用addBlock方法,先写入磁盘中,然后在写入内存中。
data:image/s3,"s3://crabby-images/461e2/461e2fbf53f912208d130fdb5c7b051db0fc2800" alt="Driver容错安全性是什么 driver 第1张 Driver容错安全性是什么 driver 第1张"
data:image/s3,"s3://crabby-images/21fd4/21fd40e00c162f3f26f1d377473739e825d49702" alt="Driver容错安全性是什么 driver 第2张 Driver容错安全性是什么 driver 第2张"
根据batchTime分配属于当前BatchDuration要处理的数据到timToAllocatedBlocks数据结构中。
data:image/s3,"s3://crabby-images/a670d/a670d6d3ac48ac324556c9849bbc1947d1a36364" alt="Driver容错安全性是什么 driver 第3张 Driver容错安全性是什么 driver 第3张"
Time类的是一个case class,记录时间,重载了操作符,隐式转换,值得借鉴。
case class Time(private val millis: Long) { def milliseconds: Long = millis def < (that: Time): Boolean = (this.millis < that.millis) def <= (that: Time): Boolean = (this.millis <= that.millis) def > (that: Time): Boolean = (this.millis > that.millis) def >= (that: Time): Boolean = (this.millis >= that.millis) def + (that: Duration): Time = new Time(millis + that.milliseconds) def - (that: Time): Duration = new Duration(millis - that.millis) def - (that: Duration): Time = new Time(millis - that.milliseconds) // Java-friendlier versions of the above. def less(that: Time): Boolean = this < that def lessEq(that: Time): Boolean = this <= that def greater(that: Time): Boolean = this > that def greaterEq(that: Time): Boolean = this >= that def plus(that: Duration): Time = this + that def minus(that: Time): Duration = this - that def minus(that: Duration): Time = this - that def floor(that: Duration): Time = { val t = that.milliseconds new Time((this.millis / t) * t) } def floor(that: Duration, zeroTime: Time): Time = { val t = that.milliseconds new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) } def isMultipleOf(that: Duration): Boolean = (this.millis % that.milliseconds == 0) def min(that: Time): Time = if (this < that) this else that def max(that: Time): Time = if (this > that) this else that def until(that: Time, interval: Duration): Seq[Time] = { (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_)) } def to(that: Time, interval: Duration): Seq[Time] = { (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_)) } override def toString: String = (millis.toString + " ms") } object Time { implicit val ordering = Ordering.by((time: Time) => time.millis) } |
跟踪Time对象,ReceiverTracker的allocateBlocksToBatch方法中的入参batchTime是被JobGenerator的generateJobs方法调用的。
data:image/s3,"s3://crabby-images/31b4d/31b4d88557fae14937386508e50c31bf556fdc0d" alt="Driver容错安全性是什么 driver 第4张 Driver容错安全性是什么 driver 第4张"
JobGenerator的generateJobs方法是被定时器发送GenerateJobs消息调用的。
data:image/s3,"s3://crabby-images/6387f/6387f24f01ad5fe6a2692664931d78a86042fb69" alt="Driver容错安全性是什么 driver 第5张 Driver容错安全性是什么 driver 第5张"
data:image/s3,"s3://crabby-images/5e5a4/5e5a4a9d0d6d39c5561dce2c71f4617e18ddb993" alt="Driver容错安全性是什么 driver 第6张 Driver容错安全性是什么 driver 第6张"
data:image/s3,"s3://crabby-images/6c24f/6c24fcc318a903a5604717bce50a64bbf005ea4e" alt="Driver容错安全性是什么 driver 第7张 Driver容错安全性是什么 driver 第7张"
GenerateJobs中的时间参数就是nextTime,而nextTime+=period,这个period就是ssc.graph.batchDuration.milliseconds。
data:image/s3,"s3://crabby-images/04219/04219ab8dc106d939477bf413510ac3280f84b9a" alt="Driver容错安全性是什么 driver 第8张 Driver容错安全性是什么 driver 第8张"
nextTime的初始值是在start方法中传入的startTime赋值的,即RecurringTimer的getStartTime方法的返回值,是当前时间period的(整数倍+1)。
data:image/s3,"s3://crabby-images/e1862/e18628cf63ec73e7cbc1d37d8c1543a6848c0b11" alt="Driver容错安全性是什么 driver 第9张 Driver容错安全性是什么 driver 第9张"
data:image/s3,"s3://crabby-images/09af8/09af88dd870b71c44ffa71e50496742283e91016" alt="Driver容错安全性是什么 driver 第10张 Driver容错安全性是什么 driver 第10张"
Period这个值是我们调用new StreamingContext来构造StreamingContext时传入的Duration值。
data:image/s3,"s3://crabby-images/0350f/0350f38ecc1e6c5bda314d8e4d76daa5a8aa450c" alt="Driver容错安全性是什么 driver 第11张 Driver容错安全性是什么 driver 第11张"
data:image/s3,"s3://crabby-images/554b4/554b4645c29a35ebd3969c9f8e6d5655c613e226" alt="Driver容错安全性是什么 driver 第12张 Driver容错安全性是什么 driver 第12张"
ReceivedBlockTracker会清除过期的元数据信息,从HashMap中移除,也是先写入磁盘,然后在写入内存。
data:image/s3,"s3://crabby-images/a8600/a8600f504544c00ff0594a7f2c3f4baac5d9e723" alt="Driver容错安全性是什么 driver 第13张 Driver容错安全性是什么 driver 第13张"
元数据的生成,消费和销毁都有WAL,所以失败时就可以从日志中恢复。从源码分析中得出只有设置了checkpoint目录,才进行WAL机制。
data:image/s3,"s3://crabby-images/5c3e6/5c3e672f19dc7f5dd942ffc3dec7f72b0fef0a1a" alt="Driver容错安全性是什么 driver 第14张 Driver容错安全性是什么 driver 第14张"
对传入的checkpoint目录来创建日志目录进行WAL。
data:image/s3,"s3://crabby-images/5effb/5effb829337dbaea8b4dc0e732f7574935904553" alt="Driver容错安全性是什么 driver 第15张 Driver容错安全性是什么 driver 第15张"
这里是在checkpoint目录下创建文件夹名为receivedBlockMetadata的文件夹来保存WAL记录的数据。
data:image/s3,"s3://crabby-images/ff914/ff914492f95c53e52c65308ce90ca008d0b0b6eb" alt="Driver容错安全性是什么 driver 第16张 Driver容错安全性是什么 driver 第16张"
data:image/s3,"s3://crabby-images/e60b4/e60b4c8bc6f67d043d02ca9220ce595a1b01ea2f" alt="Driver容错安全性是什么 driver 第17张 Driver容错安全性是什么 driver 第17张"
把当前的DStream和JobGenerator的状态进行checkpoint,该方法是在generateJobs方法最后通过发送DoCheckpoint消息,来调用的。
data:image/s3,"s3://crabby-images/109ae/109aebc6e2266e6481266566a7797269eb72ca30" alt="Driver容错安全性是什么 driver 第18张 Driver容错安全性是什么 driver 第18张"
data:image/s3,"s3://crabby-images/8e15e/8e15e4693e00f17c19b070d95bdb0d61d2eb1a3d" alt="Driver容错安全性是什么 driver 第19张 Driver容错安全性是什么 driver 第19张"
data:image/s3,"s3://crabby-images/1aef7/1aef7141fe02b456f93d2e8bd706ce3d0cd713ca" alt="Driver容错安全性是什么 driver 第20张 Driver容错安全性是什么 driver 第20张"
到此,相信大家对“Driver容错安全性是什么”有了更深的了解,不妨来实际操作一番吧!这里是蜗牛博客网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:niceseo99@gmail.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
评论