scala常用操作
Easul Lv6

手动hello-world项目

BASH
1
2
3
4
5
6
7
8
# 目录结构
├── project
│ └── build.properties
├── src
│ └── main
│ └── scala
│ └── Main.scala
├── build.sbt
  • build.properties
    PROPERTIES
    1
    sbt.version=1.6.2
  • main.scala
    SCALA
    1
    2
    3
    4
    // 这个是默认的测试方法
    object Main extends App {
    println("Hello, World!")
    }
    SCALA
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 使用这个测试代码,JVM内存如果有点小,那么会编译很长时间
    // 会出现这个提示:Consider increasing the JVM heap using `-Xmx` or try a different collector
    import org.apache.spark.sql.SparkSession
    object Main {
    def main(args: Array[String]) = {
    println("hello scala!")
    val ss = SparkSession.builder
    .appName("example")
    .master("local")
    .getOrCreate()
    import ss.implicits._
    ss.createDataset(1 to 10).show()
    ss.close()
    }
    }
  • build.sbt
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    // The simplest possible sbt build file is just one line:

    scalaVersion := "2.13.3"
    // That is, to create a valid sbt build, all you've got to do is define the
    // version of Scala you'd like your project to use.

    // ============================================================================

    // Lines like the above defining `scalaVersion` are called "settings". Settings
    // are key/value pairs. In the case of `scalaVersion`, the key is "scalaVersion"
    // and the value is "2.13.3"

    // It's possible to define many kinds of settings, such as:

    name := "hello-world"
    organization := "ch.epfl.scala"
    version := "1.0"

    // Note, it's not required for you to define these three settings. These are
    // mostly only necessary if you intend to publish your library's binaries on a
    // place like Sonatype.


    // Want to use a published library in your project?
    // You can define other libraries as dependencies in your build like this:

    libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2"

    // Here, `libraryDependencies` is a set of dependencies, and by using `+=`,
    // we're adding the scala-parser-combinators dependency to the set of dependencies
    // that sbt will go and fetch when it starts up.
    // Now, in any Scala file, you can import classes, objects, etc., from
    // scala-parser-combinators with a regular import.

    // TIP: To find the "dependency" that you need to add to the
    // `libraryDependencies` set, which in the above example looks like this:

    // "org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2"

    // You can use Scaladex, an index of all known published Scala libraries. There,
    // after you find the library you want, you can just copy/paste the dependency
    // information that you need into your build file. For example, on the
    // scala/scala-parser-combinators Scaladex page,
    // https://index.scala-lang.org/scala/scala-parser-combinators, you can copy/paste
    // the sbt dependency from the sbt box on the right-hand side of the screen.

    // IMPORTANT NOTE: while build files look _kind of_ like regular Scala, it's
    // important to note that syntax in *.sbt files doesn't always behave like
    // regular Scala. For example, notice in this build file that it's not required
    // to put our settings into an enclosing object or class. Always remember that
    // sbt is a bit different, semantically, than vanilla Scala.

    // ============================================================================

    // Most moderately interesting Scala projects don't make use of the very simple
    // build file style (called "bare style") used in this build.sbt file. Most
    // intermediate Scala projects make use of so-called "multi-project" builds. A
    // multi-project build makes it possible to have different folders which sbt can
    // be configured differently for. That is, you may wish to have different
    // dependencies or different testing frameworks defined for different parts of
    // your codebase. Multi-project builds make this possible.

    // Here's a quick glimpse of what a multi-project build looks like for this
    // build, with only one "subproject" defined, called `root`:

    // lazy val root = (project in file(".")).
    // settings(
    // inThisBuild(List(
    // organization := "ch.epfl.scala",
    // scalaVersion := "2.13.3"
    // )),
    // name := "hello-world"
    // )

    // To learn more about multi-project builds, head over to the official sbt
    // documentation at http://www.scala-sbt.org/documentation.html

常用注意事项

  • String类型可以直接比较

    SCALA
    1
    val result = if "test" == "test" true else false
  • 创建类时变量加var才能修改他的值

    SCALA
    1
    2
    3
    4
    5
    // 这里的name可以后期修改值,因为用了var修饰
    // age和height都不支持修改,相当于都用了val修改
    class Person(var name: String, val age: Int, height: Int) {

    }
  • class与object的区别
    class用于创建普通的类,object用于创建单例对象,也可以认为是静态类
    两个类在同一个文件里,class的是伴生类,object是伴生对象
    伴生类可以访问伴生对象的所有属性,外部对象无法访问伴生对象的属性
    object类因为是单例对象,所以可以保存所有对象的共有值
    object类有一个的apply方法,可用于不用new创建对象,需手动创建

    SCALA
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    class Person(var name: String, val age: Int, height: Int) {
    private var personId = Person.personId
    println(s"object Person's personId:$personId")

    def getPersonId() = {
    Person.getPersonId() + 1
    }
    }
    object Person {
    private var personId = 0

    def apply(name: String, age: Int, city: String = "Beijing") = new Person(name, age, city)
    def getPersonId() = {
    personId
    }
    }
  • 模式匹配
    就是用于判断值的类型,然后根据判断结果进行操作。可以参考这里

  • 基本类型外的其他类型

    YAML
    1
    2
    Any: 任意类型,传什么都可以
    Unit: 返回值为空
  • 可变参数

    SCALA
    1
    2
    3
    4
    def test(params: Int*) = {
    params.foreach(println(_))
    }
    sum(1 to 5: _*)

    1 to 5: _*表示希望将一个输入当做参数序列传入

参考

  • 路径的操作
    SCALA
    1
    2
    3
    4
    // 在spark的编程中,直接使用/temp/data/data.txt会访问hdfs上的数据
    val hdfsFileRdd = sparkContext.textFile("/temp/data/data.txt")
    // 本地数据使用file:/temp/data/local.txt访问
    val localFileRdd = sparkContext.textFile("file:/temp/data/local.txt")

正则的处理方式

SCALA
1
2
3
4
5
6
7
// 使用.r创建正则
val regex_1 = "(\\d+)\\s+(\\w+)!".r
// 使用对象创建正则
var regex_2 = new Regex("(\\d+)\\s+(\\w+)!")

// 通过正则获取字符串中的内容,获取内容在()中的变量里
val regex(number, word) = "123 asd"

数据集的操作

RDD的创建

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

class Test {
def createRdd() = {
// 数据如下
// hello hadoop
// hello bigdata
// hello spark

// 指定spark任务配置
val sparkConf = new SparkConf()
.setAppName("wordCount")
.setMaster("local[2]")
// 创建任务操作对象
val sparkContext = new SparkContext(sparkConf)
// 从本机创建一个rdd,如果从hdfs创建路径为/data/data.txt即可
val rawRdd = sparkContext.textFile("file:/home/easul/work/scala-project/src/test/scala/origin.txt")
// 对rdd进行transform操作
val rawTransform = rawRdd.flatMap(_.split("\\s+")).map((_.trim() -> 1)).reduceByKey(_ + _)
// 对rdd进行action操作
val rawAction = rawTransform.collect()
// 将array转为List输出
println(rawAction.toList)
}
}

DataFrame的创建

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

class Test{
def createDataFrame() = {
// 数据如下
// {"name": "Michael"}
// {"name": "Andy", "age": 30}
// {"name": "Justin", "age": 19}
// {"name": "Justin", "age": 25}

val sparkConf = new SparkConf()
.setAppName("wordCount")
.setMaster("local[2]")
val sparkContext = new SparkContext(sparkConf)
// 从SparkContext生成SQLContext
val sqlContext = new SQLContext(sparkContext)
// SQLContext通过读取结构化数据得到DataFrame
// parquet用于读取带有列信息的数据
// jdbc用于读取数据库
// table用于读取表
// text用于读取文件
// json用于读取json文件
// 这里读取json文件,最后生成了DataFrame
val dataFrame = sqlContext.read.json("file:/home/easul/work/scala-project/src/test/scala/origin.txt")
// 查看DataFrame结构
println(dataFrame.printSchema())
// 查看前10条数据
println(dataFrame.show(10))

// 得到年龄在13-19的人
// 进行通过API进行SQL操作
val teenOne = dataFrame.where("age >= 13 and age <= 19").select("name", "age")
// 显示前三条数据
teenOne.show(3)
// =================================
// 通过SQL操作
// 将dataFrame注册为临时表
dataFrame.registerTempTable("people_name")
// SQLContext执行相关sql
val teenTwo = sqlContext.sql("select name from people_name where age >= 13 and age <= 19")
println(teenTwo.show(3))

// 得到同名人中年龄最大的人
val peopleOfMaxAgeInSameName = dataFrame.groupBy("name").agg("age" -> "max")
println(peopleOfMaxAgeInSameName.show(10))
dataFrame.registerTempTable("people_name")
val anotherWay = sqlContext.sql("select name, max(age) from people_name group by name")
println(anotherWay.show(10))
}
}

RDD转DataFrame

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
// case类需要在类外部
case class Persons(name: String, age: Int)

class Test{
// 数据如下
// Michael, 29
// Andy, 30
// Justin, 19


def rdd2DataFrame() = {
val sparkSession = SparkSession
.builder()
.appName("RDDToDF")
.master("local[2]")
.getOrCreate()
// 隐式方法导入需要用sparkSession
import sparkSession.implicits._
val sparkContext = sparkSession.sparkContext
val rawRdd = sparkContext.textFile("file:/home/easul/work/scala-project/src/test/scala/origin.txt")

// 第一种:组装列名和列数据
// 创建表的列名字符串
val schemaStr = "name age"
// 创建列名对象
val schema = StructType(
schemaStr.split(" ").map(fieldName => StructField(fieldName, StringType, true))
)
// 将RDD转换为一行一行的列数据
val rowRdd = rawRdd.map(_.split(",")).map(person => Row(person(0), person(1).trim()))
// 通过SparkSession创建DataFrame
val dataFrame = sparkSession.createDataFrame(rowRdd, schema)
println(dataFrame.show(10))

// 第二种通过创建对象转为DataFrame
val dataFrame1 = rawRdd.map(_.split(",")).map(p => Persons(p(0),p(1).trim.toInt)).toDF()
println(dataFrame1.show(10))
}
}

value toDF is not a member of org.apache.spark.rdd.RDD

DataFrame转RDD

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
class Test{
def dataFrame2Rdd() = {
// 数据如下
// {"name": "Michael"}
// {"name": "Andy", "age": 30}
// {"name": "Justin", "age": 19}
// {"name": "Justin", "age": 25}

val sparkConf = new SparkConf()
.setAppName("wordCount")
.setMaster("local[2]")
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkContext)
val dataFrame = sqlContext.read.json("file:/home/easul/work/scala-project/src/test/scala/origin.txt")
val rowRdd = dataFrame.rdd
println(rowRdd.first())
}
}

创建sparkStreaming

SCALA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Test {
def sparkStreamingOperation() = {
// 创建spark streaming
// 1. 第一种模式
// 这里master设置为本地模式,且线程数设置为2(不写默认为1)
// 如果只有一个线程,生产者生产了数据,消费者只有一个接收数据的线程,无法执行打印操作
// 设置两个线程,sparkStreaming就可以接收到数据,并用另一个线程处理数据了
var conf = new SparkConf()
.setAppName("SparkStreaming")
.setMaster("Local[2]")
// 后边设置的Seconds是一个批处理时间间隔
// 最后的处理也只是针对每一批数据,前一批的输入不影响下一批
var sparkStreamingContext = new StreamingContext(conf, Seconds(5))
// 2. 第二种模式,通过已有的SparkContext创建
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5));

// 输入源的设置
// 1. 设置输入源为socket
// 可用nc -l -p 9999开启一个socket服务(-l是监听模式,-p是端口)
val lines = sparkStreamingContext.socketTextStream("localhost", 9999)

// Transformation操作
val words = lines.flatMap(_.split(" "))
val wordCount = words.map(words => (words.trim(), 1)).reduceByKey(_ + _)
// output操作
wordCount.print()

// 任务启动后,再添加的任务将不再执行
sparkStreamingContext.start()
// 等待控制台的关闭,然后会停止任务
sparkStreamingContext.awaitTermination()
}
}

 评论