Spark版本定制版3-通過案例對SparkStreaming透徹理解三板斧之三

a. Spark Streaming Job 架構和運行機制

b. Spark Streaming Job 容錯架構和運行機制

注:本講內容基于Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。

上節回顧:

上節課談到Spark Streaming是基于DStream編程。DStream是邏輯級別的,而RDD是物理級別的。DStream是隨著時間的流動內部將集合封裝RDD。對DStream的操作,歸根結底還是對其RDD進行的操作。

如果將Spark Streaming放在坐標系中,并以Y軸表示對RDD的操作,RDD的依賴關系構成了整個job的邏輯應用,以X軸作為時間。隨著時間的流逝,以固定的時間間隔(Batch Interval)產生一個個job實例,進而在集群中運行。

同時也為大家詳細總結并揭秘 Spark Streaming五大核心特征:特征1:邏輯管理、特征2:時間管理、特征3:流式輸入和輸出、特征4:高容錯、特征5:事務處理。最后結合Spark Streaming源碼做了進一步解析。

**

開講

**

由上一講可以得知,以固定的時間間隔(Batch Interval)產生一個個job實例。那么在時間維度和空間維度組成的時空維度的Spark Streaming中,Job的架構和運行機制、及其容錯架構和運行機制是怎樣的呢?

那我們從愛因斯坦的相對時空講起吧:

a、時間和空間是緊密聯系的統一體,也稱為時空連續體。

b、時空是相對的,不同的觀察者看到的時間,長度,質量都可以不一樣。

c、對于兩個沒有聯系的事件,沒有絕對的先后順序。但是因果關系可以確定事件的先后,比如Job的實例產生并運行在集群中,那么Job實例的產生事件必然發生在Job運行集群中之前。

就是說Job的實例產生和單向流動的時間之間,沒有必然的聯系;在這里時間只是一種假象。

怎么更好的理解這句話呢?那我們就得從以下方面為大家逐步解答。

什么是Spark Streaming Job 架構和運行機制 ?

對于一般的Spark應用程序來說,是RDD的action操作觸發了Job的運行。那對于SparkStreaming來說,Job是怎么樣運行的呢?我們在編寫SparkStreaming程序的時候,設置了BatchDuration,Job每隔BatchDuration時間會自動觸發,這個功能是Spark Streaming框架提供了一個定時器,時間一到就將編寫的程序提交給Spark,并以Spark job的方式運行。

通過案例透視Job架構和運行機制

案例代碼如下:

將上述代碼打成JAR包,再上傳到集群中運行

集群中運行結果如下

運行過程總圖如下

案例詳情解析

a、 首先通過StreamingContext調用start方法,其內部再啟動JobScheduler的Start方法,進行消息循環;

(StreamingContext.scala,610行代碼)

(JobScheduler.scala,83行代碼)

b、 在JobScheduler的start內部會構造JobGenerator和ReceiverTacker;

(JobScheduler.scala,82、83行代碼)

c、 然后調用JobGenerator和ReceiverTacker的start方法執行以下操作:

(JobScheduler.scala,79、98行代碼)

(ReceiverTacker.scala,149、157行代碼)

JobGenerator啟動后會不斷的根據batchDuration生成一個個的Job ;

(JobScheduler.scala,208行代碼)

ReceiverTracker的作用主要是兩點:

1.對Receiver的運行進行管理,ReceiverTracker啟動時會調用lanuchReceivers()方法,進而會使用rpc通信啟動Receiver(實際代碼中,Receiver外面還有一層包裝ReceiverSupervisor實現高可用)

(ReceiverTracker.scala,423行代碼)

2.管理Receiver的元數據,供Job對數據進行索引,元數據的核心結構是receivedBlockTracker

(ReceiverTracker.scala,106~112行代碼)

d、 在Receiver收到數據后會通過ReceiverSupervisor存儲到Executor的BlockManager中 ;

e、 同時把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker內部會通過ReceivedBlockTracker來管理接受到的元數據信息;

這里面涉及到兩個Job的概念:

每個BatchInterval會產生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發真正的作業的運行)

為什么使用線程池呢?

a 、作業不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執行Task有異曲同工之妙;

b 、有可能設置了Job的FAIR公平調度的方式,這個時候也需要多線程的支持;

Spark Streaming Job 容錯架構和運行機制

Spark Streaming是基于DStream的容錯機制,DStream是隨著時間流逝不斷的產生RDD,也就是說DStream是在固定的時間上操作RDD,容錯會劃分到每一次所形成的RDD。

Spark Streaming的容錯包括 Executor 與 Driver兩方面的容錯機制 :

a、 Executor 容錯:

1. 數據接收:分布式方式、wal方式,先寫日志再保存數據到Executor

2. 任務執行安全性 Job基于RDD容錯 :

b、Driver容錯 : checkpoint 。

基于RDD的特性,它的容錯機制主要就是兩種:

1. 基于checkpoint;

在stage之間,是寬依賴,產生了shuffle操作,lineage鏈條過于復雜和冗長,這時候就需要做checkpoint。

2. 基于lineage(血統)的容錯:

一般而言,spark選擇血統容錯,因為對于大規模的數據集,做檢查點的成本很高。

考慮到RDD的依賴關系,每個stage內部都是窄依賴,此時一般基于lineage容錯,方便高效。

總結: stage內部做lineage,stage之間做checkpoint。

有興趣想學習國內頂級整套Spark+Spark Streaming+Machine learning課程的,歡迎加我qq  471186150。共享視頻,性價比超高!

免責聲明:本文僅代表文章作者的個人觀點,與本站無關。其原創性、真實性以及文中陳述文字和內容未經本站證實,對本文以及其中全部或者部分內容文字的真實性、完整性和原創性本站不作任何保證或承諾,請讀者僅作參考,并自行核實相關內容。

http://www.538703.live/style/images/nopic.gif
分享
評論
首頁
买鱼能赚钱吗 单机免费打麻将单机版 九游棋牌游戏? 黑龙江6加一开奖 股票市场分析 如何上网赚钱 贵州快3走势图 意甲联赛哪里可以看 如何网络赚钱 大唐棋牌下载安装 北京赛车玩法