大数据中日志分析——3、spark编程入门
Easul Lv6

大数据日志分析相关文章

相关学习技能

  • spark RDD编程(基础)
  • spark DataFrame/SQL编程(中级)
  • spark streaming编程(将数据实时处理)

RDD基本概念

RDD(Resilient Distributed Datasets,弹性分布式数据集),是spark中的基本抽象

  • 弹性是数据集可以放到内存,也可以放到磁盘
  • 分布式就是一个数据集可以放到多个机器上
  • 数据集可以看成是Scala中的Array,Map等的数据集合

RDD一旦赋值就是不变的,成为了只读数据集;且是可以并行操作的分区集合,即可以同时由多台服务器计算的一个任务

Spark中原生RDD的三种创建方式

  • 从scala集合中创建
  • 从文件系统创建(如从本机文件或hdfs创建)
  • 现有RDD的transform操作创建(从原有RDD生成一个新的RDD)

RDD的特点

  • 分区集合:RDD是一个分区的集合,一个RDD有一个或多个分区,分区的数量决定了并行度
  • 计算函数以分区为单位:调用一个计算任务,操作的是RDD的一个分区,计算函数为compute函数
  • RDD依赖其他RDD:每个RDD都有依赖关系(源RDD的依赖关系为空),这些依赖关系构成了lineage(血缘),可通过toDebugString方法获取lineage。依赖关系在transform操作时生成
  • key-value类型的RDD有一个特殊部分Partitioner:非key-value类型的RDD,Partitioner为None,Key-Value类型的RDD,Partitioner默认为HashPartitioner。在进行shuffle操作时(如reduceByKey,sortByKey),Partitioner决定了父RDD shuffle输出时对应的分区中的数据是如何map的
  • 分区支持数据本地性:Spark在任务调度时,会尝试将任务分配在数据所在机器,从而避免机器间的数据传输,RDD获取优先位置的方法为getPreferredLocations

RDD Transform & Action

Transform和Action是RDD的两种操作
Transform将RDD操作后返回一个新的RDD,如:mapfilterflatMapunionreduceByKey等。Transform操作只记录转换关系,不计算
Action将RDD操作后返回一个数值,一组数值或Unit(空),如reducecountsaveAsTextFileforeach等。新的RDD调用了这些Action,Transform调用时的计算才会进行

RDD lazy(惰性)执行

RDD在transform时,不计算,只记录RDD之间的转换关系,进行action操作时,触发真正的计算任务才开始计算

RDD持久化(cache/persist)

因为RDD在Action操作时Transform的调用链才会计算。而多次的Action操作可能调用的Transform调用链是相同的,所以会浪费计算资源。因此可以将Transform的计算结果进行存储,以供下次Action操作。
可以通过cache(缓存)方法或persist(持久化)方法进行缓存
cache只能缓存到内存,persist可以指定缓存级别,可以缓存到内存,硬盘或者两个都有

RDD操作实例

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 计算单词出现的次数
import org.apache.spark._
import SparkContext._
object WordCount {
if (args.length != 3) {
println("usage is org.test.WordCount <input> <output>")
return
}

// 创建Spark任务,并指定任务名称
val sparkConf = new SparkConf().setAppName("wordCount")
// 创建SparkContext,用于调度任务的处理
val sparkContext = new SparkContext(sparkConf)
// 通过第一个参数提供的路径从文件系统创建一个RDD
val rawRdd = sparkContext.textFile(args(1))
// 读入RDD中每行数据,然后通过空格切分为多个单词,接着读入每个单词,转为tuple
// 这个时候就有了key-value型的RDD了,然后通过reduceByKey统计每个单词出现的次数
// 加的时候前边赋值上一次统计的次数,后边赋值本次统计的次数,算出所有单词出现的次数
val resultRdd = rawRdd.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _)
// 结果存到第二个参数提供的目录下
resultRdd.saveAsTextFile(args(2))
}

可以通过spark-shell进行操作,需要将master, slave1, slave2三个结点都打开

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 命令行输入spark-shell
// 然后启动命令行
// Spark context available as 'sc'
// 通过上边这条日志可以得到已经创建了SparkContext对象sc,用于操作RDD,下边使用即可
// 需要先在hdfs创建一个文件,数据可以如下
// hello hadoop
// hello bigdata
// hello spark

// 这个相当于一个Transform操作,然后生成一个RDD
val rawRdd = sc.textFile("/tmp/easul.txt")
// 查看RDD第一行内容,这个相当于一个Action,会自动执行上边的Transform
rawRdd.first
// 查看RDD所有内容,也相当于一个Action。数据量较大不要用这个,不然内存加载太多数据会让内存爆满
rawRdd.collect
// 查看RDD的血缘关系
rawRdd.toDebugString
// 将RDD拆分成单词
val words = rawRdd.flatMap(line => line.split("\\s+"))
// 将单词map成tuple
val words_tuple = words.map(word => (word, 1))
// 查看生成的元组
words_tuple.collect
// 将单词出现个数进行聚合,有相同键,其次数就+1
// val result = words_tuple.reduceByKey((x, y) => x + y)
// 这个相当于变量只用了一次,所以第一个下划线接受第一个参数,第二个下划线接受第二个参数,然后计算
val result = words_tuple.reduceByKey(_ + _)
// 查看结果
result.collect

注意:这里在master操作的时候,从机也需要进行启动,且主机需要2G内存,从机每个需要1G内存,否则可能会无法进行上边的RDD操作

DataFrame/SQL概念

Spark DataFrame是一个带有列名称的分布式数据集(也是一个数据集)。与RDD不同的是,DataFrame有列名称,相当于一张关系表。DataFrame可以通过结构化的数据文件,Hive中的表,外部数据库和已存在的RDD得到。
结构化的数据文件就是规定好每列是什么的文件或用分隔符分割好的文件。
Spark SQL是使用SQL或HiveQL编写的SQL语句,用于执行计算任务(比用程序执行任务更方便)

DataFrame & SQL引入动机

  • Spark DataFrame & SQL效率比RDD高
  • Spark DataFrame & SQL代码更简洁
  • Spark DataFrame & SQL有自动优化程序引擎(就算SQL写的效率并不高,其优化引擎也会自动优化)

DataFrame读取数据

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 先用spark-shell进入命令行

// 这里将文件转为DataFrame
// 定义好路径
// 文件内容可以如下,每一行都是json line
// {"name": "Michael"}
// {"name": "Andy", "age": 30}
// {"name": "Justin", "age": 19}
// {"name": "Justin", "age": 25}
val path = "/easul.json"
// 在操作DataFrame或Spark SQL的时候使用spark命令(新版本用该命令取代了sqlContext)
// parquet用于读取带有列信息的数据
// jdbc用于读取数据库
// table用于读取表
// text用于读取文件
// json用于读取json文件
// 这里读取json文件,最后生成了DataFrame
val easul = spark.read.json(path)
// 查看DataFrame的结构
easul.printSchema
// 查看前10条数据
easul.show(10)

// 得到年龄在13-19人的姓名
// 以下用sql和spark DataFrame的API来进行数据的获取。简单流程两种都可以,如果复杂流程,SQL写起来麻烦,可以选择使用API来操作
// Spark DataFrame的API实现,注意在show的时候才进行计算
val teenOne = easul.where("age >=13 and age <= 19").select("name")// where操作获取几条数据,select操作获取几条数据中的某些数据
teenone.show(3)
// Spark SQL实现
// 先注册一个临时表,再用SQL进行查询,注意在show的时候才进行计算
easul.registerTempTable("easul_tmp")
val teenTwo = spark.sql("SELECT name FROM easul_tmp WHERE age >= 13 AND age <= 19")
teenTwo.show(3)

// 找到某个同名人中最大年龄最大的,先按名字分组,然后使用agg()调用聚合函数
val peopleOfMaxAgeInSameName = easul.groupBy("name").agg(max("age"))
easul.registerTempTable("easul_tmp")
val theOtherWay = spark.sql("SELECT name, max(age) as max_age FROM easul_tmp GROUP BY name")

将RDD与DataFrame互相转换

RDD转换为DataFrame

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 进入到spark-shell进行操作

// 需要的文件内容
// Michael, 29
// Andy, 30
// Justin, 19

// 创建一个RDD,需要从HDFS创建一个RDD
var rawRdd = sc.textFile("/easul.txt")

// 第一种RDD转DataFrame的方法
// 导入所有隐式的方法
import spark.implicits._
// 定义一个case类
case class Person(name: String, age: Int)
// 先将每个行进行分割,切分之后相当于二维数组,然后将每个二维数组变成一个对象,最后用toDF将对象数组变成DataFrame
rawRdd.map(line => line.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF

// 第二种RDD转DataFrame的方法
// 需要的列名
val schemaString = "name age"
// 导入所需的包
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 将列名字符串转为可用的列名
val schema = StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true
)))
// 将RDD转为行数据,先拆分成二维数组,然后将每个二维数据变成一个行数据
val rowRdd = rawRdd.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// 然后通过RDD转换为的行数据和列名组成DataFrame
val dataFrame = spark.createDataFrame(rowRdd, schema)

将DataFrame转换为RDD

SCALA
1
2
3
4
5
6
val path = "/easul.json"
// 创建了一个DataFrame
val dataFrame = spark.read.json(path)
// 将DataFrame转为RDD,RDD的类型是RAW
val rdd = dataFrame.rdd
rdd.collect

spark streaming概念

spark streaming可以近实时的从Kafka,Flume,TCP socket接收数据,并进行实时处理。主要是用于实时计算。
可以将流式计算转为一批很小的批处理作业。
下边为一些常用名词:

  • 离散流(DStream):Streaming中描述实时数据流的抽象,对DStream的操作最后都转为了对RDD的操作
  • 批数据(batch data):将实时流数据以时间片为单位进行分批,转换为时间片为单位的批处理
  • 批处理时间间隔(batch interval):用时间片作为拆分流数据的依据,一个时间片的数据对应一个RDD实例
  • 窗口长度(window length):一个窗口覆盖的流数据的时间长度,是批处理时间间隔的倍数
  • 滑动间隔(slide interval):前一个窗口到后一个窗口的时间长度,是批处理时间间隔的倍数,默认间隔为一个批处理时间间隔

可以这样理解,整个就是一个离散流,然后通过批处理时间间隔将离散流划分为多个批处理,每个批处理底层都是一个RDD。通过一定的窗口长度,获取到一定时间范围内数据,如点击量,然后可以计算这段时间间隔内的点击量有多少。接着通过滑动间隔移动窗口,就可以获取下一个时间范围内的数据,计算下个时间范围内的点击量。

StreamingContext

StreamingContextspark Streaming整个程序的入口,相当于SparkContext的作用。

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.spark._
import org.apache.spark.streaming._

object StreamingTest {
def main(args: Array[String]): Unit = {
// 可通过SparkConf先来创建一些配置,设置程序名称和master模式(即运行方式,这里为本地模式)
// 这里操作时,master指定本地线程模式,线程数需要大于1。
// 因为线程数为1,生产者写了数据,这一个streaming线程只会接收数据,需要另一个线程处理数据(处理print)
// 因为无法处理,虽然收到数据,但结果无法打印出来
// 指定为local就会默认启动一个线程,写成local[*]会根据内核自动创建合适的线程个数,写成local[2]也可以
val conf = new SparkConf().setAppName("Streaming_Test").setMaster("local[2]")
// 然后传入conf和批处理时间间隔
val ssc = new StreamingContext(conf, Seconds(5))
// 创建从socket接收数据的DStream,也可以从kafka,flume等来传入数据
// 这里监听的是9999,可以先使用nc来创建一个服务。否则运行可能会报错
val lines = ssc.socketTextStream("localhost", 9999)
// 获取到数据就打印出来
lines.print()

// 启动StreamingContext
ssc.start()
// 出现了其他情况,如手动停止,则停止
ssc.awaitTermination()
}
}

因为代码无法直接运行,所以使用spark-shell运行

BASH
1
2
3
4
5
6
# -l: 监听模式
# -p: 设置端口号
# 使用nc可以创建一个数据生产者
nc -l -p 9999
# 然后进入spark控制台
spark-shell
SCALA
1
2
3
4
5
6
7
8
9
10
11
import org.apache.spark._
import org.apache.spark.streaming._
// 在控制台只能通过SparkContext来创建StreamingContext,不能直接用SparkConf
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
lines.print()
ssc.start()
ssc.awaitTermination()
// 开启后nc窗口输入数据,spark-shell每个5秒输出一次
// 调试完如果停不下来就可以用CTRL + C退出所有活动任务,多按几次
// 或者CTRL + Z,然后找到4040的程序kill -9

spark streaming编程流程

  • 通过输入源创建一个输入DStream
  • 对DStream进行transformation(相当于RDD的transform)和output操作(相当于RDD的action,输出后才会执行)
  • 通过StreamingContext.start()启动Streaming,接收并处理数据
  • 使用StreamingContext.awaitTermination()等待程序处理结束(只能手动关闭或者内部出错自动停止)
  • 也可以调用StreamingContext.stop()结束程序运行

对于StreamingContext需要注意

  • StreamingContext启动后,新增加的操作将不起作用,需要在启动前定义好所有的计算逻辑(transformation和output)
  • StreamingContext停止后,不能重新启动再计算,要重新计算就要重新运行整个程序
  • 单个JVM,一段时间内不能出现两个active的StreamingContext(所以上边在spark-shell不能再创建新的StreamingContext,只能通过SparkContext来创建)
  • 调用StreamingContext的stop方法,SparkContext也会被stop。如果只想关掉StreamingContext,可以在stop方法中传入stopSparkContext=false

spark streaming的WorksCount

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds,StreamingContext}

object StreamingTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Streaming_Test").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
// 将每行数据分割后,将多个数组合并为一个数组
val words - lines.flatMap(line => line.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
}
}

code-server部署scala

点击跳转

 评论