大數(shù)據(jù)我們也許聽得很多了,一般都知道hadoop,但并不都是hadoop。那么我們?cè)撊绾螛?gòu)建自己的大數(shù)據(jù)項(xiàng)目呢?對(duì)于離線處理,hadoop還是比較適合的,但對(duì)于實(shí)時(shí)性比較強(qiáng)的,數(shù)據(jù)量比較大的,可以采用spark,那spark又跟什么技術(shù)搭配才能做一個(gè)適合自己的項(xiàng)目呢?各技術(shù)之間又是如何整合的呢?這篇文章將給大家介紹下大數(shù)據(jù)項(xiàng)目中用到的各技術(shù)框架知識(shí),并通過一個(gè)實(shí)際項(xiàng)目的分布式集群部署和實(shí)際業(yè)務(wù)應(yīng)用來詳細(xì)講述大數(shù)據(jù)架構(gòu)是如何構(gòu)建的,供大家參考。 1大數(shù)據(jù)架構(gòu)介紹 1.1 數(shù)據(jù)采集 負(fù)責(zé)從各節(jié)點(diǎn)上實(shí)時(shí)采集數(shù)據(jù),選用flume來實(shí)現(xiàn) Flume介紹 Flume是Cloudera(Hadoop數(shù)據(jù)管理軟件與服務(wù)提供商)提供的一個(gè)分布式、可靠、和高可用的海量日志采集、聚合和傳輸?shù)娜罩臼占到y(tǒng),支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時(shí),F(xiàn)lume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理,并寫到各種數(shù)據(jù)接受方(可定制)的能力。 Flume的一些核心概念
Flume以agent為最小的獨(dú)立運(yùn)行單位。一個(gè)agent就是一個(gè)JVM(JavaVirtual Machine)。單agent由Source、Sink和Channel三大組件構(gòu)成,如下圖: Flume的數(shù)據(jù)流由事件(Event)貫穿始終。事件是Flume的基本數(shù)據(jù)單位,它攜帶日志數(shù)據(jù)(字節(jié)數(shù)組形式)并且攜帶有頭信息,這些Event由Agent外部的Client,比如圖中的WebServer生成。當(dāng)Source捕獲事件后會(huì)進(jìn)行特定的格式化,然后Source會(huì)把事件推入(單個(gè)或多個(gè))Channel中。你可以把Channel看作是一個(gè)緩沖區(qū),它將保存事件直到Sink處理完該事件。Sink負(fù)責(zé)持久化日志或者把事件推向另一個(gè)Source。 Flume Sources
Flume Sinks 我們系統(tǒng)中使用的是HDFS Sink和Kafka Sink
1.2、數(shù)據(jù)接入 由于采集數(shù)據(jù)的速度和數(shù)據(jù)處理的速度不一定同步,因此添加一個(gè)消息中間件來作為緩沖,選用apache的kafka,對(duì)于離線數(shù)據(jù),選用hdfs HDFS介紹 HDFS(Hadoop Distribute File System Hadoop分布式文件系統(tǒng))是一個(gè)運(yùn)行在通用硬件上的分布式文件系統(tǒng),是ApacheHadoop Core項(xiàng)目的一部分。HDFS有著高容錯(cuò)性(fault-tolerant)的特點(diǎn),并且設(shè)計(jì)用來部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)來訪問應(yīng)用程序的數(shù)據(jù),適合那些有著超大數(shù)據(jù)集(large data set)的應(yīng)用程序。HDFS放寬了(relax)POSIX的要求(requirements)這樣可以實(shí)現(xiàn)流的形式訪問(streaming access)文件系統(tǒng)中的數(shù)據(jù)。 HDFS的一些核心概念 HDFS的高可用性 在一個(gè)典型的HA(High Availability)集群中,每個(gè)NameNode是一臺(tái)獨(dú)立的服務(wù)器。在任一時(shí)刻,只有一個(gè)NameNode處于active狀態(tài),另一個(gè)處于standby狀態(tài)。其中,active狀態(tài)的NameNode負(fù)責(zé)所有的客戶端操作,standby狀態(tài)的NameNode處于從屬地位,維護(hù)著數(shù)據(jù)狀態(tài),隨時(shí)準(zhǔn)備切換。 兩個(gè)NameNode為了數(shù)據(jù)同步,會(huì)通過一組稱作JournalNodes的獨(dú)立進(jìn)程進(jìn)行相互通信。當(dāng)active狀態(tài)的NameNode的命名空間有任何修改時(shí),會(huì)告知大部分的JournalNodes進(jìn)程。standby狀態(tài)的NameNode有能力讀取JNs中的變更信息,并且一直監(jiān)控edit log的變化,把變化應(yīng)用于自己的命名空間。standby可以確保在集群出錯(cuò)時(shí),命名空間狀態(tài)已經(jīng)完全同步了,如圖所示 HDFS的體系結(jié)構(gòu) HDFS是一個(gè)主/從(Master/Slave)體系結(jié)構(gòu),從最終用戶的角度來看,它就像傳統(tǒng)的文件系統(tǒng)一樣,可以通過目錄路徑對(duì)文件執(zhí)行CRUD(Create、Read、Update和Delete)操作。但由于分布式存儲(chǔ)的性質(zhì),HDFS集群擁有一個(gè)NameNode和一些DataNode。NameNode管理文件系統(tǒng)的元數(shù)據(jù),DataNode存儲(chǔ)實(shí)際的數(shù)據(jù)。客戶端通過同NameNode和DataNodes的交互訪問文件系統(tǒng)??蛻舳寺?lián)系NameNode以獲取文件的元數(shù)據(jù),而真正的文件I/O操作是直接和DataNode進(jìn)行交互的。如圖所示。 Kafka介紹 Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者規(guī)模的網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。這種動(dòng)作(網(wǎng)頁瀏覽,搜索和其他用戶的行動(dòng))是在現(xiàn)代網(wǎng)絡(luò)上的許多社會(huì)功能的一個(gè)關(guān)鍵因素。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案。Kafka的目的是通過Hadoop的并行加載機(jī)制來統(tǒng)一線上和離線的消息處理,也是為了通過集群機(jī)來提供實(shí)時(shí)的消費(fèi)。 Kafka的一些核心概念
一個(gè)典型的Kafka集群中包含若干Producer(可以是web前端產(chǎn)生的PageView,或者是服務(wù)器日志,系統(tǒng)CPU、Memory等),若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個(gè)Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時(shí)進(jìn)行rebalance。Producer使用push模式將消息發(fā)布到broker,Consumer使用pull模式從broker訂閱并消費(fèi)消息。如圖所示 Zookeeper介紹 ZooKeeper是一個(gè)為分布式應(yīng)用所設(shè)計(jì)的分布的、開源的協(xié)調(diào)服務(wù)。它提供了一些簡(jiǎn)單的操作,使得分布式應(yīng)用可以基于這些接口實(shí)現(xiàn)諸如配置維護(hù)、域名服務(wù)、分布式同步、組服務(wù)等。 Zookeeper很容易編程接入,它使用了一個(gè)和文件樹結(jié)構(gòu)相似的數(shù)據(jù)模型??梢允褂肑ava或者C來進(jìn)行編程接入。 Zookeeper的一些基本概念
(1)最終一致性 client不論連接到哪個(gè)Server,展示給它都是同一個(gè)視圖,這是zookeeper最重要的性能。 (2)可靠性 具有簡(jiǎn)單、健壯、良好的性能,如果消息m被一臺(tái)服務(wù)器接受,那么它將被所有的服務(wù)器接受。 (3)實(shí)時(shí)性 Zookeeper保證客戶端將在一個(gè)時(shí)間間隔范圍內(nèi)獲得服務(wù)器的更新信息,或者服務(wù)器失效的信息。但由于網(wǎng)絡(luò)延時(shí)等原因,Zookeeper不能保證兩個(gè)客戶端能同時(shí)得到剛更新的數(shù)據(jù),如果需要最新數(shù)據(jù),應(yīng)該在讀數(shù)據(jù)之前調(diào)用sync()接口。 (4)等待無關(guān)(wait-free) 慢的或者失效的client不得干預(yù)快速的client的請(qǐng)求,使得每個(gè)client都能有效的等待 (5)原子性 更新只能成功或者失敗,沒有中間狀態(tài)。 (6)順序性 包括全局有序和偏序兩種:全局有序是指如果在一臺(tái)服務(wù)器上消息a在消息b前發(fā)布,則在所有Server上消息a都將在消息b前被發(fā)布;偏序是指如果一個(gè)消息b在消息a后被同一個(gè)發(fā)送者發(fā)布,a必將排在b前面。
1.3、流式計(jì)算 對(duì)采集到的數(shù)據(jù)進(jìn)行實(shí)時(shí)分析,選用apache的Spark Spark介紹 Spark是UCBerkeley AMP lab (加州大學(xué)伯克利分校的AMP實(shí)驗(yàn)室)所開源的類Hadoop MapReduce的通用并行框架,而后又加入到Apache孵化器項(xiàng)目。Spark,擁有HadoopMapReduce所具有的優(yōu)點(diǎn);但不同于MapReduce的是Job中間輸出結(jié)果可以保存在內(nèi)存中,從而不再需要讀寫磁盤,因此Spark能更好地適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等需要迭代的MapReduce的算法。 Spark不僅實(shí)現(xiàn)了MapReduce的算子map 函數(shù)和reduce函數(shù)及計(jì)算模型,還提供更為豐富的算子,如filter、join、groupByKey等。是一個(gè)用來實(shí)現(xiàn)快速而通用的集群計(jì)算的平臺(tái)。 Spark支持多處理模式以及支持庫,Spark Streaming、Spark SQL、Spark MLlib、Spark GraphX等極大程度方便開發(fā)者在大數(shù)據(jù)處理方面的不同需求 Spark的生態(tài)體系 MapReduce屬于Hadoop生態(tài)體系之一,Spark則屬于BDAS(BerkeleyData Analytics Stack,中文:伯克利數(shù)據(jù)分析棧)生態(tài)體系之一 Hadoop包含了MapReduce、HDFS、HBase、Hive、Zookeeper、Pig、Sqoop等 BDAS包含了Spark、Shark(相當(dāng)于Hive)、BlinkDB、Spark Streaming(消息實(shí)時(shí)處理框架,類似Storm)等等 Spark的運(yùn)行模式 l Local (用于測(cè)試、開發(fā)) l Standlone (獨(dú)立集群模式) l Spark on Yarn (Spark在Yarn上) l Spark on Mesos (Spark在Mesos) 我們系統(tǒng)中使用的是Spark on Yarn 1.4、數(shù)據(jù)輸出 對(duì)分析后的結(jié)果持久化,可以使用HDFS、Hbase、MongoDB Hbase介紹 HBase是ApacheHadoop的數(shù)據(jù)庫,能夠?qū)Υ笮蛿?shù)據(jù)提供隨機(jī)、實(shí)時(shí)的讀寫訪問。HBase的目標(biāo)是存儲(chǔ)并處理大型的數(shù)據(jù)。HBase是一個(gè)開源的,分布式的,多版本的,面向列的存儲(chǔ)模型。它存儲(chǔ)的是松散型數(shù)據(jù)。 HBase是一個(gè)構(gòu)建在HDFS上的分布式列存儲(chǔ)系統(tǒng); HBase是基于GoogleBigTable模型開發(fā)的,典型的key/value系統(tǒng); HBase是ApacheHadoop生態(tài)系統(tǒng)中的重要一員,主要用于海量結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ); 從邏輯上講,HBase將數(shù)據(jù)按照表、行和列進(jìn)行存儲(chǔ)。 與hadoop一樣,Hbase目標(biāo)主要依靠橫向擴(kuò)展,通過不斷增加廉價(jià)的商用服務(wù)器,來增加計(jì)算和存儲(chǔ)能力。 Hbase的系統(tǒng)架構(gòu) 關(guān)于HBase與ZooKeeper,可以分三點(diǎn)來描述: A、Zookeeper集群的職責(zé) A.1、負(fù)責(zé)監(jiān)控整個(gè)hbase集群中節(jié)點(diǎn)的狀態(tài)和通信。 A.2、管理hbase 集群的-ROOT-表,即所有Region Server的地址和HTable信息。 A.3、避免HMsater的單點(diǎn)故障問題(重啟故障的HMaster;如果zkLeader掛掉,重新選舉出leader)。 B、HMaster Server的職責(zé) B.1、為HRegionserver分配HRegion,并持續(xù)均衡負(fù)載; B.2、當(dāng)有HRegionserver失效時(shí),由HMaster負(fù)責(zé)重新分配其上的HRegion。 C、HRegion Server的職責(zé) C.1、維護(hù)HMaster分配的HRegin,響應(yīng)客戶端的請(qǐng)求(增刪改查)。 C.2、管理.META.表數(shù)據(jù),該表中包含當(dāng)前HRegion Server上HRegion的相關(guān)信息。 C.3、負(fù)責(zé)region的切分,并將相關(guān)region切分信息更新到.META.表中。 MongoDB介紹 MongoDB是一個(gè)基于分布式文件存儲(chǔ)的數(shù)據(jù)庫。由C++語言編寫。旨在為WEB應(yīng)用提供可擴(kuò)展的高性能數(shù)據(jù)存儲(chǔ)解決方案。 MongoDB是一個(gè)介于關(guān)系數(shù)據(jù)庫和非關(guān)系數(shù)據(jù)庫之間的產(chǎn)品,是非關(guān)系數(shù)據(jù)庫當(dāng)中功能最豐富,最像關(guān)系數(shù)據(jù)庫的。他支持的數(shù)據(jù)結(jié)構(gòu)非常松散,是類似json的bson格式,因此可以存儲(chǔ)比較復(fù)雜的數(shù)據(jù)類型。MongoDB最大的特點(diǎn)是他支持的查詢語言非常強(qiáng)大,其語法有點(diǎn)類似于面向?qū)ο蟮牟樵冋Z言,幾乎可以實(shí)現(xiàn)類似關(guān)系數(shù)據(jù)庫單表查詢的絕大部分功能,而且還支持對(duì)數(shù)據(jù)建立索引。 MongoDB的一些基本概念 MongoDB的數(shù)據(jù)模型 一個(gè)MongoDB 實(shí)例可以包含一組數(shù)據(jù)庫,一個(gè)DataBase 可以包含一組Collection(集合),一個(gè)集合可以包含一組Document(文檔)。一個(gè)Document包含一組field(字段),每一個(gè)字段都是一個(gè)key/value pair。 key: 必須為字符串類型。 value:可以包含如下類型: 1、基本類型,例如,string,int,float,timestamp,binary 等類型。 2、一個(gè)document。 3、數(shù)組類型。 文檔結(jié)構(gòu)如下: 2 HA模式下的分布式集群環(huán)境搭建 2.1、機(jī)器規(guī)劃 2.2、Hadoop+Hbase+Zookeeper集群部署 (1)修改各機(jī)器上的/etc/hosts文件,刪除文件中本機(jī)ip與 機(jī)器名對(duì)應(yīng)的一行 作用:有時(shí)候/etc/hosts文件里對(duì)應(yīng)的主機(jī)名包含一些特殊字符或其他原因?qū)е虏荒苷_解析主機(jī)名,最好是去掉這一行 (2)ssh免密碼登錄 作用:Hadoop中主節(jié)點(diǎn)管理從節(jié)點(diǎn)是通過SSH協(xié)議登錄到從節(jié)點(diǎn)實(shí)現(xiàn)的,而一般的SSH登錄,都是需要輸入密碼驗(yàn)證的,為了Hadoop主節(jié)點(diǎn)方便管理成千上百的從節(jié)點(diǎn),這里將主節(jié)點(diǎn)公鑰拷貝到從節(jié)點(diǎn),實(shí)現(xiàn)SSH協(xié)議免秘鑰登錄,我這里做的是所有主從節(jié)點(diǎn)之間機(jī)器免秘鑰登錄 在三臺(tái)機(jī)器上分別執(zhí)行: ssh-keygen -t rsa chmod 755 用戶目錄 chmod 700 用戶目錄的~/.ssh 在機(jī)器上執(zhí)行了ssh-keygen -t rsa命令后會(huì)在用戶目錄的~/.ssh文件夾中生成這兩個(gè)文件:id_rsa和id_rsa.pub,其中id_rsa中存的算法rsa生成的私鑰,id_rsa.pub存放算法rsa生成的公鑰。 下一步,將所有機(jī)器上的id_rsa.pub中的內(nèi)容匯總到某一臺(tái)機(jī)器上,并寫入文件:用戶目錄的~/.ssh/authorized_keys中。 注意:每臺(tái)機(jī)器的id_rsa.pub中的內(nèi)容在文件authorized_keys中是一行 下一步:將用戶目錄的~/.ssh/authorized_keys文件復(fù)制到其他兩臺(tái)機(jī)器上 在每臺(tái)機(jī)器上執(zhí)行: chmod 600 用戶目錄的~/.ssh/authorized_keys (3)Zookeeper部署 A、下載zookeeper的安裝包, 解壓到合適目錄 B、修改配置文件:zoo.cfg 集群中部署了幾臺(tái)zookeeper機(jī)器,文件中就添加幾行,并且集群中的zookeeper機(jī)器上的zoo.cfg文件必須一致 (4)Hadoop部署 A、下載hadoop的安裝包, 解壓到合適目錄 B、配置hadoop 修改hadoopconf目錄下的文件,特別是core-site.xml,hadoop-env.sh,hdfs-site.xml,mapred-site.xml,yarn-site.xml,注意根據(jù)實(shí)際部署機(jī)器,修改ip和端口 復(fù)制hadoopconf目錄到集群中其他兩臺(tái)機(jī)器 (5)Hbase部署 A、下載hbase的安裝包, 解壓到合適目錄 B、配置Hbase 修改hbaseconf目錄下的文件,特別是backup-masters,hbase-env.sh,hbase-site.xml,regionservers,注意根據(jù)實(shí)際部署機(jī)器,修改ip和端口 復(fù)制hbaseconf目錄到集群中其他兩臺(tái)機(jī)器 (6)啟動(dòng)集群 A、首次啟動(dòng)集群
(7)關(guān)閉集群 注意:一定要按順序停止,如果先停 ZooKeeper 再停 HBase 的話,基本停不下來 (8)HDFS實(shí)用工具 HDFS狀態(tài)查詢:bin/hdfsdfsadmin -report 離開安全模式:bin/hdfs dfsadmin -safemodeleave 文件目錄查詢:bin/hadoop fs -ls 文件內(nèi)容查看:bin/hadoop fs -cat 文件上傳:bin/hadoop fs -put 文件下載:bin/hadoop fs -get 文件(夾)刪除:bin/hadoop fs -rm (-rmr) (9)Hbase實(shí)用工具 連接hbase客戶端:bin/hbaseshell shell命令: 表清單:list 表創(chuàng)建: create 'UserProfile', {NAME => 'realtime',VERSIONS => 1, TTL => 2147483647},{NAME => 'profile', VERSIONS =>1, TTL => 2147483647} 表查詢: scan 'UserProfile', LIMIT => 10 表數(shù)據(jù)構(gòu)造:(put '表名','openid','列族:列','值') put 'UserProfile','1222234','realtime:sex','1' put 'UserProfile','4555554','realtime:age','25' 表刪除,分兩步: disable 'UserProfile' drop 'UserProfile' 查看表結(jié)構(gòu): describe 'UserProfile' (10)Kafka部署 A、下載kafka的安裝包,解壓到合適目錄 B、配置Kafka 修改kafkaconf目錄下的文件,特別是server.properties、zookeeper.properties、producer.properties、consumer.properties ,注意根據(jù)實(shí)際部署機(jī)器,修改ip和端口 C、啟動(dòng)Kafka kafka/bin/kafka-server-start.sh../config/server.properties 啟動(dòng)成功后可通過jps命令查看進(jìn)程
kafka/bin/kafka-server-stop.sh E、Kafka實(shí)用命令 數(shù)據(jù)查詢:bin/kafka-console-consumer.sh 數(shù)據(jù)構(gòu)造:bin/kafka-console-producer.sh Topic查詢修改:bin/kafka-topics.sh (11)Flume部署 A、下載flume的安裝包,解壓到合適目錄 B、配置Flume 修改flume/conf/目錄下的文件,特別是core-site-lf.xml、hdfs-site-lf.xml、flume-wq.conf,注意根據(jù)實(shí)際部署機(jī)器,修改ip和端口 C、啟動(dòng)Flume flume/sbin/flume-ng agent --conf conf --conf-fileconf/flume-wq.conf --name a1 啟動(dòng)成功后可通過jps命令查看進(jìn)程
A、下載mongodb的安裝包,解壓到合適目錄 B、配置Mongodb 修改/export/shadows/mongodb/conf目錄下的文件:configsvr.conf,mongos.conf,shard1.conf C、啟動(dòng)Mongodb 啟動(dòng)配置服務(wù): mongodb/bin/mongod -f conf/configsvr.conf 啟動(dòng)路由服務(wù): mongos -f conf/mongos.conf 啟動(dòng)物理存儲(chǔ): mongod -f conf/shard1.conf 啟動(dòng)成功后查看進(jìn)程:ps -ef |grep mongo D、Mongodb實(shí)用工具 連接客戶端:mongo --port 30000 客戶端命令: 數(shù)據(jù)庫清單:show databases; 數(shù)據(jù)庫切換:use tablename1; 集合(表)清單:show collections; 查詢表所有記錄:db.tablename1.find(); //相當(dāng)于select * from tablename1; 條件查詢:db.tablename1.find({'groupId':'8037'}); //相當(dāng)于select * from tablename1where groupId = 8037
3、實(shí)際業(yè)務(wù)應(yīng)用 3.1、業(yè)務(wù)曝光 (1)配置中心修改如下手工服務(wù)為flume機(jī)器的ip和端口 mcoss_ds_mmart_show-------全量數(shù)據(jù)曝光 mcoss_ds_simpshow-----------簡(jiǎn)要數(shù)據(jù)曝光 mcoss_ds_keystat--------------關(guān)鍵數(shù)據(jù)曝光 (2)訪問曝光tws服務(wù) http://wq.jd.com/mcoss/mmart/show?actid=xxxx&pc=xxxx (3)數(shù)據(jù)驗(yàn)證 A hdfs數(shù)據(jù)驗(yàn)證 hadoop fs -ls -h /data/flume 類似于ls命令 hadoop fs -cat /data/flume/xxx/xxx/xxx/xxx 類似于cat查看文件內(nèi)容 下載文件到本地: hadoop fs-get /data/flume/xxx/xxx/xxx/xxx /tmp/xxx B Kafka數(shù)據(jù)驗(yàn)證 查看有哪些topic kafka/bin/kafka-topics.sh --list --zookeeper zookeeper機(jī)器ip:端口,zookeeper機(jī)器ip:端口,zookeeper機(jī)器ip:端口 數(shù)據(jù)查詢: ./kafka-console-consumer.sh --zookeeper zookeeper機(jī)器ip:端口,zookeeper機(jī)器ip:端口,zookeeper機(jī)器ip:端口 --from-beginning --topic mmart_show|more |
|