Skip to content

Commit

Permalink
✨ Enhance default dataframe schema (#9)
Browse files Browse the repository at this point in the history
* 🎨 Rename logger

* 🎨 Modify exception / warning message

* 🎨 Rename arg

* 🎨 Rename arg and require keywords

* 🎨 Deduplicate

* 🎨 Configure default dtypes

* ✨ Add datetime index

* 🎨

* 🔖 Release 0.2.0
  • Loading branch information
ff137 authored Feb 25, 2025
1 parent f91f22e commit 7b64e95
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 33 deletions.
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
"addopts",
"bitstamp",
"btcusd",
"dropna",
"dtypes",
"iloc",
"ipyparallel",
"levelname",
"loguru",
"nans",
"numpy",
"ohlc",
"patchers",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "ohlc-toolkit"
version = "0.1.1"
version = "0.2.0"
description = "A flexible toolkit for working with OHLC data and generating custom time frames from minute data."
authors = ["Mourits de Beer <[email protected]>"]
license = "MIT"
Expand Down
43 changes: 29 additions & 14 deletions src/ohlc_toolkit/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,70 +13,79 @@
)
from ohlc_toolkit.utils import check_data_integrity, infer_time_step

logger = get_logger(__name__)
LOGGER = get_logger(__name__)


def read_ohlc_csv(
filepath: str,
timeframe: Optional[str] = None,
expected_columns: Optional[list[str]] = None,
*,
header_row: Optional[int] = None,
columns: Optional[list[str]] = None,
dtype: Optional[dict[str, str]] = None,
) -> pd.DataFrame:
"""Read OHLC data from a CSV file.
Arguments:
filepath (str): Path to the CSV file.
timeframe (Optional[str]): User-defined timeframe (e.g., '1m', '5m', '1h').
expected_columns (Optional[list[str]]): The expected columns in the CSV file.
header_row (Optional[int]): The row number to use as the header.
columns (Optional[list[str]]): The expected columns in the CSV file.
dtype (Optional[dict[str, str]]): The data type for the columns.
Returns:
pd.DataFrame: Processed OHLC dataset.
"""
bound_logger = logger.bind(body=filepath)
bound_logger = LOGGER.bind(body=filepath)
bound_logger.info("Reading OHLC data")

if expected_columns is None:
expected_columns = EXPECTED_COLUMNS
if columns is None:
columns = EXPECTED_COLUMNS
if dtype is None:
dtype = {
"timestamp": "int32",
"open": "float32",
"high": "float32",
"low": "float32",
"close": "float32",
"volume": "float32",
}

read_csv_params = {
"filepath_or_buffer": filepath,
"names": expected_columns,
"names": columns,
"dtype": dtype,
}

def _read_csv(header: Optional[int] = None) -> pd.DataFrame:
return pd.read_csv(**read_csv_params, header=header)

# If header_row is provided, use it directly
if header_row is not None:
df = pd.read_csv(**read_csv_params, header=header_row)
df = _read_csv(header=header_row)
else:
# User doesn't specify header - let's try reading without header first
try:
df = pd.read_csv(**read_csv_params, header=None)
df = _read_csv(header=None)
except FileNotFoundError as e:
raise FileNotFoundError(f"File not found: {filepath}") from e
except ValueError:
# If that fails, try with header
try:
df = pd.read_csv(**read_csv_params, header=0)
df = _read_csv(header=0)
except ValueError as e:
raise ValueError(
f"Data for file {filepath} does not match expected schema. "
f"Please validate the file data aligns with the expected "
f"columns ({expected_columns}) and data types ({dtype})"
f"columns ({columns}) and data types ({dtype})"
) from e

bound_logger.debug(
f"Read {df.shape[0]} rows and {df.shape[1]} columns: {df.columns.tolist()}"
)

# Infer time step from data
time_step = infer_time_step(df, logger=bound_logger)
time_step_seconds = infer_time_step(df, logger=bound_logger)

# Convert user-defined timeframe to seconds
timeframe_seconds = None
Expand All @@ -86,10 +95,16 @@ def read_ohlc_csv(

timeframe_seconds = parse_timeframe(timeframe)

validate_timeframe(time_step, timeframe_seconds, bound_logger)
validate_timeframe(time_step_seconds, timeframe_seconds, bound_logger)

df = df.sort_values("timestamp") # Ensure timestamp is sorted

# Perform integrity checks
check_data_integrity(df, logger=bound_logger, time_step=time_step)
check_data_integrity(df, logger=bound_logger, time_step_seconds=time_step_seconds)

# Convert the timestamp column to a datetime index
df.index = pd.to_datetime(df["timestamp"], unit="s")
df.index.name = "datetime"

bound_logger.info("OHLC data successfully loaded.")
return df
6 changes: 3 additions & 3 deletions src/ohlc_toolkit/timeframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ def validate_timeframe(time_step: int, user_timeframe: int, logger: Logger):
"""Ensure that the timeframe is valid given the time step."""
if user_timeframe < time_step:
raise ValueError(
f"Provided timeframe ({user_timeframe}s) cannot be smaller "
f"than inferred time step ({time_step}s)."
f"Provided timeframe ({user_timeframe}s) should not be smaller "
f"than time step ({time_step}s)."
)

if user_timeframe % time_step != 0:
logger.warning(
f"Provided timeframe ({user_timeframe}s) is not a multiple "
f"of the inferred time step ({time_step}s). Data may be incomplete."
f"of the time step ({time_step}s). Data may be incomplete."
)
12 changes: 8 additions & 4 deletions src/ohlc_toolkit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ def infer_time_step(df: pd.DataFrame, logger: Logger) -> int:
raise ValueError("Cannot infer time step from a single-row dataset.")

time_step = int(pd.Series(time_diffs).mode()[0]) # Most frequent difference
logger.info(f"Inferred time step: {time_step} seconds")
logger.info("Inferred time step: {} seconds", time_step)
return time_step


def check_data_integrity(
df: pd.DataFrame, logger: Logger, time_step: Optional[int] = None
df: pd.DataFrame, logger: Logger, time_step_seconds: Optional[int] = None
):
"""Perform basic data integrity checks on the OHLC dataset."""
if df.isnull().values.any():
Expand All @@ -37,9 +37,13 @@ def check_data_integrity(
if df["timestamp"].duplicated().any():
logger.warning("Duplicate timestamps found in the dataset.")

if time_step:
if time_step_seconds:
expected_timestamps = set(
range(df["timestamp"].min(), df["timestamp"].max() + time_step, time_step)
range(
df["timestamp"].min(),
df["timestamp"].max() + time_step_seconds,
time_step_seconds,
)
)
actual_timestamps = set(df["timestamp"])
missing_timestamps = expected_timestamps - actual_timestamps
Expand Down
32 changes: 22 additions & 10 deletions tests/test_csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ class TestCsvReader(unittest.TestCase):

def setUp(self):
"""Set up test data."""
self.csv_data = """1736208060,102228,102228,102228,102228,0.00114705
1736208120,102214,102225,102214,102215,0.42548035
1736208180,102214,102220,102214,102220,0.24356262
1736208240,102214,102214,102163,102163,0.03850259
self.csv_data = """1736208060,102228.0,102228.0,102228.0,102228.0,0.00114705
1736208120,102214.0,102225.0,102214.0,102215.0,0.42548035
1736208180,102214.0,102220.0,102214.0,102220.0,0.24356262
1736208240,102214.0,102214.0,102163.0,102163.0,0.03850259
"""
self.csv_data_no_header_path = "tests/test_data/test_csv_no_header.csv"
self.csv_data_w_header_path = "tests/test_data/test_csv_w_header.csv"
Expand All @@ -26,13 +26,26 @@ def setUp(self):
"timestamp": pd.Series(
[1736208060, 1736208120, 1736208180, 1736208240], dtype="int32"
),
"open": [102228, 102214, 102214, 102214],
"high": [102228, 102225, 102220, 102214],
"low": [102228, 102214, 102214, 102163],
"close": [102228, 102215, 102220, 102163],
"open": [102228.0, 102214.0, 102214.0, 102214.0],
"high": [102228.0, 102225.0, 102220.0, 102214.0],
"low": [102228.0, 102214.0, 102214.0, 102163.0],
"close": [102228.0, 102215.0, 102220.0, 102163.0],
"volume": [0.00114705, 0.42548035, 0.24356262, 0.03850259],
}
)
# Cast to the expected, default dtypes:
self.df_expected = self.df_expected.astype(
{
"open": "float32",
"high": "float32",
"low": "float32",
"close": "float32",
"volume": "float32",
}
)
# Set expected index:
self.df_expected.index = pd.to_datetime(self.df_expected["timestamp"], unit="s")
self.df_expected.index.name = "datetime"

def test_read_ohlc_csv_valid_no_header(self):
"""Test reading a valid OHLC CSV file with no header."""
Expand Down Expand Up @@ -77,8 +90,7 @@ def test_read_ohlc_csv_bad_timeframe_value(self):
read_ohlc_csv(csv_file, timeframe="30s")
self.assertEqual(
str(context.exception),
"Provided timeframe (30s) cannot be smaller "
"than inferred time step (60s).",
"Provided timeframe (30s) should not be smaller than time step (60s).",
)

def test_read_ohlc_csv_invalid_timeframe_format(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_timeframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_validate_timeframe(self):

logger.warning.assert_called_with(
"Provided timeframe (25s) is not a multiple "
"of the inferred time step (10s). Data may be incomplete."
"of the time step (10s). Data may be incomplete."
)


Expand Down

0 comments on commit 7b64e95

Please sign in to comment.