Source code for functions.detrend

"""Detrend models."""
import numpy as np
import pandas as pd
from scipy import interpolate
from sklearn.linear_model import LinearRegression
from statsmodels.tsa.deterministic import DeterministicProcess

from src.functions.detrend_fancy_plot import _fancy_plot


[docs] class BaseDetrend: """Base class for all detrend models.""" def __init__(self, method_name: str) -> None: self.y_predict: np.ndarray = np.array([]) self.y_original: np.ndarray = np.array([]) self.fitted_values: np.ndarray = np.array([]) self.fitted_parameters: dict[str, [int | float]] = {} self.method_name: str = method_name
[docs] def predict(self, y: np.ndarray) -> np.ndarray: """_summary_ Args: y (np.ndarray): 1 dimensional array of same length as y_original Returns: np.ndarray: detrended values, 1 dimensional array of length len(y) """ self.y_predict = y - self.fitted_values return self.y_predict
[docs] def fancy_plot(self, xticklabels: pd.core.indexes.base.Index | None = None) -> None: """Plot two graphs: 1. the original data and its fitted trend curve; 2. the detrended data Args: xticklabels (pd.core.indexes.base.Index | None, optional): the date index of the imported financial data. Defaults to None. """ _fancy_plot( y_original=self.y_original, y_fitted=self.fitted_values, y_detrend=self.y_predict, fitted_parameters=self.fitted_parameters, xticklabels=xticklabels, method_name=self.method_name, )
[docs] class LinearRegressionDetrend(BaseDetrend): def __init__(self) -> None: super().__init__("linear regression")
[docs] def fit(self, y: np.ndarray | pd.DataFrame) -> np.ndarray: """_summary_ Args: y (np.ndarray): time series 1 dimensional array """ # Create deterministic process (X) dp = DeterministicProcess( index=np.arange(len(y)), # dates from the training data constant=True, # dummy feature for the bias (y_intercept) order=1, # order of the time dummy (trend) drop=False, # drop terms if necessary to avoid collinearity ) # `in_sample` creates features for the dates given in the `index` argument X_dp = dp.in_sample() # Convert data and fit the linear regression X = np.array(X_dp) y = np.array(y) model = LinearRegression() model.fit(X, y) y_predict = model.predict(X) self.y_original = y self.fitted_values = np.array(y_predict).ravel()
[docs] class PolynomialRegressionDetrend(BaseDetrend): def __init__(self, order: int = 3, n_segments: int = 5) -> None: super().__init__("polynomial regression") self.fitted_parameters = { "Polynomial order": order, "Number of segments": n_segments, } self.order = order self.n_segments = n_segments
[docs] def fit(self, y: np.ndarray | pd.DataFrame) -> np.ndarray: """_summary_ Args: y (np.ndarray): time series 1 dimensional array """ # Create deterministic process (X) dp = DeterministicProcess( index=np.arange(len(y)), # dates from the training data constant=True, # dummy feature for the bias (y_intercept) order=self.order, # order of the time dummy (trend) drop=False, # drop terms if necessary to avoid collinearity ) # `in_sample` creates features for the dates given in the `index` argument X_dp = dp.in_sample() # Convert data X = np.array(X_dp) y = np.array(y) # Create segments segment_length = len(y) // self.n_segments y_segments = [ y[i : i + segment_length] for i in range(0, len(y), segment_length) ] X_segments = [ X[i : i + segment_length, :] for i in range(0, len(y), segment_length) ] # Fit and predict for each segment y_pred_segments = np.array([]) for X_segment, y_segment in zip(X_segments, y_segments): model = LinearRegression() model.fit(X_segment, y_segment) y_pred_segment = model.predict(X_segment) y_pred_segments = np.append(y_pred_segments, y_pred_segment) self.y_original = y self.fitted_values = np.array(y_pred_segments).ravel()
[docs] class LinearMADetrend(BaseDetrend): def __init__(self, window: int = 100) -> None: super().__init__("linear mobile average") self.fitted_parameters = {"Time span": window} self.window = window
[docs] def fit(self, y: np.ndarray | pd.DataFrame) -> np.ndarray: """ Returns fitted values with the linear mobile average method """ linear_MA = ( pd.DataFrame(y) .rolling(center=True, window=self.window, min_periods=1) .mean() ) self.y_original = y self.fitted_values = np.array(linear_MA).ravel()
[docs] class ExponentialMADetrend(BaseDetrend): def __init__(self, alpha: float = 0.05) -> None: super().__init__("exponential mobile average") self.fitted_parameters = {"Alpha": alpha} self.alpha = alpha
[docs] def fit(self, y: np.ndarray | pd.DataFrame) -> np.ndarray: """ Returns fitted values with the exponential mobile average method """ expo_MA = pd.DataFrame(y).ewm(alpha=self.alpha, adjust=False).mean() self.y_original = y self.fitted_values = np.array(expo_MA).ravel()
[docs] class BSplinesDetrend(BaseDetrend): def __init__(self, interval_length: int = 10, degree: int = 3) -> None: super().__init__("B-splines") self.fitted_parameters = { "Interval length": interval_length, "Degree": degree, } self.interval_length = interval_length self.degree = degree
[docs] def fit(self, y: np.ndarray | pd.DataFrame) -> None: """Fit BSplines to price series Args: y (np.ndarray | pd.DataFrame): Price series """ # Define time index starting from 0 time_index = np.arange(len(y)) # Define knots and corresponding price knots = np.arange(0, len(y), self.interval_length) price_series = np.array(y) knots_price = price_series[knots] # Define t, c, k parameters of scipy interpolate function t, c, k = interpolate.splrep(x=knots, y=knots_price, k=self.degree) # Interpolate the prices spline = interpolate.BSpline(t, c, k) y_interpolate = spline(time_index) self.y_original = y self.fitted_values = np.array(y_interpolate).ravel()