如何计算Python 3中的移动平均值?

问题描述:

比方说,我有一个列表:如何计算Python 3中的移动平均值?

y = ['1', '2', '3', '4','5','6','7','8','9','10'] 

我希望创建一个计算移动正天平均值的函数。 所以如果n是5,我想我的代码计算出第一个1-5,添加它并找到平均值,这将是3.0,然后继续到2-6,计算平均值,这将是4.0,然后3-7,4-8,5-9,6-10。

我不想计算第n-1天,所以从第n天开始计算前几天。

def moving_average(x:'list of prices', n): 
    for num in range(len(x)+1): 
     print(x[num-n:num]) 

这似乎是打印出来什么,我想:

[] 
[] 
[] 
[] 
[] 

['1', '2', '3', '4', '5'] 

['2', '3', '4', '5', '6'] 

['3', '4', '5', '6', '7'] 

['4', '5', '6', '7', '8'] 

['5', '6', '7', '8', '9'] 

['6', '7', '8', '9', '10'] 

不过,我不知道如何计算这些名单内的号码。有任何想法吗?

+5

为什么你在列表中有字符串而不是数字? – 2013-02-14 21:08:50

有一个老版本的Python文档的一个伟大的滑动窗口发生器itertools examples

from itertools import islice 

def window(seq, n=2): 
    "Returns a sliding window (of width n) over data from the iterable" 
    " s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...     " 
    it = iter(seq) 
    result = tuple(islice(it, n)) 
    if len(result) == n: 
     yield result  
    for elem in it: 
     result = result[1:] + (elem,) 
     yield result 

使用您的移动平均线是简单的:

from __future__ import division # For Python 2 

def moving_averages(values, size): 
    for selection in window(values, size): 
     yield sum(selection)/size 

运行此对您的输入(将字符串映射为整数)给出:

>>> y= ['1', '2', '3', '4','5','6','7','8','9','10'] 
>>> for avg in moving_averages(map(int, y), 5): 
...  print(avg) 
... 
3.0 
4.0 
5.0 
6.0 
7.0 
8.0 

要重新转None第一n - 1迭代 '不完整' 集,只是扩大了moving_averages功能一点:

def moving_averages(values, size): 
    for _ in range(size - 1): 
     yield None 
    for selection in window(values, size): 
     yield sum(selection)/size 
+1

+1我从来没有见过这个功能。 – placeybordeaux 2013-02-14 21:09:24

+1

我希望结果是[none,none,none,none,3.0,4.0,5.0,6.0,7.0,8.0]尽管 – Kara 2013-02-14 21:13:47

+0

虽然我非常感谢您的优雅解决方案,但我却将其与追踪运行总和而不是重新计算总和倍数。请参阅[我的答案](http://*.com/a/14942753/923794)。如果将函数简化为仅回答原始问题并且不允许其他参数,则这可能会更快。 – cfi 2013-02-19 08:19:48

使用summap函数。

print(sum(map(int, x[num-n:num]))) 

map功能在Python 3基本上是一个懒惰这个:

[int(i) for i in x[num-n:num]] 

我敢肯定,你可以猜到sum功能做什么。

是避免重复计算中间和值的方法..

list=range(0,12) 
def runs(v): 
global runningsum 
runningsum+=v 
return(runningsum) 
runningsum=0 
runsumlist=[ runs(v) for v in list ] 
result = [ (runsumlist[k] - runsumlist[k-5])/5 for k in range(0,len(list)+1)] 

打印结果

[2,3,4,5,6,7,8,9] 

make that runs(int(v)).. then .. rep r(runsumlist [k] - runsumlist [k-5])/ 5) 如果你蚂蚁携带数字串。


Alt键没有全球:

list = [float[x] for x in range(0,12)] 
nave = 5 
movingave = sum(list[:nave]/nave) 
for i in range(len(list)-nave):movingave.append(movingave[-1]+(list[i+nave]-list[i])/nave) 
print movingave 

一定要做到,即使你输入值是整数

[2.0,3.0,4.0,5.0,6.0,7.0,8.0,9,0] 
+0

事实上,运行总和算法更快。我发布了一个可以证明你的观点。这里只需要一个'global'变量。 – cfi 2013-02-18 18:16:35

+0

正确的你,我试图太难以明确的循环.. – agentp 2013-02-19 18:37:44

浮动数学虽然我喜欢这个Martijn's answer,像乔治,我想知道,这不会是通过使用运行总和,而不是在几乎相同的数字一遍遍应用sum()更快。

同样在斜坡阶段为None值作为默认的想法很有趣。实际上,可能会有很多不同的场景可以设想移动平均线。让我们平均的计算过程分为三个阶段:

  1. 斜升:开始反复在当前迭代计数<窗口大小
  2. 稳步推进:我们有完全相同的窗口大小数量的元素可用来计算正常average := sum(x[iteration_counter-window_size:iteration_counter])/window_size
  3. 斜坡下降:在输入数据的最后,我们可以返回另一个window_size - 1“平均”的数字。

下面是接受

  • 任意iterables(发电机是精细)作为用于数据输入
  • 任意窗口尺寸> = 1个
  • 参数期间接通/断开生产值的函数对于这些阶段斜坡向上/向下
  • 回调函数的相位控制值是如何产生的。这可以用来不断地提供一个默认的(如None),或提供部分平均值

下面的代码:

from collections import deque 

def moving_averages(data, size, rampUp=True, rampDown=True): 
    """Slide a window of <size> elements over <data> to calc an average 

    First and last <size-1> iterations when window is not yet completely 
    filled with data, or the window empties due to exhausted <data>, the 
    average is computed with just the available data (but still divided 
    by <size>). 
    Set rampUp/rampDown to False in order to not provide any values during 
    those start and end <size-1> iterations. 
    Set rampUp/rampDown to functions to provide arbitrary partial average 
    numbers during those phases. The callback will get the currently 
    available input data in a deque. Do not modify that data. 
    """ 
    d = deque() 
    running_sum = 0.0 

    data = iter(data) 
    # rampUp 
    for count in range(1, size): 
     try: 
      val = next(data) 
     except StopIteration: 
      break 
     running_sum += val 
     d.append(val) 
     #print("up: running sum:" + str(running_sum) + " count: " + str(count) + " deque: " + str(d)) 
     if rampUp: 
      if callable(rampUp): 
       yield rampUp(d) 
      else: 
       yield running_sum/size 

    # steady 
    exhausted_early = True 
    for val in data: 
     exhausted_early = False 
     running_sum += val 
     #print("st: running sum:" + str(running_sum) + " deque: " + str(d)) 
     yield running_sum/size 
     d.append(val) 
     running_sum -= d.popleft() 

    # rampDown 
    if rampDown: 
     if exhausted_early: 
      running_sum -= d.popleft() 
     for (count) in range(min(len(d), size-1), 0, -1): 
      #print("dn: running sum:" + str(running_sum) + " deque: " + str(d)) 
      if callable(rampDown): 
       yield rampDown(d) 
      else: 
       yield running_sum/size 
      running_sum -= d.popleft() 

这似乎是有点比马亭的版本快 - 这是更为虽然优雅。下面是测试代码:

print("") 
print("Timeit") 
print("-" * 80) 

from itertools import islice 
def window(seq, n=2): 
    "Returns a sliding window (of width n) over data from the iterable" 
    " s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...     " 
    it = iter(seq) 
    result = tuple(islice(it, n)) 
    if len(result) == n: 
     yield result  
    for elem in it: 
     result = result[1:] + (elem,) 
     yield result 

# Martijn's version: 
def moving_averages_SO(values, size): 
    for selection in window(values, size): 
     yield sum(selection)/size 


import timeit 
problems = [int(i) for i in (10, 100, 1000, 10000, 1e5, 1e6, 1e7)] 
for problem_size in problems: 
    print("{:12s}".format(str(problem_size)), end="") 

    so = timeit.repeat("list(moving_averages_SO(range("+str(problem_size)+"), 5))", number=1*max(problems)//problem_size, 
         setup="from __main__ import moving_averages_SO") 
    print("{:12.3f} ".format(min(so)), end="") 

    my = timeit.repeat("list(moving_averages(range("+str(problem_size)+"), 5, False, False))", number=1*max(problems)//problem_size, 
         setup="from __main__ import moving_averages") 
    print("{:12.3f} ".format(min(my)), end="") 

    print("") 

和输出:

Timeit 
-------------------------------------------------------------------------------- 
10     7.242  7.656 
100    5.816  5.500 
1000    5.787  5.244 
10000    5.782  5.180 
100000    5.746  5.137 
1000000   5.745  5.198 
10000000   5.764  5.186 

原来的问题现在可以调用这个函数来解决:

print(list(moving_averages(range(1,11), 5, 
          rampUp=lambda _: None, 
          rampDown=False))) 

输出:

[None, None, None, None, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0] 

还有另一种解决方案extendin g itertools配方pairwise()。您可以扩展这nwise(),它给你的滑动窗口(如果可迭代是发电机的工作原理):

def nwise(iterable, n): 
    ts = it.tee(iterable, n) 
    for c, t in enumerate(ts): 
     next(it.islice(t, c, c), None) 
    return zip(*ts) 

def moving_averages_nw(iterable, n): 
    yield from (sum(x)/n for x in nwise(iterable, n)) 

>>> list(moving_averages_nw(range(1, 11), 5)) 
[3.0, 4.0, 5.0, 6.0, 7.0, 8.0] 

虽然短iterable个比较高的设置成本这一成本的影响降低了较长的数据组。这使用sum(),但代码相当优雅:

Timeit    MP   cfi   ***** 
-------------------------------------------------------------------------------- 
10     4.658  4.959  7.351 
100    5.144  4.070  4.234 
1000    5.312  4.020  3.977 
10000    5.317  4.031  3.966 
100000    5.508  4.115  4.087 
1000000   5.526  4.263  4.202 
10000000   5.632  4.326  4.242