diff --git a/CHANGELOG.md b/CHANGELOG.md index bd661777..02ca03d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,22 @@ # Changelog +## 0.48.0 - 2026-01-27 + +### Enhancements +- Added initial support for splitting DBN files: + - Added new `SplitEncoder` that supports synchronous and asynchronous encoding + - Added new `Splitter` trait that allows for extensible splitting of files while + reusing the `SplitEncoder` boilerplate. + - Added `SchemaSplitter`, `SymbolSplitter`, and `TimeSplitter` which allow for + different methods of splitting DBN files + - Added split support to the CLI. For example: + `dbn mbo.dbn --split-by week --output-pattern '{date}.json'` --json +- Added new publisher for Blue Ocean ATS (`OCEA_MEMOIR_OCEA`) + +### Bug fixes +- Fixed issue where `AsyncDynReader` instances created from `with_buffer()` would only + decode the first frame of multi-frame Zstandard files + ## 0.47.0 - 2026-01-20 ### Enhancements diff --git a/Cargo.lock b/Cargo.lock index 63faaccd..7e0a8b65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,9 +143,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.53" +version = "1.2.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "755d2fce177175ffca841e9a06afdb2c4ab0f593d53b4dee48147dfaade85932" +checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" dependencies = [ "find-msvc-tools", "jobserver", @@ -246,7 +246,7 @@ dependencies = [ [[package]] name = "databento-dbn" -version = "0.47.0" +version = "0.48.0" dependencies = [ "dbn", "pyo3", @@ -258,7 +258,7 @@ dependencies = [ [[package]] name = "dbn" -version = "0.47.0" +version = "0.48.0" dependencies = [ "async-compression", "csv", @@ -281,7 +281,7 @@ dependencies = [ [[package]] name = "dbn-c" -version = "0.47.0" +version = "0.48.0" dependencies = [ "anyhow", "cbindgen", @@ -291,7 +291,7 @@ dependencies = [ [[package]] name = "dbn-cli" -version = "0.47.0" +version = "0.48.0" dependencies = [ "anyhow", "assert_cmd", @@ -301,12 +301,13 @@ dependencies = [ "rstest", "serde", "tempfile", + "time", "zstd", ] [[package]] name = "dbn-macros" -version = "0.47.0" +version = "0.48.0" dependencies = [ "csv", "dbn", @@ -550,9 +551,9 @@ checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" [[package]] name = "num-traits" @@ -674,9 +675,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.105" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" dependencies = [ "unicode-ident", ] @@ -744,9 +745,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.43" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a" +checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4" dependencies = [ "proc-macro2", ] @@ -1046,9 +1047,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.45" +version = "0.3.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9e442fc33d7fdb45aa9bfeb312c095964abdf596f7567261062b2a7107aaabd" +checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" dependencies = [ "deranged", "itoa", @@ -1061,15 +1062,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b36ee98fd31ec7426d599183e8fe26932a8dc1fb76ddb6214d05493377d34ca" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.25" +version = "0.2.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e552d1249bf61ac2a52db88179fd0673def1e1ad8243a00d9ec9ed71fee3dd" +checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4" dependencies = [ "num-conv", "time-core", @@ -1335,9 +1336,9 @@ checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" [[package]] name = "zmij" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfcd145825aace48cff44a8844de64bf75feec3080e0aa5cdbde72961ae51a65" +checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439" [[package]] name = "zstd" diff --git a/Cargo.toml b/Cargo.toml index ecaa222a..eddb67f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ resolver = "2" [workspace.package] authors = ["Databento "] edition = "2021" -version = "0.47.0" +version = "0.48.0" documentation = "https://databento.com/docs" repository = "https://github.com/databento/dbn" license = "Apache-2.0" diff --git a/python/pyproject.toml b/python/pyproject.toml index 17164c91..7af47965 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "databento-dbn" -version = "0.47.0" +version = "0.48.0" description = "Python bindings for encoding and decoding Databento Binary Encoding (DBN)" readme = "README.md" requires-python = ">=3.10" diff --git a/rust/dbn-cli/Cargo.toml b/rust/dbn-cli/Cargo.toml index 52ff23c4..3c1d9391 100644 --- a/rust/dbn-cli/Cargo.toml +++ b/rust/dbn-cli/Cargo.toml @@ -16,15 +16,16 @@ name = "dbn" path = "src/main.rs" [dependencies] -dbn = { path = "../dbn", version = "=0.47.0", default-features = false } +dbn = { path = "../dbn", version = "=0.48.0", default-features = false } -anyhow = { workspace = true } +anyhow.workspace = true clap = { version = "4.5", features = ["derive", "wrap_help"] } serde = { workspace = true, features = ["derive"] } -zstd = { workspace = true } +time.workspace = true +zstd.workspace = true [dev-dependencies] assert_cmd = "2.1" predicates = "3.1" -rstest = { workspace = true } tempfile = "3.24" +rstest.workspace = true diff --git a/rust/dbn-cli/README.md b/rust/dbn-cli/README.md index fb9e87c9..1bac304d 100644 --- a/rust/dbn-cli/README.md +++ b/rust/dbn-cli/README.md @@ -62,6 +62,18 @@ dbn equs-mini-20250331.dbn equs-mini-20250401.dbn equs-mini-20250402.dbn -o equs ``` The only limitation is they must be from the same dataset. +### Splitting DBN files +You can also split one DBN file into several by passing `--split-by`/`-S` with a split method and +`--output-pattern`/`-O` with a pattern for the output file names. +```sh +# By schema, such as from a live data capture +dbn opra-pillar-20260114.dbn.zst --split-by schema --output-pattern 'opra-pillar-2026114.{schema}.dbn.zst' +# By time +dbn opra-pillar-202512.ohlcv-1s.dbn.zst --split-by day --output-pattern 'opra-pillar-{date}.ohlcv-1s.dbn.zst' +# By symbol, with the short argument forms +dbn equs-mini-20260114.dbn.zst -S symbol -O 'equs-mini-2026014-{symbol}.dbn.zst' +``` + ### Compressing the output In addition to reading Zstandard-compressed files, `dbn` can also write compressed JSON and CSV. diff --git a/rust/dbn-cli/src/encode.rs b/rust/dbn-cli/src/encode.rs index d936c146..c72b0193 100644 --- a/rust/dbn-cli/src/encode.rs +++ b/rust/dbn-cli/src/encode.rs @@ -1,15 +1,17 @@ -use std::io; +use std::{io, path::Path}; use dbn::{ decode::{DbnMetadata, DecodeRecordRef}, encode::{ - json, DbnEncodable, DbnRecordEncoder, DynEncoder, DynWriter, EncodeDbn, EncodeRecordRef, - EncodeRecordTextExt, + json, DbnEncodable, DbnRecordEncoder, DynEncoder, DynWriter, EncodeDbn, EncodeRecord, + EncodeRecordRef, EncodeRecordTextExt, NoSchemaBehavior, SchemaSplitter, SplitEncoder, + Splitter, SymbolSplitter, TimeSplitter, }, - rtype_dispatch, Compression, Encoding, MetadataBuilder, SType, SymbolIndex, + rtype_dispatch, Compression, Encoding, Metadata, MetadataBuilder, SType, Schema, SymbolIndex, + TsSymbolMap, }; -use crate::{infer_encoding, output_from_args, Args}; +use crate::{infer_encoding, output_from_args, Args, InferredEncoding, SplitBy}; pub fn silence_broken_pipe(err: anyhow::Error) -> anyhow::Result<()> { // Handle broken pipe as a non-error. @@ -27,7 +29,12 @@ where D: DecodeRecordRef + DbnMetadata, { let writer = output_from_args(args)?; - let (encoding, compression, delimiter) = infer_encoding(args)?; + let InferredEncoding { + encoding, + is_fragment, + delimiter, + compression, + } = infer_encoding(args)?; if args.should_output_metadata { if encoding != Encoding::Json { return Err(anyhow::format_err!( @@ -41,7 +48,7 @@ where args.should_pretty_print, ) .encode_metadata(decoder.metadata())?; - } else if args.fragment { + } else if is_fragment { encode_fragment(decoder, writer, compression)?; } else { let mut encoder = DynEncoder::builder(writer, encoding, compression, decoder.metadata()) @@ -67,13 +74,217 @@ where Ok(()) } +pub fn split_encode_from_dbn( + args: &Args, + split_by: SplitBy, + output_pattern: &str, + decoder: D, +) -> anyhow::Result<()> +where + D: DecodeRecordRef + DbnMetadata, +{ + let InferredEncoding { + encoding, + compression, + delimiter, + is_fragment: is_output_fragment, + } = infer_encoding(args)?; + let open_output = |path: &str| { + crate::output(Some(Path::new(path)), args.force) + .map_err(|e| dbn::Error::io(io::Error::other(e), format!("opening output file {path}"))) + }; + if is_output_fragment { + let build_encoder = |path: &str, _metadata: Option| -> dbn::Result<_> { + Ok(DbnRecordEncoder::new(DynWriter::new( + open_output(path)?, + compression, + )?)) + }; + split_by_encode_fragment(decoder, split_by, output_pattern, build_encoder) + } else { + let build_encoder = |path: &str, metadata: Option| -> dbn::Result<_> { + DynEncoder::builder( + open_output(path)?, + encoding, + compression, + &metadata.unwrap(), + ) + .delimiter(delimiter) + .write_header(args.write_header) + .all_pretty(args.should_pretty_print) + .with_symbol(args.map_symbols) + .build() + }; + split_by_encode( + decoder, + split_by, + output_pattern, + build_encoder, + args.map_symbols, + ) + } +} + +fn split_by_encode( + decoder: D, + split_by: SplitBy, + output_pattern: &str, + build_encoder: F, + map_symbols: bool, +) -> anyhow::Result<()> +where + D: DecodeRecordRef + DbnMetadata, + E: EncodeRecordTextExt, + F: Fn(&str, Option) -> dbn::Result, +{ + let symbol_map = decoder.metadata().symbol_map()?; + match split_by { + SplitBy::Symbol => { + // TODO: detect live data and split on live symbol mapping msgs + let splitter = SymbolSplitter::new( + |symbol: &str, metadata| { + build_encoder(&output_pattern.replace("{symbol}", symbol), metadata) + }, + symbol_map.clone(), + ); + split_encode_impl(decoder, map_symbols, splitter, Some(symbol_map)) + } + SplitBy::Schema => { + let splitter = SchemaSplitter::new( + |schema: Schema, metadata| { + build_encoder( + &output_pattern.replace("{schema}", schema.as_str()), + metadata, + ) + }, + // TODO: support other behaviors + NoSchemaBehavior::default(), + ); + split_encode_impl(decoder, map_symbols, splitter, Some(symbol_map)) + } + SplitBy::Day | SplitBy::Week | SplitBy::Month => { + let splitter = TimeSplitter::new( + |date: time::Date, metadata| { + build_encoder( + &output_pattern.replace("{date}", &date.to_string()), + metadata, + ) + }, + split_by.duration().unwrap(), + ); + split_encode_impl(decoder, map_symbols, splitter, Some(symbol_map)) + } + } +} + +fn split_by_encode_fragment( + decoder: D, + split_by: SplitBy, + output_pattern: &str, + build_encoder: F, +) -> anyhow::Result<()> +where + D: DecodeRecordRef + DbnMetadata, + E: EncodeRecord + EncodeRecordRef, + F: Fn(&str, Option) -> dbn::Result, +{ + match split_by { + SplitBy::Symbol => { + let symbol_map = decoder.metadata().symbol_map()?; + let splitter = SymbolSplitter::new( + |symbol: &str, metadata| { + build_encoder(&output_pattern.replace("{symbol}", symbol), metadata) + }, + symbol_map, + ); + split_encode_fragment_impl(decoder, splitter) + } + SplitBy::Schema => { + let splitter = SchemaSplitter::new( + |schema: Schema, metadata| { + build_encoder( + &output_pattern.replace("{schema}", schema.as_str()), + metadata, + ) + }, + // TODO: support other behaviors + NoSchemaBehavior::default(), + ); + split_encode_fragment_impl(decoder, splitter) + } + SplitBy::Day | SplitBy::Week | SplitBy::Month => { + let splitter = TimeSplitter::new( + |date: time::Date, metadata| { + build_encoder( + &output_pattern.replace("{date}", &date.to_string()), + metadata, + ) + }, + split_by.duration().unwrap(), + ); + split_encode_fragment_impl(decoder, splitter) + } + } +} + +fn split_encode_impl( + mut decoder: D, + map_symbols: bool, + splitter: S, + symbol_map: Option, +) -> anyhow::Result<()> +where + D: DecodeRecordRef + DbnMetadata, + S: Splitter, + E: EncodeRecordTextExt, +{ + let mut encoder = SplitEncoder::with_metadata(splitter, decoder.metadata().clone()); + if map_symbols { + let symbol_map = if let Some(symbol_map) = symbol_map { + symbol_map + } else { + decoder.metadata().symbol_map()? + }; + let ts_out = decoder.metadata().ts_out; + while let Some(rec) = decoder.decode_record_ref()? { + let sym = symbol_map.get_for_rec(&rec).map(String::as_str); + // SAFETY: `ts_out` is accurate because it's sourced from the metadata + unsafe { + encoder.encode_ref_ts_out_with_sym(rec, ts_out, sym)?; + } + } + } else { + encoder.encode_decoded(decoder)?; + } + Ok(()) +} + +fn split_encode_fragment_impl(mut decoder: D, splitter: S) -> anyhow::Result<()> +where + D: DecodeRecordRef, + S: Splitter, + E: EncodeRecord + EncodeRecordRef, +{ + let mut encoder = SplitEncoder::records_only(splitter); + while let Some(rec) = decoder.decode_record_ref()? { + encoder.encode_record_ref(rec)?; + } + encoder.flush()?; + Ok(()) +} + pub fn encode_from_frag(args: &Args, mut decoder: D) -> anyhow::Result<()> where D: DecodeRecordRef, { let writer = output_from_args(args)?; - let (encoding, compression, delimiter) = infer_encoding(args)?; - if args.fragment { + let InferredEncoding { + encoding, + compression, + delimiter, + is_fragment, + } = infer_encoding(args)?; + if is_fragment { encode_fragment(decoder, writer, compression)?; return Ok(()); } @@ -84,13 +295,7 @@ where encoding, compression, // dummy metadata won't be encoded - &MetadataBuilder::new() - .dataset(String::new()) - .schema(None) - .start(0) - .stype_in(None) - .stype_out(SType::InstrumentId) - .build(), + &dummy_metadata(), ) .delimiter(delimiter) // Can't write header until we know the record type @@ -114,6 +319,16 @@ where Ok(()) } +fn dummy_metadata() -> Metadata { + MetadataBuilder::new() + .dataset(String::new()) + .schema(None) + .start(0) + .stype_in(None) + .stype_out(SType::InstrumentId) + .build() +} + fn encode_fragment( mut decoder: D, writer: Box, @@ -125,3 +340,92 @@ fn encode_fragment( } Ok(()) } + +/// Split encode from a fragment input (no metadata). +/// +/// Only supports time-based and schema-based splitting. Symbol splitting requires +/// a symbol map which is not available in fragment inputs. +pub fn split_encode_from_frag( + args: &Args, + split_by: SplitBy, + output_pattern: &str, + decoder: D, +) -> anyhow::Result<()> +where + D: DecodeRecordRef, +{ + if matches!(split_by, SplitBy::Symbol) { + return Err(anyhow::anyhow!( + "Cannot split by symbol when input is a fragment: no symbol map available" + )); + } + let InferredEncoding { + encoding, + compression, + delimiter, + is_fragment, + } = infer_encoding(args)?; + let open_output = |path: &str| { + crate::output(Some(Path::new(path)), args.force) + .map_err(|e| dbn::Error::io(io::Error::other(e), format!("opening output file {path}"))) + }; + if is_fragment { + let build_encoder = |path: &str| -> dbn::Result<_> { + Ok(DbnRecordEncoder::new(DynWriter::new( + open_output(path)?, + compression, + )?)) + }; + match split_by { + SplitBy::Symbol => unreachable!("handled above"), + SplitBy::Schema => { + let splitter = SchemaSplitter::new( + |schema: Schema, _metadata| { + build_encoder(&output_pattern.replace("{schema}", schema.as_str())) + }, + NoSchemaBehavior::default(), + ); + split_encode_fragment_impl(decoder, splitter) + } + SplitBy::Day | SplitBy::Week | SplitBy::Month => { + let splitter = TimeSplitter::new( + |date: time::Date, _metadata| { + build_encoder(&output_pattern.replace("{date}", &date.to_string())) + }, + split_by.duration().unwrap(), + ); + split_encode_fragment_impl(decoder, splitter) + } + } + } else { + let metadata = dummy_metadata(); + let build_encoder = |path: &str| -> dbn::Result<_> { + DynEncoder::builder(open_output(path)?, encoding, compression, &metadata) + .delimiter(delimiter) + .write_header(args.write_header) + .all_pretty(args.should_pretty_print) + .build() + }; + match split_by { + SplitBy::Symbol => unreachable!("handled above"), + SplitBy::Schema => { + let splitter = SchemaSplitter::new( + |schema: Schema, _metadata| { + build_encoder(&output_pattern.replace("{schema}", schema.as_str())) + }, + NoSchemaBehavior::default(), + ); + split_encode_fragment_impl(decoder, splitter) + } + SplitBy::Day | SplitBy::Week | SplitBy::Month => { + let splitter = TimeSplitter::new( + |date: time::Date, _metadata| { + build_encoder(&output_pattern.replace("{date}", &date.to_string())) + }, + split_by.duration().unwrap(), + ); + split_encode_fragment_impl(decoder, splitter) + } + } + } +} diff --git a/rust/dbn-cli/src/lib.rs b/rust/dbn-cli/src/lib.rs index c35b530f..35ee4525 100644 --- a/rust/dbn-cli/src/lib.rs +++ b/rust/dbn-cli/src/lib.rs @@ -2,13 +2,14 @@ use std::{ fs::File, io::{self, BufWriter}, num::NonZeroU64, - path::PathBuf, + path::{Path, PathBuf}, }; use anyhow::{anyhow, Context}; use clap::{ArgAction, Parser, ValueEnum}; use dbn::{ + encode::SplitDuration, enums::{Compression, Encoding}, Schema, VersionUpgradePolicy, }; @@ -28,6 +29,35 @@ pub enum OutputEncoding { DbnFragment, } +/// How to split a DBN file +#[derive(Clone, Copy, Debug, ValueEnum)] +pub enum SplitBy { + Symbol, + Schema, + Day, + Week, + Month, +} + +impl SplitBy { + pub fn duration(self) -> Option { + match self { + SplitBy::Day => Some(SplitDuration::Day), + SplitBy::Week => Some(SplitDuration::Week), + SplitBy::Month => Some(SplitDuration::Month), + SplitBy::Symbol | SplitBy::Schema => None, + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct InferredEncoding { + pub encoding: Encoding, + pub compression: Compression, + pub delimiter: u8, + pub is_fragment: bool, +} + #[derive(Debug, Parser)] #[clap(name = "dbn", version, about)] #[cfg_attr(test, derive(Default))] @@ -47,6 +77,23 @@ pub struct Args { value_name = "FILE" )] pub output: Option, + #[clap( + short = 'O', + long, + help = "Saves the result of file splitting to paths according to PATTERN", + requires = "split_by", + conflicts_with = "output", + value_name = "PATTERN" + )] + pub output_pattern: Option, + #[clap( + short = 'S', + long, + help = "How to optionally split the output across files", + requires = "output_pattern", + value_name = "SPLIT_BY" + )] + pub split_by: Option, #[clap( short = 'J', long, @@ -218,37 +265,120 @@ impl Args { } } -/// Infer the [`Encoding`], [`Compression`], and delimiter (CSV/TSV) from `args` if they -/// aren't already explicitly set. -pub fn infer_encoding(args: &Args) -> anyhow::Result<(Encoding, Compression, u8)> { +pub fn infer_encoding(args: &Args) -> anyhow::Result { let compression = if args.zstd { Compression::Zstd } else { Compression::None }; match args.output_encoding() { - OutputEncoding::DbnFragment | OutputEncoding::Dbn => Ok((Encoding::Dbn, compression, 0)), - OutputEncoding::Csv => Ok((Encoding::Csv, compression, b',')), - OutputEncoding::Tsv => Ok((Encoding::Csv, compression, b'\t')), - OutputEncoding::Json => Ok((Encoding::Json, compression, 0)), + OutputEncoding::DbnFragment => Ok(InferredEncoding { + encoding: Encoding::Dbn, + compression, + delimiter: 0, + is_fragment: true, + }), + OutputEncoding::Dbn => Ok(InferredEncoding { + encoding: Encoding::Dbn, + compression, + delimiter: 0, + is_fragment: false, + }), + OutputEncoding::Csv => Ok(InferredEncoding { + encoding: Encoding::Csv, + compression, + delimiter: b',', + is_fragment: false, + }), + OutputEncoding::Tsv => Ok(InferredEncoding { + encoding: Encoding::Csv, + compression, + delimiter: b'\t', + is_fragment: false, + }), + OutputEncoding::Json => Ok(InferredEncoding { + encoding: Encoding::Json, + compression, + delimiter: 0, + is_fragment: false, + }), OutputEncoding::Infer => { - if let Some(output) = args.output.as_ref().map(|o| o.to_string_lossy()) { - if output.ends_with(".dbn.zst") { - Ok((Encoding::Dbn, Compression::Zstd, 0)) + let output = args + .output + .as_ref() + .map(|p| p.to_string_lossy().into_owned()) + .or_else(|| args.output_pattern.clone()); + if let Some(output) = output { + if output.ends_with(".dbn.frag.zst") { + Ok(InferredEncoding { + encoding: Encoding::Dbn, + compression: Compression::Zstd, + delimiter: 0, + is_fragment: true, + }) + } else if output.ends_with(".dbn.frag") { + Ok(InferredEncoding { + encoding: Encoding::Dbn, + compression: Compression::None, + delimiter: 0, + is_fragment: true, + }) + } else if output.ends_with(".dbn.zst") { + Ok(InferredEncoding { + encoding: Encoding::Dbn, + compression: Compression::Zstd, + delimiter: 0, + is_fragment: false, + }) } else if output.ends_with(".dbn") { - Ok((Encoding::Dbn, Compression::None, 0)) + Ok(InferredEncoding { + encoding: Encoding::Dbn, + compression: Compression::None, + delimiter: 0, + is_fragment: false, + }) } else if output.ends_with(".csv.zst") { - Ok((Encoding::Csv, Compression::Zstd, b',')) + Ok(InferredEncoding { + encoding: Encoding::Csv, + compression: Compression::Zstd, + delimiter: b',', + is_fragment: false, + }) } else if output.ends_with(".csv") { - Ok((Encoding::Csv, Compression::None, b',')) + Ok(InferredEncoding { + encoding: Encoding::Csv, + compression: Compression::None, + delimiter: b',', + is_fragment: false, + }) } else if output.ends_with(".tsv.zst") || output.ends_with(".xls.zst") { - Ok((Encoding::Csv, Compression::Zstd, b'\t')) + Ok(InferredEncoding { + encoding: Encoding::Csv, + compression: Compression::Zstd, + delimiter: b'\t', + is_fragment: false, + }) } else if output.ends_with(".tsv") || output.ends_with(".xls") { - Ok((Encoding::Csv, Compression::None, b'\t')) + Ok(InferredEncoding { + encoding: Encoding::Csv, + compression: Compression::None, + delimiter: b'\t', + is_fragment: false, + }) } else if output.ends_with(".json.zst") { - Ok((Encoding::Json, Compression::Zstd, 0)) + Ok(InferredEncoding { + encoding: Encoding::Json, + compression: Compression::Zstd, + delimiter: 0, + is_fragment: false, + }) } else if output.ends_with(".json") { - Ok((Encoding::Json, Compression::None, 0)) + Ok(InferredEncoding { + encoding: Encoding::Json, + compression: Compression::None, + delimiter: 0, + is_fragment: false, + }) } else { Err(anyhow!( "Unable to infer output encoding from output path '{output}'", @@ -265,15 +395,19 @@ pub fn infer_encoding(args: &Args) -> anyhow::Result<(Encoding, Compression, u8) /// Returns a writeable object where the `dbn` output will be directed. pub fn output_from_args(args: &Args) -> anyhow::Result> { - if let Some(output) = &args.output { - let output_file = open_output_file(output, args.force)?; + output(args.output.as_deref(), args.force) +} + +pub fn output(output: Option<&Path>, force: bool) -> anyhow::Result> { + if let Some(output) = output { + let output_file = open_output_file(output, force)?; Ok(Box::new(BufWriter::new(output_file))) } else { Ok(Box::new(io::stdout().lock())) } } -fn open_output_file(path: &PathBuf, force: bool) -> anyhow::Result { +fn open_output_file(path: &Path, force: bool) -> anyhow::Result { let mut options = File::options(); options.write(true).truncate(true); if force { @@ -361,7 +495,15 @@ mod tests { zstd, ..Default::default() }; - assert_eq!(infer_encoding(&args).unwrap(), (exp_enc, exp_comp, exp_sep)); + assert_eq!( + infer_encoding(&args).unwrap(), + InferredEncoding { + encoding: exp_enc, + compression: exp_comp, + delimiter: exp_sep, + is_fragment: false, + } + ); } #[rstest] @@ -385,7 +527,15 @@ mod tests { output: Some(PathBuf::from(output)), ..Default::default() }; - assert_eq!(infer_encoding(&args).unwrap(), (exp_enc, exp_comp, exp_sep)); + assert_eq!( + infer_encoding(&args).unwrap(), + InferredEncoding { + encoding: exp_enc, + compression: exp_comp, + delimiter: exp_sep, + is_fragment: false, + } + ); } #[test] diff --git a/rust/dbn-cli/src/main.rs b/rust/dbn-cli/src/main.rs index 0cdbeaa2..7028cce0 100644 --- a/rust/dbn-cli/src/main.rs +++ b/rust/dbn-cli/src/main.rs @@ -4,13 +4,16 @@ use std::{ path::Path, }; -use anyhow::Context; +use anyhow::{anyhow, Context}; use clap::Parser; use dbn::decode::{ DbnMetadata, DbnRecordDecoder, DecodeRecordRef, DynDecoder, MergeDecoder, MergeRecordDecoder, }; use dbn_cli::{ - encode::{encode_from_dbn, encode_from_frag, silence_broken_pipe}, + encode::{ + encode_from_dbn, encode_from_frag, silence_broken_pipe, split_encode_from_dbn, + split_encode_from_frag, + }, filter::{LimitFilter, SchemaFilter}, Args, }; @@ -90,7 +93,26 @@ fn with_inputs(args: Args) -> anyhow::Result<()> { } fn with_input(args: Args, reader: impl BufRead) -> anyhow::Result<()> { - if args.is_input_fragment { + if let Some(split_by) = args.split_by { + let Some(output_pattern) = &args.output_pattern else { + return Err(anyhow!( + "Must specify an output pattern when splitting files" + )); + }; + if args.is_input_fragment { + split_encode_from_frag(&args, split_by, output_pattern, decode_frag(&args, reader)?) + } else if args.is_input_zstd_fragment { + split_encode_from_frag( + &args, + split_by, + output_pattern, + decode_frag(&args, zstd::stream::Decoder::with_buffer(reader)?)?, + ) + } else { + let decoder = DynDecoder::inferred_with_buffer(reader, args.upgrade_policy())?; + split_encode_from_dbn(&args, split_by, output_pattern, wrap(&args, decoder)) + } + } else if args.is_input_fragment { encode_from_frag(&args, decode_frag(&args, reader)?) } else if args.is_input_zstd_fragment { encode_from_frag( @@ -111,6 +133,9 @@ fn with_input(args: Args, reader: impl BufRead) -> anyhow::Result<()> { fn main() -> anyhow::Result<()> { let args = Args::parse(); if args.input.len() > 1 { + if args.split_by.is_some() { + return Err(anyhow!("Can't split by files while merging files")); + } with_inputs(args) } else if args.input[0].as_os_str() == STDIN_SENTINEL { with_input(args, io::stdin().lock()) diff --git a/rust/dbn-cli/tests/integration_tests.rs b/rust/dbn-cli/tests/integration_tests.rs index e5e21937..62f94f14 100644 --- a/rust/dbn-cli/tests/integration_tests.rs +++ b/rust/dbn-cli/tests/integration_tests.rs @@ -792,6 +792,233 @@ fn test_merge_mbo_and_mbp10_metadata() { .stderr(is_empty()); } +#[rstest] +fn split_by_day(output_dir: TempDir) { + let output_pattern = format!("{}/{{date}}.json", output_dir.path().to_str().unwrap()); + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.mbo.v3.dbn.zst"), + "--split-by", + "day", + "--output-pattern", + &output_pattern, + "--json", + ]) + .assert() + .success() + .stdout(is_empty()) + .stderr(is_empty()); + let output_path = format!("{}/2020-12-28.json", output_dir.path().to_str().unwrap()); + let contents = fs::read_to_string(&output_path).unwrap(); + assert!(contents.contains("\"rtype\":160")); + assert_eq!(contents.lines().count(), 2); +} + +#[rstest] +fn split_by_week(output_dir: TempDir) { + let output_pattern = format!("{}/{{date}}.json", output_dir.path().to_str().unwrap()); + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.mbo.v3.dbn.zst"), + "--split-by", + "week", + "--output-pattern", + &output_pattern, + "--json", + ]) + .assert() + .success() + .stdout(is_empty()) + .stderr(is_empty()); + // 2020-12-28 is a Monday, week starts on Sunday 2020-12-27 + let output_path = format!("{}/2020-12-27.json", output_dir.path().to_str().unwrap()); + let contents = fs::read_to_string(&output_path).unwrap(); + assert!(contents.contains("\"rtype\":160")); +} + +#[rstest] +fn split_by_month(output_dir: TempDir) { + let output_pattern = format!("{}/{{date}}.json", output_dir.path().to_str().unwrap()); + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.mbo.v3.dbn.zst"), + "--split-by", + "month", + "--output-pattern", + &output_pattern, + "--json", + ]) + .assert() + .success() + .stdout(is_empty()) + .stderr(is_empty()); + let output_path = format!("{}/2020-12-01.json", output_dir.path().to_str().unwrap()); + let contents = fs::read_to_string(&output_path).unwrap(); + assert!(contents.contains("\"rtype\":160")); +} + +#[rstest] +fn split_by_symbol(output_dir: TempDir) { + let output_pattern = format!("{}/{{symbol}}.json", output_dir.path().to_str().unwrap()); + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.mbo.v3.dbn.zst"), + "--split-by", + "symbol", + "--output-pattern", + &output_pattern, + "--json", + ]) + .assert() + .success() + .stdout(is_empty()) + .stderr(is_empty()); + let output_path = format!("{}/ESH1.json", output_dir.path().to_str().unwrap()); + let contents = fs::read_to_string(&output_path).unwrap(); + assert!(contents.contains("\"rtype\":160")); +} + +#[test] +fn output_pattern_requires_split_by() { + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.mbo.v3.dbn.zst"), + "--output-pattern", + "{date}.json", + "--json", + ]) + .assert() + .failure() + .stderr(contains("split-by")); +} + +#[test] +fn split_by_requires_output_pattern() { + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.mbo.v3.dbn.zst"), + "--split-by", + "day", + "--json", + ]) + .assert() + .failure() + .stderr(contains("output-pattern")); +} + +#[test] +fn output_pattern_conflicts_with_output() { + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.mbo.v3.dbn.zst"), + "--split-by", + "day", + "--output", + "out.json", + "--output-pattern", + "{date}.json", + "--json", + ]) + .assert() + .failure() + .stderr(contains("cannot be used with")); +} + +#[rstest] +fn split_by_day_fragment(output_dir: TempDir) { + let output_pattern = format!("{}/{{date}}.dbn", output_dir.path().to_str().unwrap()); + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.mbo.v3.dbn.zst"), + "--split-by", + "day", + "--output-pattern", + &output_pattern, + "--fragment", + ]) + .assert() + .success() + .stdout(is_empty()) + .stderr(is_empty()); + // Verify the fragment file exists and can be decoded + let output_path = format!("{}/2020-12-28.dbn", output_dir.path().to_str().unwrap()); + let contents = std::fs::read(&output_path).unwrap(); + assert_ne!( + &contents[..3], + b"DBN", + "Fragment should not have metadata header" + ); + assert!(!contents.is_empty()); +} + +#[rstest] +fn split_input_fragment_by_day_to_json(output_dir: TempDir) { + let output_pattern = format!("{}/{{date}}.json", output_dir.path().to_str().unwrap()); + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.definition.v3.dbn.frag"), + "--input-fragment", + "--split-by", + "day", + "--output-pattern", + &output_pattern, + ]) + .assert() + .success() + .stdout(is_empty()) + .stderr(is_empty()); + let output_path = format!("{}/2020-12-27.json", output_dir.path().to_str().unwrap()); + let contents = std::fs::read_to_string(&output_path).unwrap(); + assert!(contents.contains("\"rtype\""), "Output should be JSON"); + assert!(!contents.is_empty()); +} + +#[rstest] +fn split_input_fragment_by_day_to_fragment(output_dir: TempDir) { + let output_pattern = format!("{}/{{date}}.dbn.frag", output_dir.path().to_str().unwrap()); + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.definition.v3.dbn.frag"), + "--input-fragment", + "--split-by", + "day", + "--output-pattern", + &output_pattern, + ]) + .assert() + .success() + .stdout(is_empty()) + .stderr(is_empty()); + let output_path = format!( + "{}/2020-12-27.dbn.frag", + output_dir.path().to_str().unwrap() + ); + // Verify file exists and is a fragment: no DBN header + let contents = std::fs::read(&output_path).unwrap(); + assert_ne!( + &contents[..3], + b"DBN", + "Fragment should not have metadata header" + ); + assert!(!contents.is_empty()); +} + +#[test] +fn split_input_fragment_by_symbol_fails() { + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.definition.v3.dbn.frag"), + "--input-fragment", + "--split-by", + "symbol", + "--output-pattern", + "{symbol}.dbn.frag", + ]) + .assert() + .failure() + .stderr(contains("Cannot split by symbol when input is a fragment")); +} + #[test] fn help() { cmd() diff --git a/rust/dbn/Cargo.toml b/rust/dbn/Cargo.toml index 3b6823a8..4d4ec3f6 100644 --- a/rust/dbn/Cargo.toml +++ b/rust/dbn/Cargo.toml @@ -25,7 +25,7 @@ serde = ["dep:serde", "time/parsing", "time/serde"] trivial_copy = [] [dependencies] -dbn-macros = { version = "=0.47.0", path = "../dbn-macros" } +dbn-macros = { version = "=0.48.0", path = "../dbn-macros" } async-compression = { version = "0.4.37", features = ["tokio", "zstd"], optional = true } csv = { workspace = true } diff --git a/rust/dbn/src/decode/dyn_reader.rs b/rust/dbn/src/decode/dyn_reader.rs index 10b6724b..1b85ee81 100644 --- a/rust/dbn/src/decode/dyn_reader.rs +++ b/rust/dbn/src/decode/dyn_reader.rs @@ -265,7 +265,7 @@ mod r#async { pub fn with_buffer(reader: R, compression: Compression) -> Self { match compression { Compression::None => Self(DynReaderImpl::Uncompressed(reader)), - Compression::Zstd => Self(DynReaderImpl::Zstd(ZstdDecoder::new(reader))), + Compression::Zstd => Self(DynReaderImpl::Zstd(zstd_decoder(reader))), } } diff --git a/rust/dbn/src/encode.rs b/rust/dbn/src/encode.rs index f50048e9..6d696359 100644 --- a/rust/dbn/src/encode.rs +++ b/rust/dbn/src/encode.rs @@ -5,6 +5,7 @@ pub mod dbn; mod dyn_encoder; mod dyn_writer; pub mod json; +mod split; use std::{fmt, io, num::NonZeroU64}; @@ -18,6 +19,10 @@ pub use self::{ RecordEncoder as DbnRecordEncoder, }, json::Encoder as JsonEncoder, + split::{ + NoSchemaBehavior, SchemaSplitter, SplitDuration, SplitEncoder, Splitter, SymbolSplitter, + TimeSplitter, + }, }; #[cfg(feature = "async")] pub use self::{ diff --git a/rust/dbn/src/encode/split.rs b/rust/dbn/src/encode/split.rs new file mode 100644 index 00000000..6efa0b04 --- /dev/null +++ b/rust/dbn/src/encode/split.rs @@ -0,0 +1,1064 @@ +//! Splitters for routing DBN records to different encoders. +//! +//! This module provides [`SplitEncoder`] which wraps a [`Splitter`] implementation +//! to route records to different sub-encoders based on various criteria such as time, +//! symbol, or schema. + +use std::{ + collections::{HashMap, HashSet}, + marker::PhantomData, + num::NonZeroU64, +}; + +use time::{Time, Weekday}; + +use crate::{ + encode::{DbnEncodable, EncodeDbn, EncodeRecord, EncodeRecordRef, EncodeRecordTextExt}, + Metadata, RType, Record, RecordRef, Schema, SymbolIndex, +}; + +/// A strategy for routing records to different sub-encoders. +pub trait Splitter { + /// Returns the encoder for the given record, or `None` if the record should be ignored. + /// + /// # Errors + /// This function returns an error if it fails to create the sub encoder. + fn sub_encoder( + &mut self, + metadata: Option<&Metadata>, + record: &R, + ) -> crate::Result> + where + R: Record; + + /// Returns an iterator over all active sub-encoders. + fn sub_encoders<'a>(&'a mut self) -> impl Iterator + where + E: 'a; +} + +/// An encoder that routes records to sub-encoders based on a [`Splitter`] strategy. +/// +/// Wraps a `Splitter` implementation and delegates encoding to the appropriate +/// sub-encoder returned by the splitter for each record. +#[derive(Debug)] +pub struct SplitEncoder { + splitter: S, + metadata: Option, + _encoder: PhantomData, +} + +impl SplitEncoder { + /// Creates a new `SplitEncoder` without metadata. + /// + /// Use this when encoding records without associated metadata, such + /// as DBN fragments. + pub fn records_only(splitter: S) -> Self { + Self { + splitter, + metadata: None, + _encoder: PhantomData, + } + } + + /// Creates a new `SplitEncoder` with metadata. + /// + /// The metadata will be passed to the splitter and used to create split-specific + /// metadata for each sub-encoder. + pub fn with_metadata(splitter: S, metadata: Metadata) -> Self { + Self { + splitter, + metadata: Some(metadata), + _encoder: PhantomData, + } + } +} + +impl EncodeRecord for SplitEncoder +where + S: Splitter, + E: EncodeRecord, +{ + fn encode_record(&mut self, record: &R) -> crate::Result<()> { + if let Some(encoder) = self.splitter.sub_encoder(self.metadata.as_ref(), record)? { + encoder.encode_record(record)?; + } + Ok(()) + } + + fn flush(&mut self) -> crate::Result<()> { + self.splitter.sub_encoders().try_for_each(E::flush) + } +} + +impl EncodeRecordRef for SplitEncoder +where + S: Splitter, + E: EncodeRecordRef, +{ + fn encode_record_ref(&mut self, record: RecordRef) -> crate::Result<()> { + if let Some(encoder) = self.splitter.sub_encoder(self.metadata.as_ref(), &record)? { + encoder.encode_record_ref(record)?; + } + Ok(()) + } + + unsafe fn encode_record_ref_ts_out( + &mut self, + record: RecordRef, + ts_out: bool, + ) -> crate::Result<()> { + if let Some(encoder) = self.splitter.sub_encoder(self.metadata.as_ref(), &record)? { + encoder.encode_record_ref_ts_out(record, ts_out)?; + } + Ok(()) + } +} + +impl EncodeRecordTextExt for SplitEncoder +where + S: Splitter, + E: EncodeRecordTextExt, +{ + fn encode_record_with_sym( + &mut self, + record: &R, + symbol: Option<&str>, + ) -> crate::Result<()> { + if let Some(encoder) = self.splitter.sub_encoder(self.metadata.as_ref(), record)? { + encoder.encode_record_with_sym(record, symbol)?; + } + Ok(()) + } +} + +impl EncodeDbn for SplitEncoder +where + S: Splitter, + E: EncodeRecordTextExt, +{ +} + +#[cfg(feature = "async")] +impl super::AsyncEncodeRecord for SplitEncoder +where + S: Splitter, + E: super::AsyncEncodeRecord, +{ + async fn encode_record(&mut self, record: &R) -> crate::Result<()> { + if let Some(encoder) = self.splitter.sub_encoder(self.metadata.as_ref(), record)? { + encoder.encode_record(record).await?; + } + Ok(()) + } + + async fn flush(&mut self) -> crate::Result<()> { + for encoder in self.splitter.sub_encoders() { + encoder.flush().await?; + } + Ok(()) + } + + async fn shutdown(&mut self) -> crate::Result<()> { + for encoder in self.splitter.sub_encoders() { + encoder.shutdown().await?; + } + Ok(()) + } +} + +#[cfg(feature = "async")] +impl super::AsyncEncodeRecordRef for SplitEncoder +where + S: Splitter, + E: super::AsyncEncodeRecordRef, +{ + async fn encode_record_ref(&mut self, record_ref: RecordRef<'_>) -> crate::Result<()> { + if let Some(encoder) = self + .splitter + .sub_encoder(self.metadata.as_ref(), &record_ref)? + { + encoder.encode_record_ref(record_ref).await?; + } + Ok(()) + } + + async unsafe fn encode_record_ref_ts_out( + &mut self, + record_ref: RecordRef<'_>, + ts_out: bool, + ) -> crate::Result<()> { + if let Some(encoder) = self + .splitter + .sub_encoder(self.metadata.as_ref(), &record_ref)? + { + encoder.encode_record_ref_ts_out(record_ref, ts_out).await?; + } + Ok(()) + } +} + +#[cfg(feature = "async")] +impl super::AsyncEncodeRecordTextExt for SplitEncoder +where + S: Splitter, + E: super::AsyncEncodeRecordTextExt, +{ + async fn encode_record_with_sym( + &mut self, + record: &R, + symbol: Option<&str>, + ) -> crate::Result<()> { + if let Some(encoder) = self.splitter.sub_encoder(self.metadata.as_ref(), record)? { + encoder.encode_record_with_sym(record, symbol).await?; + } + Ok(()) + } +} + +/// How to group records according to their index timestamp. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum SplitDuration { + /// Split by UTC day. + Day, + /// Split by Sunday-based weeks. + Week, + /// Split by month. + Month, +} + +/// Splits a stream by time. +#[derive(Debug)] +pub struct TimeSplitter { + build_encoder: F, + split_duration: SplitDuration, + encoders: HashMap, +} + +/// Splits a stream by symbol. +/// +/// It's generic over [`SymbolIndex`], allowing it to work with both +/// [`TsSymbolMap`](crate::TsSymbolMap) and [`PitSymbolMap`](crate::PitSymbolMap). +#[derive(Debug)] +pub struct SymbolSplitter { + build_encoder: F, + encoders: HashMap, + symbol_map: M, +} + +/// How to handle records with an rtype that doesn't map to a schema such as an +/// [`ErrorMsg`](crate::ErrorMsg). +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum NoSchemaBehavior { + /// Skip records with rtypes that have no schema. + #[default] + Skip, + /// Return an error when encountering an rtype without a schema. + Error, + /// Route records with rtypes that have no schema to all existing encoders. + Broadcast, +} + +/// Splits a stream by schema. +#[derive(Debug)] +pub struct SchemaSplitter { + build_encoder: F, + encoders: HashMap, + no_schema_behavior: NoSchemaBehavior, +} + +impl TimeSplitter +where + F: Fn(time::Date, Option) -> crate::Result, +{ + /// Creates a new splitter that will split the input stream according to + /// `split_duration`, creating a separate sub-encoder for each split using + /// `build_encoder`. + pub fn new(build_encoder: F, split_duration: SplitDuration) -> Self { + Self { + split_duration, + build_encoder, + encoders: HashMap::new(), + } + } + + fn split_metadata( + split_duration: SplitDuration, + mut metadata: Metadata, + encoder_date: time::Date, + ) -> Metadata { + metadata.start = metadata + .start() + .max(encoder_date.with_time(Time::MIDNIGHT).assume_utc()) + .unix_timestamp_nanos() as u64; + let end = match split_duration { + SplitDuration::Day => encoder_date.next_day().unwrap(), + SplitDuration::Week => encoder_date + time::Duration::days(7), + SplitDuration::Month => { + let end_year = if encoder_date.month() == time::Month::December { + encoder_date.year() + 1 + } else { + encoder_date.year() + }; + encoder_date + .replace_month(encoder_date.month().next()) + .unwrap() + .replace_year(end_year) + .unwrap() + } + } + .with_time(Time::MIDNIGHT) + .assume_utc(); + metadata.end = NonZeroU64::new( + metadata + .end() + .map(|old_end| old_end.min(end)) + .unwrap_or(end) + .unix_timestamp_nanos() as u64, + ); + let start_date = metadata.start().date(); + let end = metadata.end().unwrap(); + let end_date = if end.time() == time::Time::MIDNIGHT { + end.date() + } else { + end.date().next_day().unwrap() + }; + metadata.mappings.retain_mut(|mapping| { + mapping.intervals.retain_mut(|interval| { + interval.start_date = interval.start_date.max(start_date); + interval.end_date = interval.end_date.min(end_date); + interval.start_date < end_date && interval.end_date > start_date + }); + !mapping.intervals.is_empty() + }); + let symbols = metadata + .mappings + .iter() + .map(|m| &m.raw_symbol) + .collect::>(); + metadata.symbols.retain(|s| symbols.contains(s)); + metadata.partial.retain(|s| symbols.contains(s)); + + metadata + } +} + +impl Splitter for TimeSplitter +where + F: Fn(time::Date, Option) -> crate::Result, +{ + fn sub_encoder( + &mut self, + metadata: Option<&Metadata>, + record: &R, + ) -> crate::Result> + where + R: Record, + { + use std::collections::hash_map::Entry; + + let index_date = record + .index_date() + .ok_or_else(|| crate::Error::encode("record has undefined timestamp"))?; + let encoder_date = match self.split_duration { + SplitDuration::Day => index_date, + SplitDuration::Week if index_date.weekday() == Weekday::Sunday => index_date, + SplitDuration::Week => index_date.prev_occurrence(Weekday::Sunday), + SplitDuration::Month => index_date.replace_day(1).unwrap(), + }; + let encoder = match self.encoders.entry(encoder_date) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + let split_metadata = metadata + .cloned() + .map(|m| Self::split_metadata(self.split_duration, m, encoder_date)); + entry.insert((self.build_encoder)(encoder_date, split_metadata)?) + } + }; + Ok(Some(encoder)) + } + + fn sub_encoders<'a>(&'a mut self) -> impl Iterator + where + E: 'a, + { + self.encoders.values_mut() + } +} + +impl SymbolSplitter +where + F: Fn(&str, Option) -> crate::Result, + M: SymbolIndex, +{ + /// Creates a new splitter that will split the input stream by symbol, + /// creating a separate sub-encoder for each symbol using `build_encoder`. + /// + /// The `symbol_map` is used to look up the symbol for each record based on + /// the instrument ID (and optionally the timestamp for `TsSymbolMap`). + pub fn new(build_encoder: F, symbol_map: M) -> Self { + Self { + build_encoder, + encoders: HashMap::new(), + symbol_map, + } + } +} + +impl Splitter for SymbolSplitter +where + F: Fn(&str, Option) -> crate::Result, + M: SymbolIndex, +{ + fn sub_encoder( + &mut self, + metadata: Option<&Metadata>, + record: &R, + ) -> crate::Result> + where + R: Record, + { + use std::collections::hash_map::Entry; + + let index_ts = record.index_ts(); + let symbol = self + .symbol_map + .get_for_rec(record) + .ok_or_else(|| { + crate::Error::encode(format!( + "no symbol mapping for instrument_id {} at {index_ts:?}", + record.header().instrument_id + )) + })? + .clone(); + let encoder = match self.encoders.entry(symbol.clone()) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + let split_metadata = metadata.cloned().map(|mut m| { + m.symbols.retain(|s| *s == symbol); + m.partial.retain(|s| *s == symbol); + m.mappings + .retain(|sym_mapping| sym_mapping.raw_symbol == symbol); + m + }); + entry.insert((self.build_encoder)(&symbol, split_metadata)?) + } + }; + Ok(Some(encoder)) + } + + fn sub_encoders<'a>(&'a mut self) -> impl Iterator + where + E: 'a, + { + self.encoders.values_mut() + } +} + +impl SchemaSplitter +where + F: Fn(Schema, Option) -> crate::Result, +{ + /// Creates a new splitter that will split the input stream by schema, + /// creating a separate sub-encoder for each schema using `build_encoder`. + /// + /// The `no_schema_behavior` determines how records with rtypes that don't map + /// to a schema (such as [`ErrorMsg`](crate::ErrorMsg)) are handled. + pub fn new(build_encoder: F, no_schema_behavior: NoSchemaBehavior) -> Self { + Self { + build_encoder, + encoders: HashMap::new(), + no_schema_behavior, + } + } +} + +impl Splitter for SchemaSplitter +where + F: Fn(Schema, Option) -> crate::Result, + E: EncodeRecordRef, +{ + fn sub_encoder( + &mut self, + metadata: Option<&Metadata>, + record: &R, + ) -> crate::Result> + where + R: Record, + { + use std::collections::hash_map::Entry; + + let Some(schema) = RType::try_into_schema(record.header().rtype) else { + return match self.no_schema_behavior { + NoSchemaBehavior::Skip => Ok(None), + NoSchemaBehavior::Error => Err(crate::Error::encode(format!( + "rtype {} has no corresponding schema", + record.header().rtype + ))), + NoSchemaBehavior::Broadcast => { + let rec_ref = + // SAFETY: `record` is a valid DBN record: it satisfies `R: Record`. + unsafe { RecordRef::unchecked_from_header(record.header() as *const _) }; + for encoder in self.encoders.values_mut() { + // Have to use `encode_record_ref` here because `SplitEncoder` supports + // both `EncodeRecord` and `EncodeRecordRef` + encoder.encode_record_ref(rec_ref)?; + } + Ok(None) + } + }; + }; + let encoder = match self.encoders.entry(schema) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + let split_metadata = metadata.cloned().map(|mut m| { + // Only set schema if not broadcasting (broadcast outputs contain mixed rtypes) + if self.no_schema_behavior != NoSchemaBehavior::Broadcast { + m.schema = Some(schema); + } else { + m.schema = None; + } + m + }); + entry.insert((self.build_encoder)(schema, split_metadata)?) + } + }; + Ok(Some(encoder)) + } + + fn sub_encoders<'a>(&'a mut self) -> impl Iterator + where + E: 'a, + { + self.encoders.values_mut() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use time::macros::{date, datetime}; + + use super::*; + use crate::{rtype, MboMsg, Mbp1Msg, RecordHeader, TradeMsg, TsSymbolMap, UNDEF_TIMESTAMP}; + + /// Helper to create an MboMsg with a specific timestamp and instrument_id + fn mbo_msg(ts: u64, instrument_id: u32) -> MboMsg { + MboMsg { + hd: RecordHeader::new::(rtype::MBO, 1, instrument_id, ts), + ts_recv: ts, + ..Default::default() + } + } + + /// Helper to create a TradeMsg with a specific timestamp and instrument_id + fn trade_msg(ts: u64, instrument_id: u32) -> TradeMsg { + TradeMsg { + hd: RecordHeader::new::(rtype::MBP_0, 1, instrument_id, ts), + ts_recv: ts, + ..Default::default() + } + } + + /// Helper to create an Mbp1Msg with a specific timestamp and instrument_id + fn mbp1_msg(ts: u64, instrument_id: u32) -> Mbp1Msg { + Mbp1Msg { + hd: RecordHeader::new::(rtype::MBP_1, 1, instrument_id, ts), + ts_recv: ts, + ..Default::default() + } + } + + /// Simple test encoder that just stores the records it receives + #[derive(Debug, Default)] + struct TestEncoder { + records: Vec<(u64, u32)>, // (ts_event, instrument_id) + } + + impl EncodeRecord for TestEncoder { + fn encode_record(&mut self, record: &R) -> crate::Result<()> { + self.records + .push((record.header().ts_event, record.header().instrument_id)); + Ok(()) + } + + fn flush(&mut self) -> crate::Result<()> { + Ok(()) + } + } + + impl EncodeRecordRef for TestEncoder { + fn encode_record_ref(&mut self, record: RecordRef) -> crate::Result<()> { + self.records + .push((record.header().ts_event, record.header().instrument_id)); + Ok(()) + } + + unsafe fn encode_record_ref_ts_out( + &mut self, + record: RecordRef, + _ts_out: bool, + ) -> crate::Result<()> { + self.encode_record_ref(record) + } + } + + type TestTimeSplitter = + TimeSplitter) -> crate::Result>; + + #[test] + fn test_time_splitter_by_day_single_day() { + let build_encoder = + |_date: time::Date, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = TimeSplitter::new(build_encoder, SplitDuration::Day); + + let ts1 = datetime!(2023-07-15 10:00 UTC).unix_timestamp_nanos() as u64; + let ts2 = datetime!(2023-07-15 14:00 UTC).unix_timestamp_nanos() as u64; + let ts3 = datetime!(2023-07-15 18:00 UTC).unix_timestamp_nanos() as u64; + + let rec1 = mbo_msg(ts1, 100); + let rec2 = mbo_msg(ts2, 101); + let rec3 = mbo_msg(ts3, 102); + + splitter.sub_encoder(None, &rec1).unwrap(); + splitter.sub_encoder(None, &rec2).unwrap(); + splitter.sub_encoder(None, &rec3).unwrap(); + + assert_eq!(splitter.encoders.len(), 1); + assert!(splitter.encoders.contains_key(&date!(2023 - 07 - 15))); + } + + #[test] + fn test_time_splitter_by_day_multiple_days() { + let build_encoder = + |_date: time::Date, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = TimeSplitter::new(build_encoder, SplitDuration::Day); + + let ts_day1 = datetime!(2023-07-15 10:00 UTC).unix_timestamp_nanos() as u64; + let ts_day2 = datetime!(2023-07-16 10:00 UTC).unix_timestamp_nanos() as u64; + let ts_day3 = datetime!(2023-07-17 10:00 UTC).unix_timestamp_nanos() as u64; + + let rec1 = mbo_msg(ts_day1, 100); + let rec2 = mbo_msg(ts_day2, 100); + let rec3 = mbo_msg(ts_day3, 100); + + splitter.sub_encoder(None, &rec1).unwrap(); + splitter.sub_encoder(None, &rec2).unwrap(); + splitter.sub_encoder(None, &rec3).unwrap(); + + assert_eq!(splitter.encoders.len(), 3); + assert!(splitter.encoders.contains_key(&date!(2023 - 07 - 15))); + assert!(splitter.encoders.contains_key(&date!(2023 - 07 - 16))); + assert!(splitter.encoders.contains_key(&date!(2023 - 07 - 17))); + } + + #[test] + fn test_time_splitter_by_week() { + let build_encoder = + |_date: time::Date, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = TimeSplitter::new(build_encoder, SplitDuration::Week); + + // 2023-07-16 is a Sunday, 2023-07-17 is a Monday + let ts_sun = datetime!(2023-07-16 10:00 UTC).unix_timestamp_nanos() as u64; + let ts_mon = datetime!(2023-07-17 10:00 UTC).unix_timestamp_nanos() as u64; + let ts_tue = datetime!(2023-07-18 10:00 UTC).unix_timestamp_nanos() as u64; + let ts_next_sun = datetime!(2023-07-23 10:00 UTC).unix_timestamp_nanos() as u64; + + let rec_sun = mbo_msg(ts_sun, 100); + let rec_mon = mbo_msg(ts_mon, 100); + let rec_tue = mbo_msg(ts_tue, 100); + let rec_next_sun = mbo_msg(ts_next_sun, 100); + + splitter.sub_encoder(None, &rec_sun).unwrap(); + splitter.sub_encoder(None, &rec_mon).unwrap(); + splitter.sub_encoder(None, &rec_tue).unwrap(); + splitter.sub_encoder(None, &rec_next_sun).unwrap(); + + // Sunday is its own week start, Mon-Sat belong to that Sunday + // Next Sunday is a new week + assert_eq!(splitter.encoders.len(), 2); + assert!(splitter.encoders.contains_key(&date!(2023 - 07 - 16))); // First Sunday + assert!(splitter.encoders.contains_key(&date!(2023 - 07 - 23))); // Next Sunday + } + + #[test] + fn test_time_splitter_by_month() { + let build_encoder = + |_date: time::Date, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = TimeSplitter::new(build_encoder, SplitDuration::Month); + + let ts_jul = datetime!(2023-07-15 10:00 UTC).unix_timestamp_nanos() as u64; + let ts_aug = datetime!(2023-08-10 10:00 UTC).unix_timestamp_nanos() as u64; + let ts_sep = datetime!(2023-09-05 10:00 UTC).unix_timestamp_nanos() as u64; + + let rec_jul = mbo_msg(ts_jul, 100); + let rec_aug = mbo_msg(ts_aug, 100); + let rec_sep = mbo_msg(ts_sep, 100); + + splitter.sub_encoder(None, &rec_jul).unwrap(); + splitter.sub_encoder(None, &rec_aug).unwrap(); + splitter.sub_encoder(None, &rec_sep).unwrap(); + + assert_eq!(splitter.encoders.len(), 3); + assert!(splitter.encoders.contains_key(&date!(2023 - 07 - 01))); + assert!(splitter.encoders.contains_key(&date!(2023 - 08 - 01))); + assert!(splitter.encoders.contains_key(&date!(2023 - 09 - 01))); + } + + #[test] + fn test_time_splitter_by_month_year_boundary() { + let build_encoder = + |_date: time::Date, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = TimeSplitter::new(build_encoder, SplitDuration::Month); + + let ts_dec = datetime!(2023-12-15 10:00 UTC).unix_timestamp_nanos() as u64; + let ts_jan = datetime!(2024-01-10 10:00 UTC).unix_timestamp_nanos() as u64; + + let rec_dec = mbo_msg(ts_dec, 100); + let rec_jan = mbo_msg(ts_jan, 100); + + splitter.sub_encoder(None, &rec_dec).unwrap(); + splitter.sub_encoder(None, &rec_jan).unwrap(); + + assert_eq!(splitter.encoders.len(), 2); + assert!(splitter.encoders.contains_key(&date!(2023 - 12 - 01))); + assert!(splitter.encoders.contains_key(&date!(2024 - 01 - 01))); + } + + #[test] + fn test_symbol_splitter_multiple_symbols() { + let mut symbol_map = TsSymbolMap::new(); + symbol_map + .insert( + 100, + date!(2023 - 07 - 01), + date!(2023 - 08 - 01), + Arc::new("AAPL".to_owned()), + ) + .unwrap(); + symbol_map + .insert( + 101, + date!(2023 - 07 - 01), + date!(2023 - 08 - 01), + Arc::new("TSLA".to_owned()), + ) + .unwrap(); + symbol_map + .insert( + 102, + date!(2023 - 07 - 01), + date!(2023 - 08 - 01), + Arc::new("MSFT".to_owned()), + ) + .unwrap(); + + let build_encoder = |_symbol: &str, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = SymbolSplitter::new(build_encoder, symbol_map); + + let ts = datetime!(2023-07-15 10:00 UTC).unix_timestamp_nanos() as u64; + + let rec_aapl = mbo_msg(ts, 100); + let rec_tsla = mbo_msg(ts, 101); + let rec_msft = mbo_msg(ts, 102); + + splitter.sub_encoder(None, &rec_aapl).unwrap(); + splitter.sub_encoder(None, &rec_tsla).unwrap(); + splitter.sub_encoder(None, &rec_msft).unwrap(); + + assert_eq!(splitter.encoders.len(), 3); + assert!(splitter.encoders.keys().any(|k| k == "AAPL")); + assert!(splitter.encoders.keys().any(|k| k == "TSLA")); + assert!(splitter.encoders.keys().any(|k| k == "MSFT")); + } + + #[test] + fn test_symbol_splitter_same_symbol_multiple_records() { + let mut symbol_map = TsSymbolMap::new(); + symbol_map + .insert( + 100, + date!(2023 - 07 - 01), + date!(2023 - 08 - 01), + Arc::new("AAPL".to_owned()), + ) + .unwrap(); + + let build_encoder = |_symbol: &str, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = SymbolSplitter::new(build_encoder, symbol_map); + + let ts1 = datetime!(2023-07-15 10:00 UTC).unix_timestamp_nanos() as u64; + let ts2 = datetime!(2023-07-15 11:00 UTC).unix_timestamp_nanos() as u64; + let ts3 = datetime!(2023-07-15 12:00 UTC).unix_timestamp_nanos() as u64; + + let rec1 = mbo_msg(ts1, 100); + let rec2 = mbo_msg(ts2, 100); + let rec3 = mbo_msg(ts3, 100); + + splitter.sub_encoder(None, &rec1).unwrap(); + splitter.sub_encoder(None, &rec2).unwrap(); + splitter.sub_encoder(None, &rec3).unwrap(); + + assert_eq!(splitter.encoders.len(), 1); + } + + #[test] + fn test_symbol_splitter_unknown_instrument() { + let mut symbol_map = TsSymbolMap::new(); + symbol_map + .insert( + 100, + date!(2023 - 07 - 01), + date!(2023 - 08 - 01), + Arc::new("AAPL".to_owned()), + ) + .unwrap(); + + let build_encoder = |_symbol: &str, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = SymbolSplitter::new(build_encoder, symbol_map); + + let ts = datetime!(2023-07-15 10:00 UTC).unix_timestamp_nanos() as u64; + + // instrument ID not in symbol map + let rec_unknown = mbo_msg(ts, 999); + + let result = splitter.sub_encoder(None, &rec_unknown); + assert!(result.is_err()); + assert_eq!(splitter.encoders.len(), 0); + } + + #[test] + fn test_schema_splitter_multiple_schemas() { + let build_encoder = + |_schema: Schema, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = SchemaSplitter::new(build_encoder, NoSchemaBehavior::Skip); + + let ts = datetime!(2023-07-15 10:00 UTC).unix_timestamp_nanos() as u64; + + let rec_mbo = mbo_msg(ts, 100); + let rec_trades = trade_msg(ts, 100); + let rec_mbp1 = mbp1_msg(ts, 100); + + splitter.sub_encoder(None, &rec_mbo).unwrap(); + splitter.sub_encoder(None, &rec_trades).unwrap(); + splitter.sub_encoder(None, &rec_mbp1).unwrap(); + + assert_eq!(splitter.encoders.len(), 3); + assert!(splitter.encoders.contains_key(&Schema::Mbo)); + assert!(splitter.encoders.contains_key(&Schema::Trades)); + assert!(splitter.encoders.contains_key(&Schema::Mbp1)); + } + + #[test] + fn test_schema_splitter_same_schema() { + let build_encoder = + |_schema: Schema, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = SchemaSplitter::new(build_encoder, NoSchemaBehavior::Skip); + + let ts1 = datetime!(2023-07-15 10:00 UTC).unix_timestamp_nanos() as u64; + let ts2 = datetime!(2023-07-15 11:00 UTC).unix_timestamp_nanos() as u64; + + let rec1 = mbo_msg(ts1, 100); + let rec2 = mbo_msg(ts2, 101); + + splitter.sub_encoder(None, &rec1).unwrap(); + splitter.sub_encoder(None, &rec2).unwrap(); + + // Both are MBO, so should have only one encoder + assert_eq!(splitter.encoders.len(), 1); + assert!(splitter.encoders.contains_key(&Schema::Mbo)); + } + + #[test] + fn test_split_encoder_with_time_splitter() { + let build_encoder = + |_date: time::Date, _metadata: Option| Ok(TestEncoder::default()); + let splitter = TimeSplitter::new(build_encoder, SplitDuration::Day); + let mut encoder: SplitEncoder<_, TestEncoder> = SplitEncoder::records_only(splitter); + + let ts_day1 = datetime!(2023-07-15 10:00 UTC).unix_timestamp_nanos() as u64; + let ts_day2 = datetime!(2023-07-16 10:00 UTC).unix_timestamp_nanos() as u64; + + let rec1 = mbo_msg(ts_day1, 100); + let rec2 = mbo_msg(ts_day1, 101); + let rec3 = mbo_msg(ts_day2, 100); + + encoder.encode_record(&rec1).unwrap(); + encoder.encode_record(&rec2).unwrap(); + encoder.encode_record(&rec3).unwrap(); + + let day1_encoder = encoder + .splitter + .encoders + .get(&date!(2023 - 07 - 15)) + .unwrap(); + let day2_encoder = encoder + .splitter + .encoders + .get(&date!(2023 - 07 - 16)) + .unwrap(); + + assert_eq!(day1_encoder.records.len(), 2); + assert_eq!(day2_encoder.records.len(), 1); + } + + #[test] + fn test_split_metadata_by_day() { + use crate::{MappingInterval, MetadataBuilder, SType, Schema, SymbolMapping}; + use std::num::NonZeroU64; + + let metadata = MetadataBuilder::new() + .dataset("TEST".to_owned()) + .schema(Some(Schema::Mbo)) + .stype_in(Some(SType::RawSymbol)) + .stype_out(SType::InstrumentId) + .start(datetime!(2023-07-01 00:00 UTC).unix_timestamp_nanos() as u64) + .end(NonZeroU64::new( + datetime!(2023-07-10 00:00 UTC).unix_timestamp_nanos() as u64, + )) + .symbols(vec!["AAPL".to_owned()]) + .mappings(vec![SymbolMapping { + raw_symbol: "AAPL".to_owned(), + intervals: vec![MappingInterval { + start_date: date!(2023 - 07 - 01), + end_date: date!(2023 - 07 - 10), + symbol: "100".to_owned(), + }], + }]) + .build(); + + let split_meta = TestTimeSplitter::split_metadata( + SplitDuration::Day, + metadata.clone(), + date!(2023 - 07 - 05), + ); + + assert_eq!( + split_meta.start, + datetime!(2023-07-05 00:00 UTC).unix_timestamp_nanos() as u64 + ); + assert_eq!( + split_meta.end.unwrap().get(), + datetime!(2023-07-06 00:00 UTC).unix_timestamp_nanos() as u64 + ); + } + + #[test] + fn test_split_metadata_by_month() { + use crate::{MappingInterval, MetadataBuilder, SType, Schema, SymbolMapping}; + use std::num::NonZeroU64; + + let metadata = MetadataBuilder::new() + .dataset("TEST".to_owned()) + .schema(Some(Schema::Mbo)) + .stype_in(Some(SType::RawSymbol)) + .stype_out(SType::InstrumentId) + .start(datetime!(2023-06-15 00:00 UTC).unix_timestamp_nanos() as u64) + .end(NonZeroU64::new( + datetime!(2023-08-15 00:00 UTC).unix_timestamp_nanos() as u64, + )) + .symbols(vec!["AAPL".to_owned()]) + .mappings(vec![SymbolMapping { + raw_symbol: "AAPL".to_owned(), + intervals: vec![MappingInterval { + start_date: date!(2023 - 06 - 15), + end_date: date!(2023 - 08 - 15), + symbol: "100".to_owned(), + }], + }]) + .build(); + + // Test metadata splitting for July + let split_meta = TestTimeSplitter::split_metadata( + SplitDuration::Month, + metadata.clone(), + date!(2023 - 07 - 01), + ); + + // Check that start/end are correctly bounded to July + assert_eq!( + split_meta.start, + datetime!(2023-07-01 00:00 UTC).unix_timestamp_nanos() as u64 + ); + assert_eq!( + split_meta.end.unwrap().get(), + datetime!(2023-08-01 00:00 UTC).unix_timestamp_nanos() as u64 + ); + } + + #[test] + fn test_split_metadata_retains_relevant_mappings() { + use crate::{MappingInterval, MetadataBuilder, SType, Schema, SymbolMapping}; + use std::num::NonZeroU64; + + let metadata = MetadataBuilder::new() + .dataset("TEST".to_owned()) + .schema(Some(Schema::Mbo)) + .stype_in(Some(SType::RawSymbol)) + .stype_out(SType::InstrumentId) + .start(datetime!(2023-07-01 00:00 UTC).unix_timestamp_nanos() as u64) + .end(NonZeroU64::new( + datetime!(2023-07-31 00:00 UTC).unix_timestamp_nanos() as u64, + )) + .symbols(vec!["AAPL".to_owned(), "TSLA".to_owned()]) + .mappings(vec![ + SymbolMapping { + raw_symbol: "AAPL".to_owned(), + intervals: vec![MappingInterval { + start_date: date!(2023 - 07 - 01), + end_date: date!(2023 - 07 - 15), + symbol: "100".to_owned(), + }], + }, + SymbolMapping { + raw_symbol: "TSLA".to_owned(), + intervals: vec![MappingInterval { + start_date: date!(2023 - 07 - 10), + end_date: date!(2023 - 07 - 25), + symbol: "101".to_owned(), + }], + }, + ]) + .build(); + + // both AAPL and TSLA should be present + let split_meta = TestTimeSplitter::split_metadata( + SplitDuration::Day, + metadata.clone(), + date!(2023 - 07 - 12), + ); + assert_eq!(split_meta.mappings.len(), 2); + assert_eq!(split_meta.symbols.len(), 2); + + // only AAPL should be present + let split_meta = TestTimeSplitter::split_metadata( + SplitDuration::Day, + metadata.clone(), + date!(2023 - 07 - 05), + ); + assert_eq!(split_meta.mappings.len(), 1); + assert_eq!(split_meta.mappings[0].raw_symbol, "AAPL"); + assert_eq!(split_meta.symbols.len(), 1); + + // only TSLA should be present + let split_meta = TestTimeSplitter::split_metadata( + SplitDuration::Day, + metadata.clone(), + date!(2023 - 07 - 20), + ); + assert_eq!(split_meta.mappings.len(), 1); + assert_eq!(split_meta.mappings[0].raw_symbol, "TSLA"); + assert_eq!(split_meta.symbols.len(), 1); + } + + #[test] + fn test_record_with_undef_timestamp_returns_error() { + let build_encoder = + |_date: time::Date, _metadata: Option| Ok(TestEncoder::default()); + let mut splitter = TimeSplitter::new(build_encoder, SplitDuration::Day); + + let rec = mbo_msg(UNDEF_TIMESTAMP, 100); + splitter.sub_encoder(None, &rec).unwrap_err(); + } +} diff --git a/rust/dbn/src/metadata.rs b/rust/dbn/src/metadata.rs index 476d2707..593c94e6 100644 --- a/rust/dbn/src/metadata.rs +++ b/rust/dbn/src/metadata.rs @@ -169,6 +169,25 @@ impl Metadata { } Ok(merger.finalize()) } + + /// Returns `true` if the metadata is for inverse mappings, where `stype_in` is + /// [`SType::InstrumentId`]. + /// + /// # Errors + /// This function returns an error if neither `stype_in` and `stype_out` are + /// [`SType::InstrumentId`]. + pub fn is_inverse(&self) -> crate::Result { + match (self.stype_in, self.stype_out) { + (_, SType::InstrumentId) => Ok(false), + (Some(SType::InstrumentId), _) => Ok(true), + _ => { + Err(crate::Error::BadArgument { + param_name: "self".to_owned(), + desc: "Can only create symbol maps from metadata where either stype_out or stype_in is instrument ID".to_owned(), + }) + } + } + } } /// Helper for constructing [`Metadata`] structs with defaults. diff --git a/rust/dbn/src/publishers.rs b/rust/dbn/src/publishers.rs index 580bbe29..bad1b23c 100644 --- a/rust/dbn/src/publishers.rs +++ b/rust/dbn/src/publishers.rs @@ -117,10 +117,12 @@ pub enum Venue { Xeee = 51, /// Cboe Futures Exchange Xcbf = 52, + /// Blue Ocean ATS + Ocea = 53, } /// The number of [`Venue`] variants. -pub const VENUE_COUNT: usize = 52; +pub const VENUE_COUNT: usize = 53; impl Venue { /// Converts the venue to its `str` representation. @@ -178,6 +180,7 @@ impl Venue { Self::Xeur => "XEUR", Self::Xeee => "XEEE", Self::Xcbf => "XCBF", + Self::Ocea => "OCEA", } } } @@ -251,6 +254,7 @@ impl std::str::FromStr for Venue { "XEUR" => Ok(Self::Xeur), "XEEE" => Ok(Self::Xeee), "XCBF" => Ok(Self::Xcbf), + "OCEA" => Ok(Self::Ocea), _ => Err(Error::conversion::(s)), } } @@ -345,10 +349,12 @@ pub enum Dataset { XeeeEobi = 39, /// Cboe Futures Exchange PITCH XcbfPitch = 40, + /// Blue Ocean ATS MEMOIR Depth + OceaMemoir = 41, } /// The number of [`Dataset`] variants. -pub const DATASET_COUNT: usize = 40; +pub const DATASET_COUNT: usize = 41; impl Dataset { /// Converts the dataset to its `str` representation. @@ -396,6 +402,7 @@ impl Dataset { Self::XeurEobi => "XEUR.EOBI", Self::XeeeEobi => "XEEE.EOBI", Self::XcbfPitch => "XCBF.PITCH", + Self::OceaMemoir => "OCEA.MEMOIR", } } @@ -519,6 +526,7 @@ impl Dataset { Self::XeurEobi => &[Publisher::XeurEobiXeur, Publisher::XeurEobiXoff], Self::XeeeEobi => &[Publisher::XeeeEobiXeee, Publisher::XeeeEobiXoff], Self::XcbfPitch => &[Publisher::XcbfPitchXcbf, Publisher::XcbfPitchXoff], + Self::OceaMemoir => &[Publisher::OceaMemoirOcea], } } } @@ -582,6 +590,7 @@ impl std::str::FromStr for Dataset { "XEUR.EOBI" => Ok(Self::XeurEobi), "XEEE.EOBI" => Ok(Self::XeeeEobi), "XCBF.PITCH" => Ok(Self::XcbfPitch), + "OCEA.MEMOIR" => Ok(Self::OceaMemoir), _ => Err(Error::conversion::(s)), } } @@ -806,10 +815,12 @@ pub enum Publisher { XcbfPitchXcbf = 105, /// Cboe Futures Exchange - Off-Market Trades XcbfPitchXoff = 106, + /// Blue Ocean ATS MEMOIR + OceaMemoirOcea = 107, } /// The number of [`Publisher`] variants. -pub const PUBLISHER_COUNT: usize = 106; +pub const PUBLISHER_COUNT: usize = 107; impl Publisher { /// Converts the publisher to its `str` representation. @@ -921,6 +932,7 @@ impl Publisher { Self::XeeeEobiXoff => "XEEE.EOBI.XOFF", Self::XcbfPitchXcbf => "XCBF.PITCH.XCBF", Self::XcbfPitchXoff => "XCBF.PITCH.XOFF", + Self::OceaMemoirOcea => "OCEA.MEMOIR.OCEA", } } @@ -1033,6 +1045,7 @@ impl Publisher { Self::XeeeEobiXoff => Venue::Xoff, Self::XcbfPitchXcbf => Venue::Xcbf, Self::XcbfPitchXoff => Venue::Xoff, + Self::OceaMemoirOcea => Venue::Ocea, } } @@ -1145,6 +1158,7 @@ impl Publisher { Self::XeeeEobiXoff => Dataset::XeeeEobi, Self::XcbfPitchXcbf => Dataset::XcbfPitch, Self::XcbfPitchXoff => Dataset::XcbfPitch, + Self::OceaMemoirOcea => Dataset::OceaMemoir, } } @@ -1260,6 +1274,7 @@ impl Publisher { (Dataset::XeeeEobi, Venue::Xoff) => Ok(Self::XeeeEobiXoff), (Dataset::XcbfPitch, Venue::Xcbf) => Ok(Self::XcbfPitchXcbf), (Dataset::XcbfPitch, Venue::Xoff) => Ok(Self::XcbfPitchXoff), + (Dataset::OceaMemoir, Venue::Ocea) => Ok(Self::OceaMemoirOcea), _ => Err(Error::conversion::(format!("({dataset}, {venue})"))), } } @@ -1388,6 +1403,7 @@ impl std::str::FromStr for Publisher { "XEEE.EOBI.XOFF" => Ok(Self::XeeeEobiXoff), "XCBF.PITCH.XCBF" => Ok(Self::XcbfPitchXcbf), "XCBF.PITCH.XOFF" => Ok(Self::XcbfPitchXoff), + "OCEA.MEMOIR.OCEA" => Ok(Self::OceaMemoirOcea), _ => Err(Error::conversion::(s)), } } diff --git a/rust/dbn/src/symbol_map.rs b/rust/dbn/src/symbol_map.rs index 6cad7958..1028e5f3 100644 --- a/rust/dbn/src/symbol_map.rs +++ b/rust/dbn/src/symbol_map.rs @@ -4,9 +4,7 @@ use std::{cmp::Ordering, collections::HashMap, ops::Deref, sync::Arc}; use time::{macros::time, PrimitiveDateTime}; -use crate::{ - compat, v1, Error, HasRType, Metadata, RType, Record, RecordRef, SType, SymbolMappingMsg, -}; +use crate::{compat, v1, Error, HasRType, Metadata, Record, RecordRef, SymbolMappingMsg}; /// A timeseries symbol map. Generally useful for working with historical data /// and is commonly built from a [`Metadata`] object via [`Self::from_metadata()`]. @@ -46,8 +44,8 @@ impl TsSymbolMap { /// /// # Errors /// This function returns an error if neither stype_in or stype_out are - /// [`SType::InstrumentId`]. It will also return an error if it can't - /// parse a symbol into `u32` instrument ID. + /// [`SType::InstrumentId`](crate::SType::InstrumentId). It will also return an + /// error if it can't parse a symbol into `u32` instrument ID. pub fn from_metadata(metadata: &Metadata) -> crate::Result { Self::try_from(metadata) } @@ -118,7 +116,7 @@ impl TryFrom<&Metadata> for TsSymbolMap { fn try_from(metadata: &Metadata) -> Result { let mut res = Self::new(); - if is_inverse(metadata)? { + if metadata.is_inverse()? { for mapping in metadata.mappings.iter() { let iid = mapping .raw_symbol @@ -174,11 +172,11 @@ impl PitSymbolMap { /// /// # Errors /// This function returns an error if neither stype_in or stype_out are - /// [`SType::InstrumentId`]. It will also return an error if it can't - /// parse a symbol into `u32` instrument ID or if `date` is outside the query range - /// of the metadata. + /// [`SType::InstrumentId`](crate::SType::InstrumentId). It will also return an + /// error if it can't parse a symbol into `u32` instrument ID or if `date` is + /// outside the query range of the metadata. pub fn from_metadata(metadata: &Metadata, date: time::Date) -> crate::Result { - let is_inverse = is_inverse(metadata)?; + let is_inverse = metadata.is_inverse()?; let datetime = PrimitiveDateTime::new(date, time!(0:00)).assume_utc(); // need to compare with `end` as a datetime to handle midnight case if date < metadata.start().date() || metadata.end().is_some_and(|end| datetime >= end) { @@ -221,19 +219,12 @@ impl PitSymbolMap { /// This function returns an error when `record` contains a symbol mapping /// with invalid UTF-8. pub fn on_record(&mut self, record: RecordRef) -> crate::Result<()> { - match record.rtype() { - // >= to allow WithTsOut - Ok(RType::SymbolMapping) - if record.record_size() >= std::mem::size_of::() => - { - // Safety: checked rtype and length - self.on_symbol_mapping(unsafe { record.get_unchecked::() }) - } - Ok(RType::SymbolMapping) => { - // Use `get` here to get still perform length checks - self.on_symbol_mapping(record.get::().unwrap()) - } - _ => Ok(()), + if let Ok(symbol_mapping) = record.try_get::() { + self.on_symbol_mapping(symbol_mapping) + } else if let Ok(symbol_mapping) = record.try_get::() { + self.on_symbol_mapping(symbol_mapping) + } else { + Ok(()) } } @@ -323,19 +314,6 @@ impl std::ops::Index for PitSymbolMap { } } -fn is_inverse(metadata: &Metadata) -> crate::Result { - match (metadata.stype_in, metadata.stype_out) { - (_, SType::InstrumentId) => Ok(false), - (Some(SType::InstrumentId), _) => Ok(true), - _ => { - Err(Error::BadArgument { - param_name: "metadata".to_owned(), - desc: "Can only create symbol maps from metadata where either stype_out or stype_in is instrument ID".to_owned(), - }) - } - } -} - #[cfg(test)] pub(crate) mod tests { use std::num::NonZeroU64; @@ -346,7 +324,7 @@ pub(crate) mod tests { use crate::{ compat::{SymbolMappingMsgV1, SymbolMappingRec}, publishers::Dataset, - MappingInterval, Metadata, Schema, SymbolMapping, UNDEF_TIMESTAMP, + MappingInterval, Metadata, SType, Schema, SymbolMapping, UNDEF_TIMESTAMP, }; use super::*;