大数据中日志分析——4、日志分析操作
Easul Lv6

大数据日志分析相关文章

整体目标

  • 通过搭建Discuz获取日志
  • 项目需求分析
  • 项目架构设计
  • 使用Flume实时监控日志
  • 使用Kafka接收Flume日志
  • 使用Spark Streaming统计日志需求

XAMPP安装

折叠代码块BASH 复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 在slave1操作
cd ~/software
# 下载XAMPP,下载速度还行,比较快
wget https://phoenixnap.dl.sourceforge.net/project/xampp/XAMPP%20Linux/5.6.33/xampp-linux-x64-5.6.33-0-installer.run --no-check-certificate
# 授予XAMPP执行权限并运行安装,默认安装在/opt/lampp
# linux, apache,mysql或mariadb(mariadb相当于mysql的开源实现),php,perl
chmod u+x xampp-linux-x64-5.6.33-0-installer.run && sudo ./xampp-linux-x64-5.6.33-0-installer.run
rm -rf xampp-linux-x64-5.6.33-0-installer.run
# 下边的都在root下操作
su root
# 添加环境变量
echo "
XAMPP=/opt/lampp
PATH=\$XAMPP:\$XAMPP/bin:\$PATH
export PATH
" >> ~/.bash_profile
source ~/.bash_profile
# 查看xampp状态
xampp status
# 如果出现该错误:/bin/sh: error while loading shared libraries: libdl.so.2: cannot open shared object file: No such file or directory
# 将LD_ASSUME_KERNEL从2.2.5修改为2.8.0
vi /opt/lampp/lampp

export LD_ASSUME_KERNEL=2.8.0

# 停止xampp所有软件
xampp stop
# 启动xampp所有软件
xampp start
# 访问http://slave1的ip/,自动跳转xammp到dashboard界面
# 修改mariadb密码,默认没有密码
mysql -u root

lidbl.so.2无法找到

折叠代码块SQL 复制代码
1
2
3
4
5
6
7
8
9
10
-- 修改root用户密码
update mysql.user set password=PASSWORD('你的密码') where user = 'root';
-- 刷新权限
flush privileges;
-- 授予root用户远程登陆权限
grant all privileges on *.* to 'root'@'%' identified by '123654789' with grant option;
-- 刷新权限
flush privileges;
-- 这样就可以远程登录这个mysql了
-- mysql -h hostip -u root -p password

Discuz安装

折叠代码块BASH 复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 先退出刚刚的root
cd ~/software
wget http://download.comsenz.com/DiscuzX/3.2/Discuz_X3.2_SC_UTF8.zip
# 删掉xampp原本的网站内容
sudo rm -rf /opt/lampp/htdocs/*
# 安装unzip
sudo yum install -y unzip
# 解压到/opt/lampp/htdocs/
sudo unzip Discuz_X3.2_SC_UTF8.zip -d /opt/lampp/htdocs/
rm -rf Discuz_X3.2_SC_UTF8.zip
# 将upload中的文件移动到外部
sudo mv /opt/lampp/htdocs/upload/* /opt/lampp/htdocs/
sudo rm -rf /opt/lampp/htdocs/upload
# 赋予相关文件夹权限
sudo chmod 777 -R /opt/lampp/htdocs/config/
sudo chmod 777 -R /opt/lampp/htdocs/data/
sudo chmod 777 -R /opt/lampp/htdocs/uc_client/
sudo chmod 777 -R /opt/lampp/htdocs/uc_server/
# 然后访问http://slave1的ip/进行安装,如果有不满足的库就安装一下
# 后边就改一下数据库密码和管理员密码即可访问

Discuz相关操作

  • 添加板块

    折叠代码块BASH 复制代码
    1
    2
    3
    4
    5
    6
    7
    # 进入管理员后台
    # http://slave1ip/admin.php
    # 进入论坛,可以添加板块和子版块
    # 添加两个板块,大数据和人工智能
    # 大数据下添加Hadoop和Spark子版块
    # 人工智能添加深度学习和机器学习子版块
    # 然后可以随便发一个帖子
  • 查看访问日志

    折叠代码块BASH 复制代码
    1
    2
    3
    # 日志位置在/opt/lampp/logs/access_log
    # 查看实时访问日志
    tail -f /opt/lampp/logs/access_log
  • Discuz一些表的查看

    折叠代码块SQL 复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    -- 登陆数据库:mysql -u root -p password 
    -- 查看数据库都有哪些
    show databases;
    -- 选择数据库
    use ultrax;
    -- 查看都有什么表
    show tables;
    -- 设置字符集
    set names UTF8;
    -- 查看帖子id与标题对应关系表
    select tid, subject from pre_forum_post limit 10;
    -- 查看板块id与标题对应关系表
    select fid, name from pre_forum_forum limit 40;

项目需求分析

也就是分析项目要做什么东西,要实现什么

  • 统计指定时间段的热门文章
  • 统计指定时间段最受欢迎用户,也就是哪个用户在某时间段访问的最多,可用IP标识用户
  • 统计指定时间段不同模块访问量

整体架构设计

知道了要做什么之后,就需要思考如何实现,需要有整体架构
这里会介绍架构原理和每个组件的作用

Created with Raphaël 2.2.0Log文件middle内部流程为Flume➝Kafka➝Spark StreamingMysql
  • Flume:用于高效的收集,聚合和移动大量日志数据。是分布式的,可靠的,可用的服务。
  • Kafka:用于构建实时的数据管道和流式应用程序。是高吞吐量的分布式发布订阅的消息系统。
  • Spark Streaming:用于实时流式数据处理。是Spark核心API的一个扩展
  • MySQL:关系型数据库

Flume与Kafka的概念

Created with Raphaël 2.2.0Web ServerAgent内部流程为Source➝Channel➝SinkHDFS

Flume中三个组件的介绍
Kafka常用概念

Flume与Kafka实现实时日志传输

折叠代码块BASH 复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# 在每一个结点启动zookeeper
zkServer.sh start
# jps有了QuorumPeerMain即启动成功
jps
# 在每个结点以守护进程启动kafka
kafka-server-start.sh -daemon /home/easul/software/kafka_2.11-0.10.1.0/config/server.properties
# jps有了Kafka即启动成功
jps
# 在master操作topic
# 查看当前的topic
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --list
# 创建topic
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 3 --topic test
# 创建一个控制台的消费者
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic test --from-beginning
# 将master的flume复制到slave1,因为slave1安装了Discuz
scp -r ~/software/apache* easul@slave1:~/software
# 上边操作SBT时,环境变量里有了FLUME_HOME,所以无需再添加
# 下边在slave1操作
# 创建新的agent的配置文件
vi ~/software/apache-flume-1.7.0-bin/conf/log_collection.properties

# 将agent的名称设置为a1,写什么就是什么
# 然后定义组件名
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1
# =============================================================================
# 设置source
# 设置组件类型,用于监听一个目录下所有的文件
# a1.sources.source1.type = spooldir
a1.sources.source1.type = TAILDIR
# 给文件定义一个组
a1.sources.source1.filegroups = f1
# 监控的日志路径
a1.sources.source1.filegroups.f1 = /opt/lampp/logs/access_log
# 关闭文件头
a1.sources.source1.fileHeader = flase
# 设置一批处理多少事件(即数据),一个事件由数据的主题和数据的报头组成
a1.sources.source1.batchSize=4000
# 设置拦截器
a1.sources.source1.interceptors = i1
a1.sources.source1.interceptors.i1.type = timestamp
# =============================================================================
# 设置sink
# 日志写到哪里
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置kafka的broker服务器,flume1.6用这个,1.7用brokerList
# a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
# 设置kafka的topic,flume的日志会输出到这里
a1.sinks.sink1.topic= test
# 设置flume处理的一批事件数量
a1.sinks.sink1.kafka.flumeBatchSize = 20
# 设置生产者的一些参数
a1.sinks.sink1.kafka.producer.acks = 1
a1.sinks.sink1.kafka.producer.linger.ms = 1
a1.sinks.sink1.kafka.producer.compression.type = snappy
# =============================================================================
# 设置channel
# 设置是基于内存还是基于文件缓冲,基于文件安全性更高,内存断电就没有了。并且数据量过多,可以先缓存到文件
a1.channels.channel1.type = file
# 设置缓冲文件的检查点
a1.channels.channel1.checkpointDir=/home/easul/software/hadoop-data/flume_data/checkpoint
# 设置缓冲文件的数据目录
a1.channels.channel1.dataDirs= /home/easul/software/hadoop-data/flume_data/data
# =============================================================================
# 组件间的绑定
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

# 创建所需路径
mkdir -p ~/software/hadoop-data/flume_data/checkpoint
mkdir -p ~/software/hadoop-data/flume_data/data
mkdir -p ~/software/hadoop-data/flume_data/running_logs
# 启动flume
# 测试启动没有问题,可以用nohup以后台程序运行
flume-ng agent --conf-file /home/easul/software/apache-flume-1.7.0-bin/conf/log_collection.properties --name a1 -Dflume.root.logger=INFO, console
# 启动Discuz
su root
source ~/.bash_profile
xampp start

修改Apache访问日志格式

点击跳转

使用Spark Streaming进行统计

  • 流程

    • 分析日志格式,提取所需字段
    • 分析不同统计需求的关键特征,关于一段时间内用户,板块,文章的统计
    • 使用spark streaming窗口函数来统计
    • 将统计结果存到数据库
    • 结果可视化
  • 项目目录结构

    折叠代码块BASH 复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    tree -I ".bloop|.bsp|.metals|.vscode|target|project"

    ├── build.sbt
    ├── project
    │ └── build.properties
    └── src
    ├── main
    │ ├── resources
    │ │ ├── access_log.txt 访问日志
    │ │ └── log_scala.conf 项目配置文件
    │ └── scala
    │ ├── biz 业务层
    │ │ ├── AccessLogParser.scala 日志解析类
    │ │ └── LogAnalysis.scala 日志分析类
    │ ├── Run.scala 调度类
    │ └── util 工具类
    │ └── Utility.scala 配置文件解析类
    └── test
    └── scala
    └── Test.scala 调试类

相关配置

  • build.sbt添加如下配置

    折叠代码块 复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 解决冲突,不需要的类丢弃即可
    // https://stackoverflow.com/questions/14791955/assembly-merge-strategy-issues-using-sbt-assembly
    assemblyMergeStrategy in assembly := {
    case x if x.endsWith("UnusedStubClass.class") => MergeStrategy.discard
    case x if x.contains("org/apache/commons/collections") => MergeStrategy.discard
    case x if x.contains("javax/inject") => MergeStrategy.discard
    case x if x.contains("org/aopalliance") => MergeStrategy.discard
    case x if x.endsWith("package-info.class") => MergeStrategy.discard
    case x if x.endsWith("git.properties") => MergeStrategy.discard
    case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
    }
  • project/assembly.sbt加入可添加fatjar的配置

    折叠代码块 复制代码
    1
    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
  • ~/.sbt/1.0/plugins/plugins.sbt添加可以查看依赖树的插件

    折叠代码块 复制代码
    1
    addDependencyTreePlugin

编译运行加部署

折叠代码块BASH 复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 进入到项目根目录进行编译
sbt assembly
# 在master运行编译后的jar包
# 运行主类是Run,两线程,最后指定外部的配置文件
# 运行之前,每个结点的zookeeper要启动
# slave1的Discuz站点要启动,然后添加一个statistics数据库
# create database statistics;
# 最后kafka和slave1的flume要启动。这时就可以启动项目了
spark-submit --class Run --master local[2] scala-project-assembly-1.0.jar /log_scala.conf
# standalone模式(脱机模式),使用这个需要启动hadoop和spark
spark-submit --master spark://master:7077 scala-project-assembly-1.0.jar /log_scala.conf

# 添加一个数据库的报表监控界面
# 在slave2操作
# 安装pip
sudo yum -y install epel-release
sudo yum -y install python-pip
# 上传解压后台监控源码后安装依赖
# 如果安装失败可以点击下边的 pip安装依赖失败 进行尝试
sudo pip install -r requirements.txt
# 进行后台页面运行
cd web_monitor
# 显示输出的日志文件
python server.py --config=./config_online --port=9999 --log-to-stderr=True
# 查看监控页面
http://slave2:9999/web/log/main

pip安装依赖失败

 评论
Powered By Valine
v1.5.2