使用Spark MLLib群集(K-Means)进行网络异常检测
问题描述:
我有一大组网络数据,我一直在使用Spark和MLLib进行群集练习。我已经将数据归一化为一组表示时间,方向(进/出网络),发送字节,接收字节和每个连接持续时间的向量集。总共有七个维度。使用Spark MLLib群集(K-Means)进行网络异常检测
使用KMeans,使用此数据很容易构建模型。使用这个模型,每个输入向量被“分类”,并且距离被计算到最近的质心。最后,RDD(现在用距离标记)按距离排序,并提取最极端的值。
我的数据中的一个输入列是一个连接uuid(唯一的字母数字标识符)。我希望通过模型传递这些数据(每个输入向量都有唯一标记),但是当列不能转换为浮点数时,会触发异常。
这里的问题是:“我如何最有效地将异常值与原始输入数据绑定?”输入数据严重标准化,并且与原始输入不相似。此外,源IP地址和目标IP地址已丢失。我没有看到KMeans中的任何接口告诉它在构建模型时要考虑哪些列(或者相反,忽略)。
我的代码看起来是这样的:
def get_distance(clusters):
def _distance_map(record):
cluster = clusters.predict(record)
centroid = clusters.clusterCenters[cluster]
dist = np.linalg.norm(np.array(record) - np.array(centroid))
return (dist, record)
return _distance_map
def parseMap(row):
# parses rows of data out of the input strings
def conMap(row):
# normalizes the values to be used in building the model
rdd = sc.textFile('/data2/network/201610').filter(lambda r: r[0] != '#')
tcp = rdd.map(parseMap).filter(lambda r: r['proto'] == 'tcp')
cons = tcp.map(conMap) # this normalizes connection data
model = KMeans.train(cons, (24 * 7), maxIterations=25,
runs=1, initializationMode = "random")
data_distance = cons.map(get_distance(model)).sortByKey(ascending=False)
print(data_distance.take(10))
答
由于K-手段是异常敏感,这意味着异常数据经常在群独自结束。
您需要通过密钥(集群)对您的数据点进行计数,以找出捕获异常值并过滤出该点的群集。