有興趣想學習國內(nèi)整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq 471186150。共享視頻,性價比超高!
package com.dt.streaming;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.datanucleus.store.rdbms.request.InsertRequest;
import com.google.common.base.Optional;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
/**
*
* 在線處理廣告點擊流
* 廣告點擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
*
* @author hp
*
*/
public class AdClickedStreamingStats {
public static void main(String[] args) {
/*
* 第一步:配置SparkConf:
* 1,至少2條線程:因為Spark Streaming應(yīng)用程序在運行的時候,至少有一條
* 線程用于不斷的循環(huán)接收數(shù)據(jù),并且至少有一條線程用于處理接受的數(shù)據(jù)(否則的話無法
* 有線程用于處理數(shù)據(jù),隨著時間的推移,內(nèi)存和磁盤都會不堪重負);
* 2,對于集群而言,每個Executor一般肯定不止一個Thread,那對于處理Spark Streaming的
* 應(yīng)用程序而言,每個Executor一般分配多少Core比較合適?根據(jù)我們過去的經(jīng)驗,5個左右的
* Core是最佳的(一個段子分配為奇數(shù)個Core表現(xiàn)最佳,例如3個、5個、7個Core等);
*/
SparkConf conf = new SparkConf().setMaster("local[5]").
setAppName("AdClickedStreamingStats");
/*SparkConf conf = new SparkConf().setMaster("spark://Master:7077").
setAppName("SparkStreamingOnKafkaReceiver");*/
/*
* 第二步:創(chuàng)建SparkStreamingContext:
* 1,這個是SparkStreaming應(yīng)用程序所有功能的起始點和程序調(diào)度的核心
* SparkStreamingContext的構(gòu)建可以基于SparkConf參數(shù),也可基于持久化的SparkStreamingContext的內(nèi)容
* 來恢復(fù)過來(典型的場景是Driver崩潰后重新啟動,由于Spark Streaming具有連續(xù)7*24小時不間斷運行的特征,
* 所有需要在Driver重新啟動后繼續(xù)上衣系的狀態(tài),此時的狀態(tài)恢復(fù)需要基于曾經(jīng)的Checkpoint);
* 2,在一個Spark Streaming應(yīng)用程序中可以創(chuàng)建若干個SparkStreamingContext對象,使用下一個SparkStreamingContext
* 之前需要把前面正在運行的SparkStreamingContext對象關(guān)閉掉,由此,我們獲得一個重大的啟發(fā)SparkStreaming框架也只是
* Spark Core上的一個應(yīng)用程序而已,只不過Spark Streaming框架箱運行的話需要Spark工程師寫業(yè)務(wù)邏輯處理代碼;
*/
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
/*
* 第三步:創(chuàng)建Spark Streaming輸入數(shù)據(jù)來源input Stream:
* 1,數(shù)據(jù)輸入來源可以基于File、HDFS、Flume、Kafka、Socket等
* 2, 在這里我們指定數(shù)據(jù)來源于網(wǎng)絡(luò)Socket端口,Spark Streaming連接上該端口并在運行的時候一直監(jiān)聽該端口
* 的數(shù)據(jù)(當然該端口服務(wù)首先必須存在),并且在后續(xù)會根據(jù)業(yè)務(wù)需要不斷的有數(shù)據(jù)產(chǎn)生(當然對于Spark Streaming
* 應(yīng)用程序的運行而言,有無數(shù)據(jù)其處理流程都是一樣的);
* 3,如果經(jīng)常在每間隔5秒鐘沒有數(shù)據(jù)的話不斷的啟動空的Job其實是會造成調(diào)度資源的浪費,因為并沒有數(shù)據(jù)需要發(fā)生計算,所以
* 實例的企業(yè)級生成環(huán)境的代碼在具體提交Job前會判斷是否有數(shù)據(jù),如果沒有的話就不再提交Job;
* 4,在本案例中具體參數(shù)含義:
* 第一個參數(shù)是StreamingContext實例;
* 第二個參數(shù)是ZooKeeper集群信息(接受Kafka數(shù)據(jù)的時候會從ZooKeeper中獲得Offset等元數(shù)據(jù)信息)
* 第三個參數(shù)是Consumer Group
* 第四個參數(shù)是消費的Topic以及并發(fā)讀取Topic中Partition的線程數(shù)
*/
/**
* 創(chuàng)建Kafka元數(shù)據(jù),來讓Spark Streaming這個Kafka Consumer利用
*/
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list",
"Master:9092,Worker1:9092,Worker2:9092");
Set<String> topics = new HashSet<String>();
topics.add("AdClicked");
JavaPairInputDStream<String, String> adClickedStreaming = KafkaUtils.createDirectStream(jsc,
String.class, String.class,
StringDecoder.class, StringDecoder.class,
kafkaParameters,
topics);
/**
* 因為要對黑名單進行在線過濾,而數(shù)據(jù)是在RDD中的,所以必然使用transform這個函數(shù);
* 但是在這里我們必須使用transformToPair,原因是讀取進來的Kafka的數(shù)據(jù)是Pair<String,String>類型的,另外
* 一個原因是過濾后的數(shù)據(jù)要進行進一步處理,所以必須是讀進來的Kafka數(shù)據(jù)的原始類型DStream<String, String>
*
* 在此:再次說明每個Batch Duration中實際上講輸入的數(shù)據(jù)就是被一個且僅僅被一個RDD封裝的,你可以有多個
* InputDstream,但是其實在產(chǎn)生Job的時候,這些不同的InputDstream在Batch Duration中就相當于Spark基于
* HDFS數(shù)據(jù)操作的不同文件來源而已罷了。
*/
JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
/**
* 在線黑名單過濾思路步驟:
* 1,從數(shù)據(jù)庫中獲取黑名單轉(zhuǎn)換成RDD,即新的RDD實例封裝黑名單數(shù)據(jù);
* 2,然后把代表黑名單的RDD的實例和Batch Duration產(chǎn)生的rdd進行join操作,準確的說是進行
* leftOuterJoin操作,也就是說使用Batch Duration產(chǎn)生的rdd和代表黑名單的RDD的實例進行
* leftOuterJoin操作,如果兩者都有內(nèi)容的話,就會是true,否則的話就是false;
*
* 我們要留下的是leftOuterJoin操作結(jié)果為false;
*
*/
List<String> blackListNames = new ArrayList<>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
jdbcWrapper.doQuery("SELECT * FROM blacklisttable", null, new ExecuteCallBack(){
@Override
public void resultCallBack(ResultSet result) throws Exception {
while(result.next()){
blackListNames.add(result.getString(1));
}
}
});
List<Tuple2<String, Boolean>> blackListTuple = new ArrayList<Tuple2<String, Boolean>>();
for (String name : blackListNames){
blackListTuple.add(new Tuple2<String,Boolean>(name, true));
}
List<Tuple2<String, Boolean>> blackListFromDB = blackListTuple; //數(shù)據(jù)來自于查詢的黑名單表并且映射成為<String, Boolean>
JavaSparkContext jsc = new JavaSparkContext(rdd.context());
/**
* 黑名單的表中只有userID,但是如果要進行join操作的話,就必須是Key-Value,所以
* 在這里我們需要基于數(shù)據(jù)表中的數(shù)據(jù)產(chǎn)生Key-Value類型的數(shù)據(jù)集合;
*/
JavaPairRDD<String, Boolean> blackListRDD = jsc.parallelizePairs(blackListFromDB);
/**
* 進行操作的時候肯定是基于userID進行join的,所以必須把傳入的rdd進行mapToPair操作轉(zhuǎn)化成為符合
* 格式的rdd
*
* 廣告點擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
*/
JavaPairRDD<String, Tuple2<String, String>> rdd2Pair = rdd.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String,String>>() {
@Override
public Tuple2<String, Tuple2<String, String>> call(Tuple2<String, String> t) throws Exception {
String userID = t._2.split("\t")[2];
return new Tuple2<String, Tuple2<String, String>>(userID, t);
}
});
JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joined = rdd2Pair.leftOuterJoin(blackListRDD);
JavaPairRDD<String, String> result = joined.filter(new Function<Tuple2<String,
Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> v1)
throws Exception {
Optional<Boolean> optional = v1._2._2;
if (optional.isPresent() && optional.get()){
return false;
} else {
return true;
}
}
}).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() {
@Override
public Tuple2<String, String> call(
Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t) throws Exception {
// TODO Auto-generated method stub
return t._2._1;
}
});
return result;
}
});
/*
* 第四步:接下來就像對于RDD編程一樣基于DStream進行編程?。。≡蚴荄Stream是RDD產(chǎn)生的模板(或者說類),在Spark Streaming具體
* 發(fā)生計算前,其實質(zhì)是把每個Batch的DStream的操作翻譯成為對RDD的操作!?。? *對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數(shù)等的編程,來進行具體的數(shù)據(jù)計算
* 廣告點擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
*/
JavaPairDStream<String, Long> pairs = adClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {
@Override
public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception {
String[] splited = t._2.split("\t");
String timestamp = splited[0]; //yyyy-MM-dd
String ip = splited[1];
String userID = splited[2];
String adID = splited[3];
String province = splited[4];
String city = splited[5];
String clickedRecord = timestamp + "_" + ip + "_" + userID + "_" + adID + "_"
+ province + "_" + city;
return new Tuple2<String, Long>(clickedRecord, 1L);
}
});
/*
* 第四步:對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數(shù)等的編程,來進行具體的數(shù)據(jù)計算
* 計算每個Batch Duration中每個User的廣告點擊量
*/
JavaPairDStream<String, Long> adClickedUsers = pairs.reduceByKey(new Function2<Long, Long, Long>(){
@Override
public Long call(Long v1, Long v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
/**
*
* 計算出什么叫有效的點擊?
* 1,復(fù)雜化的一般都是采用機器學習訓練好模型直接在線進行過濾;
* 2,簡單的?可以通過一個Batch Duration中的點擊次數(shù)來判斷是不是非法廣告點擊,但是實際上講非法廣告
* 點擊程序會盡可能模擬真實的廣告點擊行為,所以通過一個Batch來判斷是 不完整的,我們需要對例如一天(也可以是每一個小時)
* 的數(shù)據(jù)進行判斷!
* 3,比在線機器學習退而求次的做法如下:
* 例如:一段時間內(nèi),同一個IP(MAC地址)有多個用戶的賬號訪問;
* 例如:可以統(tǒng)一一天內(nèi)一個用戶點擊廣告的次數(shù),如果一天點擊同樣的廣告操作50次的話,就列入黑名單;
*
* 黑名單有一個重點的特征:動態(tài)生成!??!所以每一個Batch Duration都要考慮是否有新的黑名單加入,此時黑名單需要存儲起來
* 具體存儲在什么地方呢,存儲在DB/Redis中即可;
*
* 例如郵件系統(tǒng)中的“黑名單”,可以采用Spark Streaming不斷的監(jiān)控每個用戶的操作,如果用戶發(fā)送郵件的頻率超過了設(shè)定的值,可以
* 暫時把用戶列入“黑名單”,從而阻止用戶過度頻繁的發(fā)送郵件。
*/
JavaPairDStream<String, Long> filteredClickInBatch = adClickedUsers.filter(new Function<Tuple2<String,Long>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Long> v1) throws Exception {
if ( 1 < v1._2){
//更新一下黑名單的數(shù)據(jù)表
return false;
} else {
return true;
}
}
});
// Todo。。。。
/*
* 此處的print并不會直接出發(fā)Job的執(zhí)行,因為現(xiàn)在的一切都是在Spark Streaming框架的控制之下的,對于Spark Streaming
* 而言具體是否觸發(fā)真正的Job運行是基于設(shè)置的Duration時間間隔的
*
* 諸位一定要注意的是Spark Streaming應(yīng)用程序要想執(zhí)行具體的Job,對Dtream就必須有output Stream操作,
* output Stream有很多類型的函數(shù)觸發(fā),類print、saveAsTextFile、saveAsHadoopFiles等,最為重要的一個
* 方法是foraeachRDD,因為Spark Streaming處理的結(jié)果一般都會放在Redis、DB、DashBoard等上面,foreachRDD
* 主要就是用用來完成這些功能的,而且可以隨意的自定義具體數(shù)據(jù)到底放在哪里?。?!
*
*/
// filteredClickInBatch.print();
filteredClickInBatch.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {
@Override
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {
@Override
public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
/**
* 在這里我們使用數(shù)據(jù)庫連接池的高效讀寫數(shù)據(jù)庫的方式把數(shù)據(jù)寫入數(shù)據(jù)庫MySQL;
* 由于傳入的參數(shù)是一個Iterator類型的集合,所以為了更加高效的操作我們需要批量處理
* 例如說一次性插入1000條Record,使用insertBatch或者updateBatch類型的操作;
* 插入的用戶信息可以只包含:timestamp、ip、userID、adID、province、city
* 這里面有一個問題:可能出現(xiàn)兩條記錄的Key是一樣的,此時就需要更新累加操作
*/
List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>();
while (partition.hasNext()){
Tuple2<String, Long> record = partition.next();
String[] splited = record._1.split("\t");
UserAdClicked userClicked = new UserAdClicked();
userClicked.setTimestamp(splited[0]);
userClicked.setIp(splited[1]);
userClicked.setUserID(splited[2]);
userClicked.setAdID(splited[3]);
userClicked.setProvince(splited[4]);
userClicked.setCity(splited[5]);
userAdClickedList.add(userClicked);
}
List<UserAdClicked> inserting = new ArrayList<UserAdClicked>();
List<UserAdClicked> updating = new ArrayList<UserAdClicked>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
//adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
for (UserAdClicked clicked : userAdClickedList){
jdbcWrapper.doQuery("SELECT count(1) FROM adclicked WHERE "
+ " timestamp = ? AND userID = ? AND adID = ?",
new Object[]{clicked.getTimestamp(), clicked.getUserID(), clicked.getAdID()},
new ExecuteCallBack() {
@Override
public void resultCallBack(ResultSet result) throws Exception {
if(result.next()){
long count = result.getLong(1);
clicked.setClickedCount(count);
updating.add(clicked);
} else {
inserting.add(clicked);
}
}
});
}
//adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
ArrayList<Object[]> insertParametersList = new ArrayList<Object[]>();
for(UserAdClicked inserRecord : inserting){
insertParametersList.add(new Object[]{
inserRecord.getTimestamp(),
inserRecord.getIp(),
inserRecord.getUserID(),
inserRecord.getAdID(),
inserRecord.getProvince(),
inserRecord.getCity(),
inserRecord.getClickedCount()
});
}
jdbcWrapper.doBatch("INSERT INTO adclicked VALUES(?,?,?,?,?,?,?)", insertParametersList);
//adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
ArrayList<Object[]> updateParametersList = new ArrayList<Object[]>();
for(UserAdClicked updateRecord : updating){
updateParametersList.add(new Object[]{
updateRecord.getTimestamp(),
updateRecord.getIp(),
updateRecord.getUserID(),
updateRecord.getAdID(),
updateRecord.getProvince(),
updateRecord.getCity(),
updateRecord.getClickedCount()
});
}
jdbcWrapper.doBatch("UPDATE adclicked set clickedCount = clickedCount + 1 WHERE "
+ " timestamp = ? AND userID = ? AND adID = ?", updateParametersList);
}
});
return null;
}
});
JavaPairDStream<String, Long> blackListBasedOnHistory = filteredClickInBatch.filter(new Function<Tuple2<String,Long>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Long> v1) throws Exception {
//廣告點擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
String[] splited = v1._1.split("\t");
String date = splited[0];
String userID = splited[2];
String adID = splited[3];
/**
* 接下來根據(jù)date、userID、adID等條件去查詢用戶點擊廣告的數(shù)據(jù)表,獲得總的點擊次數(shù)
* 這個時候基于點擊次數(shù)判斷是否屬于黑名單點擊 *
*/
int clickedCountTotalToday = 81;
if (clickedCountTotalToday > 50)
{
return true;
} else {
return false;
}
}
});
/**
* 必須對黑名單的整個RDD進行去重操作?。。? */
JavaDStream<String> blackListuserIDtBasedOnHistory = blackListBasedOnHistory.map(new Function<Tuple2<String,Long>, String>() {
@Override
public String call(Tuple2<String, Long> v1) throws Exception {
// TODO Auto-generated method stub
return v1._1.split("\t")[2];
}
});
JavaDStream<String> blackListUniqueuserIDtBasedOnHistory = blackListuserIDtBasedOnHistory.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
// TODO Auto-generated method stub
return rdd.distinct();
}
});
//下一步寫入黑名單數(shù)據(jù)表中
blackListUniqueuserIDtBasedOnHistory.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> t) throws Exception {
/**
* 在這里我們使用數(shù)據(jù)庫連接池的高效讀寫數(shù)據(jù)庫的方式把數(shù)據(jù)寫入數(shù)據(jù)庫MySQL;
* 由于傳入的參數(shù)是一個Iterator類型的集合,所以為了更加高效的操作我們需要批量處理
* 例如說一次性插入1000條Record,使用insertBatch或者updateBatch類型的操作;
* 插入的用戶信息可以只包含:useID
* 此時直接插入黑名單數(shù)據(jù)表即可。
*/
List<Object[]> blackList = new ArrayList<Object[]>();
while(t.hasNext()){
blackList.add(new Object[]{(Object)t.next()});
}
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
jdbcWrapper.doBatch("INSERT INTO blacklisttable VALUES (?) ", blackList);
}
});
return null;
}
});
/*
* Spark Streaming執(zhí)行引擎也就是Driver開始運行,Driver啟動的時候是位于一條新的線程中的,當然其內(nèi)部有消息循環(huán)體,用于
* 接受應(yīng)用程序本身或者Executor中的消息;
*/
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
class JDBCWrapper {
private static JDBCWrapper jdbcInstance = null;
private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection> ();
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static JDBCWrapper getJDBCInstance(){
if (jdbcInstance == null){
synchronized(JDBCWrapper.class){
if (jdbcInstance == null){
jdbcInstance = new JDBCWrapper();
}
}
}
return jdbcInstance;
}
private JDBCWrapper(){
for (int i = 0; i < 10; i++){
try {
Connection conn = DriverManager.getConnection("jdbc:mysql://Master:3306/sparkstreaming","root","root");
dbConnectionPool.put(conn);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public synchronized Connection getConnection(){
while (0 == dbConnectionPool.size()){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return dbConnectionPool.poll();
}
public int[] doBatch(String sqlText, List<Object[]> paramsList) {
Connection conn = getConnection();
PreparedStatement preparedStatement = null;
int[] result = null;
try {
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement(sqlText);
for (Object[] parameters : paramsList){
for(int i = 0; i < parameters.length; i++){
preparedStatement.setObject(i+1, parameters[i]);
}
preparedStatement.addBatch();
}
result = preparedStatement.executeBatch();
conn.commit();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (preparedStatement != null){
try {
preparedStatement.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (conn != null){
try {
dbConnectionPool.put(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
return result;
}
public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callBack) {
Connection conn = getConnection();
PreparedStatement preparedStatement = null;
ResultSet result = null;
try {
preparedStatement = conn.prepareStatement(sqlText);
for(int i = 0; i < paramsList.length; i++){
preparedStatement.setObject(i+1, paramsList[i]);
}
result = preparedStatement.executeQuery();
callBack.resultCallBack(result);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (preparedStatement != null){
try {
preparedStatement.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (conn != null){
try {
dbConnectionPool.put(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
interface ExecuteCallBack {
void resultCallBack(ResultSet result) throws Exception;
}
class UserAdClicked {
private String timestamp;
private String ip;
private String userID;
private String adID;
private String province;
private String city;
private Long clickedCount;
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getUserID() {
return userID;
}
public void setUserID(String userID) {
this.userID = userID;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
}
有興趣想學習國內(nèi)整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq 471186150。共享視頻,性價比超高!
|