forked from chrisconlan/fast-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmoving_averages.py
192 lines (143 loc) · 4.97 KB
/
moving_averages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
"""
Given a list of numbers, compute their moving average
Output the to a new sequence of equal length
"""
import pandas as pd
from numba import jit
import numpy as np
from typing import List
from utils.profiler import time_this, timed_report
from utils.profiler import ExponentialRange
import gc
def random_numeric_list(n: int) -> List[float]:
return list(np.random.random(n))
@time_this(lambda *args, **kwargs: len(args[0]))
def slow_moving_avg(values: List[float],
m: int=20) -> List[float]:
"""
This is O(nm) for a list of length n because it
re-computes the average at each step
"""
# Exit early if m greater than length of values
if m > len(values):
return [None] * len(values)
# Pad initial m-1 values with Nones
moving_avg = [None] * (m-1)
# Compute the moving average
for i in range(m-1, len(values)):
the_avg = sum(values[(i-m+1):(i+1)]) / m
moving_avg.append(the_avg)
return moving_avg
@time_this(lambda *args, **kwargs: len(args[0]))
def fast_moving_avg(values: List[float],
m: int=20) -> List[float]:
"""
This is O(n) for a list of length n because it
uses the differential of the accumulator at each step
"""
# Exit early if m greater than length of values
if m > len(values):
return [None] * len(values)
# Pad initial m-1 values with Nones
moving_avg = [None] * (m-1)
# Compute the initial values
accumulator = sum(values[:m])
moving_avg.append(accumulator / m)
# Loop through the remainder of the data
for i in range(m, len(values)):
# Subtract the out-of-window value
accumulator -= values[i-m]
# Add the new in-window value
accumulator += values[i]
# Store the average
moving_avg.append(accumulator / m)
return moving_avg
@time_this(lambda *args, **kwargs: len(args[0]))
def np_fast_moving_avg(values: np.ndarray,
m: int=20) -> np.ndarray:
"""
Calculate the moving average in numpy in O(n) time.
Calculate v_i in advance using lagged difference of the
cumsum.
"""
# Calculate the cumulative sum to derive v_i from
cumsum = np.cumsum(values)
# Initialize empty array
moving_avg = np.empty((len(values),))
# Fill it with results
moving_avg[:m-1] = np.nan
if m <= values.shape[0]:
moving_avg[m-1] = cumsum[m-1] / m
if m < values.shape[0]:
moving_avg[m:] = (cumsum[m:] - cumsum[:-m]) / m
return moving_avg
@time_this(lambda *args, **kwargs: len(args[0]))
def pd_fast_moving_avg(values: pd.Series,
m: int=20) -> pd.Series:
"""
This is O(n) time and utilizes pandas .rolling interface
"""
return values.rolling(m).mean()
@time_this(lambda *args, **kwargs: len(args[0]))
def pd_faster_moving_avg(values: pd.Series,
m: int=20) -> pd.Series:
"""
This is O(n) time an outperforms the .rolling variant
"""
cumsum = values.cumsum()
return (cumsum - cumsum.shift(m)) / m
@jit(nopython=True)
def _numba_fast_moving_avg(values: np.ndarray,
m: int=20) -> np.ndarray:
"""
This is O(n) time and just-in-time compiled with numba
"""
# Initialize arrays to store data
moving_avg = np.empty(values.shape)
moving_avg[:m-1] = np.nan
# Exit early if m greater than length of values
if m > values.shape[0]:
return moving_avg
# Compute the initial values
accumulator = np.sum(values[:m])
moving_avg[m-1] = accumulator / m
# Loop through the remainder of the data
for i in range(m, values.shape[0]):
# Subtract the out-of-window value
accumulator -= values[i-m]
# Add the new in-window value
accumulator += values[i]
# Store the average
moving_avg[i] = accumulator / m
return moving_avg
# Get numba to run the jit optimization
_numba_fast_moving_avg(np.random.random(100000))
# Register time-able version of function
@time_this(lambda *args, **kwargs: len(args[0]))
def numba_fast_moving_avg(values: np.ndarray,
m: int=20) -> np.ndarray:
return _numba_fast_moving_avg(values, m=m)
if __name__ == '__main__':
exp_range = ExponentialRange(2, 7, 1/4)
values = random_numeric_list(exp_range.max)
series_values = pd.Series(values)
np_values = np.array(values)
with timed_report():
for i in exp_range.iterator(5):
slow_moving_avg(values[:i], m=100)
gc.collect()
for i in exp_range.iterator(7):
fast_moving_avg(values[:i], m=100)
gc.collect()
for i in exp_range.iterator():
np_fast_moving_avg(np_values[:i], m=100)
gc.collect()
for i in exp_range.iterator():
pd_fast_moving_avg(series_values[:i], m=100)
gc.collect()
for i in exp_range.iterator():
pd_faster_moving_avg(series_values[:i], m=100)
gc.collect()
for i in exp_range.iterator():
numba_fast_moving_avg(np_values[:i], m=100)
gc.collect()