第一步:下載Eclipse IDE for JAVA Developer 第二步:解壓并啟動Eclipse 第三步:創(chuàng)建Maven工程 第四步:使用maven-archetype-quickstart,設定一些包名 第五步:通過BuildPath把默認的J2EE 1.5變成Java1.8 第六步:配置pom.xml,添加程序開發(fā)時的相關(guān)依賴,并配置具體build打包的信息 POM.xml 有各種依賴的支持 <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
http://maven./org.apache.spark package com.tom.spark.SparkApps.cores;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
/**
* 使用Java的方式開發(fā)本地測試Spark的WordCount程序
* @author
*
*/
public class WordCount {
public static void main(String[] args) {
// TODO Auto-generated method stub
/**
* 第一步:創(chuàng)建Spark的配置對象,SparkConf,設置Spark程序的運行時的配置信息
* 例如通過setMaster來設置程序要連接的spark集群的Master的URL,如果設置
* 為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差的初學者
*
*/
SparkConf conf = new SparkConf().setAppName("Spark WordCount written by Java").setMaster("local");
/**
* 第二步:創(chuàng)建SparkContext對象
* SparkContext是Spark程序所有功能的唯一入口,無論采用Scala、Java、Python、R等都必須有一個SparkContext(不同的語言具體的類名稱不同,Java則為JavaSparkContext)
* SparkContext核心作用:初始化Spark應用程序運行所需要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBackEnd
* 同時還會負責Spark程序往Master注冊程序等
* SparkContext是整個Spark應用程序中最為至關(guān)重要的一個對象
*/
JavaSparkContext sc = new JavaSparkContext(conf);
/**
* 第三步:根據(jù)具體的數(shù)據(jù)來源(HDFS、HBase、Local FS、DB、S3等)通過SparkContext來創(chuàng)建JavaRDD
* JavaRDD的創(chuàng)建基本有三種方式:根據(jù)外部的數(shù)據(jù)來源(例如HDFS)、根據(jù)Scala集合、由其他JavaRDD操作
* 數(shù)據(jù)會被JavaRDD劃分成為一系列的Partitions,分配到每個Partition的數(shù)據(jù)屬于一個Task的處理范疇
*/
JavaRDD<String> lines = sc.textFile("F:/channel.txt",1);
/**
* 第四步:對初始的JavaRDD進行Transformation級別的處理,例如map、filter等高階函數(shù)等的編程,來進行具體的數(shù)據(jù)計算
* 第4.1步:將每一行的字符串拆分成單個的單詞
*/
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){ //如果shiScala,由于SAM轉(zhuǎn)換,所以可以寫成val words = lines.flatMap(_.split(" "))
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
/**
* 第4.2步:在單詞拆分的基礎(chǔ)上對每個單詞實例計數(shù)為1,也就是word => (word, 1)
*/
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer> (word, 1);
}
});
/**
* 第4.3步:在單詞實例計數(shù)為1基礎(chǔ)上,統(tǒng)計每個單詞在文件中出現(xiàn)的總次數(shù)
*/
JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
//對相同的key,進行Value的累加(包括Local和Reducer級別同時Reduce)
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> pairs) throws Exception {
// TODO Auto-generated method stub
System.out.println(pairs._1 + " : " + pairs._2);
}
});
sc.close();
}
}
作業(yè):放在集群上跑
|