Spark 实战

第一部分 使用 Scala 语言开发 Spark 应用程序

一、关于 Spark

  • Spark 由加州大学伯克利分校 AMP 实验室开发,可用来构建大型低延迟的大数据处理的应用程序,并且提供了用于机器学习(MLlib)、流计算(Streaming)、图计算(GraphX)等子模块。
  • Spark 对数据的存储、转换、计算均基于 RDD(分布式内存,Resilient Distributed Dataset),通过对 RDD 的转化(Transformation)和动作(Action)算子进行。
    • 转化算子可以把一个 RDD 转成另一个 RDD,如 filter 算子可以通过添加过滤条件生成一个只包含符合条件的数据的新的 RDD。
    • 动作算子负责完成最终的计算,如 count 算子可以计算出整个 RDD 表示的数据集中元素的个数。

二、关于 Scala

详见 Scala 课堂 篇。

三、案例分析与编程实现

1、案例一:词频统计

(1)思想

步骤 目标 做法
Step1 将文本文件中的每一行转化成一个个的单词 使用 flatMap 算子把一行文本 split 成多个单词
Step2 对每一个出现的单词进行记一次数 使用 map 算子把单个的单词转化成一个有计数的 Key-Value 对,即 word -> (word,1)
Step3 把所有相同单词的计数相加得到最终的结果 使用 reduceByKey 算子把相同单词的计数相加得到最终结果

(2)编程实现

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
 
object SparkWordCount {
  def FILE_NAME: String = "word_count_results_";
  
  def main(args:Array[String]) {
    if (args.length < 1) {
      println("Usage:SparkWordCount FileName");
      System.exit(1);
    }
    val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
    val sc = new SparkContext(conf);
    val textFile = sc.textFile(args(0));
    // 核心步骤:Step1 + Step2 + Step3
    val wordCounts = textFile.flatMap(line => line.split(" ")).map(
      word => (word, 1)).reduceByKey((a, b) => a + b) 
    
    //print the results,for debug use.
    //println("Word Count program running results:");
    //wordCounts.collect().foreach(e => {
    //val (k,v) = e
    //println(k+"="+v)
    //});
    wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
    println("Word Count program running results are successfully saved.");
  }
}

2、案例二:计算平均年龄

目标统计一个群体所有人的平均年龄。这些年龄信息都存储在一个文件里,并且该文件的格式如下,第一列是 ID,第二列是年龄。

(1)思想

步骤 目标 做法
Step1 将源文件对应的 RDD转化成一个只包含年龄信息的 RDD 使用 map 算子把源文件对应的 RDD 映射成一个新的只包含年龄数据的 RDD,在 map 算子的传入函数中使用 split 方法,得到数组后只取第二个元素即为年龄信息
Step2 计算元素个数,即总人数 对于第一步映射的结果 RDD 使用 count 算子
Step3 把所有年龄数加起来 使用 reduce 算子对只包含年龄信息的 RDD 的所有元素用加法求和
Step4 平均年龄=总年龄/人数 使用除法计算平均年龄

(2)编程实现

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AvgAgeCalculator {
  def main(args:Array[String]) {
    if (args.length < 1){
      println("Usage:AvgAgeCalculator datafile")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
    val sc = new SparkContext(conf)
    val dataFile = sc.textFile(args(0), 5);
    val count = dataFile.count()        // Step2
    val ageData = dataFile.map(line => line.split(" ")(1))    // Step1
    val totalAge = ageData.map(age => Integer.parseInt(
                                String.valueOf(age))).collect().reduce((a,b) => a+b)   // Step3
    println("Total Age:" + totalAge + ";Number of People:" + count )
    val avgAge : Double = totalAge.toDouble / count.toDouble      // Step4
    println("Average Age is " + avgAge)
 }
}

3、案例三:人口统计

假设我们需要对某个省的人口 (1 亿) 性别还有身高进行统计,需要计算出男女人数,男性中的最高和最低身高,以及女性中的最高和最低身高。本案例中用到的源文件有以下格式, 三列分别是 ID,性别,身高 (cm)。

(1)思想

步骤 目标 做法
Step1 从源文件的对应的 RDD 中分离两个分别包含男女信息的 RDD 使用 filter 算子,过滤条件就是包含”M” 的行是男性,包含”F”的行是女性
Step2 分别对男女信息对应的 RDD 的数据转化成只包含身高数据的两个 RDD 使用 map 算子把男女各自的身高数据从 RDD 中分离出来
Step3 对两个 RDD 进行排序,得到最高和最低的男性或女性身高 使用 sortBy 算子对男女身高数据进行排序

(2)编程实现

在实现上,有一个需要注意的点是在 RDD 转化的过程中需要把身高数据转换成整数,否则 sortBy 算子会把它视为字符串,那么排序结果就会受到影响,例如 身高数据如果是:123,110,84,72,100,那么升序排序结果将会是 100,110,123,72,84,显然这是不对的。

object PeopleInfoCalculator {
  def main(args:Array[String]) {
    if (args.length < 1){
      println("Usage:PeopleInfoCalculator datafile")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
    val sc = new SparkContext(conf)
    val dataFile = sc.textFile(args(0), 5);
    val maleData = dataFile.filter(line => line.contains("M")).map(
          line => (line.split(" ")(1) + " " + line.split(" ")(2)))
    val femaleData = dataFile.filter(line => line.contains("F")).map(
          line => (line.split(" ")(1) + " " + line.split(" ")(2)))            // Step1
    //for debug use
    //maleData.collect().foreach { x => println(x)}
    //femaleData.collect().foreach { x => println(x)}
    val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
    val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)   // Step2
    //for debug use
    //maleHeightData.collect().foreach { x => println(x)}
    //femaleHeightData.collect().foreach { x => println(x)}
    val lowestMale = maleHeightData.sortBy(x => x,true).first()
    val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
    //for debug use
    //maleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
    //femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
    val highestMale = maleHeightData.sortBy(x => x, false).first()
    val highestFemale = femaleHeightData.sortBy(x => x, false).first()      // Step3
    println("Number of Male Peole:" + maleData.count())
    println("Number of Female Peole:" + femaleData.count())
    println("Lowest Male:" + lowestMale)
    println("Lowest Female:" + lowestFemale)
    println("Highest Male:" + highestMale)
    println("Highest Female:" + highestFemale)
  }
}

4、案例四:关键词频率

(1)思想

步骤 目标 做法
Step1 对每一个出现的单词进行记一次数,过程中需要识别不同大小写的相同单词或者词组 使用 map 算子对源数据对应的 RDD 数据进行全小写转化并且给词组记一次数
Step2 对每个关键词出现的次数进行计算 调用 reduceByKey 算子计算相同词组的出现次数
Step3 对关键词或者词组按照出现的次数进行降序排序,排序前需要把 RDD 数据元素从 (k,v) 转化成 (v,k) 对 RDD 的数据元素用 sortByKey 算子进行降序排序
Step4 取排在最前面的 K 个单词或者词组 对排好序的 RDD 数据使用 take 算子获取前 K 个数据元素

(2)编程实现

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
 
object TopKSearchKeyWords {
  def main(args:Array[String]){
    if (args.length < 2) {
      println("Usage:TopKSearchKeyWords KeyWordsFile K");
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
    val sc = new SparkContext(conf)
    val srcData = sc.textFile(args(0))
    val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)     // Step1 Step2
    //for debug use
    //countedData.foreach(x => println(x))
    val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)      // Step3
    val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }      // Step4
    topKData.foreach(println)
  }
}

第二部分 使用 Spark SQL 对结构化数据进行统计分析

一、关于 Spark SQL / DataFrame

  • Spark SQL 是 Spark 生态系统里用于处理结构化大数据的模块,该模块里最重要的概念就是 DataFrame。
  • Spark DataFrame 以 RDD 为基础,但是带有 Schema 信息,它类似于传统数据库中的二维表格。
  • Spark SQL 模块目前支持将多种外部数据源的数据转化为 DataFrame,并像操作 RDD 或者将其注册为临时表的方式处理和分析这些数据。

二、案例分析与编程实现

1、案例一:人口统计

(1)案例描述与分析

一份包含 5 亿条人口信息的结构化数据,共包含三列,第一列是 ID,第二列是性别信息 (F -> 女,M -> 男),第三列是人口的身高信息,单位是 cm。

我们的统计任务如下:

  1. 用 SQL 语句的方式统计男性中身高超过 180cm 的人数。
  2. 用 SQL 语句的方式统计女性中身高超过 170cm 的人数。
  3. 对人群按照性别分组并统计男女人数。
  4. 用类 RDD 转换的方式对 DataFrame 操作来统计并打印身高大于 210cm 的前 50 名男性。
  5. 对所有人按身高进行排序并打印前 50 名的信息。
  6. 统计男性的平均身高。
  7. 统计女性身高的最大值。

(2)编码实现

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

object PeopleDataStatistics2 {
  private val schemaString = "id,gender,height"
  
  def main(args: Array[String]) {
    if (args.length < 1) {
      println("Usage:PeopleDataStatistics2 filePath")
      System.exit(1)
    }
    
    val conf = new SparkConf().setAppName("Spark Exercise:People Data Statistics 2")
    val sc = new SparkContext(conf)
    val peopleDataRDD = sc.textFile(args(0))
    val sqlCtx = new SQLContext(sc)
    
    // this is used to implicitly convert an RDD to a DataFrame.
    import sqlCtx.implicits._
    
    val schemaArray = schemaString.split(",")     // 结构化信息:列名
    val schema = StructType(schemaArray.map(fieldName => StructField(fieldName, StringType, true)))
    val rowRDD: RDD[Row] = peopleDataRDD.map(_.split(" ")).map(eachRow => Row(eachRow(0), eachRow(1), eachRow(2)))
    val peopleDF = sqlCtx.createDataFrame(rowRDD, schema)
    peopleDF.registerTempTable("people")      // 将 DataFrame peopleDF 注册为临时表
    
    //1. get the male people whose height is more than 180
    val higherMale180 = sqlCtx.sql("select id,gender,height from people where height > 180 and gender='M'")
    println("Men whose height are more than 180: " + higherMale180.count())
    println("<Display #1>")
    
    //2. get the female people whose height is more than 170
    val higherFemale170 = sqlCtx.sql("select id,gender,height from people where height > 170 and gender='F'")
    println("Women whose height are more than 170: " + higherFemale170.count())
    println("<Display #2>")
    
    //3. Grouped the people by gender and count the number
    peopleDF.groupBy(peopleDF("gender")).count().show()
    println("People Count Grouped By Gender")
    println("<Display #3>")
 
    //4. 用类 RDD 转换的方式对 DataFrame 操作来统计并打印身高大于 210cm 的前 50 名男性
    peopleDF.filter(peopleDF("gender").equalTo("M")).filter(peopleDF("height") > 210).show(50)
    println("Men whose height is more than 210")
    println("<Display #4>")
 
    //5. 对所有人按身高进行排序并打印前 50 名的信息
    peopleDF.sort($"height".desc).take(50).foreach { row => println(row(0) + "," + row(1) + "," + row(2)) }
    println("Sorted the people by height in descend order,Show top 50 people")
    println("<Display #5>")
 
    //6. 统计男性的平均身高
    peopleDF.filter(peopleDF("gender").equalTo("M")).agg(Map("height" -> "avg")).show()
    println("The Average height for Men")
    println("<Display #6>")
 
    //7. 统计女性身高的最大值
    peopleDF.filter(peopleDF("gender").equalTo("F")).agg("height" -> "max").show()
    println("The Max height for Women:")
    println("<Display #7>")

    println("All the statistics actions are finished on structured People data.")
  }
}

2、案例二:1 千万用户和 1 亿条交易数据

(1)案例描述与分析

我们将统计分析 1 千万用户和 1 亿条交易数据。对于用户数据,它是一个包含 6 个列 (ID, 性别, 年龄, 注册日期, 角色 (从事行业), 所在区域) 的文本文件。对于交易数据,它是一个包含 5 个列 (交易单号, 交易日期, 产品种类, 价格, 用户 ID) 的文本文件。

我们的统计任务如下:

  1. 统计在2015年有交易的用户人数。
  2. 统计2014年的订单总数。
  3. 展示ID为1的用户的订单信息。
  4. 统计ID为10的用户交易的最大、最小和平均价格。

(2)编码实现

import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkContext, SparkConf}

//define case class for user(用户)
case class User(userID: String, gender: String, age: Int, registerDate: String,role: String, region: String)
//define case class for consuming data(交易)
case class Order(orderID: String, orderDate: String, productID: Int, price: Int, userID: String)

object UserConsumingDataStatistics {
  def main(args: Array[String]) {
    if (args.length < 1) {
      println("Usage:UserConsumingDataStatistics userDataFilePath consumingDataFilePath")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("Spark Exercise:User Consuming Data Statistics")
    //Kryo serializer is more quickly by default java serializer
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val ctx = new SparkContext(conf)
    val sqlCtx = new SQLContext(ctx)
    import sqlCtx.implicits._

    //Convert user data RDD to a DataFrame and register it as a temp table
    val userDF = ctx.textFile(args(0)).map(_.split(" ")).map(u => User(u(0), u(1), u(2).toInt,u(3),u(4),u(5))).toDF()
    userDF.registerTempTable("user")
 
    //Convert consuming data RDD to a DataFrame and register it as a temp table
    val orderDF = ctx.textFile(args(1)).map(_.split(" ")).map(o => Order(o(0), o(1), o(2).toInt,o(3).toInt,o(4))).toDF()
    orderDF.registerTempTable("orders")
 
    //cache the DF in memory with serializer should make the program run much faster
    userDF.persist(StorageLevel.MEMORY_ONLY_SER)
    orderDF.persist(StorageLevel.MEMORY_ONLY_SER)

    //1. The number of people who have orders in the year 2015(统计在2015年有交易的用户人数)
    val count = orderDF.filter(orderDF("orderDate").contains("2015")).join(userDF, orderDF("userID").equalTo(userDF("userID"))).count()
    println("The number of people who have orders in the year 2015:" + count)
    
    //2. total orders produced in the year 2014(在2014年的总订单量)
    val countOfOrders2014 = sqlCtx.sql("SELECT * FROM orders where orderDate like '2014%'").count()
    println("total orders produced in the year 2014:" + countOfOrders2014)
 
    //3. Orders that are produced by user with ID 1 information overview(用户ID为1的用户的订单信息)
    val countOfOrdersForUser1 = sqlCtx.sql("SELECT o.orderID,o.productID,o.price,u.userID FROM orders o,user u 
                              where u.userID =1 and u.userID = o.userID").show()
    println("Orders produced by user with ID 1 showed.")
 
    //4. Calculate the max,min,avg prices for the orders that are producted by user with ID 10(用户ID为10的用户交易的最大、最小和平均价格)
    val orderStatsForUser10 = sqlCtx.sql("SELECT max(o.price) as maxPrice,min(o.price) as minPrice,avg(o.price) as avgPrice,u.userID 
                              FROM orders o,user u where u.userID = 10 and u.userID = o.userID group by u.userID")
    println("Order statistic result for user with ID 10:")
 
    orderStatsForUser10.collect().map(order => "Minimum Price=" + order.getAs("minPrice")
                              + ";Maximum Price=" + order.getAs("maxPrice")
                              + ";Average Price=" + order.getAs("avgPrice")
                              ).foreach(result => println(result))
 }
}

第三部分 使用 Spark MLlib 进行机器学习

一、关于 Spark 机器学习库

  • Spark 机器学习库提供了常用机器学习算法的实现,包括聚类,分类,回归,协同过滤,维度缩减等。
  • Spark MLlib 提供的算法实现都是基于原始的 RDD,想要基于这个包提供的工具构建完整并且复杂的机器学习流水线是比较困难的。
  • Spark ML Pipeline 弥补了原始 MLlib 库的不足,使用户很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。

二、Spark 机器学习实战

1、回归(Regression)

  • 将标签数据转化为整数索引:StringIndexer
  • 合并多列作为模型输入特征:VectorAssembler
  • 将数据划分为训练集和验证集:data.randomSplit
  • 线性回归模型:LinearRegression
  • 模型训练、评估、预测:model.fit(training_data)、model.evaluate(test_data)、model.transform(target_data)

2、分类(Classification)

  • 逻辑回归模型:LogisticRegression
  • 决策树模型:DecisionTreeClassifier
  • 随机森林模型:RandomForestClassifier
  • 梯度提升树模型:GBTClassifier

3、聚类(Clustering)

  • 特征标准化:StandardScaler
  • K-Means 聚类模型:KMeans

4、基于 TF-IDF 的文本分类

  • 分词:Tokenizer

  • 去除停用词: StopWordsRemover

  • 计算词频:CountVectorizer

  • 计算逆文本频率:IDF

  • 朴素贝叶斯模型:NaiveBayes

  • 形成数据处理 pipeline:Pipeline

第四部分 Pyspark 常用命令汇总

一、常用函数

函数 用法 作用
show df.show( ) 打印数据
printSchema df.printSchema( ) 打印概要
count df.count( ) 查询总行数
head df.head( ) 获取头几行
fliter df.fliter( isnull(“income”)) 筛选符合条件的行
describe df.describe( ).show( ) 查询概况
distinct df.select(“user_id”).distinct( ).show( ) 去重 set 操作
orderBy df.orderby(“income”) 按指定字段排序,默认为升序
sample df.sample(False, 0.2, 42) 抽样函数,withReplacement = True or False 代表是否有放回;42表示seed
withColumn df.withColumn(“income1”, df.income+10).show(5) 通过现有列的变换增加列
join df = df1.join(df2, df1.key == df2.key, “inner”) 按字段合并两个 dataframe
drop df.drop(“income”) 删除字段
union union df1.union(df2).show( ) 按行拼接两个 dataframe
subtract df1.substract(df2) 两个 dataframe 的差集(df1有但df2没有的)
intersect df1.intersect(df2) 两个 dataframe 的交集
explode df.explode(“time”,”day”){time:String=>time.splite(“ “)}.show(false) 根据 time 字段的空格将内容进行分割,分割的内容存储在新的字段”day”中
groupBy( ).agg( ) df.groupBy(“Age”).agg({“Purchase”: “mean”}).show( )df.groupBy(“A”).agg(F.avg(“B”), F.min(“B”)).show() 分组汇总
F.when( ).otherwise( )   条件判断取值,类似 case….when…
F.datediff   计算时间差