spark-shell是Spark自帶的交互式Shell程序,方便用戶進(jìn)行交互式編程,用戶可以在該命令行下用Scala編寫Spark程序。spark-shell程序一般用作Spark程序測(cè)試練習(xí)來用。spark-shell屬于Spark的特殊應(yīng)用程序,我們可以在這個(gè)特殊的應(yīng)用程序中提交應(yīng)用程序 spark-shell啟動(dòng)有兩種模式,local模式和cluster模式,分別為 local模式:spark-shell local模式僅在本機(jī)啟動(dòng)一個(gè)SparkSubmit進(jìn)程,沒有與集群建立聯(lián)系,雖然進(jìn)程中有SparkSubmit但是不會(huì)被提交到集群紅
Cluster模式(集群模式):spark-shell \ --master spark://hadoop01:7077 \ --executor-memory 512m \ --total-executor-cores 1
后兩個(gè)命令不是必須的 --master這條命令是必須的(除非在jar包中已經(jīng)指可以不指定,不然就必須指定) 退出shell千萬不要ctrl+c spark-shell 正確退出 :quit 千萬不要ctrl+c退出 這樣是錯(cuò)誤的 若使用了ctrl+c退出 使用命令查看監(jiān)聽端口 netstat - apn | grep 4040 在使用kill -9 端口號(hào) 殺死即可 3.25.11 spark2.2shell和spark1.6shell對(duì)比
ps:啟動(dòng)spark-shell若是集群模式,在webUI會(huì)有一個(gè)一直執(zhí)行的任務(wù) 通過IDEA創(chuàng)建Spark工程ps:工程創(chuàng)建之前步驟省略,在scala中已經(jīng)講解,直接默認(rèn)是創(chuàng)建好工程的 對(duì)工程中的pom.xml文件配置 <!-- 聲明公有的屬性 --> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.7.1</hadoop.version> <scala.compat.version>2.11</scala.compat.version> </properties> <!-- 聲明并引入公有的依賴 --> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> Spark實(shí)現(xiàn)WordCount程序Scala版本 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
object SparkWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("dri/wordcount").setMaster("local[*]") //創(chuàng)建sparkContext對(duì)象 val sc = new SparkContext(conf) //通過sparkcontext對(duì)象就可以處理數(shù)據(jù) //讀取文件 參數(shù)是一個(gè)String類型的字符串 傳入的是路徑 val lines: RDD[String] = sc.textFile(“dir/wordcount”) //切分?jǐn)?shù)據(jù) val words: RDD[String] = lines.flatMap(_.split(" ")) //將每一個(gè)單詞生成元組 (單詞,1) val tuples: RDD[(String, Int)] = words.map((_,1)) //spark中提供一個(gè)算子 reduceByKey 相同key 為一組進(jìn)行求和 計(jì)算value val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_) //對(duì)當(dāng)前這個(gè)結(jié)果進(jìn)行排序 sortBy 和scala中sotrBy是不一樣的 多了一個(gè)參數(shù) //默認(rèn)是升序 false就是降序 val sorted: RDD[(String, Int)] = sumed.sortBy(_._2,false) //將數(shù)據(jù)提交到集群存儲(chǔ) 無法返回值 sorted.foreach(println) //回收資源停止sc,結(jié)束任務(wù) sc.stop() } } Java版本import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; public class JavaWordCount { public static void main(String[] args) { //1.先創(chuàng)建conf對(duì)象進(jìn)行配置主要是設(shè)置名稱,為了設(shè)置運(yùn)行模式 SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); //2.創(chuàng)建context對(duì)象 JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("dir/file"); //進(jìn)行切分?jǐn)?shù)據(jù) flatMapFunction是具體實(shí)現(xiàn)類 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { List<String> splited = Arrays.asList(s.split(" ")); return splited.iterator(); }
}); //將數(shù)據(jù)生成元組 //第一個(gè)泛型是輸入的數(shù)據(jù)類型 后兩個(gè)參數(shù)是輸出參數(shù)元組的數(shù)據(jù) JavaPairRDD<String, Integer> tuples = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //聚合 JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override //第一個(gè)Integer是相同key對(duì)應(yīng)的value //第二個(gè)Integer是相同key 對(duì)應(yīng)的value public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //因?yàn)?/span>Java api沒有提供sortBy算子,此時(shí)需要將元組中的數(shù)據(jù)進(jìn)行位置調(diào)換,然后在排序,排完序在換回 //第一次交換是為了排序 JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception { return tup.swap(); } }); //排序 JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false); //第二次交換是為了最終結(jié)果 <單詞,數(shù)量> JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception { return tuple2.swap(); } }); System.out.println(res.collect()); res.saveAsTextFile("out1"); jsc.stop(); } }
|