日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

大數(shù)據(jù)IMF傳奇行動絕密課程第75

 看風景D人 2019-02-24

 * 生成數(shù)據(jù) SparkSQLDataManually.java
 */
package com.tom.spark.SparkApps.sql;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Random;

/**
 * 論壇數(shù)據(jù)自動生成代碼,數(shù)據(jù)格式如下:
 * data:日期,格式為yyyy-MM-dd
 * timestamp:時間戳
 * userID:用戶ID
 * pageID:頁面ID
 * channelID:板塊ID
 * action:點擊和注冊
 */
public class SparkSQLDataManually {

    static String yesterday = yesterday();
    static String[] channelNames = new String[] {
            "Spark", "Scala", "Kafka", "Flink", "Hadoop", "Storm",
            "Hive", "Impala", "HBase", "ML"
    };
    static String[] actionNames = new String[] {
        "register","view"
    };

    public static void main(String[] args) {
        /**
         * 通過傳遞進來的參數(shù)生成制定大小規(guī)模的數(shù)據(jù)
         */
        long numberItems = 5000;
        String path = ".";
        if (args.length > 0) {
            numberItems = Integer.valueOf(args[0]);
            path = args[1];
            System.out.println(path);
        }

        System.out.println("User log number is : " + numberItems);

        //具體的論壇頻道

        /**
         * 昨天的時間生成
         */


        userlogs(numberItems, path);
    }

    private static void userlogs(long numberItems, String path) {
        // TODO Auto-generated method stub
        Random random = new Random();
        StringBuffer userLogBuffer = new StringBuffer("");
        int[] unregisteredUsers = new int[]{1,2,3,4,5,6,7,8};
        for(int i = 0; i < numberItems; i++) {
            long timestamp = new Date().getTime();
            Long userID = 0L;
            long pageID = 0;

            //隨機生成的用戶ID
            if(unregisteredUsers[random.nextInt(8)] == 1) {
                userID = null;
            }
            else {
                userID = (long) random.nextInt((int) numberItems);
            }
            //隨機生成的頁面ID
            pageID = random.nextInt((int) numberItems);
            //隨機生成Channel
            String channel = channelNames[random.nextInt(10)];

            //隨機生成acton行為
            String action = actionNames[random.nextInt(2)];

            userLogBuffer.append(yesterday)
                .append("\t")
                .append(timestamp)
                .append("\t")
                .append(userID)
                .append("\t")
                .append(pageID)
                .append("\t")
                .append(channel)
                .append("\t")
                .append(action)
                .append("\n");

        }
//      System.out.print(userLogBuffer);
        PrintWriter pw = null;

        try {
            pw = new PrintWriter(new OutputStreamWriter(new FileOutputStream(path + "\\userlog.log")));
            System.out.println(path + "userlog.log");
            pw.write(userLogBuffer.toString());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            pw.close();
        }
    }

    private static String yesterday() {
        SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd");
        Calendar cal = Calendar.getInstance();
        cal.setTime(new Date());
        cal.add(Calendar.DATE, -1);

        Date yesterday = cal.getTime();

        return date.format(yesterday);
    }

}
/**
  * 計算PV、UV、熱門板塊、跳出率、新用戶注冊比率
  */
package com.tom.spark.SparkApps.sql;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.HiveContext;

/**
 * Table in hive database creation:
 * sqlContext.sql("create table userlogs(date string, timestamp bigint, userID bigint, pageID bigint, channel string, action string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'")
 *
 */
public class SparkSQLUserlogsOps {

    /**
     * @param args
     */
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("SparkSQLUserlogsOps").setMaster("spark://Master:7077");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        HiveContext hiveContext = new HiveContext(sc);

        String yesterday = getYesterday();

        pvStat(hiveContext, yesterday); //PV

        uvStat(hiveContext, yesterday); //UV

        hotChannel(hiveContext, yesterday); //熱門板塊

        jumpOutStat(hiveContext, yesterday); //跳出率

        newUserRegisterPercentStat(hiveContext, yesterday); //新用戶注冊的比例
    }

    private static void newUserRegisterPercentStat(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String newUserSQL = "select count(*) "
                + "from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' and userID is NULL "
//              + "limit 10"
                ;

        String RegisterUserSQL = "SELECT count(*) "
                + "from userlogs"
                + "where action = 'Register' and date='"+ yesterday+"' "
//              + "limit 10"
                ;

        Object newUser = hiveContext.sql(newUserSQL).collect()[0].get(0);
        Object RegisterUser = hiveContext.sql(RegisterUserSQL).collect()[0].get(0);

        double total = Double.valueOf(newUser.toString());
        double register = Double.valueOf(RegisterUser.toString());

        System.out.println("模擬新用戶注冊比例:" + register / total);
    }

    private static void jumpOutStat(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String totalPvSQL = "select count(*) "
                + "from "
                + "userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
//              + "limit 10"
                ;


        String pv2OneSQL = "SELECT count(*) "
                + "from "
                + "(SELECT count(*) totalNumber from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
                + "group by userID "
                + "having totalNumber = 1) subquery "
//              + "limit 10"
                ;


        Object totalPv = hiveContext.sql(totalPvSQL).collect()[0].get(0);
        Object pv2One = hiveContext.sql(pv2OneSQL).collect()[0].get(0);

        double total = Double.valueOf(totalPv.toString());
        double pv21 = Double.valueOf(pv2One.toString());

        System.out.println("跳出率為" + pv21 / total);
    }



    private static void uvStat(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String sqlText = "select date, pageID, uv "
                + "from "
                + "(select date, pageID, count(distinct(userID)) uv from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
                + "group by date, pageID) subquery "
                + "order by uv desc "
//              + "limit 10"
                ;

        hiveContext.sql(sqlText).show();
    }

    private static void hotChannel(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String sqlText = "select date, pageID, pv "
                + "from "
                + "(select date, pageID, count(1) pv from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
                + "group by date, pageID) subquery "
                + "order by pv desc "
//              + "limit 10"
                ;


        hiveContext.sql(sqlText).show();
    }

    private static void pvStat(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String sqlText = "select date, channel, channelpv "
                + "from "
                + "(select date, channel, count(*) channelpv from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
                + "group by date, channel) subquery "
                + "order by channelpv desc "
//              + "limit 10"
                ;


        hiveContext.sql(sqlText).show();
        //把執(zhí)行結果放到數(shù)據(jù)庫或Hive中

        //select date, pageID, pv from (select date, pageID, count(1) pv from userlogs where action = 'View' and 
        //date='2017-03-10' group by date, pageID) subquery order by pv desc limit 10   
    }



    private static String getYesterday() {
        SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd");
        Calendar cal = Calendar.getInstance();
        cal.setTime(new Date());
        cal.add(Calendar.DATE, -2);

        Date yesterday = cal.getTime();

        return date.format(yesterday);
    }

}

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內容均由用戶發(fā)布,不代表本站觀點。請注意甄別內容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內容,請點擊一鍵舉報。
    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約