Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 119 additions & 28 deletions aeon/forecasting/stats/_arima.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ARIMA(BaseForecaster, IterativeForecastingMixin):

_tags = {
"capability:horizon": False, # cannot fit to a horizon other than 1
"capability:exogenous": True, # can handle exogenous variables
}

def __init__(
Expand All @@ -84,6 +85,9 @@ def __init__(
self.aic_ = 0
self._model = []
self._parameters = []
self.exog_ = None
self.beta_ = None
self.exog_n_features_ = None
super().__init__(horizon=1, axis=1)

def _fit(self, y, exog=None):
Expand All @@ -94,19 +98,39 @@ def _fit(self, y, exog=None):
y : np.ndarray
A time series on which to learn a forecaster to predict horizon ahead
exog : np.ndarray, default =None
Not allowed for this forecaster
Optional exogenous time series data aligned with y. If provided, an
OLS regression (with intercept) is fit and the ARIMA model is fit on
the residual series (y - X beta).

Returns
-------
self
Fitted ARIMA.
"""
self._series = np.array(y.squeeze(), dtype=np.float64)
series_for_arima = self._series.copy()

if exog is not None:
exog = np.asarray(exog)
if exog.ndim == 1:
exog = exog.reshape(-1, 1)
if len(exog) != len(self._series):
raise ValueError("exog must have the same number of rows as y")
self.exog_ = exog
self.exog_n_features_ = exog.shape[1]
X = np.column_stack([np.ones(len(self._series)), exog])
self.beta_ = np.linalg.lstsq(X, self._series, rcond=None)[0]
series_for_arima = self._series - X @ self.beta_
else:
self.beta_ = None
self.exog_ = None
self.exog_n_features_ = None

# Model is an array of the (c,p,q)
self._model = np.array(
(1 if self.use_constant else 0, self.p, self.q), dtype=np.int32
)
self._differenced_series = np.diff(self._series, n=self.d)
self._differenced_series = np.diff(series_for_arima, n=self.d)
s = 0.1 / (np.sum(self._model) + 1) # Randomise
# Nelder Mead returns the parameters in a single array
(self._parameters, self.aic_) = nelder_mead(
Expand Down Expand Up @@ -152,7 +176,14 @@ def _predict(self, y, exog=None):
A time series to predict the value of. y can be independent of the series
seen in fit.
exog : np.ndarray, default =None
Optional exogenous time series data assumed to be aligned with y
Optional exogenous time series data. If the model was fit with exogenous
variables, a regression contribution will be added to the ARIMA forecast.
For one-step forecasting, `exog` may be:
- a 1D array representing the single future exog row (n_features,)
- a 2D array with shape (n_obs, n_features), in which case the last row
will be used as the future exog row.
- a 2D array aligned with `y` (same number of rows);
last row will be used.

Returns
-------
Expand Down Expand Up @@ -194,6 +225,36 @@ def _predict(self, y, exog=None):

forecast_diff = c + ar_forecast + ma_forecast

reg_component = 0.0
if self.beta_ is not None:
if exog is None:
raise ValueError(
"exog must be provided for prediction"
"when model was fit with exogenous variables"
)
exog_arr = np.asarray(exog)
if exog_arr.ndim == 1:
if exog_arr.shape[0] == len(y):
exog_row = np.atleast_1d(exog_arr[-1])
elif exog_arr.shape[0] == 1:
exog_row = np.atleast_1d(exog_arr.reshape(1, -1)[0])
else:
exog_row = np.atleast_1d(exog_arr.reshape(-1, 1)[-1])
else:
exog_row = exog_arr[-1]
exog_row = np.asarray(exog_row).reshape(-1)
if (
self.exog_n_features_ is not None
and exog_row.shape[0] != self.exog_n_features_
):
raise ValueError(
f"exog must have {self.exog_n_features_} features,"
f" got {exog_row.shape[0]}"
)
Xf = np.concatenate(([1.0], exog_row), axis=0)
reg_component = float(Xf @ self.beta_)
forecast_diff = forecast_diff + reg_component

# Undifference the forecast
if self.d == 0:
return forecast_diff
Expand All @@ -207,42 +268,63 @@ def _forecast(self, y, exog=None):
self._fit(y, exog)
return float(self.forecast_)

def iterative_forecast(self, y, prediction_horizon):
def iterative_forecast(self, y, prediction_horizon, exog=None):
self.fit(y)
n = len(self._differenced_series)
p, q = self.p, self.q
phi, theta = self.phi_, self.theta_
h = prediction_horizon
c = 0.0
if self.use_constant:
c = self.c_
p, q, d = self.p, self.q, self.d
phi, theta = self.phi_, self.theta_
c = self.c_ if self.use_constant else 0.0

if self.beta_ is not None:
if exog is None:
raise ValueError(
"Future exogenous values must be provided"
" for multi-step forecasting."
)
exog = np.asarray(exog)
if exog.ndim == 1:
exog = exog.reshape(-1, self.exog_n_features_)
if exog.shape[0] != h:
raise ValueError("Future exog must have prediction_horizon rows.")
if exog.shape[1] != self.exog_n_features_:
raise ValueError(
f"Future exog must have {self.exog_n_features_} columns."
)
future_exog = exog
else:
future_exog = None

# Start with a copy of the original series and residuals
residuals = np.zeros(len(self.residuals_) + h)
residuals[: len(self.residuals_)] = self.residuals_
n = len(self._differenced_series)
forecast_series = np.zeros(n + h)
forecast_series[:n] = self._differenced_series

residuals = np.zeros(len(self.residuals_) + h)
residuals[: len(self.residuals_)] = self.residuals_

for i in range(h):
# Get most recent p values (lags)

t = n + i
ar_term = 0.0
if p > 0:
ar_term = np.dot(phi, forecast_series[t - np.arange(1, p + 1)])
# Get most recent q residuals (lags)
ma_term = 0.0
if q > 0:
ma_term = np.dot(theta, residuals[t - np.arange(1, q + 1)])
ar_term = (
np.dot(phi, forecast_series[t - np.arange(1, p + 1)]) if p > 0 else 0.0
)
ma_term = (
np.dot(theta, residuals[t - np.arange(1, q + 1)]) if q > 0 else 0.0
)
next_value = c + ar_term + ma_term
# Append prediction and a zero residual (placeholder)
forecast_series[n + i] = next_value
# Can't compute real residual during prediction, leave as zero

# Correct differencing using forecast values
if future_exog is not None:
Xf = np.concatenate(([1.0], future_exog[i]))
next_value += float(Xf @ self.beta_)

forecast_series[t] = next_value

y_forecast_diff = forecast_series[n : n + h]
if self.d == 0:

if d == 0:
return y_forecast_diff
else:
return _undifference(y_forecast_diff, self._series[-self.d :])[self.d :]
restored = _undifference(y_forecast_diff, self._series[-d:])
return restored[d:]


class AutoARIMA(BaseForecaster, IterativeForecastingMixin):
Expand Down Expand Up @@ -280,6 +362,11 @@ class AutoARIMA(BaseForecaster, IterativeForecastingMixin):
481.87157356139943
"""

_tags = {
"capability:exogenous": True,
"capability:horizon": False,
}

def __init__(self, max_p=3, max_d=3, max_q=2):
self.max_p = max_p
self.max_d = max_d
Expand Down Expand Up @@ -336,7 +423,11 @@ def _fit(self, y, exog=None):
) = model
self.constant_term_ = constant_term_int == 1
self.final_model_ = ARIMA(self.p_, self.d_, self.q_, self.constant_term_)
self.final_model_.fit(y)

if exog is not None:
self.final_model_.fit(y, exog=exog)
else:
self.final_model_.fit(y)
self.forecast_ = self.final_model_.forecast_
return self

Expand Down
57 changes: 57 additions & 0 deletions aeon/forecasting/stats/tests/test_arima.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,60 @@ def test_autoarima_forecast_is_consistent_with_wrapped():
forecaster = AutoARIMA()
val = forecaster._forecast(y)
assert np.isclose(val, forecaster.final_model_.forecast_)


def test_arima_with_exog_basic_fit_predict():
"""Test ARIMA fit and predict with exogenous variables."""
y_local = np.arange(50, dtype=float)
exog = np.random.RandomState(42).randn(50, 2)

model = ARIMA(p=1, d=0, q=1)
model.fit(y_local, exog=exog)
pred = model.predict(y_local, exog=exog[-1:].copy())

assert isinstance(pred, float)
assert np.isfinite(pred)


def test_arima_exog_shape_mismatch_raises():
"""Test that exogenous shape mismatches raise ValueError."""
y_local = np.arange(20, dtype=float)
exog = np.random.RandomState(0).randn(20, 3)

model = ARIMA(p=1, d=0, q=1)

with pytest.raises(ValueError):
model.fit(y_local, exog=np.random.randn(10, 3))

model.fit(y_local, exog=exog)

with pytest.raises(ValueError):
model.predict(y_local, exog=np.random.randn(1, 5))


def test_arima_iterative_forecast_with_exog():
"""Test multi-step forecast with future exogenous variables."""
y_local = np.arange(40, dtype=float)
exog = np.random.RandomState(1).randn(40, 2)

model = ARIMA(p=1, d=1, q=1)
model.fit(y_local, exog=exog)

h = 5
future_exog = np.random.RandomState(2).randn(h, 2)
preds = model.iterative_forecast(y_local, prediction_horizon=h, exog=future_exog)

assert preds.shape == (h,)
assert np.all(np.isfinite(preds))


def test_arima_no_exog_backward_compatibility():
"""Test ARIMA works normally when no exogenous variables are provided."""
y_local = np.arange(30, dtype=float)

model = ARIMA(p=1, d=1, q=1)
model.fit(y_local)
pred = model.predict(y_local)

assert isinstance(pred, float)
assert np.isfinite(pred)