From e5df5fc62b5908b34b9269a99598c35ac4acde9a Mon Sep 17 00:00:00 2001 From: skjoracle Date: Tue, 16 Dec 2025 12:38:24 +0530 Subject: [PATCH 01/10] Theta Forecaster added --- ads/model/model_metadata.py | 1 + ads/opctl/operator/lowcode/forecast/const.py | 1 + .../lowcode/forecast/model/factory.py | 2 + .../forecast/model/forecast_datasets.py | 19 +- .../operator/lowcode/forecast/model/theta.py | 435 ++++++++++++++++++ .../operator/lowcode/forecast/schema.yaml | 1 + .../forecast_operator/yaml_schema.rst | 4 +- tests/operators/forecast/test_datasets.py | 1 + tests/operators/forecast/test_errors.py | 1 + tests/operators/forecast/test_explainers.py | 1 + 10 files changed, 454 insertions(+), 12 deletions(-) create mode 100644 ads/opctl/operator/lowcode/forecast/model/theta.py diff --git a/ads/model/model_metadata.py b/ads/model/model_metadata.py index f0428ec9c..995aba0bd 100644 --- a/ads/model/model_metadata.py +++ b/ads/model/model_metadata.py @@ -165,6 +165,7 @@ class Framework(ExtendedEnum): PYOD = "pyod" SPACY = "spacy" PROPHET = "prophet" + THETA = "theta" SKTIME = "sktime" STATSMODELS = "statsmodels" CUML = "cuml" diff --git a/ads/opctl/operator/lowcode/forecast/const.py b/ads/opctl/operator/lowcode/forecast/const.py index f2265418a..4ad107366 100644 --- a/ads/opctl/operator/lowcode/forecast/const.py +++ b/ads/opctl/operator/lowcode/forecast/const.py @@ -15,6 +15,7 @@ class SupportedModels(ExtendedEnum): NeuralProphet = "neuralprophet" LGBForecast = "lgbforecast" AutoMLX = "automlx" + Theta = "theta" AutoTS = "autots" # Auto = "auto" diff --git a/ads/opctl/operator/lowcode/forecast/model/factory.py b/ads/opctl/operator/lowcode/forecast/model/factory.py index 262fe5bbc..f3f0bb506 100644 --- a/ads/opctl/operator/lowcode/forecast/model/factory.py +++ b/ads/opctl/operator/lowcode/forecast/model/factory.py @@ -23,6 +23,7 @@ from .ml_forecast import MLForecastOperatorModel from .neuralprophet import NeuralProphetOperatorModel from .prophet import ProphetOperatorModel +from .theta import ThetaOperatorModel class UnSupportedModelError(Exception): @@ -46,6 +47,7 @@ class ForecastOperatorModelFactory: SupportedModels.LGBForecast: MLForecastOperatorModel, SupportedModels.AutoMLX: AutoMLXOperatorModel, SupportedModels.AutoTS: AutoTSOperatorModel, + SupportedModels.Theta: ThetaOperatorModel, } @classmethod diff --git a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py index 3019b6839..0c234b906 100644 --- a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py +++ b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py @@ -345,19 +345,18 @@ def populate_series_output( f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) from e + start_idx = output_i.shape[0] - self.horizon - len(fit_val) if (output_i.shape[0] - self.horizon) == len(fit_val): - output_i["fitted_value"].iloc[: -self.horizon] = ( - fit_val # Note: may need to do len(output_i) - (len(fit_val) + horizon) : -horizon - ) + output_i.loc[output_i.index[ + : -self.horizon], "fitted_value"] = fit_val # Note: may need to do len(output_i) - (len(fit_val) + horizon) : -horizon elif (output_i.shape[0] - self.horizon) > len(fit_val): logger.debug( f"Fitted Values were only generated on a subset ({len(fit_val)}/{(output_i.shape[0] - self.horizon)}) of the data for Series: {series_id}." ) - start_idx = output_i.shape[0] - self.horizon - len(fit_val) - output_i["fitted_value"].iloc[start_idx : -self.horizon] = fit_val + output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val else: - output_i["fitted_value"].iloc[start_idx : -self.horizon] = fit_val[ - -(output_i.shape[0] - self.horizon) : + output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val[ + -(output_i.shape[0] - self.horizon): ] if len(forecast_val) != self.horizon: @@ -365,21 +364,21 @@ def populate_series_output( f"Attempting to set forecast along horizon ({self.horizon}) for series: {series_id}, however forecast is only length {len(forecast_val)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i["forecast_value"].iloc[-self.horizon :] = forecast_val + output_i.loc[output_i.index[-self.horizon:], "forecast_value"] = forecast_val if len(upper_bound) != self.horizon: raise ValueError( f"Attempting to set upper_bound along horizon ({self.horizon}) for series: {series_id}, however upper_bound is only length {len(upper_bound)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i[self.upper_bound_name].iloc[-self.horizon :] = upper_bound + output_i.loc[output_i.index[-self.horizon:], self.upper_bound_name] = upper_bound if len(lower_bound) != self.horizon: raise ValueError( f"Attempting to set lower_bound along horizon ({self.horizon}) for series: {series_id}, however lower_bound is only length {len(lower_bound)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i[self.lower_bound_name].iloc[-self.horizon :] = lower_bound + output_i.loc[output_i.index[-self.horizon:], self.lower_bound_name] = lower_bound self.series_id_map[series_id] = output_i self.verify_series_output(series_id) diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py new file mode 100644 index 000000000..9cdaaa4c7 --- /dev/null +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -0,0 +1,435 @@ +#!/usr/bin/env python + +import logging +import traceback +from typing import Dict, Any + +import numpy as np +import optuna +import pandas as pd +from joblib import Parallel, delayed +from sktime.forecasting.base import ForecastingHorizon +from sktime.forecasting.theta import ThetaForecaster +from sktime.performance_metrics.forecasting import mean_squared_error, \ + mean_absolute_percentage_error +from sktime.split import ExpandingWindowSplitter + +from ads.opctl import logger +from ads.opctl.operator.lowcode.common.utils import seconds_to_datetime +from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig +from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, smape) +from .base_model import ForecastOperatorBaseModel +from .forecast_datasets import ForecastDatasets, ForecastOutput +from ..const import ( + SupportedModels, ForecastOutputColumns, DEFAULT_TRIALS, +) + +logging.getLogger("report_creator").setLevel(logging.WARNING) + + +def freq_to_sp(freq: str) -> int | None: + """ + Convert pandas freq string to seasonal period (sp). + """ + if not freq: + return None + + freq = freq.upper() + + # Direct mappings + mapping = { + "M": 12, + "Q": 4, + "A": 1, + "Y": 1, + "W": 52, + "D": 7, + "H": 24, + "T": 1440, + "MIN": 1440, + } + if freq in mapping: + return mapping[freq] + + # Weekly variants (W-MON, W-SUN, etc.) + if freq.startswith("W"): + return 52 + + # Minute frequencies like "5T" or "15MIN" + if freq.endswith("T"): + try: + return 1440 // int(freq[:-1]) + except ValueError: + pass + + if freq.endswith("MIN"): # e.g., "15MIN" + try: + return 1440 // int(freq[:-3]) + except ValueError: + pass + + logger.warning("Unable to infer data frequency and sp") + return None + + +class ThetaOperatorModel(ForecastOperatorBaseModel): + """Theta operator model""" + + def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): + super().__init__(config=config, datasets=datasets) + self.global_explanation = {} + self.local_explanation = {} + + def set_kwargs(self): + """Prepare kwargs for Theta model from spec. + The operator's 'model_kwargs' is respected. + """ + model_kwargs = self.spec.model_kwargs + model_kwargs["alpha"] = self.spec.model_kwargs.get("alpha", None) + model_kwargs["initial_level"] = self.spec.model_kwargs.get("initial_level", None) + model_kwargs["deseasonalize"] = self.spec.model_kwargs.get("deseasonalize", True) + model_kwargs["deseasonalize_model"] = self.spec.model_kwargs.get("deseasonalize_model", "additive") + model_kwargs["sp"] = self.spec.model_kwargs.get("sp", None) + + if self.spec.confidence_interval_width is None: + self.spec.confidence_interval_width = 1 - 0.90 if model_kwargs["alpha"] is None else model_kwargs["alpha"] + + model_kwargs["interval_width"] = self.spec.confidence_interval_width + return model_kwargs + + def preprocess(self, data, series_id): + self.le[series_id], df_encoded = _label_encode_dataframe( + data, + no_encode={self.spec.datetime_column.name, self.original_target_column}, + ) + return df_encoded.set_index(self.spec.datetime_column.name) + + def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, Any]): + try: + self.forecast_output.init_series_output(series_id=series_id, data_at_series=df) + data = self.preprocess(df, series_id) + + data_i = self.drop_horizon(data) + target = self.spec.target_column + + freq = pd.infer_freq(data_i.index) + if freq.startswith("W-"): + freq = "W" + data_i = data_i.asfreq(freq) + y = data_i[target] + if model_kwargs["sp"] is None: + inferred_sp = freq_to_sp(freq) + model_kwargs["sp"] = 1 if inferred_sp is None else inferred_sp + + # If model already loaded, extract parameters (best-effort) + if self.loaded_models is not None and series_id in self.loaded_models: + previous_res = self.loaded_models[series_id].get("model") + fitted_params = previous_res.get_fitted_params() + model_kwargs["deseasonalize_model"] = previous_res.deseasonalize_model + model_kwargs["sp"] = previous_res.sp + model_kwargs["deseasonalize"] = previous_res.deseasonalize + model_kwargs["initial_level"] = fitted_params.get("initial_level", None) + else: + if self.perform_tuning: + model_kwargs = self.run_tuning(y, model_kwargs) + if len(y) < 2 * model_kwargs["sp"]: + model_kwargs["deseasonalize"] = False + + # Fit ThetaModel using params + model = ThetaForecaster(initial_level=model_kwargs["initial_level"], + deseasonalize=model_kwargs["deseasonalize"], + deseasonalize_model=model_kwargs["deseasonalize_model"], sp=model_kwargs["sp"]) + model.fit(y) + + fh = ForecastingHorizon(range(1, self.spec.horizon + 1), is_relative=True) + fh_in_sample = ForecastingHorizon(range(-len(data_i) + 1, 1)) + fitted_vals = model.predict(fh_in_sample) + forecast_values = model.predict(fh) + forecast_range = model.predict_interval(fh=fh) + + lower = forecast_range[(self.original_target_column, 0.9, "lower")].rename("yhat_lower") + upper = forecast_range[(self.original_target_column, 0.9, "upper")].rename("yhat_upper") + point = forecast_values.rename("yhat") + forecast = pd.DataFrame( + pd.concat([point, lower, upper], axis=1) + ) + logger.debug(f"-----------------Model {i}----------------------") + logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) + + self.forecast_output.populate_series_output( + series_id=series_id, + fit_val=fitted_vals.values, + forecast_val=forecast["yhat"].values, + upper_bound=forecast["yhat_upper"].values, + lower_bound=forecast["yhat_lower"].values, + ) + self.outputs[series_id] = forecast + self.models[series_id] = {} + self.models[series_id]["model"] = model + self.models[series_id]["le"] = self.le[series_id] + + params = vars(model).copy() + self.model_parameters[series_id] = { + "framework": SupportedModels.Theta, + **params, + } + + logger.debug("===========Done===========") + + except Exception as e: + self.errors_dict[series_id] = { + "model_name": self.spec.model, + "error": str(e), + "error_trace": traceback.format_exc(), + } + logger.error(f"Encountered Error: {e}. Skipping.") + logger.error(traceback.format_exc()) + + def _build_model(self) -> pd.DataFrame: + """Build models for all series in parallel and return forecast long format.""" + full_data_dict = self.datasets.get_data_by_series() + self.models = {} + self.outputs = {} + self.explanations_info = {} + model_kwargs = self.set_kwargs() + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width, + horizon=self.spec.horizon, + target_column=self.original_target_column, + dt_column=self.spec.datetime_column.name, + ) + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(ThetaOperatorModel._train_model)( + self, i, series_id, df, model_kwargs.copy() + ) + for self, (i, (series_id, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) + + return self.forecast_output.get_forecast_long() + + def run_tuning(self, y: pd.DataFrame, model_kwargs_i: Dict[str, Any]): + + scoring = { + "mape": lambda y_true, y_pred: mean_absolute_percentage_error(y_true, y_pred), + "rmse": lambda y_true, y_pred: np.sqrt(mean_squared_error(y_true, y_pred)), + "mse": lambda y_true, y_pred: mean_squared_error(y_true, y_pred), + "smape": lambda y_true, y_pred: smape(y_true, y_pred) + } + score_fn = scoring.get(self.spec.metric.lower(), scoring["mape"]) + + def objective(trial): + initial_level = model_kwargs_i["initial_level"] + sp = model_kwargs_i["sp"] + deseasonalize = trial.suggest_categorical("deseasonalize", [True, False]) + deseasonalize_model = trial.suggest_categorical("deseasonalize_model", ["additive", "multiplicative"]) + if deseasonalize_model == "multiplicative" and (y <= 0).any(): + raise optuna.exceptions.TrialPruned() + + model = ThetaForecaster( + initial_level=initial_level, + sp=sp, + deseasonalize_model=deseasonalize_model, + deseasonalize=deseasonalize, + ) + + cv = ExpandingWindowSplitter( + initial_window=50, + step_length=100 + ) + + scores = [] + + for train, test in cv.split(y): + t_data = y.iloc[train] + if t_data.isna().any(): + continue + if len(t_data) < 2 * sp: + continue + + model.fit(t_data) + fh = ForecastingHorizon(y.index[test], is_relative=False) + y_pred = model.predict(fh) + y_test = y.iloc[test] + if y_test.isna().any(): + continue + scores.append(score_fn(y_test, y_pred)) + return np.mean(scores) + + study = optuna.create_study(direction="minimize") + trials = DEFAULT_TRIALS if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials + study.optimize(objective, n_trials=trials) + model_kwargs_i["deseasonalize_model"] = study.best_params["deseasonalize_model"] + model_kwargs_i["deseasonalize"] = study.best_params["deseasonalize"] + return model_kwargs_i + + def _generate_report(self): + import report_creator as rc + """The method that needs to be implemented on the particular model level.""" + all_sections = [] + theta_blocks = [] + + for series_id, sm in self.models.items(): + model = sm["model"] + + # ---- Extract details from ThetaModel ---- + fitted_params = model.get_fitted_params() + initial_level = fitted_params.get("initial_level", None) + smoothing_level = fitted_params.get("smoothing_level", None) + sp = model.sp + deseasonalize_model = model.deseasonalize_model + desasonalized = model.deseasonalize + n_obs = len(model._y) if hasattr(model, "_y") else "N/A" + + # Date range + if hasattr(model, "_y"): + start_date = model._y.index[0] + end_date = model._y.index[-1] + else: + start_date = "" + end_date = "" + + # ---- Build the DF ---- + meta_df = pd.DataFrame({ + "Metric": [ + "Initial Level", + "Smoothing Level / Alpha", + "No. Observations", + "Deseasonalized", + "Deseasonalization Method", + "Period (sp)", + "Sample Start", + "Sample End", + ], + "Value": [ + initial_level, + smoothing_level, + n_obs, + str(desasonalized is not None), + deseasonalize_model, + sp, + start_date, + end_date, + ], + }) + + # ---- Create a block (NOT a section directly) ---- + theta_block = rc.Block( + rc.Heading(f"Theta Model Summary", level=3), + rc.DataTable(meta_df), + label=series_id + ) + + # Add with optional label support + theta_blocks.append( + theta_block + ) + + # ---- Combine into final section like ARIMA example ---- + theta_title = rc.Heading("Theta Model Parameters", level=2) + + if len(theta_blocks) > 1: + theta_section = rc.Select(blocks=theta_blocks) + else: + theta_section = theta_blocks[0] + + all_sections.extend([theta_title, theta_section]) + + if self.spec.generate_explanations: + try: + # If the key is present, call the "explain_model" method + self.explain_model() + + # Convert the global explanation data to a DataFrame + global_explanation_df = pd.DataFrame(self.global_explanation) + + self.formatted_global_explanation = ( + global_explanation_df / global_explanation_df.sum(axis=0) * 100 + ) + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {self.spec.datetime_column.name: ForecastOutputColumns.DATE}, + axis=1, + ) + ) + aggregate_local_explanations = pd.DataFrame() + for s_id, local_ex_df in self.local_explanation.items(): + local_ex_df_copy = local_ex_df.copy() + local_ex_df_copy["Series"] = s_id + aggregate_local_explanations = pd.concat( + [aggregate_local_explanations, local_ex_df_copy], axis=0 + ) + self.formatted_local_explanation = aggregate_local_explanations + + if not self.target_cat_col: + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {"Series 1": self.original_target_column}, + axis=1, + ) + ) + self.formatted_local_explanation.drop( + "Series", axis=1, inplace=True + ) + + # Create a markdown section for the global explainability + global_explanation_section = rc.Block( + rc.Heading("Global Explanation of Models", level=2), + rc.Text( + "The following tables provide the feature attribution for the global explainability." + ), + rc.DataTable(self.formatted_global_explanation, index=True), + ) + + blocks = [ + rc.DataTable( + local_ex_df.div(local_ex_df.abs().sum(axis=1), axis=0) * 100, + label=s_id if self.target_cat_col else None, + index=True, + ) + for s_id, local_ex_df in self.local_explanation.items() + ] + local_explanation_section = rc.Block( + rc.Heading("Local Explanation of Models", level=2), + rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0], + ) + + # Append the global explanation text and section to the "all_sections" list + all_sections = all_sections + [ + global_explanation_section, + local_explanation_section, + ] + except Exception as e: + logger.warning(f"Failed to generate Explanations with error: {e}.") + logger.debug(f"Full Traceback: {traceback.format_exc()}") + + model_description = rc.Text( + "A Theta forecaster is a popular and surprisingly effective time series forecasting" + "method that works by decomposing data into long-term trend and short-term components, forecasting them separately," + "and then combining the results, often outperforming complex models by adjusting the original series' local" + "curvature using a parameter called theta (θ). It's known for its simplicity, speed, and strong performance, " + "especially in forecasting competitions like the M3, where it served as a strong benchmark, often by using" + "Simple Exponential Smoothing (SES) with drift on a modified series" + ) + other_sections = all_sections + + return ( + model_description, + other_sections, + ) + + def get_explain_predict_fn(self, series_id): + def _custom_predict( + data, + model=self.models[series_id]["model"], + ): + """ + data: ForecastDatasets.get_data_at_series(s_id) + """ + h = len(data) + fh = ForecastingHorizon(np.arange(1, h + 1), is_relative=True) + return model.predict(fh) + + return _custom_predict diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index 45690aa57..8e13787d4 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -460,6 +460,7 @@ spec: - autots - auto-select - auto-select-series + - theta model_kwargs: type: dict diff --git a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst index dc0ee92de..8f73a9e8b 100644 --- a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst +++ b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst @@ -137,7 +137,7 @@ Below is an example of a ``forecast.yaml`` file with every parameter specified: - string - No - prophet - - Model to use. Options: prophet, arima, neuralprophet, automlx, autots, auto-select. + - Model to use. Options: prophet, arima, neuralprophet, theta, automlx, autots, auto-select. * - model_kwargs - dict @@ -266,7 +266,7 @@ Further Description * **format**: (Optional) Specify the format for output data (e.g., ``csv``, ``json``, ``excel``). * **options**: (Optional) Include any additional arguments, such as connection parameters for storage. - * **model**: (Optional) The name of the model framework to use. Defaults to ``auto-select``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``autots``, and ``auto-select``. + * **model**: (Optional) The name of the model framework to use. Defaults to ``auto-select``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``theta``, ``autots``, and ``auto-select``. * **model_kwargs**: (Optional) A dictionary of arguments to pass directly to the model framework, allowing for detailed control over modeling. diff --git a/tests/operators/forecast/test_datasets.py b/tests/operators/forecast/test_datasets.py index aeb5daa66..7d39bba11 100644 --- a/tests/operators/forecast/test_datasets.py +++ b/tests/operators/forecast/test_datasets.py @@ -33,6 +33,7 @@ "neuralprophet", "autots", "lgbforecast", + "theta", "auto-select", "auto-select-series", ] diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 2d69dce9e..a835217ca 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -144,6 +144,7 @@ "neuralprophet", "autots", "lgbforecast", + "theta", ] TEMPLATE_YAML = { diff --git a/tests/operators/forecast/test_explainers.py b/tests/operators/forecast/test_explainers.py index 753e324f4..e5e8b20d6 100644 --- a/tests/operators/forecast/test_explainers.py +++ b/tests/operators/forecast/test_explainers.py @@ -20,6 +20,7 @@ # "automlx", # FIXME: automlx is failing, no errors "prophet", "neuralprophet", + "theta", "auto-select-series", ] From 128348889749c7d55accc22b9a254bb18c942ac4 Mon Sep 17 00:00:00 2001 From: skjoracle Date: Tue, 16 Dec 2025 13:15:10 +0530 Subject: [PATCH 02/10] fix test case --- ads/opctl/operator/lowcode/forecast/model/theta.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py index 9cdaaa4c7..9acb6282a 100644 --- a/ads/opctl/operator/lowcode/forecast/model/theta.py +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -329,10 +329,10 @@ def _generate_report(self): # ---- Combine into final section like ARIMA example ---- theta_title = rc.Heading("Theta Model Parameters", level=2) - + theta_section = [] if len(theta_blocks) > 1: theta_section = rc.Select(blocks=theta_blocks) - else: + elif len(theta_blocks) == 1: theta_section = theta_blocks[0] all_sections.extend([theta_title, theta_section]) From 5d8b2b44d0199c341f79d6084c5ad0f917a01464 Mon Sep 17 00:00:00 2001 From: skjoracle Date: Tue, 16 Dec 2025 14:45:55 +0530 Subject: [PATCH 03/10] fix tests in pipeline --- .github/workflows/run-forecast-explainer-tests.yml | 2 +- .github/workflows/run-forecast-unit-tests.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-forecast-explainer-tests.yml b/.github/workflows/run-forecast-explainer-tests.yml index 7b514c217..ee92cf39f 100644 --- a/.github/workflows/run-forecast-explainer-tests.yml +++ b/.github/workflows/run-forecast-explainer-tests.yml @@ -59,7 +59,7 @@ jobs: set -x # print commands that are executed $CONDA/bin/conda init source /home/runner/.bashrc - pip install -r test-requirements-operators.txt pip install "oracle-automlx[forecasting]>=25.3.0" + pip install -r test-requirements-operators.txt pip install pandas>=2.2.0 python -m pytest -v -p no:warnings --durations=5 tests/operators/forecast/test_explainers.py diff --git a/.github/workflows/run-forecast-unit-tests.yml b/.github/workflows/run-forecast-unit-tests.yml index 4a12deb51..2342b94be 100644 --- a/.github/workflows/run-forecast-unit-tests.yml +++ b/.github/workflows/run-forecast-unit-tests.yml @@ -55,7 +55,7 @@ jobs: set -x # print commands that are executed $CONDA/bin/conda init source /home/runner/.bashrc - pip install -r test-requirements-operators.txt pip install "oracle-automlx[forecasting]>=25.3.0" + pip install -r test-requirements-operators.txt pip install pandas>=2.2.0 python -m pytest -v -p no:warnings --durations=5 tests/operators/forecast --ignore=tests/operators/forecast/test_explainers.py From 6b3a6533f1d40e9d55607a567b0be8861be9ad1c Mon Sep 17 00:00:00 2001 From: skjoracle Date: Tue, 16 Dec 2025 14:58:10 +0530 Subject: [PATCH 04/10] fix tests in pipeline --- .github/workflows/run-forecast-explainer-tests.yml | 3 ++- .github/workflows/run-forecast-unit-tests.yml | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-forecast-explainer-tests.yml b/.github/workflows/run-forecast-explainer-tests.yml index ee92cf39f..acbe3e274 100644 --- a/.github/workflows/run-forecast-explainer-tests.yml +++ b/.github/workflows/run-forecast-explainer-tests.yml @@ -59,7 +59,8 @@ jobs: set -x # print commands that are executed $CONDA/bin/conda init source /home/runner/.bashrc - pip install "oracle-automlx[forecasting]>=25.3.0" pip install -r test-requirements-operators.txt + pip install "oracle-automlx[forecasting]>=25.3.0" + pip install sktime>=0.41.0 pip install pandas>=2.2.0 python -m pytest -v -p no:warnings --durations=5 tests/operators/forecast/test_explainers.py diff --git a/.github/workflows/run-forecast-unit-tests.yml b/.github/workflows/run-forecast-unit-tests.yml index 2342b94be..16dd3f821 100644 --- a/.github/workflows/run-forecast-unit-tests.yml +++ b/.github/workflows/run-forecast-unit-tests.yml @@ -55,7 +55,8 @@ jobs: set -x # print commands that are executed $CONDA/bin/conda init source /home/runner/.bashrc - pip install "oracle-automlx[forecasting]>=25.3.0" pip install -r test-requirements-operators.txt + pip install "oracle-automlx[forecasting]>=25.3.0" + pip install sktime>=0.41.0 pip install pandas>=2.2.0 python -m pytest -v -p no:warnings --durations=5 tests/operators/forecast --ignore=tests/operators/forecast/test_explainers.py From c04c77d9b0a9df7fa72e3e90bac11a8f2e9bf153 Mon Sep 17 00:00:00 2001 From: skjoracle Date: Tue, 16 Dec 2025 15:28:54 +0530 Subject: [PATCH 05/10] fix tests in pipeline --- .github/workflows/run-forecast-explainer-tests.yml | 2 +- .github/workflows/run-forecast-unit-tests.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-forecast-explainer-tests.yml b/.github/workflows/run-forecast-explainer-tests.yml index acbe3e274..a0092d9ac 100644 --- a/.github/workflows/run-forecast-explainer-tests.yml +++ b/.github/workflows/run-forecast-explainer-tests.yml @@ -61,6 +61,6 @@ jobs: source /home/runner/.bashrc pip install -r test-requirements-operators.txt pip install "oracle-automlx[forecasting]>=25.3.0" - pip install sktime>=0.41.0 + pip install "sktime>=0.41.0" --upgrade pip install pandas>=2.2.0 python -m pytest -v -p no:warnings --durations=5 tests/operators/forecast/test_explainers.py diff --git a/.github/workflows/run-forecast-unit-tests.yml b/.github/workflows/run-forecast-unit-tests.yml index 16dd3f821..a31ae4b0e 100644 --- a/.github/workflows/run-forecast-unit-tests.yml +++ b/.github/workflows/run-forecast-unit-tests.yml @@ -57,6 +57,6 @@ jobs: source /home/runner/.bashrc pip install -r test-requirements-operators.txt pip install "oracle-automlx[forecasting]>=25.3.0" - pip install sktime>=0.41.0 + pip install "sktime>=0.41.0" --upgrade pip install pandas>=2.2.0 python -m pytest -v -p no:warnings --durations=5 tests/operators/forecast --ignore=tests/operators/forecast/test_explainers.py From 45e98544a428cc0a6f86b805889259936b1b35c5 Mon Sep 17 00:00:00 2001 From: skjoracle Date: Tue, 16 Dec 2025 23:37:42 +0530 Subject: [PATCH 06/10] fix test pipeline for sktime version mismatch --- .github/workflows/run-forecast-explainer-tests.yml | 2 +- .github/workflows/run-forecast-unit-tests.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-forecast-explainer-tests.yml b/.github/workflows/run-forecast-explainer-tests.yml index a0092d9ac..5bb7e00dc 100644 --- a/.github/workflows/run-forecast-explainer-tests.yml +++ b/.github/workflows/run-forecast-explainer-tests.yml @@ -61,6 +61,6 @@ jobs: source /home/runner/.bashrc pip install -r test-requirements-operators.txt pip install "oracle-automlx[forecasting]>=25.3.0" - pip install "sktime>=0.41.0" --upgrade + pip install -U sktime pip install pandas>=2.2.0 python -m pytest -v -p no:warnings --durations=5 tests/operators/forecast/test_explainers.py diff --git a/.github/workflows/run-forecast-unit-tests.yml b/.github/workflows/run-forecast-unit-tests.yml index a31ae4b0e..5dd295538 100644 --- a/.github/workflows/run-forecast-unit-tests.yml +++ b/.github/workflows/run-forecast-unit-tests.yml @@ -57,6 +57,6 @@ jobs: source /home/runner/.bashrc pip install -r test-requirements-operators.txt pip install "oracle-automlx[forecasting]>=25.3.0" - pip install "sktime>=0.41.0" --upgrade + pip install -U sktime pip install pandas>=2.2.0 python -m pytest -v -p no:warnings --durations=5 tests/operators/forecast --ignore=tests/operators/forecast/test_explainers.py From 38be2baf56831727cfbe7ce00bce764a7dc1b047 Mon Sep 17 00:00:00 2001 From: skjoracle Date: Wed, 17 Dec 2025 14:11:46 +0530 Subject: [PATCH 07/10] fix seasonality bug in theta forecaster --- ads/opctl/operator/lowcode/common/utils.py | 69 +++++++++++ .../operator/lowcode/forecast/model/theta.py | 111 ++++++++---------- 2 files changed, 121 insertions(+), 59 deletions(-) diff --git a/ads/opctl/operator/lowcode/common/utils.py b/ads/opctl/operator/lowcode/common/utils.py index 0c024cb05..3be6e6028 100644 --- a/ads/opctl/operator/lowcode/common/utils.py +++ b/ads/opctl/operator/lowcode/common/utils.py @@ -24,6 +24,14 @@ ) from ads.secrets import ADBSecretKeeper +from pandas.tseries.frequencies import to_offset +from pandas.tseries.offsets import ( + MonthBegin, MonthEnd, + QuarterBegin, QuarterEnd, + YearBegin, YearEnd, + Week +) + def call_pandas_fsspec(pd_fn, filename, storage_options, **kwargs): if fsspec.utils.get_protocol(filename) == "file" or fsspec.utils.get_protocol( @@ -385,3 +393,64 @@ def enable_print(): except Exception: pass sys.stdout = sys.__stdout__ + + +def normalize_freq(index: pd.DatetimeIndex): + """ + Returns: + freq_str: canonical pandas freq string or None + sp: seasonal period (int) + """ + freq = pd.infer_freq(index) + + if freq is None: + return None, 1 + + try: + offset = to_offset(freq) + except Exception: + return None, 1 + + if isinstance(offset, (MonthBegin, MonthEnd)): + return "M", 12 + + if isinstance(offset, (QuarterBegin, QuarterEnd)): + return "Q", 4 + + if isinstance(offset, (YearBegin, YearEnd)): + return "Y", 1 + + if isinstance(offset, Week): + return "W", 52 + + freq = str(freq).upper() + + mapping = { + "D": ("D", 7), + "H": ("H", 24), + "T": ("T", 1440), + "MIN": ("T", 1440), + "M": ("M", 12), + "Q": ("Q", 4), + "A": ("Y", 1), + "Y": ("Y", 1), + } + + for key, val in mapping.items(): + if freq.startswith(key): + return val + + return None, 1 + + +def ensure_period_index(y: pd.Series, freq: str | None): + """ + Converts DatetimeIndex → PeriodIndex if possible. + """ + if not isinstance(y.index, pd.PeriodIndex): + if freq is not None: + try: + y.index = y.index.to_period(freq) + except Exception: + return y, False + return y, True diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py index 9acb6282a..780bbf1b2 100644 --- a/ads/opctl/operator/lowcode/forecast/model/theta.py +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -15,7 +15,7 @@ from sktime.split import ExpandingWindowSplitter from ads.opctl import logger -from ads.opctl.operator.lowcode.common.utils import seconds_to_datetime +from ads.opctl.operator.lowcode.common.utils import normalize_freq, ensure_period_index from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, smape) from .base_model import ForecastOperatorBaseModel @@ -27,51 +27,6 @@ logging.getLogger("report_creator").setLevel(logging.WARNING) -def freq_to_sp(freq: str) -> int | None: - """ - Convert pandas freq string to seasonal period (sp). - """ - if not freq: - return None - - freq = freq.upper() - - # Direct mappings - mapping = { - "M": 12, - "Q": 4, - "A": 1, - "Y": 1, - "W": 52, - "D": 7, - "H": 24, - "T": 1440, - "MIN": 1440, - } - if freq in mapping: - return mapping[freq] - - # Weekly variants (W-MON, W-SUN, etc.) - if freq.startswith("W"): - return 52 - - # Minute frequencies like "5T" or "15MIN" - if freq.endswith("T"): - try: - return 1440 // int(freq[:-1]) - except ValueError: - pass - - if freq.endswith("MIN"): # e.g., "15MIN" - try: - return 1440 // int(freq[:-3]) - except ValueError: - pass - - logger.warning("Unable to infer data frequency and sp") - return None - - class ThetaOperatorModel(ForecastOperatorBaseModel): """Theta operator model""" @@ -104,6 +59,38 @@ def preprocess(self, data, series_id): ) return df_encoded.set_index(self.spec.datetime_column.name) + def enforce_theta_index_requirements(self, y: pd.Series, model_kwargs: dict): + """ + Ensures Theta won't crash due to missing freq. + Mutates model_kwargs safely. + """ + index = y.index + + # Case 1: PeriodIndex → OK + if isinstance(index, pd.PeriodIndex): + return y + + # Case 2: DatetimeIndex with freq → OK + if isinstance(index, pd.DatetimeIndex) and index.freq is not None: + return y + + # Case 3: DatetimeIndex but freq missing → try to set it + if isinstance(index, pd.DatetimeIndex): + inferred = pd.infer_freq(index) + if inferred is not None: + try: + y = y.asfreq(inferred) + return y + except Exception: + pass + + # 🚨 Last resort: disable deseasonalization + model_kwargs["deseasonalize"] = False + model_kwargs["deseasonalize_model"] = None + model_kwargs["sp"] = 1 + + return y + def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, Any]): try: self.forecast_output.init_series_output(series_id=series_id, data_at_series=df) @@ -112,14 +99,20 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A data_i = self.drop_horizon(data) target = self.spec.target_column - freq = pd.infer_freq(data_i.index) - if freq.startswith("W-"): - freq = "W" - data_i = data_i.asfreq(freq) + freq_str, sp = normalize_freq(data_i.index) + if freq_str is not None: + data_i = data_i.asfreq(freq_str) + y = data_i[target] - if model_kwargs["sp"] is None: - inferred_sp = freq_to_sp(freq) - model_kwargs["sp"] = 1 if inferred_sp is None else inferred_sp + + model_kwargs["sp"] = model_kwargs.get("sp") or sp + + y, period_ok = ensure_period_index(y, freq_str) + + if not period_ok or len(y) < 2 * model_kwargs["sp"]: + model_kwargs["deseasonalize"] = False + + y = self.enforce_theta_index_requirements(y, model_kwargs) # If model already loaded, extract parameters (best-effort) if self.loaded_models is not None and series_id in self.loaded_models: @@ -129,16 +122,14 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A model_kwargs["sp"] = previous_res.sp model_kwargs["deseasonalize"] = previous_res.deseasonalize model_kwargs["initial_level"] = fitted_params.get("initial_level", None) - else: - if self.perform_tuning: - model_kwargs = self.run_tuning(y, model_kwargs) - if len(y) < 2 * model_kwargs["sp"]: - model_kwargs["deseasonalize"] = False + elif self.perform_tuning: + model_kwargs = self.run_tuning(y, model_kwargs) # Fit ThetaModel using params model = ThetaForecaster(initial_level=model_kwargs["initial_level"], deseasonalize=model_kwargs["deseasonalize"], - deseasonalize_model=model_kwargs["deseasonalize_model"], sp=model_kwargs["sp"]) + deseasonalize_model=model_kwargs["deseasonalize_model"], + sp=model_kwargs.get("sp", 1), ) model.fit(y) fh = ForecastingHorizon(range(1, self.spec.horizon + 1), is_relative=True) @@ -334,6 +325,8 @@ def _generate_report(self): theta_section = rc.Select(blocks=theta_blocks) elif len(theta_blocks) == 1: theta_section = theta_blocks[0] + else: + theta_section = rc.Text("No Theta models were successfully trained.") all_sections.extend([theta_title, theta_section]) From 511a0e8a74bd43338bbac3614d397dcdf4eb9ee1 Mon Sep 17 00:00:00 2001 From: skjoracle Date: Wed, 17 Dec 2025 16:17:42 +0530 Subject: [PATCH 08/10] fix seasonality bug in theta forecaster --- .../operator/lowcode/forecast/model/theta.py | 16 ++++++++++++++-- tests/operators/forecast/test_errors.py | 2 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py index 780bbf1b2..a19ecfe2a 100644 --- a/ads/opctl/operator/lowcode/forecast/model/theta.py +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -8,6 +8,7 @@ import optuna import pandas as pd from joblib import Parallel, delayed +from optuna.trial import TrialState from sktime.forecasting.base import ForecastingHorizon from sktime.forecasting.theta import ThetaForecaster from sktime.performance_metrics.forecasting import mean_squared_error, \ @@ -100,8 +101,6 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A target = self.spec.target_column freq_str, sp = normalize_freq(data_i.index) - if freq_str is not None: - data_i = data_i.asfreq(freq_str) y = data_i[target] @@ -252,6 +251,19 @@ def objective(trial): study = optuna.create_study(direction="minimize") trials = DEFAULT_TRIALS if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials study.optimize(objective, n_trials=trials) + + completed_trials = [ + t for t in study.trials + if t.state == TrialState.COMPLETE + ] + + if not completed_trials: + print( + "Theta tuning produced no completed trials. " + "Falling back to default parameters." + ) + return model_kwargs_i + model_kwargs_i["deseasonalize_model"] = study.best_params["deseasonalize_model"] model_kwargs_i["deseasonalize"] = study.best_params["deseasonalize"] return model_kwargs_i diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index a835217ca..85e20ac18 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -817,7 +817,7 @@ def test_date_format(operator_setup, model): @pytest.mark.parametrize("model", MODELS) def test_what_if_analysis(operator_setup, model): os.environ["TEST_MODE"] = "True" - if model in ["auto-select", "lgbforecast"]: + if model in ["auto-select", "lgbforecast", "theta"]: pytest.skip("Skipping what-if scenario for auto-select") tmpdirname = operator_setup historical_data_path, additional_data_path = setup_small_rossman() From 3e0a2f73eed81030d5869b3ddcb14001aede99cc Mon Sep 17 00:00:00 2001 From: skjoracle Date: Wed, 17 Dec 2025 21:33:39 +0530 Subject: [PATCH 09/10] fix seasonality bug in theta forecaster --- ads/opctl/operator/lowcode/common/utils.py | 77 +++---------------- .../operator/lowcode/forecast/model/theta.py | 57 +++----------- 2 files changed, 23 insertions(+), 111 deletions(-) diff --git a/ads/opctl/operator/lowcode/common/utils.py b/ads/opctl/operator/lowcode/common/utils.py index 3be6e6028..a3163a509 100644 --- a/ads/opctl/operator/lowcode/common/utils.py +++ b/ads/opctl/operator/lowcode/common/utils.py @@ -23,14 +23,7 @@ InvalidParameterError, ) from ads.secrets import ADBSecretKeeper - -from pandas.tseries.frequencies import to_offset -from pandas.tseries.offsets import ( - MonthBegin, MonthEnd, - QuarterBegin, QuarterEnd, - YearBegin, YearEnd, - Week -) +from sktime.param_est.seasonality import SeasonalityACF def call_pandas_fsspec(pd_fn, filename, storage_options, **kwargs): @@ -395,62 +388,14 @@ def enable_print(): sys.stdout = sys.__stdout__ -def normalize_freq(index: pd.DatetimeIndex): - """ - Returns: - freq_str: canonical pandas freq string or None - sp: seasonal period (int) - """ - freq = pd.infer_freq(index) - - if freq is None: - return None, 1 - +def find_seasonal_period_from_dataset(data: pd.DataFrame) -> tuple[int, list]: try: - offset = to_offset(freq) - except Exception: - return None, 1 - - if isinstance(offset, (MonthBegin, MonthEnd)): - return "M", 12 - - if isinstance(offset, (QuarterBegin, QuarterEnd)): - return "Q", 4 - - if isinstance(offset, (YearBegin, YearEnd)): - return "Y", 1 - - if isinstance(offset, Week): - return "W", 52 - - freq = str(freq).upper() - - mapping = { - "D": ("D", 7), - "H": ("H", 24), - "T": ("T", 1440), - "MIN": ("T", 1440), - "M": ("M", 12), - "Q": ("Q", 4), - "A": ("Y", 1), - "Y": ("Y", 1), - } - - for key, val in mapping.items(): - if freq.startswith(key): - return val - - return None, 1 - - -def ensure_period_index(y: pd.Series, freq: str | None): - """ - Converts DatetimeIndex → PeriodIndex if possible. - """ - if not isinstance(y.index, pd.PeriodIndex): - if freq is not None: - try: - y.index = y.index.to_period(freq) - except Exception: - return y, False - return y, True + print(f"find_seasonal_period_from_dataset({data.index} {data.index.freq})") + sp_est = SeasonalityACF() + sp_est.fit(data) + sp = sp_est.get_fitted_params()["sp"] + probable_sps = sp_est.get_fitted_params()["sp_significant"] + return sp, probable_sps + except Exception as e: + print(f"Unable to find seasonal period: {e}") + return None, None diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py index a19ecfe2a..703940b42 100644 --- a/ads/opctl/operator/lowcode/forecast/model/theta.py +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -16,7 +16,6 @@ from sktime.split import ExpandingWindowSplitter from ads.opctl import logger -from ads.opctl.operator.lowcode.common.utils import normalize_freq, ensure_period_index from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, smape) from .base_model import ForecastOperatorBaseModel @@ -24,6 +23,7 @@ from ..const import ( SupportedModels, ForecastOutputColumns, DEFAULT_TRIALS, ) +from ads.opctl.operator.lowcode.common.utils import find_seasonal_period_from_dataset logging.getLogger("report_creator").setLevel(logging.WARNING) @@ -60,38 +60,6 @@ def preprocess(self, data, series_id): ) return df_encoded.set_index(self.spec.datetime_column.name) - def enforce_theta_index_requirements(self, y: pd.Series, model_kwargs: dict): - """ - Ensures Theta won't crash due to missing freq. - Mutates model_kwargs safely. - """ - index = y.index - - # Case 1: PeriodIndex → OK - if isinstance(index, pd.PeriodIndex): - return y - - # Case 2: DatetimeIndex with freq → OK - if isinstance(index, pd.DatetimeIndex) and index.freq is not None: - return y - - # Case 3: DatetimeIndex but freq missing → try to set it - if isinstance(index, pd.DatetimeIndex): - inferred = pd.infer_freq(index) - if inferred is not None: - try: - y = y.asfreq(inferred) - return y - except Exception: - pass - - # 🚨 Last resort: disable deseasonalization - model_kwargs["deseasonalize"] = False - model_kwargs["deseasonalize_model"] = None - model_kwargs["sp"] = 1 - - return y - def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, Any]): try: self.forecast_output.init_series_output(series_id=series_id, data_at_series=df) @@ -100,19 +68,20 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A data_i = self.drop_horizon(data) target = self.spec.target_column - freq_str, sp = normalize_freq(data_i.index) + freq = self.datasets.get_datetime_frequency() + if freq is not None: + if freq.startswith("W-"): + freq = "W" + data_i.index = data_i.index.to_period(freq) y = data_i[target] + sp, probable_sps = find_seasonal_period_from_dataset(y) model_kwargs["sp"] = model_kwargs.get("sp") or sp - y, period_ok = ensure_period_index(y, freq_str) - - if not period_ok or len(y) < 2 * model_kwargs["sp"]: + if not sp or len(y) < 2 * model_kwargs["sp"]: model_kwargs["deseasonalize"] = False - y = self.enforce_theta_index_requirements(y, model_kwargs) - # If model already loaded, extract parameters (best-effort) if self.loaded_models is not None and series_id in self.loaded_models: previous_res = self.loaded_models[series_id].get("model") @@ -122,9 +91,8 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A model_kwargs["deseasonalize"] = previous_res.deseasonalize model_kwargs["initial_level"] = fitted_params.get("initial_level", None) elif self.perform_tuning: - model_kwargs = self.run_tuning(y, model_kwargs) + model_kwargs = self.run_tuning(y, model_kwargs, probable_sps) - # Fit ThetaModel using params model = ThetaForecaster(initial_level=model_kwargs["initial_level"], deseasonalize=model_kwargs["deseasonalize"], deseasonalize_model=model_kwargs["deseasonalize_model"], @@ -200,7 +168,7 @@ def _build_model(self) -> pd.DataFrame: return self.forecast_output.get_forecast_long() - def run_tuning(self, y: pd.DataFrame, model_kwargs_i: Dict[str, Any]): + def run_tuning(self, y: pd.DataFrame, model_kwargs_i: Dict[str, Any], probable_sps: list[int]): scoring = { "mape": lambda y_true, y_pred: mean_absolute_percentage_error(y_true, y_pred), @@ -212,7 +180,7 @@ def run_tuning(self, y: pd.DataFrame, model_kwargs_i: Dict[str, Any]): def objective(trial): initial_level = model_kwargs_i["initial_level"] - sp = model_kwargs_i["sp"] + sp = trial.suggest_categorical("sp", probable_sps) deseasonalize = trial.suggest_categorical("deseasonalize", [True, False]) deseasonalize_model = trial.suggest_categorical("deseasonalize_model", ["additive", "multiplicative"]) if deseasonalize_model == "multiplicative" and (y <= 0).any(): @@ -251,14 +219,13 @@ def objective(trial): study = optuna.create_study(direction="minimize") trials = DEFAULT_TRIALS if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials study.optimize(objective, n_trials=trials) - completed_trials = [ t for t in study.trials if t.state == TrialState.COMPLETE ] if not completed_trials: - print( + logger.warning( "Theta tuning produced no completed trials. " "Falling back to default parameters." ) From 24c813b2cc8860894edfd8e6cf92c5e22dc58313 Mon Sep 17 00:00:00 2001 From: skjoracle Date: Thu, 18 Dec 2025 01:10:47 +0530 Subject: [PATCH 10/10] fix seasonality bug in theta forecaster and refactored explainablity code --- ads/opctl/operator/lowcode/common/utils.py | 48 +++++++++ .../operator/lowcode/forecast/model/arima.py | 61 +---------- .../lowcode/forecast/model/base_model.py | 71 +++++++++++++ .../operator/lowcode/forecast/model/theta.py | 100 ++++-------------- 4 files changed, 140 insertions(+), 140 deletions(-) diff --git a/ads/opctl/operator/lowcode/common/utils.py b/ads/opctl/operator/lowcode/common/utils.py index a3163a509..e28ed8483 100644 --- a/ads/opctl/operator/lowcode/common/utils.py +++ b/ads/opctl/operator/lowcode/common/utils.py @@ -399,3 +399,51 @@ def find_seasonal_period_from_dataset(data: pd.DataFrame) -> tuple[int, list]: except Exception as e: print(f"Unable to find seasonal period: {e}") return None, None + + +def normalize_frequency(freq: str) -> str: + """ + Normalize pandas frequency strings to sktime/period-compatible formats. + + Args: + freq: Pandas frequency string + + Returns: + Normalized frequency string compatible with PeriodIndex + """ + if freq is None: + return None + + freq = freq.upper() + + # Handle weekly frequencies with day anchors (W-SUN, W-MON, etc.) + if freq.startswith("W-"): + return "W" + + # Handle month start/end frequencies + freq_mapping = { + "MS": "M", # Month Start -> Month End + "ME": "M", # Month End -> Month + "BMS": "M", # Business Month Start -> Month + "BME": "M", # Business Month End -> Month + "QS": "Q", # Quarter Start -> Quarter + "QE": "Q", # Quarter End -> Quarter + "BQS": "Q", # Business Quarter Start -> Quarter + "BQE": "Q", # Business Quarter End -> Quarter + "YS": "Y", # Year Start -> Year (Alias: A) + "AS": "Y", # Year Start -> Year (Alias: A) + "YE": "Y", # Year End -> Year + "AE": "Y", # Year End -> Year + "BYS": "Y", # Business Year Start -> Year + "BAS": "Y", # Business Year Start -> Year + "BYE": "Y", # Business Year End -> Year + "BAE": "Y", # Business Year End -> Year + } + + # Handle frequencies with prefixes (e.g., "2W", "3M") + for old_freq, new_freq in freq_mapping.items(): + if freq.endswith(old_freq): + prefix = freq[:-len(old_freq)] + return f"{prefix}{new_freq}" if prefix else new_freq + + return freq \ No newline at end of file diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index 3158e704a..5c64c1564 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -183,66 +183,7 @@ def _generate_report(self): try: # If the key is present, call the "explain_model" method self.explain_model() - - # Convert the global explanation data to a DataFrame - global_explanation_df = pd.DataFrame(self.global_explanation) - - self.formatted_global_explanation = ( - global_explanation_df / global_explanation_df.sum(axis=0) * 100 - ) - self.formatted_global_explanation = ( - self.formatted_global_explanation.rename( - {self.spec.datetime_column.name: ForecastOutputColumns.DATE}, - axis=1, - ) - ) - aggregate_local_explanations = pd.DataFrame() - for s_id, local_ex_df in self.local_explanation.items(): - local_ex_df_copy = local_ex_df.copy() - local_ex_df_copy["Series"] = s_id - aggregate_local_explanations = pd.concat( - [aggregate_local_explanations, local_ex_df_copy], axis=0 - ) - self.formatted_local_explanation = aggregate_local_explanations - - if not self.target_cat_col: - self.formatted_global_explanation = ( - self.formatted_global_explanation.rename( - {"Series 1": self.original_target_column}, - axis=1, - ) - ) - self.formatted_local_explanation.drop( - "Series", axis=1, inplace=True - ) - - # Create a markdown section for the global explainability - global_explanation_section = rc.Block( - rc.Heading("Global Explanation of Models", level=2), - rc.Text( - "The following tables provide the feature attribution for the global explainability." - ), - rc.DataTable(self.formatted_global_explanation, index=True), - ) - - blocks = [ - rc.DataTable( - local_ex_df.div(local_ex_df.abs().sum(axis=1), axis=0) * 100, - label=s_id if self.target_cat_col else None, - index=True, - ) - for s_id, local_ex_df in self.local_explanation.items() - ] - local_explanation_section = rc.Block( - rc.Heading("Local Explanation of Models", level=2), - rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0], - ) - - # Append the global explanation text and section to the "all_sections" list - all_sections = all_sections + [ - global_explanation_section, - local_explanation_section, - ] + all_sections = all_sections + self.generate_explanation_report() except Exception as e: logger.warning(f"Failed to generate Explanations with error: {e}.") logger.debug(f"Full Traceback: {traceback.format_exc()}") diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index db2c73507..473fcc6f8 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -848,6 +848,77 @@ def explain_model(self): "Local explanations generation completed in %s seconds", local_ex_time ) + def generate_explanation_report(self): + try: + # Convert the global explanation data to a DataFrame + global_explanation_df = pd.DataFrame(self.global_explanation) + + self.formatted_global_explanation = ( + global_explanation_df / global_explanation_df.sum(axis=0) * 100 + ) + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {self.spec.datetime_column.name: ForecastOutputColumns.DATE}, + axis=1, + ) + ) + aggregate_local_explanations = pd.DataFrame() + for s_id, local_ex_df in self.local_explanation.items(): + local_ex_df_copy = local_ex_df.copy() + local_ex_df_copy["Series"] = s_id + aggregate_local_explanations = pd.concat( + [aggregate_local_explanations, local_ex_df_copy], axis=0 + ) + self.formatted_local_explanation = aggregate_local_explanations + + if not self.target_cat_col: + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {"Series 1": self.original_target_column}, + axis=1, + ) + ) + self.formatted_local_explanation.drop( + "Series", axis=1, inplace=True + ) + + # Create a markdown section for the global explainability + global_explanation_section = rc.Block( + rc.Heading("Global Explanation of Models", level=2), + rc.Text( + "The following tables provide the feature attribution for the global explainability." + ), + rc.DataTable(self.formatted_global_explanation, index=True), + ) + + blocks = [ + rc.DataTable( + local_ex_df + .select_dtypes(include="number") + .div( + local_ex_df.select_dtypes(include="number").abs().sum(axis=1), + axis=0, + ) * 100, + label=s_id if self.target_cat_col else None, + index=True, + ) + for s_id, local_ex_df in self.local_explanation.items() + ] + + local_explanation_section = rc.Block( + rc.Heading("Local Explanation of Models", level=2), + rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0], + ) + + return [ + global_explanation_section, + local_explanation_section, + ] + except Exception as e: + logger.warning(f"Failed to generate explanations Report with error: {e}.") + logger.debug(f"Full Traceback: {traceback.format_exc()}") + return [] + def local_explainer(self, kernel_explainer, series_id, datetime_col_name) -> None: """ Generate local explanations using a kernel explainer. diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py index 703940b42..d108abd5a 100644 --- a/ads/opctl/operator/lowcode/forecast/model/theta.py +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -11,19 +11,17 @@ from optuna.trial import TrialState from sktime.forecasting.base import ForecastingHorizon from sktime.forecasting.theta import ThetaForecaster -from sktime.performance_metrics.forecasting import mean_squared_error, \ - mean_absolute_percentage_error from sktime.split import ExpandingWindowSplitter from ads.opctl import logger +from ads.opctl.operator.lowcode.common.utils import find_seasonal_period_from_dataset, normalize_frequency from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig -from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, smape) +from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, _build_metrics_df) from .base_model import ForecastOperatorBaseModel from .forecast_datasets import ForecastDatasets, ForecastOutput from ..const import ( - SupportedModels, ForecastOutputColumns, DEFAULT_TRIALS, + SupportedModels, DEFAULT_TRIALS, ) -from ads.opctl.operator.lowcode.common.utils import find_seasonal_period_from_dataset logging.getLogger("report_creator").setLevel(logging.WARNING) @@ -70,13 +68,11 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A freq = self.datasets.get_datetime_frequency() if freq is not None: - if freq.startswith("W-"): - freq = "W" - data_i.index = data_i.index.to_period(freq) - + normalized_freq = normalize_frequency(freq) + data_i.index = data_i.index.to_period(normalized_freq) y = data_i[target] - sp, probable_sps = find_seasonal_period_from_dataset(y) + sp, probable_sps = find_seasonal_period_from_dataset(y) model_kwargs["sp"] = model_kwargs.get("sp") or sp if not sp or len(y) < 2 * model_kwargs["sp"]: @@ -93,6 +89,7 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A elif self.perform_tuning: model_kwargs = self.run_tuning(y, model_kwargs, probable_sps) + # Fit ThetaModel using params model = ThetaForecaster(initial_level=model_kwargs["initial_level"], deseasonalize=model_kwargs["deseasonalize"], deseasonalize_model=model_kwargs["deseasonalize_model"], @@ -170,14 +167,6 @@ def _build_model(self) -> pd.DataFrame: def run_tuning(self, y: pd.DataFrame, model_kwargs_i: Dict[str, Any], probable_sps: list[int]): - scoring = { - "mape": lambda y_true, y_pred: mean_absolute_percentage_error(y_true, y_pred), - "rmse": lambda y_true, y_pred: np.sqrt(mean_squared_error(y_true, y_pred)), - "mse": lambda y_true, y_pred: mean_squared_error(y_true, y_pred), - "smape": lambda y_true, y_pred: smape(y_true, y_pred) - } - score_fn = scoring.get(self.spec.metric.lower(), scoring["mape"]) - def objective(trial): initial_level = model_kwargs_i["initial_level"] sp = trial.suggest_categorical("sp", probable_sps) @@ -213,7 +202,16 @@ def objective(trial): y_test = y.iloc[test] if y_test.isna().any(): continue - scores.append(score_fn(y_test, y_pred)) + metrics_df = _build_metrics_df(y_test, y_pred, 0) + metrics_dict = { + k.lower(): v + for k, v in metrics_df[0].to_dict().items() + } + if self.spec.metric.lower() not in metrics_dict: + scores.append(metrics_dict["mape"]) + else: + scores.append(metrics_dict[self.spec.metric.lower()]) + return np.mean(scores) study = optuna.create_study(direction="minimize") @@ -225,7 +223,7 @@ def objective(trial): ] if not completed_trials: - logger.warning( + logger.debug( "Theta tuning produced no completed trials. " "Falling back to default parameters." ) @@ -233,6 +231,7 @@ def objective(trial): model_kwargs_i["deseasonalize_model"] = study.best_params["deseasonalize_model"] model_kwargs_i["deseasonalize"] = study.best_params["deseasonalize"] + model_kwargs_i["sp"] = study.best_params["sp"] return model_kwargs_i def _generate_report(self): @@ -313,66 +312,7 @@ def _generate_report(self): try: # If the key is present, call the "explain_model" method self.explain_model() - - # Convert the global explanation data to a DataFrame - global_explanation_df = pd.DataFrame(self.global_explanation) - - self.formatted_global_explanation = ( - global_explanation_df / global_explanation_df.sum(axis=0) * 100 - ) - self.formatted_global_explanation = ( - self.formatted_global_explanation.rename( - {self.spec.datetime_column.name: ForecastOutputColumns.DATE}, - axis=1, - ) - ) - aggregate_local_explanations = pd.DataFrame() - for s_id, local_ex_df in self.local_explanation.items(): - local_ex_df_copy = local_ex_df.copy() - local_ex_df_copy["Series"] = s_id - aggregate_local_explanations = pd.concat( - [aggregate_local_explanations, local_ex_df_copy], axis=0 - ) - self.formatted_local_explanation = aggregate_local_explanations - - if not self.target_cat_col: - self.formatted_global_explanation = ( - self.formatted_global_explanation.rename( - {"Series 1": self.original_target_column}, - axis=1, - ) - ) - self.formatted_local_explanation.drop( - "Series", axis=1, inplace=True - ) - - # Create a markdown section for the global explainability - global_explanation_section = rc.Block( - rc.Heading("Global Explanation of Models", level=2), - rc.Text( - "The following tables provide the feature attribution for the global explainability." - ), - rc.DataTable(self.formatted_global_explanation, index=True), - ) - - blocks = [ - rc.DataTable( - local_ex_df.div(local_ex_df.abs().sum(axis=1), axis=0) * 100, - label=s_id if self.target_cat_col else None, - index=True, - ) - for s_id, local_ex_df in self.local_explanation.items() - ] - local_explanation_section = rc.Block( - rc.Heading("Local Explanation of Models", level=2), - rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0], - ) - - # Append the global explanation text and section to the "all_sections" list - all_sections = all_sections + [ - global_explanation_section, - local_explanation_section, - ] + all_sections = all_sections + self.generate_explanation_report() except Exception as e: logger.warning(f"Failed to generate Explanations with error: {e}.") logger.debug(f"Full Traceback: {traceback.format_exc()}")