





from datetime import date 
days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)] 
IQ = [110, 105, 90] 



编辑: 似乎从scikits.timeseries.lib.moving_funcs子模块mov_average_expw()功能从SciKits(附加补充SciPy工具包)更适合您的问题的措辞。

要与平滑因子alpha计算数据的exponential smoothing(这是*的条款(1 - alpha)):

>>> alpha = 0.5 
>>> assert 0 < alpha <= 1.0 
>>> av = sum(alpha**n.days * iq 
...  for n, iq in map(lambda (day, iq), today=max(days): (today-day, iq), 
...   sorted(zip(days, IQ), key=lambda p: p[0], reverse=True))) 


from collections import namedtuple 
from operator import itemgetter 

def smooth(iq_data, alpha=1, today=None): 
    """Perform exponential smoothing with factor `alpha`. 

    Time period is a day. 
    Each time period the value of `iq` drops `alpha` times. 
    The most recent data is the most valuable one. 
    assert 0 < alpha <= 1 

    if alpha == 1: # no smoothing 
     return sum(map(itemgetter(1), iq_data)) 

    if today is None: 
     today = max(map(itemgetter(0), iq_data)) 

    return sum(alpha**((today - date).days) * iq for date, iq in iq_data) 

IQData = namedtuple("IQData", "date iq") 

if __name__ == "__main__": 
    from datetime import date 

    days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)] 
    IQ = [110, 105, 90] 
    iqdata = list(map(IQData, days, IQ)) 
    print("\n".join(map(str, iqdata))) 

    print(smooth(iqdata, alpha=0.5)) 


$ python26 smooth.py 
IQData(date=datetime.date(2008, 1, 1), iq=110) 
IQData(date=datetime.date(2008, 1, 2), iq=105) 
IQData(date=datetime.date(2008, 1, 7), iq=90) 


y_new = y_old + (input - y_old)*alpha 

其中alpha = DT/tau蛋白,DT =过滤器,tau的时间步长=滤波器的时间常数τ (这个可变时间步格式如下,只是夹DT /头至不超过1.0)

y_new = y_old + (input - y_old)*dt/tau 



def ema(s, n): 
    returns an n period exponential moving average for 
    the time series s 

    s is a list ordered from oldest (index 0) to most 
    recent (index -1) 
    n is an integer 

    returns a numeric array of the exponential 
    moving average 
    s = array(s) 
    ema = [] 
    j = 1 

    #get n sma first and calculate the next n period ema 
    sma = sum(s[:n])/n 
    multiplier = 2/float(1 + n) 

    #EMA(current) = ((Price(current) - EMA(prev)) x Multiplier) + EMA(prev) 
    ema.append(((s[n] - sma) * multiplier) + sma) 

    #now calculate the rest of the values 
    for i in s[n+1:]: 
     tmp = ((i - ema[j]) * multiplier) + ema[j] 
     j = j + 1 

    return ema 

def movingAverageExponential(values, alpha, epsilon = 0): 

    if not 0 < alpha < 1: 
     raise ValueError("out of range, alpha='%s'" % alpha) 

    if not 0 <= epsilon < alpha: 
     raise ValueError("out of range, epsilon='%s'" % epsilon) 

    result = [None] * len(values) 

    for i in range(len(result)): 
     currentWeight = 1.0 

     numerator  = 0 
     denominator = 0 
     for value in values[i::-1]: 
      numerator  += value * currentWeight 
      denominator += currentWeight 

      currentWeight *= alpha 
      if currentWeight < epsilon: 

     result[i] = numerator/denominator 

    return result 


在函数结束时,它在返回列表之前颠倒这些值(以便它们按照调用者的正确顺序)。 (注意:如果我使用python以外的语言,我会先创建一个全尺寸的空数组,然后将它向后顺序填充,这样我就不必在最后反转它。但是我不认为你可以在python中声明一个大的空数组,并且在python列表中,追加比prepending要便宜得多,这就是为什么我以相反的顺序创建列表的原因,如果我错了,请纠正我。 )


today:  1.0 
yesterday: 0.5 
2 days ago: 0.25 
3 days ago: 0.125 

当然,如果你已经有了一个巨大的价值的数组,值:例如,如果您使用的0.5α,那么今天的移动平均值将被由以下加权值的或十五天前对今天的加权平均数不会有太大的贡献。 'epsilon'的观点可以让你设定一个截止点,在这个点之下,你将不再关心旧的价值(因为它们对当今价值的贡献将是微不足道的)。


result = movingAverageExponential(values, 0.75, 0.0001) 

我发现上面的代码片段通过@earino非常有用的 - 但我需要的东西,可以连续平滑值流 - 所以我把它重构为这样:

def exponential_moving_average(period=1000): 
    """ Exponential moving average. Smooths the values in v over ther period. Send in values - at first it'll return a simple average, but as soon as it's gahtered 'period' values, it'll start to use the Exponential Moving Averge to smooth the values. 
    period: int - how many values to smooth over (default=100). """ 
    multiplier = 2/float(1 + period) 
    cum_temp = yield None # We are being primed 

    # Start by just returning the simple average until we have enough data. 
    for i in xrange(1, period + 1): 
     cum_temp += yield cum_temp/float(i) 

    # Grab the timple avergae 
    ema = cum_temp/period 

    # and start calculating the exponentially smoothed average 
    while True: 
     ema = (((yield ema) - ema) * multiplier) + ema 


def temp_monitor(pin): 
    """ Read from the temperature monitor - and smooth the value out. The sensor is noisy, so we use exponential smoothing. """ 
    ema = exponential_moving_average() 
    next(ema) # Prime the generator 

    while True: 
     yield ema.send(val_to_temp(pin.read())) 



def moving_average(x, n, type): 
    x = np.asarray(x) 
    if type=='simple': 
     weights = np.ones(n) 
     weights = np.exp(np.linspace(-1., 0., n)) 

    weights /= weights.sum() 

    a = np.convolve(x, weights, mode='full')[:len(x)] 
    a[:n] = a[n] 
    return a 



def emaWeight(numSamples): 
    return 2/float(numSamples + 1) 

def ema(close, prevEma, numSamples): 
    return ((close-prevEma) * emaWeight(numSamples)) + prevEma 

samples = [ 
22.27, 22.19, 22.08, 22.17, 22.18, 22.13, 22.23, 22.43, 22.24, 22.29, 
22.15, 22.39, 22.38, 22.61, 23.36, 24.05, 23.75, 23.83, 23.95, 23.63, 
23.82, 23.87, 23.65, 23.19, 23.10, 23.33, 22.68, 23.10, 22.40, 22.17, 
emaCap = 10 
for s in range(len(samples)): 
    numSamples = emaCap if s > emaCap else s 
    e = ema(samples[s], e, numSamples) 
    print e 



import pandas as pd 
import numpy as np 

def ema(values, period): 
    values = np.array(values) 
    return pd.ewma(values, span=period)[-1] 

values = [9, 5, 10, 16, 5] 
period = 5 

print ema(values, period) 




import numpy as np 
from scipy.signal import lfilter 

x = np.random.normal(size=1234) 
alpha = .1 # smoothing coefficient 
zi = [x[0]] # seed the filter state with first value 
# filter can process blocks of continuous data if <zi> is maintained 
y, zi = lfilter([1.-alpha], [1., -alpha], x, zi=zi) 


def ExpMovingAverage(values, window): 
    """ Numpy implementation of EMA 
    weights = np.exp(np.linspace(-1., 0., window)) 
    weights /= weights.sum() 
    a = np.convolve(values, weights, mode='full')[:len(values)] 
    a[:window] = a[window] 
    return a