* 生成數(shù)據(jù) SparkSQLDataManually.java
*/
package com.tom.spark.SparkApps.sql;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Random;
/**
* 論壇數(shù)據(jù)自動生成代碼,數(shù)據(jù)格式如下:
* data:日期,格式為yyyy-MM-dd
* timestamp:時間戳
* userID:用戶ID
* pageID:頁面ID
* channelID:板塊ID
* action:點擊和注冊
*/
public class SparkSQLDataManually {
static String yesterday = yesterday();
static String[] channelNames = new String[] {
"Spark", "Scala", "Kafka", "Flink", "Hadoop", "Storm",
"Hive", "Impala", "HBase", "ML"
};
static String[] actionNames = new String[] {
"register","view"
};
public static void main(String[] args) {
/**
* 通過傳遞進來的參數(shù)生成制定大小規(guī)模的數(shù)據(jù)
*/
long numberItems = 5000;
String path = ".";
if (args.length > 0) {
numberItems = Integer.valueOf(args[0]);
path = args[1];
System.out.println(path);
}
System.out.println("User log number is : " + numberItems);
//具體的論壇頻道
/**
* 昨天的時間生成
*/
userlogs(numberItems, path);
}
private static void userlogs(long numberItems, String path) {
// TODO Auto-generated method stub
Random random = new Random();
StringBuffer userLogBuffer = new StringBuffer("");
int[] unregisteredUsers = new int[]{1,2,3,4,5,6,7,8};
for(int i = 0; i < numberItems; i++) {
long timestamp = new Date().getTime();
Long userID = 0L;
long pageID = 0;
//隨機生成的用戶ID
if(unregisteredUsers[random.nextInt(8)] == 1) {
userID = null;
}
else {
userID = (long) random.nextInt((int) numberItems);
}
//隨機生成的頁面ID
pageID = random.nextInt((int) numberItems);
//隨機生成Channel
String channel = channelNames[random.nextInt(10)];
//隨機生成acton行為
String action = actionNames[random.nextInt(2)];
userLogBuffer.append(yesterday)
.append("\t")
.append(timestamp)
.append("\t")
.append(userID)
.append("\t")
.append(pageID)
.append("\t")
.append(channel)
.append("\t")
.append(action)
.append("\n");
}
// System.out.print(userLogBuffer);
PrintWriter pw = null;
try {
pw = new PrintWriter(new OutputStreamWriter(new FileOutputStream(path + "\\userlog.log")));
System.out.println(path + "userlog.log");
pw.write(userLogBuffer.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
pw.close();
}
}
private static String yesterday() {
SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
cal.add(Calendar.DATE, -1);
Date yesterday = cal.getTime();
return date.format(yesterday);
}
}
/**
* 計算PV、UV、熱門板塊、跳出率、新用戶注冊比率
*/
package com.tom.spark.SparkApps.sql;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.HiveContext;
/**
* Table in hive database creation:
* sqlContext.sql("create table userlogs(date string, timestamp bigint, userID bigint, pageID bigint, channel string, action string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'")
*
*/
public class SparkSQLUserlogsOps {
/**
* @param args
*/
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SparkSQLUserlogsOps").setMaster("spark://Master:7077");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("WARN");
HiveContext hiveContext = new HiveContext(sc);
String yesterday = getYesterday();
pvStat(hiveContext, yesterday); //PV
uvStat(hiveContext, yesterday); //UV
hotChannel(hiveContext, yesterday); //熱門板塊
jumpOutStat(hiveContext, yesterday); //跳出率
newUserRegisterPercentStat(hiveContext, yesterday); //新用戶注冊的比例
}
private static void newUserRegisterPercentStat(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String newUserSQL = "select count(*) "
+ "from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' and userID is NULL "
// + "limit 10"
;
String RegisterUserSQL = "SELECT count(*) "
+ "from userlogs"
+ "where action = 'Register' and date='"+ yesterday+"' "
// + "limit 10"
;
Object newUser = hiveContext.sql(newUserSQL).collect()[0].get(0);
Object RegisterUser = hiveContext.sql(RegisterUserSQL).collect()[0].get(0);
double total = Double.valueOf(newUser.toString());
double register = Double.valueOf(RegisterUser.toString());
System.out.println("模擬新用戶注冊比例:" + register / total);
}
private static void jumpOutStat(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String totalPvSQL = "select count(*) "
+ "from "
+ "userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
// + "limit 10"
;
String pv2OneSQL = "SELECT count(*) "
+ "from "
+ "(SELECT count(*) totalNumber from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
+ "group by userID "
+ "having totalNumber = 1) subquery "
// + "limit 10"
;
Object totalPv = hiveContext.sql(totalPvSQL).collect()[0].get(0);
Object pv2One = hiveContext.sql(pv2OneSQL).collect()[0].get(0);
double total = Double.valueOf(totalPv.toString());
double pv21 = Double.valueOf(pv2One.toString());
System.out.println("跳出率為" + pv21 / total);
}
private static void uvStat(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String sqlText = "select date, pageID, uv "
+ "from "
+ "(select date, pageID, count(distinct(userID)) uv from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
+ "group by date, pageID) subquery "
+ "order by uv desc "
// + "limit 10"
;
hiveContext.sql(sqlText).show();
}
private static void hotChannel(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String sqlText = "select date, pageID, pv "
+ "from "
+ "(select date, pageID, count(1) pv from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
+ "group by date, pageID) subquery "
+ "order by pv desc "
// + "limit 10"
;
hiveContext.sql(sqlText).show();
}
private static void pvStat(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String sqlText = "select date, channel, channelpv "
+ "from "
+ "(select date, channel, count(*) channelpv from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
+ "group by date, channel) subquery "
+ "order by channelpv desc "
// + "limit 10"
;
hiveContext.sql(sqlText).show();
//把執(zhí)行結果放到數(shù)據(jù)庫或Hive中
//select date, pageID, pv from (select date, pageID, count(1) pv from userlogs where action = 'View' and
//date='2017-03-10' group by date, pageID) subquery order by pv desc limit 10
}
private static String getYesterday() {
SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
cal.add(Calendar.DATE, -2);
Date yesterday = cal.getTime();
return date.format(yesterday);
}
}
|