diff --git a/message_ix_models/tests/tools/costs/test_gdp_parity.py b/message_ix_models/tests/tools/costs/test_gdp_parity.py new file mode 100644 index 0000000000..3a40533f5a --- /dev/null +++ b/message_ix_models/tests/tools/costs/test_gdp_parity.py @@ -0,0 +1,75 @@ +import time + +import pandas as pd +import pandas.testing as pdt +import pytest + +from message_ix_models.tools.costs import Config +from message_ix_models.tools.costs.gdp import ( + adjust_cost_ratios_with_gdp, + adjust_cost_ratios_with_gdp_legacy, +) +from message_ix_models.tools.costs.regional_differentiation import ( + apply_regional_differentiation, +) + + +def assert_equal_result(legacy, refactored): + if isinstance(legacy, dict) and isinstance(refactored, dict): + # Ensure the dictionaries have the same keys + assert set(legacy.keys()) == set(refactored.keys()), ( + "Dictionary keys do not match" + ) + # Recursively compare each value in the dictionary + for key in legacy: + assert_equal_result(legacy[key], refactored[key]) + elif isinstance(legacy, pd.DataFrame) and isinstance(refactored, pd.DataFrame): + legacy = legacy.sort_index(axis=1) + refactored = refactored.sort_index(axis=1) + pdt.assert_frame_equal(legacy, refactored) + elif isinstance(legacy, pd.Series) and isinstance(refactored, pd.Series): + legacy = legacy.sort_index() + refactored = refactored.sort_index() + pdt.assert_series_equal(legacy, refactored) + else: + raise ValueError( + f"Type mismatch: legacy type {type(legacy)} vs " + f"refactored type {type(refactored)}" + ) + +#@pytest.mark.skip(reason="Skipping test_adjust_cost_ratios_with_gdp") +@pytest.mark.parametrize("module", ("energy", "materials", "cooling")) +def test_adjust_cost_ratios_with_gdp(test_context, module) -> None: + # Set parameters + test_context.model.regions = "R12" + + # Mostly defaults + config = Config(module=module, node="R12", scenario="SSP2") + + # Get regional differentiation + region_diff = apply_regional_differentiation(config) + n_iter = 5 + # Get adjusted cost ratios based on GDP per capita + start_time = time.time() + for _ in range(n_iter): + result_legacy = adjust_cost_ratios_with_gdp_legacy(region_diff, config) + end_time = time.time() + with open("time_taken_gdp.txt", "a") as f: + f.write( + f"Time taken for adjust_cost_ratios_with_gdp: " + f"{(end_time - start_time) / n_iter} seconds\n" + ) + + # Get adjusted cost ratios based on GDP per capita using vectorized approach + start_time = time.time() + for _ in range(n_iter): + result_vectorized = adjust_cost_ratios_with_gdp(region_diff, config) + end_time = time.time() + with open("time_taken_gdp.txt", "a") as f: + f.write( + f"Time taken for adjust_cost_ratios_with_gdp_vectorized:" + f"{(end_time - start_time) / n_iter} seconds\n" + ) + + # Assert that the results are equal + assert_equal_result(result_legacy, result_vectorized) diff --git a/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py b/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py new file mode 100644 index 0000000000..07e6ba64e8 --- /dev/null +++ b/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py @@ -0,0 +1,57 @@ +import time + +import numpy as np +import pandas as pd +import pandas.testing as pdt +import pytest + +from message_ix_models.tools.costs import Config +from message_ix_models.tools.costs.regional_differentiation import ( + apply_regional_differentiation, + get_weo_data, + get_weo_data_fast, +) + + +def assert_equal_result(legacy, refactored): + if isinstance(legacy, dict) and isinstance(refactored, dict): + # Ensure the dictionaries have the same keys + assert set(legacy.keys()) == set(refactored.keys()), ( + "Dictionary keys do not match" + ) + # Recursively compare each value in the dictionary + for key in legacy: + assert_equal_result(legacy[key], refactored[key]) + elif isinstance(legacy, pd.DataFrame) and isinstance(refactored, pd.DataFrame): + legacy = legacy.sort_index(axis=1) + refactored = refactored.sort_index(axis=1) + pdt.assert_frame_equal(legacy, refactored) + elif isinstance(legacy, pd.Series) and isinstance(refactored, pd.Series): + legacy = legacy.sort_index() + refactored = refactored.sort_index() + pdt.assert_series_equal(legacy, refactored) + else: + raise ValueError( + f"Type mismatch: legacy type {type(legacy)} vs refactored type {type(refactored)}" + ) + +#@pytest.mark.skip(reason="Skipping test_get_weo_data") +def test_get_weo_data() -> None: + n_iter = 5 + start_time = time.time() + for _ in range(n_iter): + result_legacy = get_weo_data() + end_time = time.time() + with open("weo_data_time.txt", "a") as f: + f.write(f"Time taken for legacy get_weo_data:" + f"{(end_time - start_time) / n_iter} seconds\n") + start_time = time.time() + for _ in range(n_iter): + result_fast = get_weo_data_fast() + end_time = time.time() + with open("weo_data_time.txt", "a") as f: + f.write(f"Time taken for fast get_weo_data:" + f"{(end_time - start_time) / n_iter} seconds\n") + + assert_equal_result(result_legacy, result_fast) + diff --git a/message_ix_models/tools/costs/gdp.py b/message_ix_models/tools/costs/gdp.py index 21c8c5dc2f..ec7d7df9e2 100644 --- a/message_ix_models/tools/costs/gdp.py +++ b/message_ix_models/tools/costs/gdp.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd from genno import KeySeq +from scipy.stats import linregress from message_ix_models import Context @@ -123,7 +124,7 @@ def merge(*dfs: pd.DataFrame) -> pd.DataFrame: return result -def adjust_cost_ratios_with_gdp(region_diff_df, config: Config): +def adjust_cost_ratios_with_gdp_legacy(region_diff_df, config: Config): """Calculate adjusted region-differentiated cost ratios. This function takes in a data frame with region-differentiated cost ratios and @@ -247,3 +248,149 @@ def _constrain_cost_ratio(df: pd.DataFrame, base_year): ] ] ) + + +def adjust_cost_ratios_with_gdp(region_diff_df, config: Config): + """Calculate adjusted region-differentiated cost ratios. + + This function takes in a data frame with region-differentiated cost ratios and + calculates adjusted region-differentiated cost ratios using GDP per capita data. + + Parameters + ---------- + region_diff_df : pandas.DataFrame + Output of :func:`apply_regional_differentiation`. + config : .Config + The function responds to, or passes on to other functions, the fields: + :attr:`~.Config.base_year`, + :attr:`~.Config.node`, + :attr:`~.Config.ref_region`, + :attr:`~.Config.scenario`, and + :attr:`~.Config.scenario_version`. + + Returns + ------- + pandas.DataFrame + DataFrame with columns: + - scenario_version: scenario version + - scenario: SSP scenario + - message_technology: message technology + - region: R11, R12, or R20 region + - year + - gdp_ratio_reg_to_reference: ratio of GDP per capita in respective region to + GDP per capita in reference region. + - reg_cost_ratio_adj: adjusted region-differentiated cost ratio + + Differences from the legacy function: + - Uses vectorized DataFrame operations to compute slope and intercept from base-year data, reducing reliance on iterative group processing. + - Merges base-year GDP values directly to compute and constrain the adjusted cost ratios. + - Eliminates the need for an explicit group-wise constraint function by applying clipping conditions directly on the merged data. + """ + # Import helper functions (they remain in use) + from .projections import _maybe_query_scenario, _maybe_query_scenario_version + + # Set region context for GDP extraction + context = Context.get_instance(-1) + context.model.regions = config.node + + # Retrieve and prepare GDP data (dropping totals, converting dtypes, filtering by y0, + # reassigning scenario_version values, and applying any scenario filters) + df_gdp = ( + process_raw_ssp_data(context, config) + .query("year >= @config.y0") + .drop(columns=["total_gdp", "total_population"]) + .assign( + scenario_version=lambda x: np.where( + x.scenario_version.str.contains("2013"), + "Previous (2013)", + "Review (2023)", + ) + ) + .astype({"year": int}) + .pipe(_maybe_query_scenario, config) + .pipe(_maybe_query_scenario_version, config) + ) + + # Ensure base_year exists in GDP data; otherwise choose the earliest year and warn. + base_year = config.base_year + if base_year not in df_gdp.year.unique(): + new_base_year = df_gdp.year.min() + log.warning(f"Use year={new_base_year} GDP data as proxy for {base_year}") + base_year = new_base_year + + # --- Step 1: Calculate slope and intercept using the base-year data --- + # 1a. Subset GDP data to the base year and drop the year column. + df_base = df_gdp.query("year == @base_year").drop("year", axis=1) + + # 1b. Merge the base-year GDP data with region_diff_df to get the base "reg_cost_ratio". + df_intermediate = df_base.merge(region_diff_df, on=["region"]) + + # 1c. Calculate the slope and intercept from the base-year values. + df_intermediate["slope"] = (df_intermediate["reg_cost_ratio"] - 1) / ( + df_intermediate["gdp_ratio_reg_to_reference"] - 1 + ) + df_intermediate["intercept"] = 1 - df_intermediate["slope"] + # Drop the GDP ratio from the base data as it will be re-added from the full data. + df_intermediate = df_intermediate.drop(columns=["gdp_ratio_reg_to_reference"]) + + # --- Step 2: Merge full GDP data and compute adjusted cost ratios --- + # Merge the intermediate (base-year derived) data with the full set of GDP data; + # this adds yearly "gdp_ratio_reg_to_reference" values to each record. + df_merged = df_intermediate.merge( + df_gdp, on=["scenario_version", "scenario", "region"], how="right" + ) + # Compute the adjusted cost ratio for all rows. + df_merged["reg_cost_ratio_adj"] = ( + df_merged["slope"] * df_merged["gdp_ratio_reg_to_reference"] + + df_merged["intercept"] + ) + # Fill any NaNs (e.g. for the reference region) with 1.0. + df_merged["reg_cost_ratio_adj"] = df_merged["reg_cost_ratio_adj"].fillna(1.0) + + # --- Step 3: Vectorize the constrain logic that was in _constrain_cost_ratio --- + # Instead of iterating per group, we extract the base-year values for each group. + base_values = ( + df_merged.query("year == @base_year") + .loc[ + :, + [ + "scenario_version", + "scenario", + "region", + "message_technology", + "gdp_ratio_reg_to_reference", + "reg_cost_ratio_adj", + ], + ] + .rename( + columns={ + "gdp_ratio_reg_to_reference": "base_gdp_ratio", + "reg_cost_ratio_adj": "base_reg_cost", + } + ) + ) + # Merge these base-year values back onto the main data. + df_merged = df_merged.merge( + base_values, + on=["scenario_version", "scenario", "region", "message_technology"], + how="left", + ) + # For groups where the base-year GDP ratio is less than 1 and the base-year cost ratio is greater than 1, + # clip the reg_cost_ratio_adj to the base-year cost ratio. + condition = (df_merged["base_gdp_ratio"] < 1) & (df_merged["base_reg_cost"] > 1) + df_merged.loc[condition, "reg_cost_ratio_adj"] = df_merged.loc[ + condition, "reg_cost_ratio_adj" + ].clip(upper=df_merged.loc[condition, "base_reg_cost"]) + + # --- Step 4: Select and return the final desired columns --- + return df_merged[ + [ + "scenario_version", + "scenario", + "message_technology", + "region", + "year", + "gdp_ratio_reg_to_reference", + "reg_cost_ratio_adj", + ] + ] diff --git a/message_ix_models/tools/costs/regional_differentiation.py b/message_ix_models/tools/costs/regional_differentiation.py index 8957475d96..bf2d8db769 100644 --- a/message_ix_models/tools/costs/regional_differentiation.py +++ b/message_ix_models/tools/costs/regional_differentiation.py @@ -31,6 +31,117 @@ def get_weo_region_map(regions: str) -> Mapping[str, str]: return {n.id: str(n.get_annotation(id="iea-weo-region").text) for n in nodes} +def get_weo_data_fast() -> pd.DataFrame: + """Read in raw WEO investment/capital costs and O&M costs data. + + Returns + ------- + pandas.DataFrame + DataFrame with columns: + + - cost_type: investment or fixed O&M cost + - weo_technology: WEO technology name + - weo_region: WEO region + - year: year + - value: cost value + + Changes from get_weo_data: + This function opens the Excel file once using pd.ExcelFile, + reusing the file handle to read data from multiple sheets. + Reducing IO overhead from repeated file access. + """ + + # Dict of all technologies, their Excel sheet name, and the starting row + DICT_TECH_ROWS = { + "bioenergy_ccus": ["Renewables", 99], + "bioenergy_cofiring": ["Renewables", 79], + "bioenergy_large": ["Renewables", 69], + "bioenergy_medium_chp": ["Renewables", 89], + "ccgt": ["Gas", 9], + "ccgt_ccs": ["Fossil fuels equipped with CCUS", 29], + "ccgt_chp": ["Gas", 29], + "csp": ["Renewables", 109], + "fuel_cell": ["Gas", 39], + "gas_turbine": ["Gas", 19], + "geothermal": ["Renewables", 119], + "hydropower_large": ["Renewables", 49], + "hydropower_small": ["Renewables", 59], + "igcc": ["Coal", 39], + "igcc_ccs": ["Fossil fuels equipped with CCUS", 19], + "marine": ["Renewables", 129], + "nuclear": ["Nuclear", 9], + "pulverized_coal_ccs": ["Fossil fuels equipped with CCUS", 9], + "solarpv_buildings": ["Renewables", 19], + "solarpv_large": ["Renewables", 9], + "steam_coal_subcritical": ["Coal", 9], + "steam_coal_supercritical": ["Coal", 19], + "steam_coal_ultrasupercritical": ["Coal", 29], + "wind_offshore": ["Renewables", 39], + "wind_onshore": ["Renewables", 29], + } + + # Dict of cost types to read and columns specification + DICT_COST_COLS = {"inv_cost": "A,B:D", "fix_cost": "A,F:H"} + + # Set the file path for the raw IEA WEO cost data Excel file + file_path = package_data_path( + "iea", "WEO_2023_PG_Assumptions_STEPSandNZE_Scenario.xlsx" + ) + + # Retrieve conversion factor from 2022 USD to 2005 USD + conversion_factor = registry("1.0 USD_2022").to("USD_2005").magnitude + + dfs_cost = [] + # Open the Excel file once for all reads + with pd.ExcelFile(file_path) as xls: + # Loop over each technology and cost type combination + for tech_key, cost_key in product(DICT_TECH_ROWS, DICT_COST_COLS): + sheet_name, skiprow_val = DICT_TECH_ROWS[tech_key] + cols_range = DICT_COST_COLS[cost_key] + # Read data with na_values so "n.a." becomes NaN + df = ( + pd.read_excel( + xls, + sheet_name=sheet_name, + header=None, + skiprows=skiprow_val, + nrows=9, + usecols=cols_range, + na_values=["n.a."], + ) + .set_axis(["weo_region", "2022", "2030", "2050"], axis=1) + .melt(id_vars=["weo_region"], var_name="year", value_name="value") + .assign( + weo_technology=tech_key, + cost_type=cost_key, + units="usd_per_kw", + ) + .reindex( + [ + "cost_type", + "weo_technology", + "weo_region", + "year", + "units", + "value", + ], + axis=1, + ) + .assign(value=lambda x: x.value * conversion_factor) + ) + dfs_cost.append(df) + + # By setting ignore_index=True we create a clean, continuous RangeIndex + all_cost_df = pd.concat(dfs_cost, ignore_index=True) + + # Replace missing values with the median cost per technology and cost type. + all_cost_df["value"] = all_cost_df.groupby(["weo_technology", "cost_type"])[ + "value" + ].transform(lambda x: x.fillna(x.median())) + + return all_cost_df + + def get_weo_data() -> pd.DataFrame: """Read in raw WEO investment/capital costs and O&M costs data. @@ -445,7 +556,9 @@ def adjust_technology_mapping( return module_all -def get_weo_regional_differentiation(config: "Config") -> pd.DataFrame: +def get_weo_regional_differentiation( + config: "Config", flag_fast: bool = True, +) -> pd.DataFrame: """Apply WEO regional differentiation. 1. Retrieve WEO data using :func:`.get_weo_data`. @@ -460,6 +573,8 @@ def get_weo_regional_differentiation(config: "Config") -> pd.DataFrame: :attr:`~.Config.base_year`, :attr:`~.Config.node`, and :attr:`~.Config.ref_region`. + flag_fast : bool + If True calls :func:`.get_weo_data_fast`, otherwise calls :func:`.get_weo_data`. Returns ------- @@ -473,7 +588,10 @@ def get_weo_regional_differentiation(config: "Config") -> pd.DataFrame: """ # Grab WEO data and keep only investment costs - df_weo = get_weo_data() + if flag_fast: + df_weo = get_weo_data_fast() # Using the faster version. + else: + df_weo = get_weo_data() # Even if config.base_year is greater than 2022, use 2022 WEO values sel_year = str(2022) @@ -638,7 +756,10 @@ def get_intratec_regional_differentiation(node: str, ref_region: str) -> pd.Data return df_reg_ratios -def apply_regional_differentiation(config: "Config") -> pd.DataFrame: +def apply_regional_differentiation( + config: "Config", + flag_fast: bool = True, +) -> pd.DataFrame: """Apply regional differentiation depending on mapping source. 1. Retrieve an adjusted technology mapping from :func:`.adjust_technology_mapping`. @@ -657,7 +778,10 @@ def apply_regional_differentiation(config: "Config") -> pd.DataFrame: :attr:`~.Config.module`, :attr:`~.Config.node`, and :attr:`~.Config.ref_region`. - + flag_fast : bool + If True is passed to :func:`.get_weo_regional_differentiation`, + then :func:`.get_weo_data_fast` is called, + otherwise :func:`.get_weo_data` is called. Returns ------- pandas.DataFrame @@ -675,7 +799,8 @@ def apply_regional_differentiation(config: "Config") -> pd.DataFrame: """ df_map = adjust_technology_mapping(config.module) assert config.ref_region is not None - df_weo = get_weo_regional_differentiation(config) + + df_weo = get_weo_regional_differentiation(config, flag_fast) df_intratec = get_intratec_regional_differentiation(config.node, config.ref_region) # Get mapping of technologies