日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

SparkShell和IDEA中編寫Spark程序

 好程序員IT 2019-05-28

spark-shell是Spark自帶的交互式Shell程序,方便用戶進(jìn)行交互式編程,用戶可以在該命令行下用Scala編寫Spark程序。spark-shell程序一般用作Spark程序測(cè)試練習(xí)來用。spark-shell屬于Spark的特殊應(yīng)用程序,我們可以在這個(gè)特殊的應(yīng)用程序中提交應(yīng)用程序

spark-shell啟動(dòng)有兩種模式,local模式和cluster模式,分別為

local模式:

spark-shell

local模式僅在本機(jī)啟動(dòng)一個(gè)SparkSubmit進(jìn)程,沒有與集群建立聯(lián)系,雖然進(jìn)程中有SparkSubmit但是不會(huì)被提交到集群紅


Cluster模式(集群模式):

spark-shell \
--master spark://hadoop01:7077 \
--executor-memory 512m \
--total-executor-cores 1

后兩個(gè)命令不是必須的 --master這條命令是必須的(除非在jar包中已經(jīng)指可以不指定,不然就必須指定)

退出shell

千萬不要ctrl+c spark-shell 正確退出 :quit 千萬不要ctrl+c退出 這樣是錯(cuò)誤的 若使用了ctrl+c退出 使用命令查看監(jiān)聽端口 netstat - apn | grep 4040 在使用kill -9 端口號(hào) 殺死即可

3.25.11 spark2.2shellspark1.6shell對(duì)比


ps:啟動(dòng)spark-shell若是集群模式,在webUI會(huì)有一個(gè)一直執(zhí)行的任務(wù)

通過IDEA創(chuàng)建Spark工程

ps:工程創(chuàng)建之前步驟省略,在scala中已經(jīng)講解,直接默認(rèn)是創(chuàng)建好工程的

對(duì)工程中的pom.xml文件配置

 <!-- 聲明公有的屬性 -->
<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <hadoop.version>2.7.1</hadoop.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>
<!-- 聲明并引入公有的依賴 -->
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies> 

Spark實(shí)現(xiàn)WordCount程序

Scala版本
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("dri/wordcount").setMaster("local[*]")
    //創(chuàng)建sparkContext對(duì)象
    val sc =  new SparkContext(conf)
    //通過sparkcontext對(duì)象就可以處理數(shù)據(jù)
    //讀取文件 參數(shù)是一個(gè)String類型的字符串 傳入的是路徑
    val lines: RDD[String] = sc.textFile(dir/wordcount)
    //切分?jǐn)?shù)據(jù)
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //將每一個(gè)單詞生成元組 (單詞,1)
    val tuples: RDD[(String, Int)] = words.map((_,1))
    //spark中提供一個(gè)算子 reduceByKey 相同key 為一組進(jìn)行求和 計(jì)算value
    val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)
    //對(duì)當(dāng)前這個(gè)結(jié)果進(jìn)行排序 sortBy scalasotrBy是不一樣的 多了一個(gè)參數(shù)
    //默認(rèn)是升序  false就是降序
    val sorted: RDD[(String, Int)] = sumed.sortBy(_._2,false)
    //將數(shù)據(jù)提交到集群存儲(chǔ) 無法返回值
     sorted.foreach(println)
    //回收資源停止sc,結(jié)束任務(wù)
    sc.stop()
  }
}

Java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List
public class JavaWordCount {
    public static void main(String[] args) {
//1.先創(chuàng)建conf對(duì)象進(jìn)行配置主要是設(shè)置名稱,為了設(shè)置運(yùn)行模式
        SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
//2.創(chuàng)建context對(duì)象
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> lines = jsc.textFile("dir/file");
//進(jìn)行切分?jǐn)?shù)據(jù) flatMapFunction是具體實(shí)現(xiàn)類
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                List<String> splited = Arrays.asList(s.split(" "));
                return splited.iterator();
            }

        });
//將數(shù)據(jù)生成元組
//第一個(gè)泛型是輸入的數(shù)據(jù)類型 后兩個(gè)參數(shù)是輸出參數(shù)元組的數(shù)據(jù)
        JavaPairRDD<String, Integer> tuples = words.mapToPair(new PairFunction<String, String,
                Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
//聚合
        JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer,
                Integer>() {
            @Override
//第一個(gè)Integer是相同key對(duì)應(yīng)的value
//第二個(gè)Integer是相同key 對(duì)應(yīng)的value
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
//因?yàn)?/span>Java api沒有提供sortBy算子,此時(shí)需要將元組中的數(shù)據(jù)進(jìn)行位置調(diào)換,然后在排序,排完序在換回
//第一次交換是為了排序
        JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String,
                Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception {
                return tup.swap();
            }
        });
//排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//第二次交換是為了最終結(jié)果 <單詞,數(shù)量>
        JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer,
                String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception
            {
                return tuple2.swap();
            }
        });
        System.out.println(res.collect());
        res.saveAsTextFile("out1");
        jsc.stop();
    }
}

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多