NumPy版本的“指数加权移动平均线”,等效于pandas.ewm()。mean()

浏览:41日期:2024-03-05
如何解决NumPy版本的“指数加权移动平均线”,等效于pandas.ewm()。mean()?

此功能等效于pandas’ewm(adjust=False).mean(),但速度更快。ewm(adjust=True).mean()(熊猫的默认设置)可以在结果开始时产生不同的值。我正在努力adjust为该解决方案添加功能。

当输入太大时,@Divakar的答案会导致浮点精度问题。这是因为,(1-alpha)**(n+1) -> 0当n ->inf和时alpha -> 1,会导致被零除,并且NaN在计算中会弹出值。

我想我终于破解了!

这是numpy_ewma功能的向量化版本,据称它可以从@RaduS’s post-产生正确的结果

def numpy_ewma_vectorized(data, window): alpha = 2 /(window + 1.0) alpha_rev = 1-alpha scale = 1/alpha_rev n = data.shape[0] r = np.arange(n) scale_arr = scale**r offset = data[0]*alpha_rev**(r+1) pw0 = alpha*alpha_rev**(n-1) mult = data*pw0*scale_arr cumsums = mult.cumsum() out = offset + cumsums*scale_arr[::-1] return out

进一步提升

我们可以通过重复使用一些代码来进一步增强它,例如-

def numpy_ewma_vectorized_v2(data, window): alpha = 2 /(window + 1.0) alpha_rev = 1-alpha n = data.shape[0] pows = alpha_rev**(np.arange(n+1)) scale_arr = 1/pows[:-1] offset = data[0]*pows[1:] pw0 = alpha*alpha_rev**(n-1) mult = data*pw0*scale_arr cumsums = mult.cumsum() out = offset + cumsums*scale_arr[::-1] return out

运行时测试

让我们将这两个时间与大型数据集的相同循环函数进行比较。

In [97]: data = np.random.randint(2,9,(5000)) ...: window = 20 ...:In [98]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized(data, window))Out[98]: TrueIn [99]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized_v2(data, window))Out[99]: TrueIn [100]: %timeit numpy_ewma(data, window)100 loops, best of 3: 6.03 ms per loopIn [101]: %timeit numpy_ewma_vectorized(data, window)1000 loops, best of 3: 665 µs per loopIn [102]: %timeit numpy_ewma_vectorized_v2(data, window)1000 loops, best of 3: 357 µs per loopIn [103]: 6030/357.0Out[103]: 16.89075630252101

加速约17倍!

这是我最快的解决方案,几乎没有向量问题,几乎没有向量化。它有点复杂,但是性能却很好,尤其是对于非常庞大的输入。不使用就地计算(可以使用out参数来节省内存分配时间):在相当老的PC上,对于100M元素输入向量为3.62秒,对于100K元素输入向量为3.2ms,对于5000个元素输入向量为293µs(结果会因alpha/row_size值而异)。

# tested with python3 & numpy 1.15.2import numpy as npdef ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order=’C’, out=None): ''' Reshapes data before calculating EWMA, then iterates once over the rows to calculate the offset without precision issues :param data: Input data, will be flattened. :param alpha: scalar float in range (0,1)The alpha parameter for the moving average. :param row_size: int, optionalThe row size to use in the computation. High row sizes need higher precision,low values will impact performance. The optimal value depends on theplatform and the alpha being used. Higher alpha values require lowerrow size. Default depends on dtype. :param dtype: optionalData type used for calculations. Defaults to float64 unlessdata.dtype is float32, then it will use float32. :param order: {’C’, ’F’, ’A’}, optionalOrder to use when flattening the data. Defaults to ’C’. :param out: ndarray, or None, optionalA location into which the result is stored. If provided, it must havethe same shape as the desired output. If not provided or `None`,a freshly-allocated array is returned. :return: The flattened result. ''' data = np.array(data, copy=False) if dtype is None:if data.dtype == np.float32: dtype = np.float32else: dtype = np.float else:dtype = np.dtype(dtype) row_size = int(row_size) if row_size is not Noneelse get_max_row_size(alpha, dtype) if data.size <= row_size:# The normal function can handle this input, use thatreturn ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out) if data.ndim > 1:# flatten inputdata = np.reshape(data, -1, order=order) if out is None:out = np.empty_like(data, dtype=dtype) else:assert out.shape == data.shapeassert out.dtype == dtype row_n = int(data.size // row_size) # the number of rows to use trailing_n = int(data.size % row_size) # the amount of data leftover first_offset = data[0] if trailing_n > 0:# set temporary results to slice view of out parameterout_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size)) else:out_main_view = outdata_main_view = data # get all the scaled cumulative sums with 0 offset ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype, order=’C’, out=out_main_view) scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1) last_scaling_factor = scaling_factors[-1] # create offset array offsets = np.empty(out_main_view.shape[0], dtype=dtype) offsets[0] = first_offset # iteratively calculate offset for each row for i in range(1, out_main_view.shape[0]):offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1] # add the offsets to the result out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :] if trailing_n > 0:# process trailing data in the 2nd slice of the out parameterewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1],dtype=dtype, order=’C’, out=out[-trailing_n:]) return outdef get_max_row_size(alpha, dtype=float): assert 0. <= alpha < 1. # This will return the maximum row size possible on # your platform for the given dtype. I can find no impact on accuracy # at this value on my machine. # Might not be the optimal value for speed, which is hard to predict # due to numpy’s optimizations # Use np.finfo(dtype).eps if you are worried about accuracy # and want to be extra safe. epsilon = np.finfo(dtype).tiny # If this produces an OverflowError, make epsilon larger return int(np.log(epsilon)/np.log(1-alpha)) + 1

一维ewma函数:

def ewma_vectorized(data, alpha, offset=None, dtype=None, order=’C’, out=None): ''' Calculates the exponential moving average over a vector. Will fail for large inputs. :param data: Input data :param alpha: scalar float in range (0,1)The alpha parameter for the moving average. :param offset: optionalThe offset for the moving average, scalar. Defaults to data[0]. :param dtype: optionalData type used for calculations. Defaults to float64 unlessdata.dtype is float32, then it will use float32. :param order: {’C’, ’F’, ’A’}, optionalOrder to use when flattening the data. Defaults to ’C’. :param out: ndarray, or None, optionalA location into which the result is stored. If provided, it must havethe same shape as the input. If not provided or `None`,a freshly-allocated array is returned. ''' data = np.array(data, copy=False) if dtype is None:if data.dtype == np.float32: dtype = np.float32else: dtype = np.float64 else:dtype = np.dtype(dtype) if data.ndim > 1:# flatten inputdata = data.reshape(-1, order) if out is None:out = np.empty_like(data, dtype=dtype) else:assert out.shape == data.shapeassert out.dtype == dtype if data.size < 1:# empty input, return empty arrayreturn out if offset is None:offset = data[0] alpha = np.array(alpha, copy=False).astype(dtype, copy=False) # scaling_factors -> 0 as len(data) gets large # this leads to divide-by-zeros below scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype), dtype=dtype) # create cumulative sum array np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1],dtype=dtype, out=out) np.cumsum(out, dtype=dtype, out=out) # cumsums / scaling out /= scaling_factors[-2::-1] if offset != 0:offset = np.array(offset, copy=False).astype(dtype, copy=False)# add offsetsout += offset * scaling_factors[1:] return out

2D ewma函数:

def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order=’C’, out=None): ''' Calculates the exponential moving average over a given axis. :param data: Input data, must be 1D or 2D array. :param alpha: scalar float in range (0,1)The alpha parameter for the moving average. :param axis: The axis to apply the moving average on.If axis==None, the data is flattened. :param offset: optionalThe offset for the moving average. Must be scalar or avector with one element for each row of data. If set to None,defaults to the first value of each row. :param dtype: optionalData type used for calculations. Defaults to float64 unlessdata.dtype is float32, then it will use float32. :param order: {’C’, ’F’, ’A’}, optionalOrder to use when flattening the data. Ignored if axis is not None. :param out: ndarray, or None, optionalA location into which the result is stored. If provided, it must havethe same shape as the desired output. If not provided or `None`,a freshly-allocated array is returned. ''' data = np.array(data, copy=False) assert data.ndim <= 2 if dtype is None:if data.dtype == np.float32: dtype = np.float32else: dtype = np.float64 else:dtype = np.dtype(dtype) if out is None:out = np.empty_like(data, dtype=dtype) else:assert out.shape == data.shapeassert out.dtype == dtype if data.size < 1:# empty input, return empty arrayreturn out if axis is None or data.ndim < 2:# use 1D versionif isinstance(offset, np.ndarray): offset = offset[0]return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order, out=out) assert -data.ndim <= axis < data.ndim # create reshaped data views out_view = out if axis < 0:axis = data.ndim - int(axis) if axis == 0:# transpose data views so columns are treated as rowsdata = data.Tout_view = out_view.T if offset is None:# use the first element of each row as the offsetoffset = np.copy(data[:, 0]) elif np.size(offset) == 1:offset = np.reshape(offset, (1,)) alpha = np.array(alpha, copy=False).astype(dtype, copy=False) # calculate the moving average row_size = data.shape[1] row_n = data.shape[0] scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype), dtype=dtype) # create a scaled cumulative sum array np.multiply(data,np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype), dtype=dtype)/ scaling_factors[np.newaxis, :-1],dtype=dtype, out=out_view ) np.cumsum(out_view, axis=1, dtype=dtype, out=out_view) out_view /= scaling_factors[np.newaxis, -2::-1] if not (np.size(offset) == 1 and offset == 0):offset = offset.astype(dtype, copy=False)# add the offsets to the scaled cumulative sumsout_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:] return out

用法:

data_n = 100000000data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100span = 5000 # span >= 1alpha = 2/(span+1) # for pandas` span parameter# com = 1000 # com >= 0# alpha = 1/(1+com) # for pandas` center-of-mass parameter# halflife = 100 # halflife > 0# alpha = 1 - np.exp(np.log(0.5)/halflife) # for pandas` half-life parameterresult = ewma_vectorized_safe(data, alpha)

根据给定alpha窗口中数据对平均值的贡献,很容易为给定的计算“窗口大小”(技术指数平均值具有无限的“窗口”)。例如,这对于选择由于边界效应而将结果的起始部分视为不可靠的情况很有用。

def window_size(alpha, sum_proportion): # Increases with increased sum_proportion and decreased alpha # solve (1-alpha)**window_size = (1-sum_proportion) for window_size return int(np.log(1-sum_proportion) / np.log(1-alpha))alpha = 0.02sum_proportion = .99 # window covers 99% of contribution to the moving averagewindow = window_size(alpha, sum_proportion) # = 227sum_proportion = .75 # window covers 75% of contribution to the moving averagewindow = window_size(alpha, sum_proportion) # = 68

alpha = 2 / (window_size +1.0)此线程中使用的关系(pandas的’span’选项)与上述函数(带有sum_proportion~=0.87)的逆函数非常近似。alpha = 1 -np.exp(np.log(1-sum_proportion)/window_size)更准确(pandas的“半衰期”选项等于sum_proportion=0.5)。

在下面的示例中,data代表一个连续的噪声信号。cutoff_idx是第一个位置,result其中至少99%的值取决于其中的单独值data(即小于1%的值取决于data[0])。直到cutoff_idx最终的数据都被排除在数据之外,因为它太依赖于中的第一个值data,因此可能会扭曲平均值。

result = ewma_vectorized_safe(data, alpha, chunk_size)sum_proportion = .99cutoff_idx = window_size(alpha, sum_proportion)result = result[cutoff_idx:]

为了说明上面解决的问题,您可以运行几次,请注意,红线经常出现的错误的开始,在以下位置被跳过cutoff_idx:

data_n = 100000data = np.random.rand(data_n) * 100window = 1000sum_proportion = .99alpha = 1 - np.exp(np.log(1-sum_proportion)/window)result = ewma_vectorized_safe(data, alpha)cutoff_idx = window_size(alpha, sum_proportion)x = np.arange(start=0, stop=result.size)import matplotlib.pyplot as pltplt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], ’-r’, x[cutoff_idx:], result[cutoff_idx:], ’-b’)plt.show()

请注意,cutoff_idx==window因为alpha是用window_size()函数的反函数设置的,所以具有sum_proportion。这类似于大熊猫的用法ewm(span=window,min_periods=window)。

解决方法

像下面的熊猫一样,如何在NumPy中获得指数加权移动平均值?

import pandas as pdimport pandas_datareader as pdrfrom datetime import datetime# Declare variablesibm = pdr.get_data_yahoo(symbols=’IBM’,start=datetime(2000,1,1),end=datetime(2012,1)).reset_index(drop=True)[’Adj Close’]windowSize = 20# Get PANDAS exponential weighted moving averageewm_pd = pd.DataFrame(ibm).ewm(span=windowSize,min_periods=windowSize).mean().as_matrix()print(ewm_pd)

我用NumPy尝试了以下

import numpy as npimport pandas_datareader as pdrfrom datetime import datetime# From this post: http://stackoverflow.com/a/40085052/3293881 by @Divakardef strided_app(a,L,S): # Window len = L,Stride len/stepsize = S nrows = ((a.size - L) // S) + 1 n = a.strides[0] return np.lib.stride_tricks.as_strided(a,shape=(nrows,L),strides=(S * n,n))def numpyEWMA(price,windowSize): weights = np.exp(np.linspace(-1.,0.,windowSize)) weights /= weights.sum() a2D = strided_app(price,windowSize,1) returnArray = np.empty((price.shape[0])) returnArray.fill(np.nan) for index in (range(a2D.shape[0])):returnArray[index + windowSize-1] = np.convolve(weights,a2D[index])[windowSize - 1:-windowSize + 1] return np.reshape(returnArray,(-1,1))# Declare variablesibm = pdr.get_data_yahoo(symbols=’IBM’,1)).reset_index(drop=True)[’Adj Close’]windowSize = 20# Get NumPy exponential weighted moving averageewma_np = numpyEWMA(ibm,windowSize)print(ewma_np)

但是结果却与大熊猫不同。

是否有更好的方法直接在NumPy中计算指数加权移动平均值并获得与完全相同的结果pandas.ewm().mean()?

在对熊猫解决方案提出60,000个请求时,我得到了大约230秒。我敢肯定,使用纯NumPy可以大大减少这种情况。

相关文章: