使用Spark Streaming+Spark SQL+Kafka+FileSystem綜合案例
1、項目分析流程圖
2、項目代碼實戰(zhàn)

Flume sink到Kafka需要一個jar包支持
https://github.com/beyondj2ee/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin
編輯flume-conf.properties
#配置sink
agent1.sinks.sink1.type=org.apache.flume.plugins.KafkaSink
agent1.sinks.sink1.metadata.broker.list=Master:9092,Worker1:9092,Worker2.9092
agent1.sinks.sink1.partition.key=0
agent1.sinks.sink1.partitioner.class=org.apache.flume.plugins.SinglePartition
agent1.sinks.sink1.serializer.class=kafka.serializer.StringEncoder
agent1.sinks.sink1.request.requiredacks=0
agent1.sinks.sink1.max.message.size=1000000
agent1.sinks.sink1.producer.type=sync
agent1.sinks.sink1.custom.encoding=UTF-8
agent1.sinks.sink1.custom.topic.name=HelloKafka
agent1.sinks.sink1.channel= channel1
Kafka也可以監(jiān)控文件夾,但為什么要用Flume?Kafka只能接收json格式的文件
數(shù)據(jù)來源?
互聯(lián)網(wǎng):電商、社交網(wǎng)絡(luò)等的網(wǎng)站和App程序
傳統(tǒng)行業(yè):金融、電信、醫(yī)療、農(nóng)業(yè)、生產(chǎn)制造行業(yè);
例如說:在京東上進行廣告的推送,當我們點擊廣告的時候,此時肯定有日志記錄Log發(fā)送回到Server中,或者說我們使用Android,iOS等中的App,都會設(shè)置有數(shù)據(jù)記錄的關(guān)鍵點(埋點)
如果是網(wǎng)站,經(jīng)典的方式是通過JS透過Ajax把日志穿回到服務(wù)器上,如果是移動App等一般是通過Socket,其他的傳感器或者工業(yè)設(shè)備可以通過自己的通信協(xié)議把數(shù)據(jù)傳回到服務(wù)器端
為了應(yīng)對高并發(fā)訪問,一般采用Nginx等作為Server前段,Server的分布式集群來做負載均衡
Tomcat、Apache、WebLogic作為Server后端
Server中接收到請求路由后一般都會對每個請求在文件中寫一條Log
Logs Cluster可以專門設(shè)置日志服務(wù)器集群,所有的Server和J2EE類型的業(yè)務(wù)邏輯在執(zhí)行過程中產(chǎn)生的日志信息都可以在后臺同步到日志服務(wù)器集群中
Server中接收到請求路由后一般都會對每個請求在文件中寫一條Log,可以自動配置Server寫日志
企業(yè)中一般都會有Crontab等定時工具來通過日志整理工具來把當天的日志采集、合并和初步的處理形成一份日志文件,然后發(fā)送到Flume監(jiān)控目錄中
當Flume發(fā)現(xiàn)有新的日志文件進來的時候會按照配置把數(shù)據(jù)通過Channel來Sink到目的地,這里是Sink到Kafka集群中
HDFS:
1、使用MapReduce作業(yè)對數(shù)據(jù)進行出不清洗,并寫入新的HDFS文件中。
2、清洗后的數(shù)據(jù)一般導入到Hive數(shù)據(jù)倉庫中,可以采用分區(qū)表
3、通過Hive中的SQL,在數(shù)據(jù)倉庫的基礎(chǔ)上,進行ETL,此時的ETL會把原始的數(shù)據(jù)生成很多張目標的table
企業(yè)生產(chǎn)環(huán)境下,Spark數(shù)據(jù)都是來自Hive
一個小例子
package com.tom.spark.sparkstreaming
import org.apache.commons.codec.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
/**
* 使用Scala開發(fā)集群運行的Spark來實現(xiàn)在線熱搜詞
*/
case class MessageItem(name: String, age: Int)
object SparkStreamingFromKafkaFlume2Hive {
def main(args: Array[String]): Unit = {
if(args.length < 2) {
System.err.println("Please input your kafka broker list and topics to consume")
System.exit(1)
}
val conf = new SparkConf().setAppName("SparkStreamingFromKafkaFlume2Hive").setMaster("local[2]")
val ssc = new StreamingContext(conf, Durations.seconds(5))
val Array(brokers, topics) = args
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val topicsParams = topics.split(",").toSet
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsParams)
.map(_._2.split(",")).foreachRDD(rdd => {
val hiveContext = new HiveContext(rdd.sparkContext)
import hiveContext.implicits._
rdd.map(record => MessageItem(record(0).trim,record(1).trim.toInt)).toDF().registerTempTable("temp")
hiveContext.sql("SELECT count(*) FROM temp").show()
})
// Flume會作為Kafka的Producer把數(shù)據(jù)寫入到Kafka供本程序消費
ssc.start()
ssc.awaitTermination()
}
}
|