Spark簡(jiǎn)介 spark 可以很容易和yarn結(jié)合,直接調(diào)用HDFS、Hbase上面的數(shù)據(jù),和hadoop結(jié)合。配置很容易。 spark發(fā)展迅猛,框架比hadoop更加靈活實(shí)用。減少了延時(shí)處理,提高性能效率實(shí)用靈活性。也可以與hadoop切實(shí)相互結(jié)合。 spark核心部分分為RDD。Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心組件解決了很多的大數(shù)據(jù)問(wèn)題,其完美的框架日受歡迎。其相應(yīng)的生態(tài)環(huán)境包括zepplin等可視化方面,正日益壯大。大型公司爭(zhēng)相實(shí)用spark來(lái)代替原有hadoop上相應(yīng)的功能模塊。Spark讀寫(xiě)過(guò)程不像hadoop溢出寫(xiě)入磁盤(pán),都是基于內(nèi)存,因此速度很快。另外DAG作業(yè)調(diào)度系統(tǒng)的寬窄依賴讓Spark速度提高。
Spark核心組成 1、RDD 是彈性分布式數(shù)據(jù)集,完全彈性的,如果數(shù)據(jù)丟失一部分還可以重建。有自動(dòng)容錯(cuò)、位置感知調(diào)度和可伸縮性,通過(guò)數(shù)據(jù)檢查點(diǎn)和記錄數(shù)據(jù)更新金象容錯(cuò)性檢查。通過(guò)SparkContext.textFile()加載文件變成RDD,然后通過(guò)transformation構(gòu)建新的RDD,通過(guò)action將RDD存儲(chǔ)到外部系統(tǒng)。 RDD使用延遲加載,也就是懶加載,只有當(dāng)用到的時(shí)候才加載數(shù)據(jù)。如果加載存儲(chǔ)所有的中間過(guò)程會(huì)浪費(fèi)空間。因此要延遲加載。一旦spark看到整個(gè)變換鏈,他可以計(jì)算僅需的結(jié)果數(shù)據(jù),如果下面的函數(shù)不需要數(shù)據(jù)那么數(shù)據(jù)也不會(huì)再加載。轉(zhuǎn)換RDD是惰性的,只有在動(dòng)作中才可以使用它們。 Spark分為driver和executor,driver提交作業(yè),executor是application早worknode上的進(jìn)程,運(yùn)行task,driver對(duì)應(yīng)為sparkcontext。Spark的RDD操作有transformation、action。Transformation對(duì)RDD進(jìn)行依賴包裝,RDD所對(duì)應(yīng)的依賴都進(jìn)行DAG的構(gòu)建并保存,在worknode掛掉之后除了通過(guò)備份恢復(fù)還可以通過(guò)元數(shù)據(jù)對(duì)其保存的依賴再計(jì)算一次得到。當(dāng)作業(yè)提交也就是調(diào)用runJob時(shí),spark會(huì)根據(jù)RDD構(gòu)建DAG圖,提交給DAGScheduler,這個(gè)DAGScheduler是在SparkContext創(chuàng)建時(shí)一同初始化的,他會(huì)對(duì)作業(yè)進(jìn)行調(diào)度處理。當(dāng)依賴圖構(gòu)建好以后,從action開(kāi)始進(jìn)行解析,每一個(gè)操作作為一個(gè)task,每遇到shuffle就切割成為一個(gè)taskSet,并把數(shù)據(jù)輸出到磁盤(pán),如果不是shuffle數(shù)據(jù)還在內(nèi)存中存儲(chǔ)。就這樣再往前推進(jìn),直到?jīng)]有算子,然后運(yùn)行從前面開(kāi)始,如果沒(méi)有action的算子在這里不會(huì)執(zhí)行,直到遇到action為止才開(kāi)始運(yùn)行,這就形成了spark的懶加載,taskset提交給TaskSheduler生成TaskSetManager并且提交給Executor運(yùn)行,運(yùn)行結(jié)束后反饋給DAGScheduler完成一個(gè)taskSet,之后再提交下一個(gè),當(dāng)TaskSet運(yùn)行失敗時(shí)就返回DAGScheduler并重新再次創(chuàng)建。一個(gè)job里面可能有多個(gè)TaskSet,一個(gè)application可能包含多個(gè)job。
2、Spark Streaming 通過(guò)對(duì)kafka數(shù)據(jù)讀取,將Stream數(shù)據(jù)分成小的時(shí)間片段(幾秒),以類似batch批處理的方式來(lái)處理這一部分小數(shù)據(jù),每個(gè)時(shí)間片生成一個(gè)RDD,有高效的容錯(cuò)性,對(duì)小批量數(shù)據(jù)可以兼容批量實(shí)時(shí)數(shù)據(jù)處理的邏輯算法,用一些歷史數(shù)據(jù)和實(shí)時(shí)數(shù)據(jù)聯(lián)合進(jìn)行分析,比如分類算法等。也可以對(duì)小批量的stream進(jìn)行mapreduce、join等操作,而保證其實(shí)時(shí)性。針對(duì)數(shù)據(jù)流時(shí)間要求不到毫秒級(jí)的工程性問(wèn)題都可以。 Spark Streaming也有一個(gè)StreamingContext,其核心是DStream,是通過(guò)以組時(shí)間序列上的連續(xù)RDD來(lái)組成的,包含一個(gè)有Time作為key、RDD作為value的結(jié)構(gòu)體,每一個(gè)RDD都包含特定時(shí)間間隔的數(shù)據(jù)流,可以通過(guò)persist將其持久化。在接受不斷的數(shù)據(jù)流后,在blockGenerator中維護(hù)一個(gè)隊(duì)列,將流數(shù)據(jù)放到隊(duì)列中,等處理時(shí)間間隔到來(lái)后將其中的所有數(shù)據(jù)合并成為一個(gè)RDD(這一間隔中的數(shù)據(jù))。其作業(yè)提交和spark相似,只不過(guò)在提交時(shí)拿到DStream內(nèi)部的RDD并產(chǎn)生Job提交,RDD在action觸發(fā)之后,將job提交給jobManager中的JobQueue,又jobScheduler調(diào)度,JobScheduler將job提交到spark的job調(diào)度器,然后將job轉(zhuǎn)換成為大量的任務(wù)分發(fā)給spark集群執(zhí)行。Job從outputStream中生成的,然后觸發(fā)反向回溯執(zhí)行DStreamDAG。在流數(shù)據(jù)處理的過(guò)程中,一般節(jié)點(diǎn)失效的處理比離線數(shù)據(jù)要復(fù)雜。Spark streamin在1.3之后可以周期性的將DStream寫(xiě)入HDFS,同時(shí)將offset也進(jìn)行存儲(chǔ),避免寫(xiě)到zk。一旦主節(jié)點(diǎn)失效,會(huì)通過(guò)checkpoint的方式讀取之前的數(shù)據(jù)。當(dāng)worknode節(jié)點(diǎn)失效,如果HDFS或文件作為輸入源那Spark會(huì)根據(jù)依賴關(guān)系重新計(jì)算數(shù)據(jù),如果是基于Kafka、Flume等網(wǎng)絡(luò)數(shù)據(jù)源spark會(huì)將手機(jī)的數(shù)據(jù)源在集群中的不同節(jié)點(diǎn)進(jìn)行備份,一旦有一個(gè)工作節(jié)點(diǎn)失效,系統(tǒng)能夠根據(jù)另一份還存在的數(shù)據(jù)重新計(jì)算,但是如果接受節(jié)點(diǎn)失效會(huì)丟失一部分?jǐn)?shù)據(jù),同時(shí)接受線程會(huì)在其他的節(jié)點(diǎn)上重新啟動(dòng)并接受數(shù)據(jù)。
3、Graphx 主要用于圖的計(jì)算。核心算法有PageRank、SVD奇異矩陣、TriangleConut等。
4、Spark SQL 是Spark新推出的交互式大數(shù)據(jù)SQL技術(shù)。把sql語(yǔ)句翻譯成Spark上的RDD操作可以支持Hive、Json等類型的數(shù)據(jù)。
5、Spark R 通過(guò)R語(yǔ)言調(diào)用spark,目前不會(huì)擁有像Scala或者java那樣廣泛的API,Spark通過(guò)RDD類提供Spark API,并且允許用戶使用R交互式方式在集群中運(yùn)行任務(wù)。同時(shí)集成了MLlib機(jī)器學(xué)習(xí)類庫(kù)。
6、MLBase 從上到下包括了MLOptimizer(給使用者)、MLI(給算法使用者)、MLlib(給算法開(kāi)發(fā)者)、Spark。也可以直接使用MLlib。ML Optimizer,一個(gè)優(yōu)化機(jī)器學(xué)習(xí)選擇更合適的算法和相關(guān)參數(shù)的模塊,還有MLI進(jìn)行特征抽取和高級(jí)ML編程 抽象算法實(shí)現(xiàn)API平臺(tái),MLlib分布式機(jī)器學(xué)習(xí)庫(kù),可以不斷擴(kuò)充算法。MLRuntime基于spark計(jì)算框架,將Spark的分布式計(jì)算應(yīng)用到機(jī)器學(xué)習(xí)領(lǐng)域。MLBase提供了一個(gè)簡(jiǎn)單的聲明方法指定機(jī)器學(xué)習(xí)任務(wù),并且動(dòng)態(tài)地選擇最優(yōu)的學(xué)習(xí)算法。
7、Tachyon 高容錯(cuò)的分布式文件系統(tǒng)。宣稱其性能是HDFS的3000多倍。有類似java的接口,也實(shí)現(xiàn)了HDFS接口,所以Spark和MR程序不需要任何的修改就可以運(yùn)行。目前支持HDFS、S3等。
8、Spark算子 1、Map。對(duì)原數(shù)據(jù)進(jìn)行處理,類似于遍歷操作,轉(zhuǎn)換成MappedRDD,原分區(qū)不變。 2、flatMap。將原來(lái)的RDD中的每一個(gè)元素通過(guò)函數(shù)轉(zhuǎn)換成新的元素,將RDD的每個(gè)集合中的元素合并成一個(gè)集合。比如一個(gè)元素里面多個(gè)list,通過(guò)這個(gè)函數(shù)都合并成一個(gè)大的list,最經(jīng)典的就是wordcount中將每一行元素進(jìn)行分詞以后成為,通過(guò)flapMap變成一個(gè)個(gè)的單詞,line.flapMap(_.split(“ ”)).map((_,1))如果通過(guò)map就會(huì)將一行的單詞變成一個(gè)list。 3、mapPartitions。對(duì)每個(gè)分區(qū)進(jìn)行迭代,生成MapPartitionsRDD。 4、Union。是將兩個(gè)RDD合并成一個(gè)。使用這個(gè)函數(shù)要保證兩個(gè)RDD元素的數(shù)據(jù)類型相同,返回的RDD的數(shù)據(jù)類型和被合并的RDD數(shù)據(jù)類型相同。 5、Filter。其功能是對(duì)元素進(jìn)行過(guò)濾,對(duì)每個(gè)元素調(diào)用f函數(shù),返回值為true的元素就保留在RDD中。 6、Distinct。對(duì)RDD中元素進(jìn)行去重操作。 7、Subtract。對(duì)RDD1中取出RDD1與RDD2交集中的所有元素。 8、Sample。對(duì)RDD中的集合內(nèi)元素進(jìn)行采樣,第一個(gè)參數(shù)withReplacement是true表示有放回取樣,false表示無(wú)放回。第二個(gè)參數(shù)表示比例,第三個(gè)參數(shù)是隨機(jī)種子。如data.sample(true, 0.3,new Random().nextInt())。 9、takeSample。和sample用法相同,只不第二個(gè)參數(shù)換成了個(gè)數(shù)。返回也不是RDD,而是collect。 10、Cache。將RDD緩存到內(nèi)存中。相當(dāng)于persist(MEMORY_ONLY)??梢酝ㄟ^(guò)參數(shù)設(shè)置緩存和運(yùn)行內(nèi)存之間的比例,如果數(shù)據(jù)量大于cache內(nèi)存則會(huì)丟失。 11、Persist。里面參數(shù)可以選擇DISK_ONLY/MEMORY_ONLY/MEMORY_AND_DISK等,其中的MEMORY_AND_DISK當(dāng)緩存空間滿了后自動(dòng)溢出到磁盤(pán)。 12、MapValues。針對(duì)KV數(shù)據(jù),對(duì)數(shù)據(jù)中的value進(jìn)行map操作,而不對(duì)key進(jìn)行處理。 13、reduceByKey。針對(duì)KV數(shù)據(jù)將相同key的value聚合到一起。與groupByKey不同,會(huì)進(jìn)行一個(gè)類似mapreduce中的combine操作,減少相應(yīng)的數(shù)據(jù)IO操作,加快效率。如果想進(jìn)行一些非疊加操作,我們可以將value組合成字符串或其他格式將相同key的value組合在一起,再通過(guò)迭代,組合的數(shù)據(jù)拆開(kāi)操作。 14、partitionBy??梢詫DD進(jìn)行分區(qū),重新生成一個(gè)ShuffleRDD,進(jìn)行一個(gè)shuffle操作,對(duì)后面進(jìn)行頻繁的shuffle操作可以加快效率。 15、randomSplit。對(duì)RDD進(jìn)行隨機(jī)切分。如data.randomSplit(new double[]{0.7, 0.3})返回一個(gè)RDD的數(shù)組。 16、Cogroup。對(duì)兩個(gè)RDD中的KV元素,每個(gè)RDD中相同key中的元素分別聚合成一個(gè)集合。與reduceByKey不同的是針對(duì)兩個(gè)RDD中相同的key的元素進(jìn)行合并。 17、Join。相當(dāng)于inner join。對(duì)兩個(gè)需要連接的RDD進(jìn)行cogroup,然后對(duì)每個(gè)key下面的list進(jìn)行笛卡爾積的操作,輸出兩兩相交的兩個(gè)集合作為value。 相當(dāng)于sql中where a.key=b.key。 18、leftOutJoin,rightOutJoin。在數(shù)據(jù)庫(kù)中左連接以左表為坐標(biāo)將表中所有的數(shù)據(jù)列出來(lái),右面不存在的用null填充。在這里面對(duì)join的基礎(chǔ)上判斷左側(cè)的RDD元素是否是空,如果是空則填充。右連接則相反。 19、saveAsTestFile。將數(shù)據(jù)輸出到HDFS的指定目錄。 20、saveAsObjectFile。寫(xiě)入HDFS為SequenceFile格式。 21、Collect、collectAsMap。將RDD轉(zhuǎn)換成list或者M(jìn)ap。結(jié)果以List或者HashMap的方式輸出。 22、Count。對(duì)RDD的元素進(jìn)行統(tǒng)計(jì),返回個(gè)數(shù)。 23、Top(k)。返回最大的k個(gè)元素,返回List的形式。 24、Take返回?cái)?shù)據(jù)的前k個(gè)元素。 25、takeOrdered。返回?cái)?shù)據(jù)的最小的k個(gè)元素,并在返回中保持元素的順序。
9、Tips 1、RDD.repartition(n)可以在最初對(duì)RDD進(jìn)行分區(qū)操作,這個(gè)操作實(shí)際上是一個(gè)shuffle,可能比較耗時(shí),但是如果之后的action比較多的話,可以減少下面操作的時(shí)間。其中的n值看cpu的個(gè)數(shù),一般大于2倍cpu,小于1000。 2、Action不能夠太多,每一次的action都會(huì)將以上的taskset劃分一個(gè)job,這樣當(dāng)job增多,而其中task并不釋放,會(huì)占用更多的內(nèi)存,使得gc拉低效率。 3、在shuffle前面進(jìn)行一個(gè)過(guò)濾,減少shuffle數(shù)據(jù),并且過(guò)濾掉null值,以及空值。 4、groupBy盡量通過(guò)reduceBy替代。reduceBy會(huì)在work節(jié)點(diǎn)做一次reduce,在整體進(jìn)行reduce,相當(dāng)于做了一次hadoop中的combine操作,而combine操作和reduceBy邏輯一致,這個(gè)groupBy不能保證。 5、做join的時(shí)候,盡量用小RDD去join大RDD,用大RDD去join超大的RDD。 6、避免collect的使用。因?yàn)閏ollect如果數(shù)據(jù)集超大的時(shí)候,會(huì)通過(guò)各個(gè)work進(jìn)行收集,io增多,拉低性能,因此當(dāng)數(shù)據(jù)集很大時(shí)要save到HDFS。 7、RDD如果后面使用迭代,建議cache,但是一定要估計(jì)好數(shù)據(jù)的大小,避免比cache設(shè)定的內(nèi)存還要大,如果大過(guò)內(nèi)存就會(huì)刪除之前存儲(chǔ)的cache,可能導(dǎo)致計(jì)算錯(cuò)誤,如果想要完全的存儲(chǔ)可以使用persist(MEMORY_AND_DISK),因?yàn)閏ache就是persist(MEMORY_ONLY)。 8、設(shè)置spark.cleaner.ttl,定時(shí)清除task,因?yàn)閖ob的原因可能會(huì)緩存很多執(zhí)行過(guò)去的task,所以定時(shí)回收可能避免集中g(shù)c操作拉低性能。 9、適當(dāng)pre-partition,通過(guò)partitionBy()設(shè)定,每次partiti
轉(zhuǎn):http://www.cnblogs.com/hellochennan/p/5372946.html |
|
來(lái)自: 看風(fēng)景D人 > 《spark》