Skip to content

Commit

Permalink
✨ Transform 1-minute data into any OHLC timeframe (#10)
Browse files Browse the repository at this point in the history
* ✨ Transform OHLC data

* 📖 Add more examples

* 🎨 Modify warning log

* 🎨 Set default log level to info

* 🎨 Improve error handling

* 🎨 Validate timeframe correctly

* 📦 Add larger test dataset

* ✅ 100% test coverage for transform module

* 🎨

* 🔖 0.3.0

* 🎨 Modify logs

* 🔧 Ignore ruff line-too-long rule

* 🎨

* 🎨 Modify error / warning note

* ✨ Use chunk-based method when it's faster than rolling aggregation

* 🧪 Experimental script for rolling vs chunk runtime comparison
  • Loading branch information
ff137 authored Feb 25, 2025
1 parent 7b64e95 commit 0473507
Show file tree
Hide file tree
Showing 11 changed files with 1,971 additions and 11 deletions.
25 changes: 23 additions & 2 deletions examples/basic_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,27 @@

import ohlc_toolkit

df = ohlc_toolkit.read_ohlc_csv("data/btcusd_bitstamp_1min_latest.csv", timeframe="1m")
df_1min = ohlc_toolkit.read_ohlc_csv(
"data/btcusd_bitstamp_1min_latest.csv", timeframe="1m"
)

print(df)
print("Input 1-minute OHLC data:\n")
print(df_1min)


df_5min = ohlc_toolkit.transform_ohlc(df_1min, timeframe=5) # int timeframe supported

print("Transformed 5-minute OHLC data (updated every minute):\n")
print(df_5min)


df_1h = ohlc_toolkit.transform_ohlc(df_1min, timeframe="1h", step_size_minutes=10)

print("Transformed 1-hour OHLC data (updated every 10 minutes):\n")
print(df_1h)


df_arb = ohlc_toolkit.transform_ohlc(df_1min, timeframe="1d2h3m", step_size_minutes=137)

print("Arbitrary timeframe (1 day 2 hours 3 minutes) with step size 137 minutes:\n")
print(df_arb)
73 changes: 73 additions & 0 deletions examples/experiment/chunk_vs_rolling_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""This script compares the time taken to compute OHLC data using the rolling window vs chunk-aggregation methods.
Results show that chunk-based aggregation may be faster when the number of chunks is less than 18000.
"""

import random
import timeit

import pandas as pd

from ohlc_toolkit import read_ohlc_csv
from ohlc_toolkit.config.log_config import get_logger
from ohlc_toolkit.transform import rolling_ohlc

logger = get_logger(__name__)
# Load the sample dataset
df_1min = read_ohlc_csv("data/btcusd_bitstamp_1min_latest.csv", timeframe="1m")

# Define the parameters for the test
timeframe = "1w"
timeframe_minutes = 10080
step_sizes = [3, 4, 5, 6, 10, 15]


def chunk_based_aggregation(df, timeframe_minutes, step_size_minutes):
"""Perform chunk-based aggregation."""
logger.debug(
"Chunk-based aggregation: {}, {}. {} chunks".format(
timeframe_minutes, step_size_minutes, len(df) / step_size_minutes
)
)
aggregated_data = []
for start in range(0, len(df), step_size_minutes):
end = start + timeframe_minutes
if end > len(df):
break

window_df = df.iloc[start:end]
aggregated_row = {
"timestamp": window_df.index[-1],
"open": window_df["open"].iloc[0],
"high": window_df["high"].max(),
"low": window_df["low"].min(),
"close": window_df["close"].iloc[-1],
"volume": window_df["volume"].sum(),
}
aggregated_data.append(aggregated_row)
return pd.DataFrame(aggregated_data)


def test_rolling_aggregation(step_size):
"""Test the rolling aggregation method."""
df_agg = rolling_ohlc(df_1min, timeframe_minutes)
df_agg = df_agg.iloc[::step_size]


def test_chunk_aggregation(step_size):
"""Test the chunk-based aggregation method."""
chunk_based_aggregation(df_1min, timeframe_minutes, step_size)


# Run the performance tests
rolling_time = (
timeit.timeit(lambda: test_rolling_aggregation(random.choice(step_sizes)), number=5)
/ 5
)
for step_size in step_sizes:
chunk_time = timeit.timeit(lambda: test_chunk_aggregation(step_size), number=3) / 3

print(f"Step size: {step_size} minutes")
print(f"Rolling aggregation time: {rolling_time:.4f} seconds")
print(f"Chunk-based aggregation time: {chunk_time:.4f} seconds")
print("-" * 40)
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "ohlc-toolkit"
version = "0.2.0"
description = "A flexible toolkit for working with OHLC data and generating custom time frames from minute data."
version = "0.3.0"
description = "A flexible toolkit for working with OHLC data and generating custom timeframes from minute data."
authors = ["Mourits de Beer <[email protected]>"]
license = "MIT"
readme = "README.md"
Expand Down Expand Up @@ -51,6 +51,7 @@ lint.ignore = [
"D408",
"D409",
"D413",
"E501",
]
include = ["src/*.py"]
line-length = 88
Expand Down
2 changes: 2 additions & 0 deletions src/ohlc_toolkit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""OHLC Toolkit."""

from ohlc_toolkit.csv_reader import read_ohlc_csv
from ohlc_toolkit.transform import transform_ohlc

__all__ = [
"read_ohlc_csv",
"transform_ohlc",
]
2 changes: 1 addition & 1 deletion src/ohlc_toolkit/config/log_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from loguru._logger import Core as _Core
from loguru._logger import Logger as _Logger

STDOUT_LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper()
STDOUT_LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
FILE_LOG_LEVEL = os.getenv("FILE_LOG_LEVEL", "DEBUG").upper()
ENABLE_FILE_LOGGING = os.getenv("ENABLE_FILE_LOGGING", "").upper() == "TRUE"
DISABLE_COLORIZE_LOGS = os.getenv("DISABLE_COLORIZE_LOGS", "").upper() == "TRUE"
Expand Down
6 changes: 3 additions & 3 deletions src/ohlc_toolkit/timeframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ def validate_timeframe(time_step: int, user_timeframe: int, logger: Logger):
"""Ensure that the timeframe is valid given the time step."""
if user_timeframe < time_step:
raise ValueError(
f"Provided timeframe ({user_timeframe}s) should not be smaller "
f"Requested timeframe ({user_timeframe}s) should not be smaller "
f"than time step ({time_step}s)."
)

if user_timeframe % time_step != 0:
logger.warning(
f"Provided timeframe ({user_timeframe}s) is not a multiple "
f"of the time step ({time_step}s). Data may be incomplete."
f"Note: Requested timeframe ({user_timeframe}s) is not a multiple "
f"of the time step ({time_step}s); values may not be suitable."
)
230 changes: 230 additions & 0 deletions src/ohlc_toolkit/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
"""Transform OHLC data."""

from logging import Logger
from typing import Union

import pandas as pd

from ohlc_toolkit.config.log_config import get_logger
from ohlc_toolkit.timeframes import parse_timeframe, validate_timeframe
from ohlc_toolkit.utils import check_data_integrity

LOGGER = get_logger(__name__)


def _first(row: pd.Series) -> float:
"""Get the first value of a row, for rolling_ohlc aggregation."""
return row.iloc[0]


def _last(row: pd.Series) -> float:
"""Get the last value of a row, for rolling_ohlc aggregation."""
return row.iloc[-1]


def rolling_ohlc(df_input: pd.DataFrame, timeframe_minutes: int) -> pd.DataFrame:
"""Rolling OHLC aggregation.
Args:
df_input (pd.DataFrame): The input DataFrame with OHLC data.
timeframe_minutes (int): The timeframe in minutes for the rolling window.
Returns:
pd.DataFrame: The aggregated OHLC data, with same schema as the input DataFrame.
"""
LOGGER.info(
"Computing OHLC for a rolling window of {} minutes over {} rows. "
"The ratio of rows to timeframe is {:.2f}.",
timeframe_minutes,
len(df_input),
len(df_input) // timeframe_minutes,
)
return df_input.rolling(timeframe_minutes).agg(
{
"timestamp": _last,
"open": _first,
"high": "max",
"low": "min",
"close": _last,
"volume": "sum",
}
)


def _cast_to_original_dtypes(
original_df: pd.DataFrame, transformed_df: pd.DataFrame
) -> pd.DataFrame:
"""Cast the transformed DataFrame to the original DataFrame's data types.
Args:
original_df (pd.DataFrame): The original DataFrame with the desired data types.
transformed_df (pd.DataFrame): The transformed DataFrame to be cast.
Returns:
pd.DataFrame: The transformed DataFrame with data types matching the original.
"""
LOGGER.debug("Casting transformed DataFrame to original dtypes")
for column in transformed_df.columns:
if column in original_df.columns:
transformed_df[column] = transformed_df[column].astype(
original_df[column].dtype
)
return transformed_df


def _drop_expected_nans(df: pd.DataFrame, logger: Logger) -> pd.DataFrame:
"""Drop the expected NaNs from the DataFrame.
We expect the first `timeframe_minutes - 1` rows to be NaNs from the aggregation.
However, we don't want to drop all NaNs in case there are unexpected ones.
Therefore, we drop the expected NaNs and proceed with data integrity checks.
Args:
df (pd.DataFrame): The DataFrame to drop NaNs from.
logger (Logger): The logger to use.
Returns:
pd.DataFrame: The DataFrame with expected NaNs dropped.
"""
logger.debug("Dropping expected NaN values from the aggregated DataFrame")
n = df.first_valid_index() # Get the index of the first valid row
if n is None:
logger.error("No valid rows after aggregation.")
raise ValueError("No valid rows after aggregation.")

n_pos = df.index.get_loc(n)

result = pd.concat([df.iloc[:n_pos].dropna(), df.iloc[n_pos:]])
return result


def transform_ohlc(
df_input: pd.DataFrame, timeframe: Union[int, str], step_size_minutes: int = 1
) -> pd.DataFrame:
"""Transform OHLC data to a different timeframe resolution.
Args:
df_input (pd.DataFrame): Input DataFrame with OHLC data.
timeframe (Union[int, str]): Desired timeframe resolution, which can be
an integer (in minutes) or a string (e.g., '1h', '4h30m').
step_size_minutes (int): Step size in minutes for the rolling window.
Returns:
pd.DataFrame: Transformed OHLC data.
"""
df = df_input.copy()
bound_logger = LOGGER.bind(
body={"timeframe": timeframe, "step_size": step_size_minutes}
)
bound_logger.debug("Starting transformation of OHLC data")

# Convert string timeframe to minutes if necessary
if isinstance(timeframe, str):
timeframe_seconds = parse_timeframe(timeframe)
bound_logger.debug("Parsed timeframe string to seconds: {}", timeframe_seconds)
if timeframe_seconds % 60 != 0:
bound_logger.error("Second-level timeframes are not yet supported.")
raise NotImplementedError("Second-level timeframes are not yet supported.")
timeframe_minutes = timeframe_seconds // 60
elif isinstance(timeframe, int):
timeframe_minutes = timeframe
else:
bound_logger.error("Invalid timeframe provided: {}", timeframe)
raise ValueError(f"Invalid timeframe: {timeframe}")

time_step_seconds = step_size_minutes * 60
validate_timeframe(
time_step=time_step_seconds,
user_timeframe=timeframe_minutes * 60,
logger=bound_logger,
)

bound_logger.debug(
"Using timeframe of {} minutes for rolling aggregation", timeframe_minutes
)

# The following cut-off was determined to be where chunk-based aggregation is faster
# than rolling aggregation. See scripts/experiment/chunk_vs_rolling_aggregation.py
chunk_cut_off = 18000
num_rows = len(df)
num_chunks = num_rows // step_size_minutes
if step_size_minutes == 1 or num_chunks > chunk_cut_off:
# Use rolling aggregation for small step sizes or large datasets
bound_logger.debug(
"Using rolling aggregation for step size: {}. "
"The number of rows would yield {} chunks",
step_size_minutes,
num_chunks,
)
df_agg = rolling_ohlc(df, timeframe_minutes)
df_agg = df_agg.iloc[::step_size_minutes]
else:
# Use chunk-based aggregation when data step is large relative to num rows
bound_logger.info(
"Using chunk-based aggregation for step size: {}. "
"The {} rows yield {} chunks",
step_size_minutes,
num_rows,
num_chunks,
)
aggregated_data = []
for start in range(0, num_rows, step_size_minutes):
end = start + timeframe_minutes
if end > num_rows:
if not aggregated_data:
bound_logger.error(
"Selected timeframe is too large. {} rows are not enough for "
"this timeframe: {} ({} minutes).", # TODO: Assuming 1-minute.
num_rows,
timeframe,
timeframe_minutes,
)
raise ValueError(
"Timeframe too large. Please ensure your dataset is big enough "
f"for this timeframe: {timeframe} ({timeframe_minutes} minutes)."
)
break

window_df = df.iloc[start:end]
aggregated_row = {
"timestamp": window_df["timestamp"].iloc[-1],
"open": window_df["open"].iloc[0],
"high": window_df["high"].max(),
"low": window_df["low"].min(),
"close": window_df["close"].iloc[-1],
"volume": window_df["volume"].sum(),
}
aggregated_data.append(aggregated_row)

df_agg = pd.DataFrame(aggregated_data)
df_agg = df_agg.sort_values("timestamp")

# Drop the expected NaNs
try:
df_agg = _drop_expected_nans(df_agg, bound_logger)
except ValueError as e:
raise ValueError(
f"{str(e)} Please ensure your dataset is big enough "
f"for this timeframe: {timeframe} ({timeframe_minutes} minutes)."
) from e

# Cast the transformed DataFrame to the original DataFrame's data types
df_agg = _cast_to_original_dtypes(df_input, df_agg)

# Do a check to ensure index of dataframe is a datetime index
if not pd.api.types.is_datetime64_any_dtype(df_input.index):
bound_logger.debug(
"DataFrame index is not a datetime index, sorting by timestamp"
)
df = df.sort_values("timestamp") # Ensure timestamp is sorted

# Convert the timestamp column to a datetime index
df.index = pd.to_datetime(df["timestamp"], unit="s")
df.index.name = "datetime"
bound_logger.debug("Converted timestamp column to datetime index")

check_data_integrity(
df_agg, logger=bound_logger, time_step_seconds=time_step_seconds
)

return df_agg
2 changes: 1 addition & 1 deletion tests/test_csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_read_ohlc_csv_bad_timeframe_value(self):
read_ohlc_csv(csv_file, timeframe="30s")
self.assertEqual(
str(context.exception),
"Provided timeframe (30s) should not be smaller than time step (60s).",
"Requested timeframe (30s) should not be smaller than time step (60s).",
)

def test_read_ohlc_csv_invalid_timeframe_format(self):
Expand Down
Loading

0 comments on commit 0473507

Please sign in to comment.