使用Spark MLLib群集(K-Means)进行网络异常检测

问题描述:

我有一大组网络数据,我一直在使用Spark和MLLib进行群集练习。我已经将数据归一化为一组表示时间,方向(进/出网络),发送字节,接收字节和每个连接持续时间的向量集。总共有七个维度。使用Spark MLLib群集(K-Means)进行网络异常检测

使用KMeans,使用此数据很容易构建模型。使用这个模型,每个输入向量被“分类”,并且距离被计算到最近的质心。最后,RDD(现在用距离标记)按距离排序,并提取最极端的值。

我的数据中的一个输入列是一个连接uuid(唯一的字母数字标识符)。我希望通过模型传递这些数据(每个输入向量都有唯一标记),但是当列不能转换为浮点数时,会触发异常。

这里的问题是:“我如何最有效地将异常值与原始输入数据绑定?”输入数据严重标准化,并且与原始输入不相似。此外,源IP地址和目标IP地址已丢失。我没有看到KMeans中的任何接口告诉它在构建模型时要考虑哪些列(或者相反,忽略)。

我的代码看起来是这样的:

def get_distance(clusters): 
    def _distance_map(record): 
     cluster = clusters.predict(record) 
     centroid = clusters.clusterCenters[cluster] 
     dist = np.linalg.norm(np.array(record) - np.array(centroid)) 
     return (dist, record) 
    return _distance_map 

def parseMap(row): 
    # parses rows of data out of the input strings 

def conMap(row): 
    # normalizes the values to be used in building the model 

rdd = sc.textFile('/data2/network/201610').filter(lambda r: r[0] != '#') 
tcp = rdd.map(parseMap).filter(lambda r: r['proto'] == 'tcp') 
cons = tcp.map(conMap) # this normalizes connection data 

model = KMeans.train(cons, (24 * 7), maxIterations=25, 
        runs=1, initializationMode = "random") 

data_distance = cons.map(get_distance(model)).sortByKey(ascending=False) 
print(data_distance.take(10)) 

由于K-手段是异常敏感,这意味着异常数据经常在群独自结束。

您需要通过密钥(集群)对您的数据点进行计数,以找出捕获异常值并过滤出该点的群集。