Skip to content

Commit 7b69274

Browse files
yidong72avolkov1
authored andcommitted
[FEA] Feature adding fractional differencing computation (#56)
* fractional difference implementation * added unit tests for frac diff * added port fraction diff computation * added the new notebook
1 parent a0cd5ef commit 7b69274

File tree

6 files changed

+1217
-2
lines changed

6 files changed

+1217
-2
lines changed

gquant/cuindicator/__init__.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from .ewm import Ewm
2-
from .indicator import * # noqa: F403
2+
from .indicator import * # noqa: F403, F401
33
from .pewm import PEwm
44
from .rolling import Rolling
55
from .util import (shift, diff, substract, summation,
66
multiply, division, scale, cumsum)
7+
from .frac_diff import (fractional_diff, get_weights_floored,
8+
port_fractional_diff)
79

810
__all__ = ["Ewm", "PEwm", "Rolling", "shift", "diff", "substract",
9-
"summation", "multiply", "division", "scale", "cumsum"]
11+
"summation", "multiply", "division", "scale", "cumsum",
12+
"fractional_diff", "port_fractional_diff", "get_weights_floored"]

gquant/cuindicator/frac_diff.py

+274
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
import numba
2+
import cmath
3+
import numpy as np
4+
from numba import cuda
5+
from gquant.cuindicator.util import port_mask_nan
6+
7+
__all__ = ["fractional_diff", "get_weights_floored", "port_fractional_diff"]
8+
9+
10+
def get_weights_floored(d, num_k, floor=1e-3):
11+
r"""Calculate weights ($w$) for each lag ($k$) through
12+
$w_k = -w_{k-1} \frac{d - k + 1}{k}$ provided weight above a minimum value
13+
(floor) for the weights to prevent computation of weights for the entire
14+
time series.
15+
16+
Args:
17+
d (int): differencing value.
18+
num_k (int): number of lags (typically length of timeseries) to
19+
calculate w.
20+
floor (float): minimum value for the weights for computational
21+
efficiency.
22+
"""
23+
w_k = np.array([1])
24+
k = 1
25+
26+
while k < num_k:
27+
w_k_latest = -w_k[-1] * ((d - k + 1)) / k
28+
if abs(w_k_latest) <= floor:
29+
break
30+
31+
w_k = np.append(w_k, w_k_latest)
32+
33+
k += 1
34+
35+
w_k = w_k.reshape(-1, 1)
36+
37+
return w_k
38+
39+
40+
@cuda.jit(device=True)
41+
def conv_window(shared, history_len, out_arr, window_size,
42+
arr_len, offset, offset2, min_size):
43+
"""
44+
This function is to do convolution for one thread
45+
46+
Arguments:
47+
------
48+
shared: numba.cuda.DeviceNDArray
49+
3 chunks of data are stored in the shared memory
50+
the first [0, window_size) elements is the chunk of data that is
51+
necessary to compute the first convolution element.
52+
then [window_size, window_size + thread_tile * blockDim) elements
53+
are the inputs allocated for this block of threads
54+
the last [window_size + thread_tile,
55+
window_size + thread_tile + window_size) is to store the kernel values
56+
history_len: int
57+
total number of historical elements available for this chunk of data
58+
out_arr: numba.cuda.DeviceNDArray
59+
output gpu_array of size of `thread_tile`
60+
window_size: int
61+
the number of elements in the kernel
62+
arr_len: int
63+
the chunk array length, same as `thread_tile`
64+
offset: int
65+
indicate the starting index of the chunk array in the shared for
66+
this thread.
67+
offset: int
68+
indicate the starting position of the weights/kernel array
69+
min_size: int
70+
the minimum number of non-na elements
71+
"""
72+
for i in range(arr_len):
73+
if i + history_len < window_size-1:
74+
out_arr[i] = np.nan
75+
else:
76+
s = 0.0
77+
average_size = 0
78+
for j in range(0, window_size):
79+
if not (cmath.isnan(
80+
shared[offset + i - j])):
81+
s += (shared[offset + i - j] *
82+
shared[offset2 + window_size - 1 - j])
83+
average_size += 1
84+
if average_size >= min_size:
85+
out_arr[i] = s
86+
else:
87+
out_arr[i] = np.nan
88+
89+
90+
@cuda.jit
91+
def kernel(in_arr, weight_arr, out_arr, window,
92+
arr_len, thread_tile, min_size):
93+
"""
94+
This kernel is to do 1D convlution on `in_arr` array with `weight_arr`
95+
as kernel. The results is saved on `out_arr`.
96+
97+
Arguments:
98+
------
99+
in_arr: numba.cuda.DeviceNDArray
100+
input gpu array
101+
weight_arr: numba.cuda.DeviceNDArray
102+
convolution kernel gpu array
103+
out_arr: numba.cuda.DeviceNDArray
104+
output gpu_array
105+
window: int
106+
the number of elements in the weight_arr
107+
arr_len: int
108+
the input/output array length
109+
thread_tile: int
110+
each thread is responsible for `thread_tile` number of elements
111+
min_size: int
112+
the minimum number of non-na elements
113+
"""
114+
shared = cuda.shared.array(shape=0,
115+
dtype=numba.float64)
116+
block_size = cuda.blockDim.x # total number of threads
117+
tx = cuda.threadIdx.x
118+
# Block id in a 1D grid
119+
bid = cuda.blockIdx.x
120+
starting_id = bid * block_size * thread_tile
121+
122+
# copy the thread_tile * number_of_thread_per_block into the shared
123+
for j in range(thread_tile):
124+
offset = tx + j * block_size
125+
if (starting_id + offset) < arr_len:
126+
shared[offset + window - 1] = in_arr[
127+
starting_id + offset]
128+
cuda.syncthreads()
129+
130+
# copy the window - 1 into the shared
131+
for j in range(0, window - 1, block_size):
132+
if (((tx + j) <
133+
window - 1) and (
134+
starting_id - window + 1 + tx + j >= 0)):
135+
shared[tx + j] = \
136+
in_arr[starting_id - window + 1 + tx + j]
137+
cuda.syncthreads()
138+
# copy the weights into the shared
139+
for j in range(0, window, block_size):
140+
element_id = tx + j
141+
if (((tx + j) < window) and (element_id < window)):
142+
shared[thread_tile * block_size + window - 1 + tx +
143+
j] = weight_arr[tx + j]
144+
cuda.syncthreads()
145+
# slice the shared memory for each threads
146+
start_shared = tx * thread_tile
147+
his_len = min(window - 1,
148+
starting_id + tx * thread_tile)
149+
# slice the global memory for each threads
150+
start = starting_id + tx * thread_tile
151+
end = min(starting_id + (tx + 1) * thread_tile, arr_len)
152+
sub_outarr = out_arr[start:end]
153+
sub_len = end - start
154+
conv_window(shared, his_len, sub_outarr,
155+
window, sub_len,
156+
window - 1 + start_shared,
157+
thread_tile * block_size + window - 1,
158+
min_size)
159+
160+
161+
def fractional_diff(input_arr, d=0.5, floor=1e-3, min_periods=None,
162+
thread_tile=2, number_of_threads=512):
163+
"""
164+
The fractional difference computation method.
165+
166+
Arguments:
167+
-------
168+
input_arr: numba.cuda.DeviceNDArray or cudf.Series
169+
the input array to compute the fractional difference
170+
d: float
171+
the differencing value. range from 0 to 1
172+
floor: float
173+
minimum value for the weights for computational efficiency.
174+
min_periods: int
175+
default the lengths of the weights. Need at least min_periods of
176+
non-na elements to get fractional difference value
177+
thread_tile: int
178+
each thread will be responsible for `thread_tile` number of
179+
elements in window computation
180+
number_of_threads: int
181+
number of threads in a block for CUDA computation
182+
183+
Returns
184+
-------
185+
(numba.cuda.DeviceNDArray, np.array)
186+
the computed fractional difference array and the weight array tuple
187+
188+
"""
189+
if isinstance(input_arr, numba.cuda.cudadrv.devicearray.DeviceNDArray):
190+
gpu_in = input_arr
191+
else:
192+
gpu_in = input_arr.data.to_gpu_array()
193+
194+
# compute the weights for the fractional difference
195+
weights = get_weights_floored(d=d,
196+
num_k=len(input_arr),
197+
floor=floor)[::-1, 0]
198+
weights_out = np.ascontiguousarray(weights)
199+
weights = numba.cuda.to_device(weights_out)
200+
201+
window = len(weights)
202+
203+
if min_periods is None:
204+
min_periods = window
205+
else:
206+
min_periods = min_periods
207+
208+
number_of_threads = number_of_threads
209+
array_len = len(gpu_in)
210+
211+
# allocate the output array
212+
gpu_out = numba.cuda.device_array_like(gpu_in)
213+
214+
number_of_blocks = \
215+
(array_len + (number_of_threads * thread_tile - 1)) // \
216+
(number_of_threads * thread_tile)
217+
218+
shared_buffer_size = (number_of_threads * thread_tile +
219+
window - 1 + window)
220+
221+
# call the conv kernel
222+
kernel[(number_of_blocks,),
223+
(number_of_threads,),
224+
0,
225+
shared_buffer_size * 8](gpu_in,
226+
weights,
227+
gpu_out,
228+
window,
229+
array_len,
230+
thread_tile,
231+
min_periods)
232+
return gpu_out, weights_out
233+
234+
235+
def port_fractional_diff(asset_indicator, input_arr, d=0.5, floor=1e-3,
236+
min_periods=None, thread_tile=2,
237+
number_of_threads=512):
238+
"""
239+
Calculate the fractional differencing signal for all the financial
240+
assets indicated by asset_indicator.
241+
242+
243+
Arguments:
244+
-------
245+
asset_indicator: cudf.Series
246+
the integer indicator array to indicate the start of the different
247+
asset
248+
input_arr: numba.cuda.DeviceNDArray or cudf.Series
249+
the input array to compute the fractional difference
250+
d: float
251+
the differencing value. range from 0 to 1
252+
floor: float
253+
minimum value for the weights for computational efficiency.
254+
min_periods: int
255+
default the lengths of the weights. Need at least min_periods of
256+
non-na elements to get fractional difference value
257+
thread_tile: int
258+
each thread will be responsible for `thread_tile` number of
259+
elements in window computation
260+
number_of_threads: int
261+
number of threads in a block for CUDA computation
262+
263+
Returns
264+
-------
265+
(numba.cuda.DeviceNDArray, np.array)
266+
the computed fractional difference array and the weight array tuple
267+
"""
268+
out, weights = fractional_diff(input_arr, d=d, floor=floor,
269+
min_periods=min_periods,
270+
thread_tile=thread_tile,
271+
number_of_threads=number_of_threads)
272+
port_mask_nan(asset_indicator.data.to_gpu_array(), out, 0,
273+
len(weights) - 1)
274+
return out, weights

gquant/dataframe_flow/taskGraph.py

+10
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ def __init__(self, task_spec_list=None):
4141
for task_spec in task_spec_list:
4242
self.__task_list.append(Task(task_spec))
4343

44+
def extend(self, task_spec_list=None):
45+
'''
46+
Add more task-spec dicts to the graph
47+
48+
:param task_spec_list: List of task-spec dicts per TaskSpecSchema.
49+
'''
50+
if task_spec_list is not None:
51+
for task_spec in task_spec_list:
52+
self.__task_list.append(Task(task_spec))
53+
4454
def __len__(self):
4555
return len(self.__task_list)
4656

gquant/plugin_nodes/transform/indicatorNode.py

+2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ def process(self, inputs):
6161
if 'args' in indicator:
6262
ar = indicator['args']
6363
v = fun(*(parallel+data+ar))
64+
if isinstance(v, tuple):
65+
v = v[0]
6466
if isinstance(v, tuple) and 'outputs' in indicator:
6567
for out in indicator['outputs']:
6668
out_col = self._compose_name(indicator, [out])

0 commit comments

Comments
 (0)