如何计算Python 3中的移动平均值?
比方说,我有一个列表:如何计算Python 3中的移动平均值?
y = ['1', '2', '3', '4','5','6','7','8','9','10']
我希望创建一个计算移动正天平均值的函数。 所以如果n
是5,我想我的代码计算出第一个1-5,添加它并找到平均值,这将是3.0,然后继续到2-6,计算平均值,这将是4.0,然后3-7,4-8,5-9,6-10。
我不想计算第n-1天,所以从第n天开始计算前几天。
def moving_average(x:'list of prices', n):
for num in range(len(x)+1):
print(x[num-n:num])
这似乎是打印出来什么,我想:
[]
[]
[]
[]
[]
['1', '2', '3', '4', '5']
['2', '3', '4', '5', '6']
['3', '4', '5', '6', '7']
['4', '5', '6', '7', '8']
['5', '6', '7', '8', '9']
['6', '7', '8', '9', '10']
不过,我不知道如何计算这些名单内的号码。有任何想法吗?
有一个老版本的Python文档的一个伟大的滑动窗口发生器itertools
examples:
from itertools import islice
def window(seq, n=2):
"Returns a sliding window (of width n) over data from the iterable"
" s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... "
it = iter(seq)
result = tuple(islice(it, n))
if len(result) == n:
yield result
for elem in it:
result = result[1:] + (elem,)
yield result
使用您的移动平均线是简单的:
from __future__ import division # For Python 2
def moving_averages(values, size):
for selection in window(values, size):
yield sum(selection)/size
运行此对您的输入(将字符串映射为整数)给出:
>>> y= ['1', '2', '3', '4','5','6','7','8','9','10']
>>> for avg in moving_averages(map(int, y), 5):
... print(avg)
...
3.0
4.0
5.0
6.0
7.0
8.0
要重新转None
第一n - 1
迭代 '不完整' 集,只是扩大了moving_averages
功能一点:
def moving_averages(values, size):
for _ in range(size - 1):
yield None
for selection in window(values, size):
yield sum(selection)/size
+1我从来没有见过这个功能。 – placeybordeaux 2013-02-14 21:09:24
我希望结果是[none,none,none,none,3.0,4.0,5.0,6.0,7.0,8.0]尽管 – Kara 2013-02-14 21:13:47
虽然我非常感谢您的优雅解决方案,但我却将其与追踪运行总和而不是重新计算总和倍数。请参阅[我的答案](http://*.com/a/14942753/923794)。如果将函数简化为仅回答原始问题并且不允许其他参数,则这可能会更快。 – cfi 2013-02-19 08:19:48
使用sum
和map
函数。
print(sum(map(int, x[num-n:num])))
的map
功能在Python 3基本上是一个懒惰版这个:
[int(i) for i in x[num-n:num]]
我敢肯定,你可以猜到sum
功能做什么。
是避免重复计算中间和值的方法..
list=range(0,12)
def runs(v):
global runningsum
runningsum+=v
return(runningsum)
runningsum=0
runsumlist=[ runs(v) for v in list ]
result = [ (runsumlist[k] - runsumlist[k-5])/5 for k in range(0,len(list)+1)]
打印结果
[2,3,4,5,6,7,8,9]
make that runs(int(v)).. then .. rep r(runsumlist [k] - runsumlist [k-5])/ 5) 如果你蚂蚁携带数字串。
Alt键没有全球:
list = [float[x] for x in range(0,12)]
nave = 5
movingave = sum(list[:nave]/nave)
for i in range(len(list)-nave):movingave.append(movingave[-1]+(list[i+nave]-list[i])/nave)
print movingave
一定要做到,即使你输入值是整数
[2.0,3.0,4.0,5.0,6.0,7.0,8.0,9,0]
浮动数学虽然我喜欢这个Martijn's answer,像乔治,我想知道,这不会是通过使用运行总和,而不是在几乎相同的数字一遍遍应用sum()
更快。
同样在斜坡阶段为None
值作为默认的想法很有趣。实际上,可能会有很多不同的场景可以设想移动平均线。让我们平均的计算过程分为三个阶段:
- 斜升:开始反复在当前迭代计数<窗口大小
- 稳步推进:我们有完全相同的窗口大小数量的元素可用来计算正常
average := sum(x[iteration_counter-window_size:iteration_counter])/window_size
- 斜坡下降:在输入数据的最后,我们可以返回另一个
window_size - 1
“平均”的数字。
下面是接受
- 任意iterables(发电机是精细)作为用于数据输入
- 任意窗口尺寸> = 1个
- 参数期间接通/断开生产值的函数对于这些阶段斜坡向上/向下
- 回调函数的相位控制值是如何产生的。这可以用来不断地提供一个默认的(如
None
),或提供部分平均值
下面的代码:
from collections import deque
def moving_averages(data, size, rampUp=True, rampDown=True):
"""Slide a window of <size> elements over <data> to calc an average
First and last <size-1> iterations when window is not yet completely
filled with data, or the window empties due to exhausted <data>, the
average is computed with just the available data (but still divided
by <size>).
Set rampUp/rampDown to False in order to not provide any values during
those start and end <size-1> iterations.
Set rampUp/rampDown to functions to provide arbitrary partial average
numbers during those phases. The callback will get the currently
available input data in a deque. Do not modify that data.
"""
d = deque()
running_sum = 0.0
data = iter(data)
# rampUp
for count in range(1, size):
try:
val = next(data)
except StopIteration:
break
running_sum += val
d.append(val)
#print("up: running sum:" + str(running_sum) + " count: " + str(count) + " deque: " + str(d))
if rampUp:
if callable(rampUp):
yield rampUp(d)
else:
yield running_sum/size
# steady
exhausted_early = True
for val in data:
exhausted_early = False
running_sum += val
#print("st: running sum:" + str(running_sum) + " deque: " + str(d))
yield running_sum/size
d.append(val)
running_sum -= d.popleft()
# rampDown
if rampDown:
if exhausted_early:
running_sum -= d.popleft()
for (count) in range(min(len(d), size-1), 0, -1):
#print("dn: running sum:" + str(running_sum) + " deque: " + str(d))
if callable(rampDown):
yield rampDown(d)
else:
yield running_sum/size
running_sum -= d.popleft()
这似乎是有点比马亭的版本快 - 这是更为虽然优雅。下面是测试代码:
print("")
print("Timeit")
print("-" * 80)
from itertools import islice
def window(seq, n=2):
"Returns a sliding window (of width n) over data from the iterable"
" s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... "
it = iter(seq)
result = tuple(islice(it, n))
if len(result) == n:
yield result
for elem in it:
result = result[1:] + (elem,)
yield result
# Martijn's version:
def moving_averages_SO(values, size):
for selection in window(values, size):
yield sum(selection)/size
import timeit
problems = [int(i) for i in (10, 100, 1000, 10000, 1e5, 1e6, 1e7)]
for problem_size in problems:
print("{:12s}".format(str(problem_size)), end="")
so = timeit.repeat("list(moving_averages_SO(range("+str(problem_size)+"), 5))", number=1*max(problems)//problem_size,
setup="from __main__ import moving_averages_SO")
print("{:12.3f} ".format(min(so)), end="")
my = timeit.repeat("list(moving_averages(range("+str(problem_size)+"), 5, False, False))", number=1*max(problems)//problem_size,
setup="from __main__ import moving_averages")
print("{:12.3f} ".format(min(my)), end="")
print("")
和输出:
Timeit
--------------------------------------------------------------------------------
10 7.242 7.656
100 5.816 5.500
1000 5.787 5.244
10000 5.782 5.180
100000 5.746 5.137
1000000 5.745 5.198
10000000 5.764 5.186
原来的问题现在可以调用这个函数来解决:
print(list(moving_averages(range(1,11), 5,
rampUp=lambda _: None,
rampDown=False)))
输出:
[None, None, None, None, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
还有另一种解决方案extendin g itertools
配方pairwise()
。您可以扩展这nwise()
,它给你的滑动窗口(如果可迭代是发电机的工作原理):
def nwise(iterable, n):
ts = it.tee(iterable, n)
for c, t in enumerate(ts):
next(it.islice(t, c, c), None)
return zip(*ts)
def moving_averages_nw(iterable, n):
yield from (sum(x)/n for x in nwise(iterable, n))
>>> list(moving_averages_nw(range(1, 11), 5))
[3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
虽然短iterable
个比较高的设置成本这一成本的影响降低了较长的数据组。这使用sum()
,但代码相当优雅:
Timeit MP cfi *****
--------------------------------------------------------------------------------
10 4.658 4.959 7.351
100 5.144 4.070 4.234
1000 5.312 4.020 3.977
10000 5.317 4.031 3.966
100000 5.508 4.115 4.087
1000000 5.526 4.263 4.202
10000000 5.632 4.326 4.242
为什么你在列表中有字符串而不是数字? – 2013-02-14 21:08:50