Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
29249f5
first bit of work
peterdudfield Nov 14, 2025
cc50553
add to readme
peterdudfield Nov 16, 2025
9803a66
add to todo list
peterdudfield Nov 16, 2025
4779ec0
add todo
peterdudfield Nov 16, 2025
b159482
stream forecast more effeciently
peterdudfield Nov 17, 2025
cbb10ea
tidy up observations
peterdudfield Nov 17, 2025
46c7792
tidy up time window
peterdudfield Nov 17, 2025
fb83cc4
dp 0.13.1
peterdudfield Nov 17, 2025
5ee30e2
Us Watts not %
peterdudfield Nov 17, 2025
ce8497b
load 30 days of data
peterdudfield Nov 17, 2025
834581c
add metrics table
peterdudfield Nov 17, 2025
65bf1e2
add data caching
peterdudfield Nov 17, 2025
963833d
move back to 7 days
peterdudfield Nov 17, 2025
72ca134
add caching
peterdudfield Nov 18, 2025
bda0da3
move data to new file
peterdudfield Nov 18, 2025
75ab761
add colours to main plot
peterdudfield Nov 18, 2025
dc1b91f
update import
peterdudfield Nov 18, 2025
a111be7
scale by units and add colours
peterdudfield Nov 18, 2025
5a04423
add probablistic
peterdudfield Nov 18, 2025
d538064
add forecast type options, add daily MAE options
peterdudfield Nov 18, 2025
7791489
add daily ME
peterdudfield Nov 18, 2025
095d495
add two todos
peterdudfield Nov 19, 2025
d10135f
solve for different forecast versions
peterdudfield Nov 19, 2025
2a57b66
remove from todo
peterdudfield Nov 19, 2025
85dd00a
add legendgroup
peterdudfield Nov 20, 2025
6d0b280
filter on pvlive_day_after
peterdudfield Nov 20, 2025
fa6af55
add todo bug not releasing cache
peterdudfield Nov 20, 2025
d3bdaf6
refactor into multiple files
peterdudfield Nov 20, 2025
2d6ad59
increase forecast window to 30 days
peterdudfield Nov 21, 2025
ccf2c87
add init files
peterdudfield Nov 21, 2025
144ccf4
fix import
peterdudfield Nov 21, 2025
10bbf6e
add more todos
peterdudfield Nov 24, 2025
a0faf6b
add TODOs
peterdudfield Nov 24, 2025
1886cb5
use MW by default on UK-National
peterdudfield Nov 24, 2025
5cf060c
add gsp id to name
peterdudfield Nov 24, 2025
c427a77
reduce to 7 days
peterdudfield Nov 24, 2025
b0dd9ae
fix for MAE plot
peterdudfield Nov 24, 2025
e5b137a
have option to show sem
peterdudfield Nov 24, 2025
c83446e
forecast vs actual
peterdudfield Nov 24, 2025
1c0a0b3
remove duplicate in daily MAE plot
peterdudfield Nov 24, 2025
c47f8b1
minus 1 sec, so we dont get obsevervations on the next day
peterdudfield Nov 24, 2025
6ceaad3
tidy
peterdudfield Nov 24, 2025
91f60aa
option for aligning t0s
peterdudfield Nov 24, 2025
4b30bdb
MAE plot link to 0
peterdudfield Nov 24, 2025
9603b9c
try to sort cache issue out
peterdudfield Nov 25, 2025
b861b7d
add select t0s from forecast
peterdudfield Nov 25, 2025
839cd21
tidy
peterdudfield Nov 25, 2025
86c6f8c
add todo
peterdudfield Nov 26, 2025
44b08ba
cache more functions
peterdudfield Nov 26, 2025
ab92c25
ruff
peterdudfield Nov 26, 2025
0f4058e
Feedback, add details
peterdudfield Nov 26, 2025
3af7531
robustness against no forecast data
peterdudfield Nov 27, 2025
4a0ff33
release cache data every 5 mins
peterdudfield Nov 28, 2025
6978935
PR comments
peterdudfield Dec 1, 2025
2b2da5b
add option for strict forecast filtering
peterdudfield Dec 1, 2025
b1bee0b
tidy
peterdudfield Dec 1, 2025
50e0ae2
Pr commens, use agg better
peterdudfield Dec 1, 2025
5232456
use p10_fraction, rather than p10
peterdudfield Dec 1, 2025
612ebbf
add _fraction to column from other_statistics_fractions column
peterdudfield Dec 1, 2025
0726b10
lint
peterdudfield Dec 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ To run the app locally, you'll need to connect it to the `forecast development d

OCF team members can connect to the `forecast development database` using [these Notion instructions](https://www.notion.so/openclimatefix/Connecting-to-AWS-RDS-bf35b3fbd61f40df9c974c240e042354). Add `DB_URL= (db_url from notion documents)` to a `secrets.toml` file. Follow the instructions in the Notion document to connect to the database v.

To connect to the database platform, use `DATA_PLATFORM_HOST` and `DATA_PLATFORM_PORT`.

Run app:

```shell
Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies = [
"plotly==5.24.1",
"psycopg2-binary==2.9.10",
"SQLAlchemy==2.0.36",
"streamlit==1.46.1",
"streamlit==1.51.0",
"testcontainers==4.9.0",
"uvicorn==0.34.0",
"geopandas==1.0.1",
Expand All @@ -35,6 +35,8 @@ dependencies = [
"torch @ https://download.pytorch.org/whl/cpu/torch-2.3.1%2Bcpu-cp312-cp312-linux_x86_64.whl ; platform_system == 'Linux' and platform_machine == 'x86_64'",
"torch @ https://download.pytorch.org/whl/cpu/torch-2.3.1-cp312-none-macosx_11_0_arm64.whl ; platform_system == 'Darwin' and platform_machine == 'arm64'",
"matplotlib>=3.8,<4.0",
"dp-sdk",
"aiocache",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -66,6 +68,9 @@ dev-dependencies = [
index-url = "https://download.pytorch.org/whl/cpu"
extra-index-url = ["https://pypi.org/simple"]

[tool.uv.sources]
dp-sdk = { url = "https://github.com/openclimatefix/data-platform/releases/download/v0.13.2/dp_sdk-0.13.2-py3-none-any.whl" }

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
Expand Down
Empty file added src/dataplatform/__init__.py
Empty file.
Empty file.
27 changes: 27 additions & 0 deletions src/dataplatform/forecast/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Cache utilities for the forecast module."""

from datetime import UTC, datetime, timedelta

from dp_sdk.ocf import dp

from dataplatform.forecast.constant import cache_seconds


def key_builder_remove_client(func: callable, *args: list, **kwargs: dict) -> str:
"""Custom key builder that ignores the client argument for caching purposes."""
key = f"{func.__name__}:"
for arg in args:
if not isinstance(arg, dp.DataPlatformDataServiceStub):
key += f"{arg}-"

for k, v in kwargs.items():
key += f"{k}={v}-"

# get the time now to the closest 5 minutes, this forces a new cache every 5 minutes
current_time = datetime.now(UTC).replace(second=0, microsecond=0)
current_time = current_time - timedelta(
minutes=current_time.minute % (int(cache_seconds / 60)),
)
key += f"time={current_time}-"

return key
24 changes: 24 additions & 0 deletions src/dataplatform/forecast/constant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Constants for the forecast module."""

colours = [
"#FFD480",
"#FF8F73",
"#4675C1",
"#65B0C9",
"#58B0A9",
"#FAA056",
"#306BFF",
"#FF4901",
"#B701FF",
"#17E58F",
]

metrics = {
"MAE": "MAE is absolute mean error, average(abs(forecast-actual))",
"ME": "ME is mean (bias) error, average((forecast-actual))",
}

cache_seconds = 300 # 5 minutes

# This is used for a specific case for the UK National and GSP
observer_names = ["pvlive_in_day", "pvlive_day_after"]
243 changes: 243 additions & 0 deletions src/dataplatform/forecast/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
"""Functions to get forecast and observation data from Data Platform."""

import time
from datetime import datetime, timedelta

import betterproto
import pandas as pd
from aiocache import Cache, cached
from dp_sdk.ocf import dp

from dataplatform.forecast.cache import key_builder_remove_client
from dataplatform.forecast.constant import cache_seconds, observer_names


async def get_forecast_data(
client: dp.DataPlatformDataServiceStub,
location: dp.ListLocationsResponseLocationSummary,
start_date: datetime,
end_date: datetime,
selected_forecasters: list[dp.Forecaster],
) -> pd.DataFrame:
"""Get forecast data for the given location and time window."""
all_data_df = []

for forecaster in selected_forecasters:
forecaster_data_df = await get_forecast_data_one_forecaster(
client,
location,
start_date,
end_date,
forecaster,
)
if forecaster_data_df is not None:
all_data_df.append(forecaster_data_df)

all_data_df = pd.concat(all_data_df, ignore_index=True)

all_data_df["effective_capacity_watts"] = all_data_df["effective_capacity_watts"].astype(float)

# get watt value
all_data_df["p50_watts"] = all_data_df["p50_fraction"] * all_data_df["effective_capacity_watts"]

for col in ["p10", "p25", "p75", "p90"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you not put the p50 in here and avoid the duplicated lines above?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I see the the column name is p50_fraction but the other quantiles don't have the fraction suffix, but I'm confused why this should be different for the different since it looks like the other quantiles are also fractions since they are multiplied by the capacity here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, this needs to be updated in here and here, to udpate that. Ill make some github issues for that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

col_fraction = f"{col}_fraction"
if col_fraction in all_data_df.columns:
all_data_df[f"{col}_watts"] = (
all_data_df[col_fraction] * all_data_df["effective_capacity_watts"]
)

return all_data_df


@cached(ttl=cache_seconds, cache=Cache.MEMORY, key_builder=key_builder_remove_client)
async def get_forecast_data_one_forecaster(
client: dp,
location: dp.ListLocationsResponseLocationSummary,
start_date: datetime,
end_date: datetime,
selected_forecaster: dp.Forecaster,
) -> pd.DataFrame | None:
"""Get forecast data for one forecaster for the given location and time window."""
all_data_list_dict = []

# Grab all the data, in chunks of 30 days to avoid too large requests
temp_start_date = start_date
while temp_start_date <= end_date:
temp_end_date = min(temp_start_date + timedelta(days=30), end_date)

# fetch data
stream_forecast_data_request = dp.StreamForecastDataRequest(
location_uuid=location.location_uuid,
energy_source=dp.EnergySource.SOLAR,
time_window=dp.TimeWindow(
start_timestamp_utc=temp_start_date,
end_timestamp_utc=temp_end_date,
),
forecasters=[selected_forecaster],
)
forecasts = []
async for chunk in client.stream_forecast_data(stream_forecast_data_request):
forecasts.append(
chunk.to_dict(include_default_values=True, casing=betterproto.Casing.SNAKE),
)

if len(forecasts) > 0:
all_data_list_dict.extend(forecasts)

temp_start_date = temp_start_date + timedelta(days=30)

all_data_df = pd.DataFrame.from_dict(all_data_list_dict)
if len(all_data_df) == 0:
return None

# get plevels into columns and rename them 'fraction
columns_before_expand = set(all_data_df.columns)
all_data_df = all_data_df.pipe(
lambda df: df.join(pd.json_normalize(df["other_statistics_fractions"])),
).drop("other_statistics_fractions", axis=1)
new_columns = set(all_data_df.columns) - columns_before_expand
if len(new_columns) > 0:
all_data_df = all_data_df.rename(columns={col: f"{col}_fraction" for col in new_columns})

# create column forecaster_name, its forecaster_fullname with version removed
all_data_df["forecaster_name"] = all_data_df["forecaster_fullname"].apply(
lambda x: x.rsplit(":", 1)[0], # split from right, max 1 split
)

return all_data_df


@cached(ttl=cache_seconds, cache=Cache.MEMORY, key_builder=key_builder_remove_client)
async def get_all_observations(
client: dp.DataPlatformDataServiceStub,
location: dp.ListLocationsResponseLocationSummary,
start_date: datetime,
end_date: datetime,
) -> pd.DataFrame:
"""Get all observations for the given location and time window."""
all_observations_df = []

for observer_name in observer_names:
# Get all the observations for this observer_name, in chunks of 7 days
observation_one_df = []
temp_start_date = start_date
while temp_start_date <= end_date:
temp_end_date = min(temp_start_date + timedelta(days=7), end_date)

get_observations_request = dp.GetObservationsAsTimeseriesRequest(
observer_name=observer_name,
location_uuid=location.location_uuid,
energy_source=dp.EnergySource.SOLAR,
time_window=dp.TimeWindow(temp_start_date, temp_end_date),
)
get_observations_response = await client.get_observations_as_timeseries(
get_observations_request,
)

observations = []
for chunk in get_observations_response.values:
observations.append(
chunk.to_dict(include_default_values=True, casing=betterproto.Casing.SNAKE),
)

observation_one_df.append(pd.DataFrame.from_dict(observations))

temp_start_date = temp_start_date + timedelta(days=7)

observation_one_df = pd.concat(observation_one_df, ignore_index=True)
observation_one_df = observation_one_df.sort_values(by="timestamp_utc")
observation_one_df["observer_name"] = observer_name

all_observations_df.append(observation_one_df)

all_observations_df = pd.concat(all_observations_df, ignore_index=True)

all_observations_df["effective_capacity_watts"] = all_observations_df[
"effective_capacity_watts"
].astype(float)

all_observations_df["value_watts"] = (
all_observations_df["value_fraction"] * all_observations_df["effective_capacity_watts"]
)
all_observations_df["timestamp_utc"] = pd.to_datetime(all_observations_df["timestamp_utc"])

return all_observations_df


async def get_all_data(
client: dp.DataPlatformDataServiceStub,
selected_location: dp.ListLocationsResponseLocationSummary,
start_date: datetime,
end_date: datetime,
selected_forecasters: list[dp.Forecaster],
) -> dict:
"""Get all forecast and observation data, and merge them."""
# get generation data
time_start = time.time()
all_observations_df = await get_all_observations(
client,
selected_location,
start_date,
end_date,
)
observation_seconds = time.time() - time_start

# get forcast all data
time_start = time.time()
all_forecast_data_df = await get_forecast_data(
client,
selected_location,
start_date,
end_date,
selected_forecasters,
)
forecast_seconds = time.time() - time_start

# If the observation data includes pvlive_day_after and pvlive_in_day,
# then lets just take pvlive_day_after
one_observations_df = all_observations_df.copy()
if "pvlive_day_after" in all_observations_df["observer_name"].values:
one_observations_df = all_observations_df[
all_observations_df["observer_name"] == "pvlive_day_after"
]

# make target_timestamp_utc
all_forecast_data_df["init_timestamp"] = pd.to_datetime(all_forecast_data_df["init_timestamp"])
all_forecast_data_df["target_timestamp_utc"] = all_forecast_data_df[
"init_timestamp"
] + pd.to_timedelta(all_forecast_data_df["horizon_mins"], unit="m")

# take the foecast data, and group by horizonMins, forecasterFullName
# calculate mean absolute error between p50Fraction and observations valueFraction
merged_df = pd.merge(
all_forecast_data_df,
one_observations_df,
left_on=["target_timestamp_utc"],
right_on=["timestamp_utc"],
how="inner",
suffixes=("_forecast", "_observation"),
)

# error and absolute error
merged_df["error"] = merged_df["p50_watts"] - merged_df["value_watts"]
merged_df["absolute_error"] = merged_df["error"].abs()

return {
"merged_df": merged_df,
"all_forecast_data_df": all_forecast_data_df,
"all_observations_df": all_observations_df,
"forecast_seconds": forecast_seconds,
"observation_seconds": observation_seconds,
}


def align_t0(merged_df: pd.DataFrame) -> pd.DataFrame:
"""Align t0 forecasts for different forecasters."""
# number of unique forecasters
num_forecasters = merged_df["forecaster_name"].nunique()
# Count number of forecasters that have each t0 time
counts = merged_df.groupby("init_timestamp")["forecaster_name"].nunique()
# Filter to just those t0s that all forecasters have
common_t0s = counts[counts == num_forecasters].index
return merged_df[merged_df["init_timestamp"].isin(common_t0s)]
Loading