想要使用kapacitor在单个流上使用多个窗口

问题描述:

目标:我希望在警报状况5分钟后发出通知,之后每30分钟发出一次通知。想要使用kapacitor在单个流上使用多个窗口

我玩.count()time功能,但没有去处,我不想计算这个烂摊子,我想不出一种方法,使其用户友好和可靠。

我现在正在使用的解决方案是使用两个独立的窗口。

var initialData = stream 
     |from() 
      .database(db) 
      .measurement(metricType) 
      .retentionPolicy(rPolicy) 
      .groupBy(group) 
      .where(lambda: "cpu" == 'cpu-total') 
      .where(lambda: "host" =~ hostFilter) 
     |mean(metric) 
      .as('initialStat') 
     |window() 
      .period(10m) 
      .every(5m) 
      .align() 

var continuousData = stream 
    |from() 
     .database(db) 
     .measurement(metricType) 
     .retentionPolicy(rPolicy) 
     .groupBy(group) 
     .where(lambda: metricType == 'cpu-total') 
     .where(lambda: "host" =~ hostFilter) 
    |mean(metric) 
     .as('continuousStat') 
    |window() 
     .period(10m) 
     .every(30) 
     .align() 

除了事实上,这只是似乎很奇怪,然后,我会需要计算出每个值,我也需要单独|alert()节点。第一个节点只会通知状态变化,但第二个节点不能有这个,所以我每隔N分钟就会收到一次警报提醒。我也有问题,第一个|alert()节点将发出OK通知,第二个也会在N分钟后发出一个骗子OK

我觉得必须有更好的方式来做到这一点。我想我可以在第二个|alert()节点中使用if声明,因为第一个|window将处理该声明,因此不会发送OK上的通知。在这一点上,我还没有想出如何做到这一点,但我相信这是可能的。我也不想打tickscript,我知道这是不是设计成一个完全成熟的语言每Issue 741

全tickscript低于

// CONFIGURATION PARAMETERS 

// Alerting 

var emailAddress = '$EMAIL' 
var pagerdutyKey = '$PD' 
var slackChannel = '$SLACK' 

// Static Thresholds in percent cpu steal used 
var warn = 85 
var crit = 95 

// Dynamic thresholds in number of std deviations 
var warnSig = 2.5 
var critSig = 3.5 

// Print INFO level (every result will be an alert) 
// AlertNode.StateChangesOnly will also need to be disabled 
// NOTE: 
// INFO level alerts will be disregarded by the pagerduty handler, this is not configurable. 
var debug = FALSE 

// Datastream 
// Define the data that will be acted upon 
var db   = 'telegraf' 
var group  = 'host' 
var metricType = 'cpu' 
var metric  = 'time_steal' 
var rPolicy  = 'default' 

// Regex used to filter on a subset of hosts 
var hostFilter = /.+/ 

// Window 
var dataPeriod   = 10m 
var initialFrequency  = 5m 
var continuousFrequency = 30m 

// DATAFRAME 
var initialData = stream 
    |from() 
     .database(db) 
     .measurement(metricType) 
     .retentionPolicy(rPolicy) 
     .groupBy(group) 
     .where(lambda: metricType == 'cpu-total') 
     .where(lambda: "host" =~ hostFilter) 
    |mean(metric) 
     .as('initialStat') 
    |window() 
     .period(dataPeriod) 
     .every(initialFrequency) 
     .align() 

var continuousData = stream 
    |from() 
     .database(db) 
     .measurement(metricType) 
     .retentionPolicy(rPolicy) 
     .groupBy(group) 
     .where(lambda: metricType == 'cpu-total') 
     .where(lambda: "host" =~ hostFilter) 
    |mean(metric) 
     .as('continuousStat') 
    |window() 
     .period(dataPeriod) 
     .every(continuousFrequency) 
     .align() 

// Calculations 
var initialCalculation = initialData 
    |eval(lambda: sigma("initialStat")) 
     .as('intialSigma') 
     .keep() 

var continuousCalculation = continuousData 
    |eval(lambda: sigma("continuousStat")) 
     .as('continuousSigma') 
     .keep() 

// ALERT CONDITIONS 
var initialCondition = initialCalculation 
    |alert() 
     .id('{{ index .Tags "host" }}') 
     .message('{{ .ID }} is {{ .Level }}: CPU STEAL USAGE {{ index .Fields "initialStat" }}% SHORT') 
     .details('this is an alert') 
     .stateChangesOnly() 
     .info(lambda: debug) 
     .warn(lambda: "stat" < warn OR 
      "sigma" > warnSig) 
     .crit(lambda: "stat" < crit OR 
      "sigma" > critSig) 

var continuousCondition = continuousCalculation 
    |alert() 
     .id('{{ index .Tags "host" }}') 
     .message('{{ .ID }} is {{ .Level }}: CPU STEAL USAGE {{ index .Fields "continuousStat" }}% LONG') 
     .details('this is an alert') 
     .info(lambda: debug) 
     .warn(lambda: "stat" < warn OR 
      "sigma" > warnSig) 
     .crit(lambda: "stat" < crit OR 
      "sigma" > critSig) 

// ACTIONS 
continuousCondition 
     // .log('/tmp/alerts/cpu_steal_usage_alerts') 
     // .slack() 
     // .channel(slackChannel) 
     .email(emailAddress) 
     .pagerDuty() 
       .serviceKey(pagerdutyKey) 

initialCondition 
     // .log('/tmp/alerts/cpu_steal_usage_alerts') 
     // .slack() 
     // .channel(slackChannel) 
     .email(emailAddress) 
     .pagerDuty() 
       .serviceKey(pagerdutyKey) 

因此很明显,我可以在一个单一的做多窗口流节点。

stream 
    |from() 
     .database(db) 
     .measurement(metricType) 
     .retentionPolicy(rPolicy) 
     .groupBy(group) 
     .where(lambda: metricType == metricFilter) 
     .where(lambda: "host" =~ hostFilter) 
    |window() 
     .period(dataPeriod) 
     .every(initialFrequency) 
     .align() 
    |mean(metric) 
     .as('initialStat') 
    |window() 
     .period(dataPeriod) 
     .every(continuousFrequency) 
     .align() 
    |mean(metric) 
     .as('continuousStat') 

虽然仍然在通过OK问题。