史上最简单的spark教程第八章-spark的自定义累加器与广播变量Java案例实践
这一章节以恶意请求流量记录作为我们的数据,编写一个完整案例
史上最简单的spark教程
所有代码示例地址:https://github.com/Mydreamandreality/sparkResearch
(提前声明:文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章,写作不易,转载请注明)
(文章参考:Elasticsearch权威指南,Spark快速大数据分析文档,Elasticsearch官方文档,实际项目中的应用场景)
(帮到到您请点点关注,文章持续更新中!)
Git主页 https://github.com/Mydreamandreality
如果你在开始前没有接触过累加器的概念,
我强烈建议先简单的了解下累加器的基础概念,链接: https://baike.baidu.com/item/累加器/8590163?fr=aladdin
本章数据的格式如下,省略了一部分涉密数据:
{ "origin_id": 2,
"asset_id": 152,
"add_type": 0,
"asset_name": "test",
"level": 2,
"status": 3,
"event_desc": "漏扫任务触发告警通知",
"notice_date": "2019-01-22T20:42:31+0800",
"create_date": "2019-01-22T20:42:31+0800",
"dispose_result_id": 1,
"disposal_user": 1,
"disposal_date": "2019-01-23T16:12:54+0800",
"note": "578468",
"attachment": "/media/disposeresult_dbbc9812-e999-461a-83.docx"
..............省略一部分数据}
在这个案例中我们统计严重漏洞的情况.并且分发一张巨大的查询表
- 共享变量:累加器(accumulator)
- 共享变量是一种可以在spark任务中使用的特殊类型的变量
- whatis累加器?
- 累加器简单的来说就是对信息进行聚合
- whatis累加器?
- 广播变量(broadcase variable)
- whatis广播变量
- 高效的分发较大对象
- whatis广播变量
=== 我的学习方法是,先有基础的概念,然后再深入学习,
=== 下面具体的介绍下这些概念
我们为什么需要累加器和广播变量?
- 在正常的程序中,我们向spark传递函数,比如使用map()或者filter(),可以使用驱动器程序中定义的变量,把任务分发到各个集群的计算节点
- 这个时候集群的计算节点会把驱动器的变量另保存一份副本,形成新的变量,
- 这个时候我们更新节点中的数据,对驱动器中的变量是不会有任何影响的
- 如果需要进行计算节点数据共享,读写共享变量效率是比较低下的
- 而spark的共享变量.累加器和广播变量这两种常见的通信模式突破了这种限制
- 说到这里可能有点晕:
那么灵魂画手上线啦
- 手动画的实在是太好看了,不好展示[捂脸].
- 这样画的不知道大家能否看的懂,实在不清楚可以留言交流
共享变量-累加器
- 刚才介绍的时候说了,累加器就是对信息进行聚合,其实准确的说,累加器提供将工作节点的值聚合到驱动器中的简单语法
- 那么累加器在真实场景中有啥子用呢?
- 举个栗子
- 我这里有大量的恶意请求流量,里面包含了各种各样的信息,请求头,请求体,协议,mac,IP,攻击手法,攻击者定位地址,攻击时间等等,但是有些时候攻击者的伪装或者引擎的某些问题,可能会导致其中的某些数据是空,
- 那我现在就要统计这些数据文件中有哪些key的value为null或者0,并且输出,其他结果不进行输出
- [当然你也可以延伸其他的需求]
- 代码案例:
- 代码略多,建议从我的GitHub上pull一份源码,自己debug一遍
import lombok.Data;
/**
* Created by 張燿峰
* 定义攻击流量中的字段
*
* @author 孤
* @date 2019/3/25
* @Varsion 1.0
*/
public class JavaBean {
public static String origin_id = "origin_id";
public static String asset_id = "asset_id";
public static String add_type = "add_type";
public static String asset_name = "asset_name";
/*其他的我先省略了,太多了,先拿这么多进行测试*/
}
import org.apache.spark.util.AccumulatorV2;
import java.util.HashMap;
import java.util.Map;
/**
* Created by 張燿峰
* 累加器
* @author 孤
* @date 2019/3/25
* @Varsion 1.0
*/
public class AttackAccumulator extends AccumulatorV2<String, String> {
/*定义我需要计算的变量*/
private Integer emptyLine;
/*定义变量的初始值*/
private String initEmptyLine = JavaBean.origin_id + ":0;" + JavaBean.asset_name + ":0;";
/*初始化原始状态*/
private String resetInitEmptyLine = initEmptyLine;
/*判断是否等于初始状态*/
@Override
public boolean isZero() {
return initEmptyLine.equals(resetInitEmptyLine);
}
//复制新的累加器
@Override
public AccumulatorV2<String, String> copy() {
return new AttackAccumulator();
}
/*初始化原始状态*/
@Override
public void reset() {
initEmptyLine = resetInitEmptyLine;
}
/*针对传入的新值,与当前累加器已有的值进行累加*/
@Override
public void add(String s) {
initEmptyLine = mergeData(s,initEmptyLine,";");
}
/*将两个累加器的计算结果合并*/
@Override
public void merge(AccumulatorV2<String, String> accumulatorV2) {
initEmptyLine = mergeData(accumulatorV2.value(), initEmptyLine, ";");
}
/*返回累加器的值*/
@Override
public String value() {
return initEmptyLine;
}
/*筛选字段为null的*/
private Integer mergeEmptyLine(String key, String value, String delimit) {
return 0;
}
private static String mergeData(String data_1, String data_2, String delimit) {
StringBuffer stringBuffer = new StringBuffer();
//通过分割的方式获取value
String[] info_1 = data_1.split(delimit);
String[] info_2 = data_2.split(delimit);
//处理info_1数据
Map<String, Integer> mapNode = resultKV(":", info_1);
//处理info_2数据
Map<String, Integer> mapNodeTo = resultKV(":", info_2);
consoleResult(delimit, stringBuffer, mapNodeTo, mapNode);
consoleResult(delimit, stringBuffer, mapNode, mapNodeTo);
return stringBuffer.toString().substring(0, stringBuffer.toString().length() - 1);
}
private static void consoleResult(String delimit, StringBuffer stringBuffer, Map<String, Integer> mapNode, Map<String, Integer> mapNodeTo) {
for (Map.Entry<String, Integer> entry : mapNodeTo.entrySet()) {
String key = entry.getKey();
Integer value = entry.getValue();
if (value == null || 0 == (value)) {
value += 1;
if (mapNode.containsKey(key) && (mapNode.get(key) == null || 0 == (mapNode.get(key)))) {
value += 1;
mapNode.remove(key);
stringBuffer.append(key + ":" + value + delimit);
continue;
}
stringBuffer.append(key + ":" + value + delimit);
}
}
}
private static Map<String, Integer> resultKV(String delimit, String[] infos) {
Map<String, Integer> mapNode = new HashMap<>();
for (String info : infos) {
String[] kv = info.split(delimit);
if (kv.length == 2) {
String k = kv[0];
Integer v = Integer.valueOf(kv[1]);
mapNode.put(k, v);
continue;
}
}
return mapNode;
}
}
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.org.apache.xpath.internal.SourceTree;
import org.apache.avro.ipc.specific.Person;
import org.apache.spark.InternalAccumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.AccumulatorV2;
import org.codehaus.janino.Java;
import scala.Tuple2;
import java.io.IOException;
import java.util.*;
/**
* Created by 張燿峰
* 第八章案例
* 累加器运行
* @author 孤
* @date 2019/3/25
* @Varsion 1.0
*/
public class Accumulator {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.master("local[4]").appName("AttackFind").getOrCreate();
//初始化sparkContext
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
//日志输出级别
javaSparkContext.setLogLevel("ERROR");
//创建RDD
JavaRDD<String> rdd = javaSparkContext.parallelize(Arrays.asList(JavaBean.origin_id, JavaBean.asset_name));
AttackAccumulator attackAccumulator = new AttackAccumulator();
//注册累加器
javaSparkContext.sc().register(attackAccumulator, "attack_count");
//生成一个随机数作为value
JavaPairRDD<String, String> javaPairRDD = rdd.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
Integer random = new Random().nextInt(10);
return new Tuple2<>(s, s + ":" + random);
}
});
javaPairRDD.foreach((VoidFunction<Tuple2<String, String>>) tuple2 -> {
attackAccumulator.add(tuple2._2);
});
}
}
运行结果
-
如果键值对的数据中value是null或者0,则进行数据的记录
-
如果键值对的数据中value不是null或者不是0,则不进行数据的记录
-
JavaBean中定义了一些需要输出的字段
-
AttackAccumulator中重写了累加器的一些函数
-
Accumulator是启动脚本
-
启动脚本中我们使用了SaprkSession,这是2.0的新概念,下一章中解释一下吧
-
在我的git中还有一个文件是TestMerge,各位可以下载debug一遍,.就知道整个运行机制了
在这对累加器做一个小总结
首先创建驱动器,注册我们的累加器,此处的累加器是继承了AccumulatorV2的类
其次执行我们自定义累加器中的方法 +=(add)增加累加器的值
最终驱动器程序可以调用累加器的value属性, java中Value()访问累加器的值
在计算节点上的任务不能访问累加器的值,对于计算节点这只是一个写的变量,在这种模式下,累加器可以更加高效
spark容错性
-
在开发中节点异常是非常有可能发生的
-
举个栗子
-
如果我们对某个分区执行map()操作的节点失败了,spark会自动重新在另一个节点运行该任务,就算这个节点没有崩溃,只是处理速度比其他节点慢很多,spark也可以抢占式的在另外一个节点上启动一个投机型的任务副本
-
那么这就衍生出一个问题,累计器是如何应对容错的
-
在RDD的转化操作中使用累加器可能会发生不止一次的更新,故我们需要对RDD先做缓存
-
rdd.cache();
-
以保证我们的数据重新读取时无需从头开始
-
如果此时你想要一个无论失败还是重复计算都绝对可靠的累加器,那就需要把累加器放进foreach()这样的行动操作中
-
因为在行动操作中,spark只会把每个任务对各累加器的修改应用一次
广播变量
-
广播变量可以高效的向所有的工作节点发送一个较大的只读值,提供给一个或者多个spark操作使用
-
广播变量的使用场景就是你需要向所有节点发送只读的查询表,又或者机器学习中很大的特征向量
-
你可以这么理解:
-
当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本,如果使用广播变量在每个Executor端中只有一份Driver端的变量副本
注意:
1.不能将RDD广播出去,可以将RDD的结果广播出去
2.广播变量在Driver定义,在Exector端不可改变,在Executor端不能定义 -
代码示例:
/**
* 广播变量测试
* @param args
*/
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.master("local[4]").appName("AttackFind").getOrCreate();
//初始化sparkContext
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
//在这里假定一份广播变量
//因为我们之前说过,广播变量只可读
final List<String> broadcastList = Arrays.asList("190099HJLL","98392QUEYY","561788LLKK");
//设置广播变量,把broadcast广播出去
final Broadcast<List<String>> broadcast = javaSparkContext.broadcast(broadcastList);
//定义数据
JavaPairRDD<String,String> pairRDD = javaSparkContext.parallelizePairs(Arrays.asList(new Tuple2<>("000", "000")));
JavaPairRDD<String,String> resultPairRDD = pairRDD.filter((Function<Tuple2<String, String>, Boolean>) v1 -> broadcast.value().contains(v1._2));
resultPairRDD.foreach((VoidFunction<Tuple2<String, String>>) System.out::println);
}
广播优化
- 当广播一个比较大的值时,选择既快又好的序列化格式是很重要的
- 因为如果序列化对象的时间很长或者传送花费的时间太久,这段时间很容易就成为性能瓶颈
- 尤其是Spark 的Scala和Java API中默认使用的序列化库为Java序列化库,
- 因此它对于除基本类型的数组以外的任何对象都比较低效
- 你可以使用spark.serializer属性选择另一个序列化库来优化序列化过程