日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

大數(shù)據(jù)IMF傳奇行動絕密課程第100

 看風景D人 2019-02-24

使用Spark Streaming+Spark SQL+Kafka+FileSystem綜合案例

1、項目分析流程圖
2、項目代碼實戰(zhàn)

圖100-1 SparkStreaming案例分析架構(gòu)圖

Flume sink到Kafka需要一個jar包支持
https://github.com/beyondj2ee/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin

編輯flume-conf.properties

#配置sink
agent1.sinks.sink1.type=org.apache.flume.plugins.KafkaSink
agent1.sinks.sink1.metadata.broker.list=Master:9092,Worker1:9092,Worker2.9092
agent1.sinks.sink1.partition.key=0
agent1.sinks.sink1.partitioner.class=org.apache.flume.plugins.SinglePartition
agent1.sinks.sink1.serializer.class=kafka.serializer.StringEncoder
agent1.sinks.sink1.request.requiredacks=0
agent1.sinks.sink1.max.message.size=1000000
agent1.sinks.sink1.producer.type=sync
agent1.sinks.sink1.custom.encoding=UTF-8
agent1.sinks.sink1.custom.topic.name=HelloKafka
agent1.sinks.sink1.channel= channel1

Kafka也可以監(jiān)控文件夾,但為什么要用Flume?Kafka只能接收json格式的文件
數(shù)據(jù)來源?
互聯(lián)網(wǎng):電商、社交網(wǎng)絡(luò)等的網(wǎng)站和App程序
傳統(tǒng)行業(yè):金融、電信、醫(yī)療、農(nóng)業(yè)、生產(chǎn)制造行業(yè);
例如說:在京東上進行廣告的推送,當我們點擊廣告的時候,此時肯定有日志記錄Log發(fā)送回到Server中,或者說我們使用Android,iOS等中的App,都會設(shè)置有數(shù)據(jù)記錄的關(guān)鍵點(埋點)
如果是網(wǎng)站,經(jīng)典的方式是通過JS透過Ajax把日志穿回到服務(wù)器上,如果是移動App等一般是通過Socket,其他的傳感器或者工業(yè)設(shè)備可以通過自己的通信協(xié)議把數(shù)據(jù)傳回到服務(wù)器端

為了應(yīng)對高并發(fā)訪問,一般采用Nginx等作為Server前段,Server的分布式集群來做負載均衡

Tomcat、Apache、WebLogic作為Server后端

Server中接收到請求路由后一般都會對每個請求在文件中寫一條Log

Logs Cluster可以專門設(shè)置日志服務(wù)器集群,所有的Server和J2EE類型的業(yè)務(wù)邏輯在執(zhí)行過程中產(chǎn)生的日志信息都可以在后臺同步到日志服務(wù)器集群中

Server中接收到請求路由后一般都會對每個請求在文件中寫一條Log,可以自動配置Server寫日志

企業(yè)中一般都會有Crontab等定時工具來通過日志整理工具來把當天的日志采集、合并和初步的處理形成一份日志文件,然后發(fā)送到Flume監(jiān)控目錄中

當Flume發(fā)現(xiàn)有新的日志文件進來的時候會按照配置把數(shù)據(jù)通過Channel來Sink到目的地,這里是Sink到Kafka集群中

HDFS:
1、使用MapReduce作業(yè)對數(shù)據(jù)進行出不清洗,并寫入新的HDFS文件中。
2、清洗后的數(shù)據(jù)一般導入到Hive數(shù)據(jù)倉庫中,可以采用分區(qū)表
3、通過Hive中的SQL,在數(shù)據(jù)倉庫的基礎(chǔ)上,進行ETL,此時的ETL會把原始的數(shù)據(jù)生成很多張目標的table

企業(yè)生產(chǎn)環(huán)境下,Spark數(shù)據(jù)都是來自Hive

一個小例子

package com.tom.spark.sparkstreaming

import org.apache.commons.codec.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}

/**
  * 使用Scala開發(fā)集群運行的Spark來實現(xiàn)在線熱搜詞
  */
case class MessageItem(name: String, age: Int)

object SparkStreamingFromKafkaFlume2Hive {
  def main(args: Array[String]): Unit = {

    if(args.length < 2) {
      System.err.println("Please input your kafka broker list and topics to consume")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("SparkStreamingFromKafkaFlume2Hive").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Durations.seconds(5))

    val Array(brokers, topics) = args
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val topicsParams = topics.split(",").toSet

    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsParams)
      .map(_._2.split(",")).foreachRDD(rdd => {
      val hiveContext = new HiveContext(rdd.sparkContext)

      import hiveContext.implicits._
      rdd.map(record => MessageItem(record(0).trim,record(1).trim.toInt)).toDF().registerTempTable("temp")
      hiveContext.sql("SELECT count(*) FROM temp").show()
    })

    // Flume會作為Kafka的Producer把數(shù)據(jù)寫入到Kafka供本程序消費
    ssc.start()
    ssc.awaitTermination()
  }
}

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多