用户行为分析大数据系统(实时统计每个分类被点击的次数,实时计算商品销售额,统计网站PV、UV)

⽤户⾏为分析数据系统(实时统计每个分类被点击的次数,实时计算商品销售额,统计⽹站PV、UV)
实时统计每天pv,uv的sparkStreaming结合redis结果存⼊mysql供前端展⽰
Flume+Kafka+Storm+Redis构建⼤数据实时处理系统:实时统计⽹站PV、UV+展⽰
flume+kafka+slipstream实现⿊名单⽤户访问实时监测
实战SparkStream+Kafka+Redis实时计算商品销售额
spark streaming从kafka获取数据,计算处理后存储到redis
⼤数据采集、清洗、处理:使⽤MapReduce进⾏离线数据分析完整案例
⼀、⼤数据处理的常⽤⽅法、项⽬的流程:
在互联⽹应⽤中,不管是哪⼀种处理⽅式,其基本的数据来源都是⽇志数据,例如对于web应⽤来说,则可能是⽤户的访问⽇志、⽤户的点击⽇志等。
⼤数据处理⽬前⽐较流⾏的是两种⽅法,⼀种是离线处理,⼀种是在线处理,基本处理架构如下:
当然,如果只是希望得到数据的分析结果,对处理的时间要求不严格,就可以采⽤离线处理的⽅式,⽐如我们可以先将⽇志数据采集到HDFS中,之后再进⼀步使⽤MapReduce、Hive等来对数据进⾏分析,这也是可⾏的。
Flume将⽹站⽇志数据采集到HDFS分布式存储系统中
Spark SQL清洗存储在HDFS的⽹站⽇志数据,清洗完后将其数据继续存储在HDFS中
Hive建⽴数据仓库,建⽴外部表,将清洗完的⽇志数据从HDFS中导⼊到Hive的外部表中,作为基础数据的存储
在Hive中新建新的外部表⽤于存储PV、UV的结果数据
⽤Hive的HQL统计分析⽇志数据,统计出PV、UV并将结果数据存到新的外部表中
将统计完的PV、UV数据使⽤Sqoop从Hive同步到外部的MySQL中供给WEB前端使⽤
如果对于数据的分析结果在时间上有⽐较严格的要求,则可以采⽤在线处理的⽅式来对数据进⾏分析,如使⽤Spark、flink等进⾏处理。⽐较贴切的⼀个例⼦是天猫双⼗⼀的成交额,在其展板上,我们看到交易额是实时动态进⾏更新的,对于这种情况,则需要采⽤在线处理。
如何⼀步步构建我们的实时处理系统(Flume+Kafka+Storm+Redis)
1.Flume将⽹站⽇志数据采集到kafka、
2.sparkstreaming实时处理kafka数据⽹站的⽤户访问⽇志,并统计出该⽹站的PV、UV
3.将实时分析出的PV、UV等指标,实时处理后发送kafka+node.js展⽰
经络油动态地展⽰在我们的前⾯页⾯上
本⽂主要分享对某个电商⽹站产⽣的⽤户访问⽇志(access.log)进⾏离线处理与分析的过程,基于MapReduce的处理⽅式,最后会统计出某⼀天不同省份访问该⽹站的uv与pv。
1 、数据源
在我们的场景中,Web应⽤的部署是如下的架构:
即⽐较典型的Nginx负载均衡+KeepAlive⾼可⽤集架构,在每台Web服务器上,都会产⽣⽤户的访问⽇志,业务需求⽅给出的⽇志格式如
下:
1001    211.167.248.22  eecf0780-2578-4d77-a8d6-e2225e8b9169    40604  1      GET /top HTTP/1.0      408    null      null    1523188122767
1003    222.68.207.11  eecf0780-2578-4d77-a8d6-e2225e8b9169    20202  1      GET /tologin HTTP/1.1  504    null      Mozilla/5.0 (Windows; U; Windows NT 1001    61.53.137.50    c3966af9-8a43-4bda-b58c-c11525ca367b    0      1      GET /update/pass HTTP/1.0      302      null    null    1523188123768
1000    221.195.40.145  1aa3b538-2f55-4cd7-9f46-6364fdd1e487    0      0      GET /user/add HTTP/1.1  200    null      Mozilla/4.0 (compatible; MSIE 7.0; Windo 1000    121.11.87.171  8b0ea90a-77a5-4034-99ed-403c800263dd    20202  1      GET /top HTTP/1.0      408    null      Mozilla/5.0 (Windows; U; Windows NT 5
appid  ip  mid  userid  login_type  request  status  http_referer  user_agent  time
其中:
appid包括: web:1000,android:1001,ios:1002,ipad:1003
mid:        唯⼀的id此id第⼀次会种在浏览器的cookie⾥。如果存在则不再种。作为浏览器唯⼀标⽰。
移动端或者pad直接取机器码。
login_type:登录状态,0未登录、1:登录⽤户
request:类似于此种 "GET /userList HTTP/1.1"
status:请求的状态主要有:200 ok、404 not found、408 Request Timeout、500 Internal Server Error、504 Gateway Timeout等
http_referer:请求该url的上⼀个url地址。
user_agent:浏览器的信息,例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36" time:时间的long格式:1451451433818。
如果备份⽇志或者⽇志切割:
vim /opt/cut_nginx.sh
#!/bin/bash
#切割⽇志
datetime=$(date -d "-1 day" "+%Y%m%d")
log_path="/usr/local/nginx/logs"
pid_path="/usr/local/nginx/logs/nginx.pid"
[ -d $log_path/backup ] || mkdir -p $log_path/backup
if [ -f $pid_path ]
then
mv $log_path/access.log $log_path/backup/access.log-$datetime
kill -USR1 $(cat $pid_path)
find $log_path/backup -mtime +30 | xargs rm -f
#mtime :⽂件被修改时间 atime:访问时间(⽂件中的数据库最后被访问的时间) ctime:改变时间(⽂件的元数据发⽣变化。⽐如权限,所有者等)
else
echo "Error,Nginx is not working!" | tee -a /var/log/messages
fi
chmod +x /opt/cut_nginx.sh
crontab -e 设置定时任务
0  0  *  *  *    /opt/cut_nginx.sh
1、模拟⽣成实时数据
public class SimulateData {
public static void main(String[] args) {
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter("G:\\Scala\\实时统计每⽇的品类的点击次数\\"));
int i = 0;
while (i < 20000){
long time = System.currentTimeMillis();
int categoryid = new Random().nextInt(23);
bw.write("ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA3                bw.newLine();
i++;
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/*
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04
无菌检测系统
D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB  */
模拟数据实时的写⼊data.log:需要⼀直启动着:
#!/bin/bash
cat demo.csv | while read line
do
echo "$line" >> data.log
sleep 1
done
或者⽣成数据直接发送kafka
/**
* 这⾥产⽣数据,就会发送给kafka,kafka那边启动消费者,就会接收到数据,这⼀步是⽤来测试⽣成数据和消费数据没有问题的,确定没问题后要关闭消费者,
* 启动OnlineBBSUserLogss.java的类作为消费者,就会按pv,uv等⽅式处理这些数据。
* 因为⼀个topic只能有⼀个消费者,所以启动程序前必须关闭kafka⽅式启动的消费者(我这⾥没有关闭关闭kafka⽅式启动的消费者也没正常啊)
*/
public class SparkStreamingDataManuallyProducerForKafkas extends Thread{
//具体的论坛频道
static String[] channelNames = new  String[]{
"Spark","Scala","Kafka","Flink","Hadoop","Storm",
"Hive","Impala","HBase","ML"
};
//⽤户的两种⾏为模式
static String[] actionNames = new String[]{"View", "Register"};
private static Producer<String, String> producerForKafka;
private static String dateToday;
private static Random random;
//2、作为线程⽽⾔,要复写run⽅法,先写业务逻辑,再写控制
@Override
@Override
public void run() {
int counter = 0;//搞500条
刮板钢while(true){//模拟实际情况,不断循环,异步过程,不可能是同步过程
counter++;
String userLog = userlogs();
System.out.println("product:"+userLog);
//"test"为topic
producerForKafka.send(new KeyedMessage<String, String>("test", userLog));
if(0 == counter%500){
counter = 0;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
网络球}
}
}
}
private static String userlogs() {
StringBuffer userLogBuffer = new StringBuffer("");
int[] unregisteredUsers = new int[]{1, 2, 3, 4, 5, 6, 7, 8};
long timestamp = new Date().getTime();
Long userID = 0L;
细胞核染
long pageID = 0L;
/
/随机⽣成的⽤户ID
if(Int(8)] == 1) {
userID = null;
} else {
userID = (long) Int((int) 2000);
}
//随机⽣成的页⾯ID
pageID =  Int((int) 2000);
//随机⽣成Channel
渣油储罐清洗处理String channel = Int(10)];
//随机⽣成action⾏为
String action = Int(2)];
userLogBuffer.append(dateToday)
.append("\t")
.append(timestamp)
.append("\t")
.append(userID)
.append("\t")
.append(pageID)
.append("\t")
.append(channel)
.append("\t")
.
append(action);  //这⾥不要加\n换⾏符,因为kafka⾃⼰会换⾏,再append⼀个换⾏符,消费者那边就会处理不出数据  String();
}
public static void main(String[] args) throws Exception {
dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
random = new Random();
Properties props = new Properties();
props.put("zk.connect", "h71:2181,h72:2181,h73:2181");
props.put("metadata.broker.list","h71:9092,h72:9092,h73:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
producerForKafka = new Producer<String, String>(config);
new SparkStreamingDataManuallyProducerForKafkas().start();
}
}

本文发布于:2024-09-21 22:19:08,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/1/202928.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   处理   访问   时间   分析   结果
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议