spark

spark

spark

spark

spark

spark

spark

spark

spark

spark

spark

spark

spark

 1)map操作  
  /**
      * map(func):返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
      * 将原始集合中的每一个元素*7
      * map的操作是一个one to one的操作
      */
        private def mapMethod(sc: SparkContext) = {
        val list = 1 to 10
        val listRDD = sc.parallelize(list)
        val bs = 7
        println("listRDD's partition: " + listRDD.getNumPartitions)
        val retRDD: RDD[Int] = listRDD.map(num => num * bs)
        retRDD.foreach(num => println(num))
}

      2)flatmap操作
         /**
      * flatMap(func):类似于map,但是每一个输入元素,
      * 会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
      *
      * flatMap是one to many
      * @param sc
      */
    private def flatMapMethod(sc: SparkContext) = {
        val list = List(
            "hello you",
            "hello  me he",
            "you you"
        )
        val listRDD:RDD[String] = sc.parallelize(list)
        val wordsRDD:RDD[String] = listRDD.flatMap{case line => {
            line.split(" ")
        }}
        wordsRDD.foreach(println)
} 
  
3)filter操作
    /**
      * filter(func): 返回一个新的数据集,由经过func函数后返回值为true的原元素组
成
      * 过滤出集合中的偶数even,留下奇数odd
      */
    private def filterMethod(sc: SparkContext) = {
        val range = 1 until 10
        val listRDD = sc.parallelize(range)
        val filteredRDD = listRDD.filter{case num => {num % 2 != 0}}

        filteredRDD.foreach(println)
    }
			
4)sample操作
    /**
      * 采集抽样:
      *     就是用样本空间来代替总体,查看总体数据的分布。
      *     这个Spark的sample抽样是否非准确抽样,如果要抽取固定的数据量,只能自
己手动处理。
      *    会在后面学习Spark如何解决数据倾斜的时候使用
      * sample(withReplacement, frac, seed):
      *     根据给定的随机种子seed,随机抽样出数量为frac的数据
      */
    private def sampleMethod(sc: SparkContext) = {
        val range = 1 to 100000
        val listRDD = sc.parallelize(range)
        val sampleRDD = listRDD.sample(true, 0.002)
        sampleRDD.foreach(println)
        println(s"sampleRDD的元素个数:${sampleRDD.count()}")
    }

5)union操作
   /**
     * union(otherDataset): 返回一个新的数据集,由原数据集和参数联合而成
     * 该操作类似于sql中的union all的操作(而sql中的union会做去重操作)
     */
    private def unionMethod(sc: SparkContext) = {
        val list1 = List(1, 2, 3, 4, 5)
        val list2 = List(3, 5, 6, 8, 10)

        val listRDD1 = sc.parallelize(list1)
        val listRDD2 = sc.parallelize(list2)

        val unionRDD = listRDD1.union(listRDD2)
        unionRDD.foreach(println)
        println("-----------------------------------")
        val interRDD = listRDD1.intersection(listRDD2)//求rdd的交集
        interRDD.foreach(println)
    }
4,SparkRDD的action操作

       package com.aura.bigdata.spark.core.p2

import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.io.compress.{DefaultCodec, SnappyCodec}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 学习SparkRDD的action操作
  * action操作是spark作业job执行的动因
  */
object _04SparkActionOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        val conf = new SparkConf().setAppName("_03SparkTransformationOps")
            .setMaster("local[2]")
        val sc = new SparkContext(conf)
        val list = List(
            "hello you",
            "hello  me he",
            "hello  shit  he",
            "you you"
        )
        val listRDD: RDD[String] = sc.parallelize(list)
        val wordsRDD: RDD[String] = listRDD.flatMap { case line => {
            line.split("\\s+")
        }
        }

        val pairsRDD = wordsRDD.map((_, 1))

        /*
            reduce(func): 通过函数func聚集数据集中的所有元素。Func函数接受2个
参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
            该reduce操作和scala中的reduce操作一模一样
            val list = 1 to 100
            val listRDD = sc.parallelize(list)
            val ret = listRDD.reduce((v1, v2) => v1 + v2)
         */

        /*
            collect():
            在Driver的程序中,以数组的形式,返回数据集的所有元素。
            这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,
            直接将整个RDD集Collect返回,很可能会让Driver程序OOM

            将RDD中各个分区中的数据,全都拉取到driver中。
         */
           pairsRDD.collect().foreach(println)

        /**
          * count(): 返回数据集rdd的元素个数
          */
            println("===========pairsRDD总共有" + pairsRDD.count() + "条记录。")
        /*
            take(n): 返回一个数组,由数据集的前n个元素组成。
            注意,这个操作目前并非在多个节点之上并行执行,而是Driver程序所在机器,
            单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)
         */
        println("=====================take(3)=====")
        pairsRDD.take(3).foreach(println)
        println("=====================takeOrdered(2)=====")
        val retRDD = pairsRDD.reduceByKey(_+_)
        retRDD.takeOrdered(2)(new Ordering[(String, Int)](){
            override def compare(x: (String, Int), y: (String, Int)) = {
                var ret = y._2.compareTo(x._2)
                if(ret == 0) {
                   ret = x._1.compareTo(y._1)
                }
                ret
            }
        }).foreach(println)
//        first(): 返回数据集的第一个元素(类似于take(1))
    /*
        saveAsTextFile(path):将数据集的元素,以textfile形式,保存到本地文件系统,
        hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString
方法,
        并将它转换为文件中的一行文本
     */
        retRDD.saveAsTextFile("file:///E:/data/spark/wc" + 
System.currentTimeMillis())

        /**
          * saveAsSequenceFile(path):
          *     将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系
统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-
value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为
          *    Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)
          */
//        retRDD.saveAsSequenceFile("file:///E:/data/spark/seq" + 
System.currentTimeMillis())

          retRDD.saveAsSequenceFile(
            "file:///E:/data/spark/seq" + System.currentTimeMillis(),
            Option(classOf[DefaultCodec])
        )
//        foreach(func): 在数据集的每一个元素上,运行函数func。这通常用于更新一
个累加器变量,或者和外部存储系统做交互

        retRDD.saveAsNewAPIHadoopFile(
            "file:///E:/data/spark/nahf" + System.currentTimeMillis(),
            classOf[Text],
            classOf[IntWritable],
            classOf[TextOutputFormat[Text, IntWritable]]
        )
        sc.stop()
    }
}

spark

spark

spark

spark

spark

spark