大数据日志分析相关文章
相关学习技能
- spark RDD编程(基础)
- spark DataFrame/SQL编程(中级)
- spark streaming编程(将数据实时处理)
RDD基本概念
RDD(Resilient Distributed Datasets,弹性分布式数据集),是spark中的基本抽象
- 弹性是数据集可以放到内存,也可以放到磁盘
- 分布式就是一个数据集可以放到多个机器上
- 数据集可以看成是Scala中的Array,Map等的数据集合
RDD一旦赋值就是不变的,成为了只读数据集;且是可以并行操作的分区集合,即可以同时由多台服务器计算的一个任务
Spark中原生RDD的三种创建方式
- 从scala集合中创建
- 从文件系统创建(如从本机文件或hdfs创建)
- 现有RDD的transform操作创建(从原有RDD生成一个新的RDD)
RDD的特点
- 分区集合:RDD是一个分区的集合,一个RDD有一个或多个分区,
分区的数量决定了并行度 - 计算函数以分区为单位:调用一个计算任务,操作的是RDD的一个分区,计算函数为compute函数
- RDD依赖其他RDD:每个RDD都有依赖关系(源RDD的依赖关系为空),这些依赖关系构成了lineage(血缘),可通过toDebugString方法获取lineage。依赖关系在transform操作时生成
- key-value类型的RDD有一个特殊部分Partitioner:非key-value类型的RDD,Partitioner为None,Key-Value类型的RDD,Partitioner默认为HashPartitioner。在进行shuffle操作时(如reduceByKey,sortByKey),Partitioner决定了父RDD shuffle输出时对应的分区中的数据是如何map的
- 分区支持数据本地性:Spark在任务调度时,会尝试将任务分配在数据所在机器,从而避免机器间的数据传输,RDD获取优先位置的方法为getPreferredLocations
RDD Transform & Action
Transform和Action是RDD的两种操作
Transform将RDD操作后返回一个新的RDD,如:map,filter,flatMap,union,reduceByKey等。Transform操作只记录转换关系,不计算
Action将RDD操作后返回一个数值,一组数值或Unit(空),如reduce,count,saveAsTextFile,foreach等。新的RDD调用了这些Action,Transform调用时的计算才会进行
RDD lazy(惰性)执行
RDD在transform时,不计算,只记录RDD之间的转换关系,进行action操作时,触发真正的计算任务才开始计算
RDD持久化(cache/persist)
因为RDD在Action操作时Transform的调用链才会计算。而多次的Action操作可能调用的Transform调用链是相同的,所以会浪费计算资源。因此可以将Transform的计算结果进行存储,以供下次Action操作。
可以通过cache(缓存)方法或persist(持久化)方法进行缓存
cache只能缓存到内存,persist可以指定缓存级别,可以缓存到内存,硬盘或者两个都有
RDD操作实例
1 | // 计算单词出现的次数 |
可以通过spark-shell进行操作,需要将master, slave1, slave2三个结点都打开
1 | // 命令行输入spark-shell |
注意:这里在
master操作的时候,从机也需要进行启动,且主机需要2G内存,从机每个需要1G内存,否则可能会无法进行上边的RDD操作
DataFrame/SQL概念
Spark DataFrame是一个带有列名称的分布式数据集(也是一个数据集)。与RDD不同的是,DataFrame有列名称,相当于一张关系表。DataFrame可以通过结构化的数据文件,Hive中的表,外部数据库和已存在的RDD得到。
结构化的数据文件就是规定好每列是什么的文件或用分隔符分割好的文件。
Spark SQL是使用SQL或HiveQL编写的SQL语句,用于执行计算任务(比用程序执行任务更方便)
DataFrame & SQL引入动机
- Spark DataFrame & SQL效率比RDD高
- Spark DataFrame & SQL代码更简洁
- Spark DataFrame & SQL有自动优化程序引擎(就算SQL写的效率并不高,其优化引擎也会自动优化)
DataFrame读取数据
1 | // 先用spark-shell进入命令行 |
将RDD与DataFrame互相转换
RDD转换为DataFrame
1 | // 进入到spark-shell进行操作 |
将DataFrame转换为RDD
1 | val path = "/easul.json" |
spark streaming概念
spark streaming可以近实时的从Kafka,Flume,TCP socket接收数据,并进行实时处理。主要是用于实时计算。
可以将流式计算转为一批很小的批处理作业。
下边为一些常用名词:
- 离散流(DStream):Streaming中描述实时数据流的抽象,对DStream的操作最后都转为了对RDD的操作
- 批数据(batch data):将实时流数据以时间片为单位进行分批,转换为时间片为单位的批处理
- 批处理时间间隔(batch interval):用时间片作为拆分流数据的依据,一个时间片的数据对应一个RDD实例
- 窗口长度(window length):一个窗口覆盖的流数据的时间长度,是批处理时间间隔的倍数
- 滑动间隔(slide interval):前一个窗口到后一个窗口的时间长度,是批处理时间间隔的倍数,默认间隔为一个批处理时间间隔
可以这样理解,整个就是一个离散流,然后通过批处理时间间隔将离散流划分为多个批处理,每个批处理底层都是一个RDD。通过一定的窗口长度,获取到一定时间范围内数据,如点击量,然后可以计算这段时间间隔内的点击量有多少。接着通过滑动间隔移动窗口,就可以获取下一个时间范围内的数据,计算下个时间范围内的点击量。
StreamingContext
StreamingContext是spark Streaming整个程序的入口,相当于SparkContext的作用。
1 | import org.apache.spark._ |
因为代码无法直接运行,所以使用spark-shell运行
1 | # -l: 监听模式 |
1 | import org.apache.spark._ |
spark streaming编程流程
- 通过输入源创建一个输入DStream
- 对DStream进行transformation(相当于RDD的transform)和output操作(相当于RDD的action,输出后才会执行)
- 通过StreamingContext.start()启动Streaming,接收并处理数据
- 使用StreamingContext.awaitTermination()等待程序处理结束(只能手动关闭或者内部出错自动停止)
- 也可以调用StreamingContext.stop()结束程序运行
对于StreamingContext需要注意
- StreamingContext启动后,新增加的操作将不起作用,需要在启动前定义好所有的计算逻辑(transformation和output)
- StreamingContext停止后,不能重新启动再计算,要重新计算就要重新运行整个程序
- 单个JVM,一段时间内不能出现两个active的StreamingContext(所以上边在spark-shell不能再创建新的StreamingContext,只能通过SparkContext来创建)
- 调用StreamingContext的stop方法,SparkContext也会被stop。如果只想关掉StreamingContext,可以在stop方法中传入
stopSparkContext=false
spark streaming的WorksCount
1 | import org.apache.spark.SparkConf |
code-server部署scala
- 本文标题:大数据中日志分析——3、spark编程入门
- 创建时间:2022-03-02 13:24:26
- 本文链接:https://blog.212490197.xyz/article/program/bigdata/spark-programming/
- 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!