Skip to content

Speed up cost calculation tools in gdp.py and regional_differentiation.py #334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions message_ix_models/tests/tools/costs/test_gdp_parity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import time

import pandas as pd
import pandas.testing as pdt
import pytest

from message_ix_models.tools.costs import Config
from message_ix_models.tools.costs.gdp import (
adjust_cost_ratios_with_gdp,
adjust_cost_ratios_with_gdp_legacy,
)
from message_ix_models.tools.costs.regional_differentiation import (
apply_regional_differentiation,
)


def assert_equal_result(legacy, refactored):
if isinstance(legacy, dict) and isinstance(refactored, dict):
# Ensure the dictionaries have the same keys
assert set(legacy.keys()) == set(refactored.keys()), (
"Dictionary keys do not match"
)
# Recursively compare each value in the dictionary
for key in legacy:
assert_equal_result(legacy[key], refactored[key])
elif isinstance(legacy, pd.DataFrame) and isinstance(refactored, pd.DataFrame):
legacy = legacy.sort_index(axis=1)
refactored = refactored.sort_index(axis=1)
pdt.assert_frame_equal(legacy, refactored)
elif isinstance(legacy, pd.Series) and isinstance(refactored, pd.Series):
legacy = legacy.sort_index()
refactored = refactored.sort_index()
pdt.assert_series_equal(legacy, refactored)
else:
raise ValueError(
f"Type mismatch: legacy type {type(legacy)} vs "
f"refactored type {type(refactored)}"
)

#@pytest.mark.skip(reason="Skipping test_adjust_cost_ratios_with_gdp")
@pytest.mark.parametrize("module", ("energy", "materials", "cooling"))
def test_adjust_cost_ratios_with_gdp(test_context, module) -> None:
# Set parameters
test_context.model.regions = "R12"

# Mostly defaults
config = Config(module=module, node="R12", scenario="SSP2")

# Get regional differentiation
region_diff = apply_regional_differentiation(config)
n_iter = 5
# Get adjusted cost ratios based on GDP per capita
start_time = time.time()
for _ in range(n_iter):
result_legacy = adjust_cost_ratios_with_gdp_legacy(region_diff, config)
end_time = time.time()
with open("time_taken_gdp.txt", "a") as f:
f.write(
f"Time taken for adjust_cost_ratios_with_gdp: "
f"{(end_time - start_time) / n_iter} seconds\n"
)

# Get adjusted cost ratios based on GDP per capita using vectorized approach
start_time = time.time()
for _ in range(n_iter):
result_vectorized = adjust_cost_ratios_with_gdp(region_diff, config)
end_time = time.time()
with open("time_taken_gdp.txt", "a") as f:
f.write(
f"Time taken for adjust_cost_ratios_with_gdp_vectorized:"
f"{(end_time - start_time) / n_iter} seconds\n"
)

# Assert that the results are equal
assert_equal_result(result_legacy, result_vectorized)
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import time

import numpy as np
import pandas as pd
import pandas.testing as pdt
import pytest

from message_ix_models.tools.costs import Config
from message_ix_models.tools.costs.regional_differentiation import (
apply_regional_differentiation,
get_weo_data,
get_weo_data_fast,
)


def assert_equal_result(legacy, refactored):
if isinstance(legacy, dict) and isinstance(refactored, dict):
# Ensure the dictionaries have the same keys
assert set(legacy.keys()) == set(refactored.keys()), (
"Dictionary keys do not match"
)
# Recursively compare each value in the dictionary
for key in legacy:
assert_equal_result(legacy[key], refactored[key])
elif isinstance(legacy, pd.DataFrame) and isinstance(refactored, pd.DataFrame):
legacy = legacy.sort_index(axis=1)
refactored = refactored.sort_index(axis=1)
pdt.assert_frame_equal(legacy, refactored)
elif isinstance(legacy, pd.Series) and isinstance(refactored, pd.Series):
legacy = legacy.sort_index()
refactored = refactored.sort_index()
pdt.assert_series_equal(legacy, refactored)
else:
raise ValueError(
f"Type mismatch: legacy type {type(legacy)} vs refactored type {type(refactored)}"
)

#@pytest.mark.skip(reason="Skipping test_get_weo_data")
def test_get_weo_data() -> None:
n_iter = 5
start_time = time.time()
for _ in range(n_iter):
result_legacy = get_weo_data()
end_time = time.time()
with open("weo_data_time.txt", "a") as f:
f.write(f"Time taken for legacy get_weo_data:"
f"{(end_time - start_time) / n_iter} seconds\n")
start_time = time.time()
for _ in range(n_iter):
result_fast = get_weo_data_fast()
end_time = time.time()
with open("weo_data_time.txt", "a") as f:
f.write(f"Time taken for fast get_weo_data:"
f"{(end_time - start_time) / n_iter} seconds\n")

assert_equal_result(result_legacy, result_fast)

149 changes: 148 additions & 1 deletion message_ix_models/tools/costs/gdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
from genno import KeySeq
from scipy.stats import linregress

from message_ix_models import Context

Expand Down Expand Up @@ -123,7 +124,7 @@ def merge(*dfs: pd.DataFrame) -> pd.DataFrame:
return result


def adjust_cost_ratios_with_gdp(region_diff_df, config: Config):
def adjust_cost_ratios_with_gdp_legacy(region_diff_df, config: Config):
"""Calculate adjusted region-differentiated cost ratios.

This function takes in a data frame with region-differentiated cost ratios and
Expand Down Expand Up @@ -247,3 +248,149 @@ def _constrain_cost_ratio(df: pd.DataFrame, base_year):
]
]
)


def adjust_cost_ratios_with_gdp(region_diff_df, config: Config):
"""Calculate adjusted region-differentiated cost ratios.

This function takes in a data frame with region-differentiated cost ratios and
calculates adjusted region-differentiated cost ratios using GDP per capita data.

Parameters
----------
region_diff_df : pandas.DataFrame
Output of :func:`apply_regional_differentiation`.
config : .Config
The function responds to, or passes on to other functions, the fields:
:attr:`~.Config.base_year`,
:attr:`~.Config.node`,
:attr:`~.Config.ref_region`,
:attr:`~.Config.scenario`, and
:attr:`~.Config.scenario_version`.

Returns
-------
pandas.DataFrame
DataFrame with columns:
- scenario_version: scenario version
- scenario: SSP scenario
- message_technology: message technology
- region: R11, R12, or R20 region
- year
- gdp_ratio_reg_to_reference: ratio of GDP per capita in respective region to
GDP per capita in reference region.
- reg_cost_ratio_adj: adjusted region-differentiated cost ratio

Differences from the legacy function:
- Uses vectorized DataFrame operations to compute slope and intercept from base-year data, reducing reliance on iterative group processing.
- Merges base-year GDP values directly to compute and constrain the adjusted cost ratios.
- Eliminates the need for an explicit group-wise constraint function by applying clipping conditions directly on the merged data.
"""
# Import helper functions (they remain in use)
from .projections import _maybe_query_scenario, _maybe_query_scenario_version

# Set region context for GDP extraction
context = Context.get_instance(-1)
context.model.regions = config.node

# Retrieve and prepare GDP data (dropping totals, converting dtypes, filtering by y0,
# reassigning scenario_version values, and applying any scenario filters)
df_gdp = (
process_raw_ssp_data(context, config)
.query("year >= @config.y0")
.drop(columns=["total_gdp", "total_population"])
.assign(
scenario_version=lambda x: np.where(
x.scenario_version.str.contains("2013"),
"Previous (2013)",
"Review (2023)",
)
)
.astype({"year": int})
.pipe(_maybe_query_scenario, config)
.pipe(_maybe_query_scenario_version, config)
)

# Ensure base_year exists in GDP data; otherwise choose the earliest year and warn.
base_year = config.base_year
if base_year not in df_gdp.year.unique():
new_base_year = df_gdp.year.min()
log.warning(f"Use year={new_base_year} GDP data as proxy for {base_year}")
base_year = new_base_year

# --- Step 1: Calculate slope and intercept using the base-year data ---
# 1a. Subset GDP data to the base year and drop the year column.
df_base = df_gdp.query("year == @base_year").drop("year", axis=1)

# 1b. Merge the base-year GDP data with region_diff_df to get the base "reg_cost_ratio".
df_intermediate = df_base.merge(region_diff_df, on=["region"])

# 1c. Calculate the slope and intercept from the base-year values.
df_intermediate["slope"] = (df_intermediate["reg_cost_ratio"] - 1) / (
df_intermediate["gdp_ratio_reg_to_reference"] - 1
)
df_intermediate["intercept"] = 1 - df_intermediate["slope"]
# Drop the GDP ratio from the base data as it will be re-added from the full data.
df_intermediate = df_intermediate.drop(columns=["gdp_ratio_reg_to_reference"])

# --- Step 2: Merge full GDP data and compute adjusted cost ratios ---
# Merge the intermediate (base-year derived) data with the full set of GDP data;
# this adds yearly "gdp_ratio_reg_to_reference" values to each record.
df_merged = df_intermediate.merge(
df_gdp, on=["scenario_version", "scenario", "region"], how="right"
)
# Compute the adjusted cost ratio for all rows.
df_merged["reg_cost_ratio_adj"] = (
df_merged["slope"] * df_merged["gdp_ratio_reg_to_reference"]
+ df_merged["intercept"]
)
# Fill any NaNs (e.g. for the reference region) with 1.0.
df_merged["reg_cost_ratio_adj"] = df_merged["reg_cost_ratio_adj"].fillna(1.0)

# --- Step 3: Vectorize the constrain logic that was in _constrain_cost_ratio ---
# Instead of iterating per group, we extract the base-year values for each group.
base_values = (
df_merged.query("year == @base_year")
.loc[
:,
[
"scenario_version",
"scenario",
"region",
"message_technology",
"gdp_ratio_reg_to_reference",
"reg_cost_ratio_adj",
],
]
.rename(
columns={
"gdp_ratio_reg_to_reference": "base_gdp_ratio",
"reg_cost_ratio_adj": "base_reg_cost",
}
)
)
# Merge these base-year values back onto the main data.
df_merged = df_merged.merge(
base_values,
on=["scenario_version", "scenario", "region", "message_technology"],
how="left",
)
# For groups where the base-year GDP ratio is less than 1 and the base-year cost ratio is greater than 1,
# clip the reg_cost_ratio_adj to the base-year cost ratio.
condition = (df_merged["base_gdp_ratio"] < 1) & (df_merged["base_reg_cost"] > 1)
df_merged.loc[condition, "reg_cost_ratio_adj"] = df_merged.loc[
condition, "reg_cost_ratio_adj"
].clip(upper=df_merged.loc[condition, "base_reg_cost"])

# --- Step 4: Select and return the final desired columns ---
return df_merged[
[
"scenario_version",
"scenario",
"message_technology",
"region",
"year",
"gdp_ratio_reg_to_reference",
"reg_cost_ratio_adj",
]
]
Loading
Loading