diff --git a/CHANGELOG.md b/CHANGELOG.md index 02ca03d5..3b4be6de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,12 +1,25 @@ # Changelog +## 0.49.0 - 2026-02-17 + +### Enhancements +- Added `encode_records` to `EncodeRecord` and `AsyncEncodeRecord` traits for more + efficient batch encoding +- Added `encode_record_refs` to `EncodeRecordRef` and `AsyncEncodeRecordRef` traits with + more efficient vectored I/O implementation for DBN +- Added support for decompressing Zstd in the Python `DBNDecoder` and new optional + `compression` parameter + +### Breaking changes +- Moved `encode_records` method from the `EncodeDbn` trait to `EncodeRecord` + ## 0.48.0 - 2026-01-27 ### Enhancements - Added initial support for splitting DBN files: - Added new `SplitEncoder` that supports synchronous and asynchronous encoding - Added new `Splitter` trait that allows for extensible splitting of files while - reusing the `SplitEncoder` boilerplate. + reusing the `SplitEncoder` boilerplate - Added `SchemaSplitter`, `SymbolSplitter`, and `TimeSplitter` which allow for different methods of splitting DBN files - Added split support to the CLI. For example: diff --git a/Cargo.lock b/Cargo.lock index 7e0a8b65..8046ab3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,9 +63,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.100" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" [[package]] name = "assert_cmd" @@ -84,9 +84,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.37" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d10e4f991a553474232bc0a31799f6d24b034a84c0971d80d2e2f78b2e576e40" +checksum = "68650b7df54f0293fd061972a0fb05aaf4fc0879d3b3d21a638a182c5c543b9f" dependencies = [ "compression-codecs", "compression-core", @@ -102,9 +102,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "bitflags" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" [[package]] name = "bstr" @@ -119,9 +119,9 @@ dependencies = [ [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "cbindgen" @@ -136,16 +136,16 @@ dependencies = [ "quote", "serde", "serde_json", - "syn 2.0.114", + "syn 2.0.116", "tempfile", - "toml", + "toml 0.9.12+spec-1.1.0", ] [[package]] name = "cc" -version = "1.2.54" +version = "1.2.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" +checksum = "aebf35691d1bfb0ac386a69bac2fde4dd276fb618cf8bf4f5318fe285e821bb2" dependencies = [ "find-msvc-tools", "jobserver", @@ -161,9 +161,9 @@ checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" [[package]] name = "clap" -version = "4.5.54" +version = "4.5.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6e6ff9dcd79cff5cd969a17a545d79e84ab086e444102a591e288a8aa3ce394" +checksum = "c5caf74d17c3aec5495110c34cc3f78644bfa89af6c8993ed4de2790e49b6499" dependencies = [ "clap_builder", "clap_derive", @@ -171,9 +171,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.54" +version = "4.5.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa42cf4d2b7a41bc8f663a7cab4031ebafa1bf3875705bfaf8466dc60ab52c00" +checksum = "370daa45065b80218950227371916a1633217ae42b2715b2287b606dcd618e24" dependencies = [ "anstream", "anstyle", @@ -184,21 +184,21 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.49" +version = "4.5.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] name = "clap_lex" -version = "0.7.7" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3e64b0cc0439b12df2fa678eae89a1c56a529fd067a9115f7827f1fffd22b32" +checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" [[package]] name = "colorchoice" @@ -246,7 +246,7 @@ dependencies = [ [[package]] name = "databento-dbn" -version = "0.48.0" +version = "0.49.0" dependencies = [ "dbn", "pyo3", @@ -258,7 +258,7 @@ dependencies = [ [[package]] name = "dbn" -version = "0.48.0" +version = "0.49.0" dependencies = [ "async-compression", "csv", @@ -281,7 +281,7 @@ dependencies = [ [[package]] name = "dbn-c" -version = "0.48.0" +version = "0.49.0" dependencies = [ "anyhow", "cbindgen", @@ -291,7 +291,7 @@ dependencies = [ [[package]] name = "dbn-cli" -version = "0.48.0" +version = "0.49.0" dependencies = [ "anyhow", "assert_cmd", @@ -307,22 +307,22 @@ dependencies = [ [[package]] name = "dbn-macros" -version = "0.48.0" +version = "0.49.0" dependencies = [ "csv", "dbn", "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", "trybuild", ] [[package]] name = "deranged" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +checksum = "cc3dc5ad92c2e2d1c193bbbbdf2ea477cb81331de4f3103f267ca18368b988c4" dependencies = [ "powerfmt", "serde_core", @@ -364,9 +364,9 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "float-cmp" @@ -377,28 +377,34 @@ dependencies = [ "num-traits", ] +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "futures-core" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" [[package]] name = "futures-macro" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] name = "futures-task" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" [[package]] name = "futures-timer" @@ -408,15 +414,14 @@ checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", "futures-macro", "futures-task", "pin-project-lite", - "pin-utils", "slab", ] @@ -432,12 +437,34 @@ dependencies = [ "wasip2", ] +[[package]] +name = "getrandom" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + [[package]] name = "glob" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + [[package]] name = "hashbrown" version = "0.16.1" @@ -450,6 +477,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "indexmap" version = "2.13.0" @@ -457,7 +490,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.1", + "serde", + "serde_core", ] [[package]] @@ -487,7 +522,7 @@ version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" dependencies = [ - "getrandom", + "getrandom 0.3.4", "libc", ] @@ -501,11 +536,17 @@ dependencies = [ "ryu", ] +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" -version = "0.2.180" +version = "0.2.182" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" [[package]] name = "linux-raw-sys" @@ -521,9 +562,9 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "memchr" -version = "2.7.6" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "memoffset" @@ -583,7 +624,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -610,12 +651,6 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "pkg-config" version = "0.3.32" @@ -624,9 +659,9 @@ checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" [[package]] name = "portable-atomic" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f89776e4d69bb58bc6993e99ffa1d11f228b839984854c7daeb5d37f87cbe950" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" [[package]] name = "powerfmt" @@ -636,9 +671,9 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "predicates" -version = "3.1.3" +version = "3.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" +checksum = "ada8f2932f28a27ee7b70dd6c1c39ea0675c55a36879ab92f3a715eaa1e63cfe" dependencies = [ "anstyle", "difflib", @@ -650,20 +685,30 @@ dependencies = [ [[package]] name = "predicates-core" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa" +checksum = "cad38746f3166b4031b1a0d39ad9f954dd291e7854fcc0eed52ee41a0b50d144" [[package]] name = "predicates-tree" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c" +checksum = "d0de1b847b39c8131db0467e9df1ff60e6d0562ab8e9a16e568ad0fdb372e2f2" dependencies = [ "predicates-core", "termtree", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn 2.0.116", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -727,7 +772,7 @@ dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -740,7 +785,7 @@ dependencies = [ "proc-macro2", "pyo3-build-config", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -760,9 +805,9 @@ checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" [[package]] name = "regex" -version = "1.12.2" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" dependencies = [ "aho-corasick", "memchr", @@ -772,9 +817,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" dependencies = [ "aho-corasick", "memchr", @@ -783,9 +828,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" [[package]] name = "relative-path" @@ -818,7 +863,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.114", + "syn 2.0.116", "unicode-ident", ] @@ -852,9 +897,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "ryu" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" [[package]] name = "semver" @@ -889,7 +934,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -922,9 +967,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "slab" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" [[package]] name = "strsim" @@ -950,7 +995,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -966,9 +1011,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.114" +version = "2.0.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a" +checksum = "3df424c70518695237746f84cede799c9c58fcb37450d7b23716568cc8bc69cb" dependencies = [ "proc-macro2", "quote", @@ -977,9 +1022,9 @@ dependencies = [ [[package]] name = "target-lexicon" -version = "0.13.4" +version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1dd07eb858a2067e2f3c7155d54e929265c264e6f37efe3ee7a8d1b5a1dd0ba" +checksum = "adb6935a6f5c20170eeceb1a3835a49e12e19d792f6dd344ccc76a985ca5a6ca" [[package]] name = "target-triple" @@ -989,12 +1034,12 @@ checksum = "591ef38edfb78ca4771ee32cf494cb8771944bee237a9b91fc9c1424ac4b777b" [[package]] name = "tempfile" -version = "3.24.0" +version = "3.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" +checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" dependencies = [ "fastrand", - "getrandom", + "getrandom 0.4.1", "once_cell", "rustix", "windows-sys 0.61.2", @@ -1042,14 +1087,14 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] name = "time" -version = "0.3.46" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", @@ -1068,9 +1113,9 @@ checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.26" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", @@ -1095,19 +1140,34 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", +] + +[[package]] +name = "toml" +version = "0.9.12+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf92845e79fc2e2def6a5d828f0801e29a2f8acc037becc5ab08595c7d5e9863" +dependencies = [ + "indexmap", + "serde_core", + "serde_spanned", + "toml_datetime 0.7.5+spec-1.1.0", + "toml_parser", + "toml_writer", + "winnow", ] [[package]] name = "toml" -version = "0.9.11+spec-1.1.0" +version = "1.0.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3afc9a848309fe1aaffaed6e1546a7a14de1f935dc9d89d32afd9a44bab7c46" +checksum = "d1dfefef6a142e93f346b64c160934eb13b5594b84ab378133ac6815cb2bd57f" dependencies = [ "indexmap", "serde_core", "serde_spanned", - "toml_datetime", + "toml_datetime 1.0.0+spec-1.1.0", "toml_parser", "toml_writer", "winnow", @@ -1122,6 +1182,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "toml_datetime" +version = "1.0.0+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32c2555c699578a4f59f0cc68e5116c8d7cabbd45e1409b989d4be085b53f13e" +dependencies = [ + "serde_core", +] + [[package]] name = "toml_edit" version = "0.23.10+spec-1.0.0" @@ -1129,16 +1198,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" dependencies = [ "indexmap", - "toml_datetime", + "toml_datetime 0.7.5+spec-1.1.0", "toml_parser", "winnow", ] [[package]] name = "toml_parser" -version = "1.0.6+spec-1.1.0" +version = "1.0.9+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" +checksum = "702d4415e08923e7e1ef96cd5727c0dfed80b4d2fa25db9647fe5eb6f7c5a4c4" dependencies = [ "winnow", ] @@ -1151,9 +1220,9 @@ checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" [[package]] name = "trybuild" -version = "1.0.114" +version = "1.0.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e17e807bff86d2a06b52bca4276746584a78375055b6e45843925ce2802b335" +checksum = "47c635f0191bd3a2941013e5062667100969f8c4e9cd787c14f977265d73616e" dependencies = [ "glob", "serde", @@ -1161,7 +1230,7 @@ dependencies = [ "serde_json", "target-triple", "termcolor", - "toml", + "toml 1.0.2+spec-1.1.0", ] [[package]] @@ -1187,9 +1256,15 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.22" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" [[package]] name = "unindent" @@ -1221,6 +1296,49 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "winapi-util" version = "0.1.11" @@ -1333,12 +1451,94 @@ name = "wit-bindgen" version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn 2.0.116", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.116", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] [[package]] name = "zmij" -version = "1.0.17" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" [[package]] name = "zstd" diff --git a/Cargo.toml b/Cargo.toml index eddb67f7..8e34e3e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ resolver = "2" [workspace.package] authors = ["Databento "] edition = "2021" -version = "0.48.0" +version = "0.49.0" documentation = "https://databento.com/docs" repository = "https://github.com/databento/dbn" license = "Apache-2.0" diff --git a/python/Cargo.toml b/python/Cargo.toml index aa525405..0a4c70f4 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -16,10 +16,10 @@ name = "databento_dbn" # Python modules can't contain dashes dbn = { path = "../rust/dbn", features = ["python"] } pyo3.workspace = true time.workspace = true +zstd.workspace = true [build-dependencies] pyo3-build-config.workspace = true [dev-dependencies] rstest.workspace = true -zstd.workspace = true diff --git a/python/pyproject.toml b/python/pyproject.toml index 7af47965..5d11e94b 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "databento-dbn" -version = "0.48.0" +version = "0.49.0" description = "Python bindings for encoding and decoding Databento Binary Encoding (DBN)" readme = "README.md" requires-python = ">=3.10" diff --git a/python/python/databento_dbn/_lib.pyi b/python/python/databento_dbn/_lib.pyi index aa409503..70c19d34 100644 --- a/python/python/databento_dbn/_lib.pyi +++ b/python/python/databento_dbn/_lib.pyi @@ -10373,6 +10373,8 @@ class DBNDecoder: metadata. upgrade_policy : VersionUpgradePolicy, default UPGRADE How to decode data from prior DBN versions. Defaults to upgrade decoding. + compression : Compression, default NONE + The compression format of the input data. Use ZSTD for zstd-compressed data. """ def __init__( @@ -10381,6 +10383,7 @@ class DBNDecoder: ts_out: bool = False, input_version: int | None = None, upgrade_policy: VersionUpgradePolicy | None = None, + compression: Compression = Compression.NONE, ): ... def buffer(self) -> bytes: """ diff --git a/python/src/dbn_decoder.rs b/python/src/dbn_decoder.rs index 04ed9f56..4e66ed8a 100644 --- a/python/src/dbn_decoder.rs +++ b/python/src/dbn_decoder.rs @@ -1,15 +1,17 @@ +use std::io::Write; + use pyo3::{exceptions::PyRuntimeError, prelude::*, IntoPyObjectExt}; use dbn::{ decode::dbn::fsm::{DbnFsm, ProcessResult}, python::to_py_err, - rtype_dispatch, HasRType, VersionUpgradePolicy, + rtype_dispatch, Compression, HasRType, VersionUpgradePolicy, }; #[pyclass(module = "databento_dbn", name = "DBNDecoder")] -#[derive(Debug)] pub struct DbnDecoder { fsm: DbnFsm, + zstd_decoder: Option>>, } #[pymethods] @@ -19,13 +21,15 @@ impl DbnDecoder { has_metadata = true, ts_out = false, input_version = None, - upgrade_policy = VersionUpgradePolicy::default() + upgrade_policy = VersionUpgradePolicy::default(), + compression = Compression::None, ))] fn new( has_metadata: bool, ts_out: bool, input_version: Option, upgrade_policy: VersionUpgradePolicy, + compression: Compression, ) -> PyResult { let fsm = DbnFsm::builder() .ts_out(ts_out) @@ -40,15 +44,25 @@ impl DbnDecoder { }) .build() .map_err(to_py_err)?; - Ok(Self { fsm }) - } - - fn __repr__(&self) -> String { - format!("{self:?}") + let zstd_decoder = match compression { + Compression::None => None, + Compression::Zstd => Some( + zstd::stream::write::Decoder::new(Vec::new()) + .map_err(|e| PyErr::new::(e.to_string()))?, + ), + }; + Ok(Self { fsm, zstd_decoder }) } fn write(&mut self, bytes: &[u8]) -> PyResult<()> { - self.fsm.write_all(bytes); + if let Some(zstd_decoder) = &mut self.zstd_decoder { + zstd_decoder + .write_all(bytes) + .map_err(|e| PyErr::new::(e.to_string()))?; + } else { + self.fsm.write_all(bytes) + } + Ok(()) } @@ -57,6 +71,18 @@ impl DbnDecoder { } fn decode(&mut self, py: Python<'_>) -> PyResult>> { + // Flush all decompressed data to FSM + if let Some(zstd_decoder) = &mut self.zstd_decoder { + zstd_decoder + .flush() + .map_err(|e| PyErr::new::(e.to_string()))?; + let decompressed = zstd_decoder.get_mut(); + if !decompressed.is_empty() { + self.fsm.write_all(decompressed); + decompressed.clear(); + } + } + let mut ts_out = self.fsm.ts_out(); let mut py_recs = Vec::new(); loop { @@ -96,7 +122,7 @@ mod tests { use dbn::{ encode::{dbn::Encoder, EncodeRecord}, enums::{rtype, SType, Schema}, - record::{ErrorMsg, OhlcvMsg, RecordHeader}, + record::{ErrorMsg, OhlcvMsg, RecordHeader, SystemMsg}, Dataset, MetadataBuilder, }; use pyo3::{ffi::c_str, types::PyDict, types::PyString}; @@ -108,8 +134,14 @@ mod tests { #[rstest] fn test_partial_metadata_and_records(_python: ()) { Python::attach(|py| { - let mut target = - DbnDecoder::new(true, false, None, VersionUpgradePolicy::default()).unwrap(); + let mut target = DbnDecoder::new( + true, + false, + None, + VersionUpgradePolicy::default(), + Compression::None, + ) + .unwrap(); let buffer = Vec::new(); let mut encoder = Encoder::new( buffer, @@ -150,8 +182,14 @@ mod tests { #[rstest] fn test_full_with_partial_record(_python: ()) { - let mut decoder = - DbnDecoder::new(true, false, None, VersionUpgradePolicy::default()).unwrap(); + let mut decoder = DbnDecoder::new( + true, + false, + None, + VersionUpgradePolicy::default(), + Compression::None, + ) + .unwrap(); let buffer = Vec::new(); let mut encoder = Encoder::new( buffer, @@ -366,4 +404,77 @@ for r in records[1:]: }) .unwrap(); } + + #[rstest] + fn test_dbn_decoder_with_zstd_compression(_python: ()) { + Python::attach(|py| { + let path = PyString::new( + py, + concat!( + env!("CARGO_MANIFEST_DIR"), + "/../tests/data/test_data.mbo.v3.dbn.zst" + ), + ); + let globals = PyDict::new(py); + globals.set_item("path", path).unwrap(); + Python::run( + py, + c_str!( + r#"from _lib import DBNDecoder, Compression + +decoder = DBNDecoder(compression=Compression.ZSTD) +with open(path, 'rb') as f: + decoder.write(f.read()) +records = decoder.decode() +assert len(records) == 3 # metadata + 2 records +"# + ), + Some(&globals), + None, + ) + .unwrap(); + }); + } + + #[rstest] + fn test_dbn_decoder_with_two_zstd_frames(_python: ()) { + Python::attach(|py| { + let buffer = Vec::new(); + let mut encoder = Encoder::new( + buffer, + &MetadataBuilder::new() + .dataset(Dataset::XnasItch.to_string()) + .schema(Some(Schema::Trades)) + .stype_in(Some(SType::RawSymbol)) + .stype_out(SType::InstrumentId) + .start(0) + .build(), + ) + .unwrap(); + let rec1 = SystemMsg::new(1680708278000000000, None, "first").unwrap(); + let rec2 = SystemMsg::new(1680708279000000000, None, "second").unwrap(); + encoder.encode_record(&rec1).unwrap(); + let split = encoder.get_ref().len(); + encoder.encode_record(&rec2).unwrap(); + let dbn_bytes = encoder.get_ref(); + + // Compress as two separate zstd frames + let frame1 = zstd::encode_all(std::io::Cursor::new(&dbn_bytes[..split]), 0).unwrap(); + let frame2 = zstd::encode_all(std::io::Cursor::new(&dbn_bytes[split..]), 0).unwrap(); + + let mut target = DbnDecoder::new( + true, + false, + None, + VersionUpgradePolicy::default(), + Compression::Zstd, + ) + .unwrap(); + target.write(&frame1).unwrap(); + target.write(&frame2).unwrap(); + let records = target.decode(py).unwrap(); + // metadata + 2 records + assert_eq!(records.len(), 3); + }); + } } diff --git a/rust/dbn-cli/Cargo.toml b/rust/dbn-cli/Cargo.toml index 3c1d9391..f8f51d28 100644 --- a/rust/dbn-cli/Cargo.toml +++ b/rust/dbn-cli/Cargo.toml @@ -16,7 +16,7 @@ name = "dbn" path = "src/main.rs" [dependencies] -dbn = { path = "../dbn", version = "=0.48.0", default-features = false } +dbn = { path = "../dbn", version = "=0.49.0", default-features = false } anyhow.workspace = true clap = { version = "4.5", features = ["derive", "wrap_help"] } diff --git a/rust/dbn-cli/src/filter.rs b/rust/dbn-cli/src/filter.rs index 95e71f3c..edcb5484 100644 --- a/rust/dbn-cli/src/filter.rs +++ b/rust/dbn-cli/src/filter.rs @@ -47,8 +47,7 @@ impl DecodeRecordRef for SchemaFilter { while let Some(record) = self.decoder.decode_record_ref()? { if self .rtype - .map(|rtype| rtype as u8 == record.header().rtype) - .unwrap_or(true) + .is_none_or(|rtype| rtype as u8 == record.header().rtype) { // Safe: casting reference to pointer so the pointer will always be valid. // Getting around borrow checker limitation. @@ -109,8 +108,7 @@ impl DecodeRecordRef for LimitFilter { fn decode_record_ref(&mut self) -> dbn::Result>> { if self .limit - .map(|limit| self.record_count >= limit.get()) - .unwrap_or(false) + .is_some_and(|limit| self.record_count >= limit.get()) { return Ok(None); } diff --git a/rust/dbn/Cargo.toml b/rust/dbn/Cargo.toml index 4d4ec3f6..f9d1b65b 100644 --- a/rust/dbn/Cargo.toml +++ b/rust/dbn/Cargo.toml @@ -25,7 +25,7 @@ serde = ["dep:serde", "time/parsing", "time/serde"] trivial_copy = [] [dependencies] -dbn-macros = { version = "=0.48.0", path = "../dbn-macros" } +dbn-macros = { version = "=0.49.0", path = "../dbn-macros" } async-compression = { version = "0.4.37", features = ["tokio", "zstd"], optional = true } csv = { workspace = true } diff --git a/rust/dbn/src/decode/dbn/sync.rs b/rust/dbn/src/decode/dbn/sync.rs index 1d355eef..f366508c 100644 --- a/rust/dbn/src/decode/dbn/sync.rs +++ b/rust/dbn/src/decode/dbn/sync.rs @@ -503,9 +503,7 @@ mod tests { use super::*; use crate::{ decode::{tests::TEST_DATA_PATH, DynReader}, - encode::{ - dbn::Encoder, DbnEncodable, DbnRecordEncoder, DynWriter, EncodeDbn, EncodeRecord, - }, + encode::{dbn::Encoder, DbnEncodable, DbnRecordEncoder, DynWriter, EncodeRecord}, rtype, v1, v2, v3, Compression, Dataset, Error, ErrorMsg, MboMsg, MetadataBuilder, OhlcvMsg, Record, RecordHeader, Result, SType, Schema, WithTsOut, }; diff --git a/rust/dbn/src/encode.rs b/rust/dbn/src/encode.rs index 6d696359..7f5c9579 100644 --- a/rust/dbn/src/encode.rs +++ b/rust/dbn/src/encode.rs @@ -4,6 +4,7 @@ pub mod csv; pub mod dbn; mod dyn_encoder; mod dyn_writer; +mod io_utils; pub mod json; mod split; @@ -62,6 +63,18 @@ pub trait EncodeRecord { /// or there's a serialization error. fn encode_record(&mut self, record: &R) -> Result<()>; + /// Encodes a slice of DBN records. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + fn encode_records(&mut self, records: &[R]) -> Result<()> { + for record in records { + self.encode_record(record)?; + } + Ok(()) + } + /// Flushes any buffered content to the true output. /// /// # Errors @@ -78,6 +91,18 @@ pub trait EncodeRecordRef { /// or there's a serialization error. fn encode_record_ref(&mut self, record: RecordRef) -> Result<()>; + /// Encodes a slice of [`RecordRef`]s. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + fn encode_record_refs(&mut self, records: &[RecordRef]) -> Result<()> { + for record in records { + self.encode_record_ref(*record)?; + } + Ok(()) + } + /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see /// [`record::WithTsOut`](crate::record::WithTsOut)). /// @@ -92,19 +117,6 @@ pub trait EncodeRecordRef { /// Trait for types that encode DBN records with a specific record type. pub trait EncodeDbn: EncodeRecord + EncodeRecordRef { - /// Encodes a slice of DBN records. - /// - /// # Errors - /// This function returns an error if it's unable to write to the underlying writer - /// or there's a serialization error. - fn encode_records(&mut self, records: &[R]) -> Result<()> { - for record in records { - self.encode_record(record)?; - } - self.flush()?; - Ok(()) - } - /// Encodes a stream of DBN records. /// /// # Errors @@ -261,6 +273,24 @@ pub trait AsyncEncodeRecord { /// encoded record from the beginning. async fn encode_record(&mut self, record: &R) -> Result<()>; + /// Encodes a slice of DBN records. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + /// + /// # Cancel safety + /// This method is not cancellation safe. If this method is used in a + /// `tokio::select!` statement and another branch completes first, then the + /// record may have been partially written, but future calls will begin writing the + /// encoded record from the beginning. + async fn encode_records(&mut self, records: &[R]) -> Result<()> { + for record in records { + self.encode_record(record).await?; + } + Ok(()) + } + /// Flushes any buffered content to the true output. /// /// # Errors @@ -291,6 +321,24 @@ pub trait AsyncEncodeRecordRef { /// encoded record from the beginning. async fn encode_record_ref(&mut self, record_ref: RecordRef) -> Result<()>; + /// Encodes a slice of [`RecordRef`]s. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + /// + /// # Cancel safety + /// This method is not cancellation safe. If this method is used in a + /// `tokio::select!` statement and another branch completes first, then the + /// record may have been partially written, but future calls will begin writing the + /// encoded record from the beginning. + async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> { + for record_ref in record_refs { + self.encode_record_ref(*record_ref).await?; + } + Ok(()) + } + /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see /// [`record::WithTsOut`](crate::record::WithTsOut)). /// diff --git a/rust/dbn/src/encode/csv/sync.rs b/rust/dbn/src/encode/csv/sync.rs index d0ae256d..73f5b6d8 100644 --- a/rust/dbn/src/encode/csv/sync.rs +++ b/rust/dbn/src/encode/csv/sync.rs @@ -273,6 +273,14 @@ where } } + fn encode_records(&mut self, records: &[R]) -> Result<()> { + for record in records { + self.encode_record(record)?; + } + self.flush()?; + Ok(()) + } + fn flush(&mut self) -> Result<()> { self.writer .flush() @@ -301,14 +309,6 @@ impl EncodeDbn for Encoder where W: io::Write, { - fn encode_records(&mut self, records: &[R]) -> Result<()> { - for record in records { - self.encode_record(record)?; - } - self.flush()?; - Ok(()) - } - /// Encodes a stream of DBN records. /// /// # Errors diff --git a/rust/dbn/src/encode/dbn/async.rs b/rust/dbn/src/encode/dbn/async.rs index 9d77c28d..af12d3b6 100644 --- a/rust/dbn/src/encode/dbn/async.rs +++ b/rust/dbn/src/encode/dbn/async.rs @@ -1,10 +1,13 @@ -use std::num::NonZeroU64; +use std::{io::IoSlice, mem::MaybeUninit, num::NonZeroU64}; use async_compression::tokio::write::ZstdEncoder; use tokio::io; use crate::{ - encode::{async_zstd_encoder, AsyncEncodeRecord, AsyncEncodeRecordRef, DbnEncodable}, + encode::{ + async_zstd_encoder, io_utils::write_all_vectored_async, AsyncEncodeRecord, + AsyncEncodeRecordRef, DbnEncodable, + }, record_ref::RecordRef, Error, Metadata, Result, SymbolMapping, DBN_VERSION, NULL_LIMIT, NULL_RECORD_COUNT, NULL_SCHEMA, NULL_STYPE, UNDEF_TIMESTAMP, @@ -79,6 +82,10 @@ where self.record_encoder.encode_record(record).await } + async fn encode_records(&mut self, records: &[R]) -> Result<()> { + self.record_encoder.encode_records(records).await + } + async fn flush(&mut self) -> Result<()> { self.record_encoder.flush().await } @@ -96,6 +103,10 @@ where self.record_encoder.encode_record_ref(record_ref).await } + async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> { + self.record_encoder.encode_record_refs(record_refs).await + } + /// Encodes a single DBN record. /// /// # Safety @@ -191,6 +202,17 @@ where self.encode(record).await } + async fn encode_records(&mut self, records: &[R]) -> Result<()> { + // SAFETY: DBN records have no implicit padding and are POD structs + let slice = unsafe { + std::slice::from_raw_parts::(records.as_ptr() as *const u8, size_of_val(records)) + }; + self.writer + .write_all(slice) + .await + .map_err(|e| Error::io(e, format!("serializing {} records", records.len()))) + } + async fn flush(&mut self) -> Result<()> { self.writer .flush() @@ -214,6 +236,28 @@ where self.encode_ref(record_ref).await } + async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> { + const BATCH_SIZE: usize = 128; + let mut slices = [const { MaybeUninit::uninit() }; BATCH_SIZE]; + for record_chunk in record_refs.chunks(BATCH_SIZE) { + for (elem, rec) in slices.iter_mut().zip(record_chunk.iter()) { + elem.write(IoSlice::from(*rec)); + } + let slices = + // SAFETY: Every element up to `record_chunk.len()` has been initialized + unsafe { std::mem::transmute::<&mut [MaybeUninit>], &mut [IoSlice<'_>]>(&mut slices[..record_chunk.len()]) }; + write_all_vectored_async(&mut self.writer, slices) + .await + .map_err(|e| { + Error::io( + e, + format!("failed to encode {} RecordRefs", record_refs.len()), + ) + })?; + } + Ok(()) + } + /// Encodes a single DBN record. /// /// # Safety @@ -520,11 +564,17 @@ where mod tests { use std::mem; + use rstest::rstest; + use super::*; use crate::{ compat::version_symbol_cstr_len, - decode::{dbn::AsyncMetadataDecoder as MetadataDecoder, FromLittleEndianSlice}, - Dataset, MappingInterval, MetadataBuilder, SType, Schema, + decode::{ + dbn::AsyncMetadataDecoder as MetadataDecoder, DecodeRecordRef, FromLittleEndianSlice, + }, + enums::rtype, + record::{MboMsg, RecordHeader, TradeMsg}, + Dataset, FlagSet, MappingInterval, MetadataBuilder, Record, SType, Schema, }; #[tokio::test] @@ -688,4 +738,139 @@ mod tests { matches!(target.encode(&metadata).await, Err(Error::Encode(msg)) if msg.contains("can't encode Metadata with version")) ); } + + fn make_mbo_msg(instrument_id: u32, ts_event: u64) -> MboMsg { + MboMsg { + hd: RecordHeader::new::(rtype::MBO, 1, instrument_id, ts_event), + order_id: 123456, + price: 100_000_000_000, + size: 10, + flags: FlagSet::default(), + channel_id: 0, + action: b'A' as i8, + side: b'B' as i8, + ts_recv: ts_event + 1000, + ts_in_delta: 500, + sequence: 1, + } + } + + fn make_trade_msg(instrument_id: u32, ts_event: u64) -> TradeMsg { + TradeMsg { + hd: RecordHeader::new::(rtype::MBP_0, 1, instrument_id, ts_event), + price: 100_000_000_000, + size: 5, + action: b'T' as i8, + side: b'A' as i8, + flags: FlagSet::default(), + depth: 0, + ts_recv: ts_event + 1000, + ts_in_delta: 500, + sequence: 1, + } + } + + #[tokio::test] + async fn test_encode_record_refs_roundtrip() { + // Create mixed record types + let mbo1 = make_mbo_msg(100, 1658441851000000000); + let trade1 = make_trade_msg(101, 1658441851001000000); + let mbo2 = make_mbo_msg(102, 1658441851002000000); + let trade2 = make_trade_msg(103, 1658441851003000000); + + let refs: Vec = vec![ + RecordRef::from(&mbo1), + RecordRef::from(&trade1), + RecordRef::from(&mbo2), + RecordRef::from(&trade2), + ]; + + // Encode using batch method + let mut buffer = Vec::new(); + let mut encoder = RecordEncoder::new(&mut buffer); + encoder.encode_record_refs(&refs).await.unwrap(); + + // Verify sizes + let expected_size = std::mem::size_of::() * 2 + std::mem::size_of::() * 2; + assert_eq!(buffer.len(), expected_size); + + // Decode and verify roundtrip + let mut decoder = crate::decode::dbn::RecordDecoder::new(&buffer[..]); + + let decoded = decoder.decode_record_ref().unwrap().unwrap(); + assert_eq!(decoded.header(), mbo1.header()); + + let decoded = decoder.decode_record_ref().unwrap().unwrap(); + assert_eq!(decoded.header(), trade1.header()); + + let decoded = decoder.decode_record_ref().unwrap().unwrap(); + assert_eq!(decoded.header(), mbo2.header()); + + let decoded = decoder.decode_record_ref().unwrap().unwrap(); + assert_eq!(decoded.header(), trade2.header()); + + assert!(decoder.decode_record_ref().unwrap().is_none()); + } + + #[tokio::test] + async fn test_encode_record_refs_single_is_equivalent() { + let mbo = make_mbo_msg(100, 1658441851000000000); + let trade = make_trade_msg(101, 1658441851001000000); + + let refs: Vec = vec![RecordRef::from(&mbo), RecordRef::from(&trade)]; + + // Encode one at a time + let mut buffer_single = Vec::new(); + let mut encoder_single = RecordEncoder::new(&mut buffer_single); + for record_ref in &refs { + encoder_single.encode_record_ref(*record_ref).await.unwrap(); + } + + // Encode as batch + let mut buffer_batch = Vec::new(); + let mut encoder_batch = RecordEncoder::new(&mut buffer_batch); + encoder_batch.encode_record_refs(&refs).await.unwrap(); + + assert_eq!(buffer_single, buffer_batch); + } + + #[tokio::test] + async fn test_encode_record_refs_empty_slice() { + let refs: Vec = vec![]; + + let mut buffer = Vec::new(); + let mut encoder = RecordEncoder::new(&mut buffer); + encoder.encode_record_refs(&refs).await.unwrap(); + + assert!(buffer.is_empty()); + } + + #[rstest] + #[case::partial_batch(127)] + #[case::exact_batch(128)] + #[case::batch_plus_one(129)] + #[case::multiple_batches(200)] + #[tokio::test] + async fn test_encode_record_refs_batch_sizes(#[case] count: usize) { + let records: Vec = (0..count) + .map(|i| make_mbo_msg(100 + i as u32, 1658441851000000000 + i as u64 * 1000)) + .collect(); + + let refs: Vec = records.iter().map(RecordRef::from).collect(); + + let mut buffer = Vec::new(); + let mut encoder = RecordEncoder::new(&mut buffer); + encoder.encode_record_refs(&refs).await.unwrap(); + + // Verify buffer size + assert_eq!(buffer.len(), records.len() * std::mem::size_of::()); + + // Decode and count records + let mut decoder = crate::decode::dbn::RecordDecoder::new(&buffer[..]); + let mut decoded_count = 0; + while decoder.decode_record_ref().unwrap().is_some() { + decoded_count += 1; + } + assert_eq!(decoded_count, count); + } } diff --git a/rust/dbn/src/encode/dbn/sync.rs b/rust/dbn/src/encode/dbn/sync.rs index bfebf1aa..9f752aa7 100644 --- a/rust/dbn/src/encode/dbn/sync.rs +++ b/rust/dbn/src/encode/dbn/sync.rs @@ -1,11 +1,14 @@ use std::{ - io::{self, SeekFrom}, - mem, + io::{self, IoSlice, SeekFrom}, + mem::{self, transmute, MaybeUninit}, num::NonZeroU64, }; use crate::{ - encode::{zstd_encoder, DbnEncodable, EncodeDbn, EncodeRecord, EncodeRecordRef}, + encode::{ + io_utils::write_all_vectored, zstd_encoder, DbnEncodable, EncodeDbn, EncodeRecord, + EncodeRecordRef, + }, Error, Metadata, RecordRef, Result, Schema, SymbolMapping, DBN_VERSION, NULL_LIMIT, NULL_RECORD_COUNT, NULL_SCHEMA, NULL_STYPE, UNDEF_TIMESTAMP, }; @@ -67,6 +70,10 @@ where self.record_encoder.encode_record(record) } + fn encode_records(&mut self, records: &[R]) -> Result<()> { + self.record_encoder.encode_records(records) + } + fn flush(&mut self) -> Result<()> { self.record_encoder.flush() } @@ -80,6 +87,10 @@ where self.record_encoder.encode_record_ref(record) } + fn encode_record_refs(&mut self, records: &[RecordRef]) -> Result<()> { + self.record_encoder.encode_record_refs(records) + } + /// Encodes a single DBN record. /// /// # Safety @@ -405,10 +416,19 @@ where W: io::Write, { fn encode_record(&mut self, record: &R) -> Result<()> { - match self.writer.write_all(record.as_ref()) { - Ok(()) => Ok(()), - Err(e) => Err(Error::io(e, format!("serializing {record:?}"))), - } + self.writer + .write_all(record.as_ref()) + .map_err(|e| Error::io(e, format!("serializing {record:?}"))) + } + + fn encode_records(&mut self, records: &[R]) -> Result<()> { + // SAFETY: DBN records have no implicit padding and are POD structs + let slice = unsafe { + std::slice::from_raw_parts::(records.as_ptr() as *const u8, size_of_val(records)) + }; + self.writer + .write_all(slice) + .map_err(|e| Error::io(e, format!("serializing {} records", records.len()))) } fn flush(&mut self) -> Result<()> { @@ -423,10 +443,26 @@ where W: io::Write, { fn encode_record_ref(&mut self, record: RecordRef) -> Result<()> { - match self.writer.write_all(record.as_ref()) { - Ok(()) => Ok(()), - Err(e) => Err(Error::io(e, format!("serializing {record:?}"))), + self.writer + .write_all(record.as_ref()) + .map_err(|e| Error::io(e, format!("serializing {record:?}"))) + } + + fn encode_record_refs(&mut self, records: &[RecordRef]) -> Result<()> { + const BATCH_SIZE: usize = 128; + let mut slices = [const { MaybeUninit::uninit() }; BATCH_SIZE]; + for record_chunk in records.chunks(BATCH_SIZE) { + for (elem, rec) in slices.iter_mut().zip(record_chunk.iter()) { + elem.write(IoSlice::from(*rec)); + } + let slices = + // SAFETY: Every element up to `record_chunk.len()` has been initialized + unsafe { transmute::<&mut [MaybeUninit>], &mut [IoSlice<'_>]>(&mut slices[..record_chunk.len()]) }; + write_all_vectored(&mut self.writer, slices).map_err(|e| { + Error::io(e, format!("failed to encode {} RecordRefs", records.len())) + })?; } + Ok(()) } /// Encodes a single DBN record. @@ -707,4 +743,209 @@ mod tests { matches!(target.encode(&metadata), Err(Error::Encode(msg)) if msg.contains("can't encode Metadata with version")) ); } + + mod batch { + use rstest::rstest; + + use crate::{ + decode::{DecodeRecord, DecodeRecordRef}, + encode::{dbn::RecordEncoder, EncodeRecord, EncodeRecordRef}, + record::{MboMsg, RecordHeader, TradeMsg}, + rtype, FlagSet, Record, RecordRef, + }; + + fn make_mbo_msg(instrument_id: u32, ts_event: u64) -> MboMsg { + MboMsg { + hd: RecordHeader::new::(rtype::MBO, 1, instrument_id, ts_event), + order_id: 123456, + price: 100_000_000_000, + size: 10, + flags: FlagSet::default(), + channel_id: 0, + action: b'A' as i8, + side: b'B' as i8, + ts_recv: ts_event + 1000, + ts_in_delta: 500, + sequence: 1, + } + } + + fn make_trade_msg(instrument_id: u32, ts_event: u64) -> TradeMsg { + TradeMsg { + hd: RecordHeader::new::(rtype::MBP_0, 1, instrument_id, ts_event), + price: 100_000_000_000, + size: 5, + action: b'T' as i8, + side: b'A' as i8, + flags: FlagSet::default(), + depth: 0, + ts_recv: ts_event + 1000, + ts_in_delta: 500, + sequence: 1, + } + } + + #[test] + fn test_encode_records_typed_roundtrip() { + // Create test records + let records: Vec = (0..10) + .map(|i| make_mbo_msg(100 + i, 1658441851000000000 + i as u64 * 1000)) + .collect(); + + // Encode using batch method + let mut buffer = Vec::new(); + let mut encoder = RecordEncoder::new(&mut buffer); + encoder.encode_records(&records).unwrap(); + + // Verify buffer size matches expected + assert_eq!(buffer.len(), records.len() * std::mem::size_of::()); + + // Decode and verify roundtrip + let mut decoder = crate::decode::dbn::RecordDecoder::new(&buffer[..]); + for (i, original) in records.iter().enumerate() { + let decoded: Option<&MboMsg> = decoder.decode_record().unwrap(); + assert!(decoded.is_some(), "Failed to decode record {i}"); + assert_eq!(decoded.unwrap(), original, "Record {i} mismatch"); + } + // Verify no more records + let extra: Option<&MboMsg> = decoder.decode_record().unwrap(); + assert!(extra.is_none()); + } + + #[test] + fn test_encode_records_single_is_equivalent() { + let records: Vec = (0..5) + .map(|i| make_mbo_msg(100 + i, 1658441851000000000 + i as u64 * 1000)) + .collect(); + + // Encode one at a time + let mut buffer_single = Vec::new(); + let mut encoder_single = RecordEncoder::new(&mut buffer_single); + for record in &records { + encoder_single.encode_record(record).unwrap(); + } + + // Encode as batch + let mut buffer_batch = Vec::new(); + let mut encoder_batch = RecordEncoder::new(&mut buffer_batch); + encoder_batch.encode_records(&records).unwrap(); + + assert_eq!(buffer_single, buffer_batch); + } + + #[test] + fn test_encode_records_empty_slice() { + let records: Vec = vec![]; + + let mut buffer = Vec::new(); + let mut encoder = RecordEncoder::new(&mut buffer); + encoder.encode_records(&records).unwrap(); + + assert!(buffer.is_empty()); + } + + #[test] + fn test_encode_record_refs_roundtrip() { + // Create mixed record types + let mbo1 = make_mbo_msg(100, 1658441851000000000); + let trade1 = make_trade_msg(101, 1658441851001000000); + let mbo2 = make_mbo_msg(102, 1658441851002000000); + let trade2 = make_trade_msg(103, 1658441851003000000); + + let refs: Vec = vec![ + RecordRef::from(&mbo1), + RecordRef::from(&trade1), + RecordRef::from(&mbo2), + RecordRef::from(&trade2), + ]; + + // Encode using batch method + let mut buffer = Vec::new(); + let mut encoder = RecordEncoder::new(&mut buffer); + encoder.encode_record_refs(&refs).unwrap(); + + // Verify sizes + let expected_size = + std::mem::size_of::() * 2 + std::mem::size_of::() * 2; + assert_eq!(buffer.len(), expected_size); + + // Decode and verify roundtrip + let mut decoder = crate::decode::dbn::RecordDecoder::new(&buffer[..]); + + let decoded = decoder.decode_record_ref().unwrap().unwrap(); + assert_eq!(decoded.header(), mbo1.header()); + + let decoded = decoder.decode_record_ref().unwrap().unwrap(); + assert_eq!(decoded.header(), trade1.header()); + + let decoded = decoder.decode_record_ref().unwrap().unwrap(); + assert_eq!(decoded.header(), mbo2.header()); + + let decoded = decoder.decode_record_ref().unwrap().unwrap(); + assert_eq!(decoded.header(), trade2.header()); + + assert!(decoder.decode_record_ref().unwrap().is_none()); + } + + #[test] + fn test_encode_record_refs_single_is_equivalent() { + let mbo = make_mbo_msg(100, 1658441851000000000); + let trade = make_trade_msg(101, 1658441851001000000); + + let refs: Vec = vec![RecordRef::from(&mbo), RecordRef::from(&trade)]; + + // Encode one at a time + let mut buffer_single = Vec::new(); + let mut encoder_single = RecordEncoder::new(&mut buffer_single); + for record_ref in &refs { + encoder_single.encode_record_ref(*record_ref).unwrap(); + } + + // Encode as batch + let mut buffer_batch = Vec::new(); + let mut encoder_batch = RecordEncoder::new(&mut buffer_batch); + encoder_batch.encode_record_refs(&refs).unwrap(); + + assert_eq!(buffer_single, buffer_batch); + } + + #[test] + fn test_encode_record_refs_empty_slice() { + let refs: Vec = vec![]; + + let mut buffer = Vec::new(); + let mut encoder = RecordEncoder::new(&mut buffer); + encoder.encode_record_refs(&refs).unwrap(); + + assert!(buffer.is_empty()); + } + + #[rstest] + #[case::partial_batch(127)] + #[case::exact_batch(128)] + #[case::batch_plus_one(129)] + #[case::multiple_batches(200)] + fn test_encode_record_refs_batch_sizes(#[case] count: usize) { + let records: Vec = (0..count) + .map(|i| make_mbo_msg(100 + i as u32, 1658441851000000000 + i as u64 * 1000)) + .collect(); + + let refs: Vec = records.iter().map(RecordRef::from).collect(); + + let mut buffer = Vec::new(); + let mut encoder = RecordEncoder::new(&mut buffer); + encoder.encode_record_refs(&refs).unwrap(); + + // Verify buffer size + assert_eq!(buffer.len(), records.len() * std::mem::size_of::()); + + // Decode and count records + let mut decoder = crate::decode::dbn::RecordDecoder::new(&buffer[..]); + let mut decoded_count = 0; + while decoder.decode_record_ref().unwrap().is_some() { + decoded_count += 1; + } + assert_eq!(decoded_count, count); + } + } } diff --git a/rust/dbn/src/encode/dyn_encoder.rs b/rust/dbn/src/encode/dyn_encoder.rs index 5c385f67..b3877bd8 100644 --- a/rust/dbn/src/encode/dyn_encoder.rs +++ b/rust/dbn/src/encode/dyn_encoder.rs @@ -240,6 +240,10 @@ where self.0.encode_record(record) } + fn encode_records(&mut self, records: &[R]) -> Result<()> { + self.0.encode_records(records) + } + fn flush(&mut self) -> Result<()> { self.0.flush() } @@ -262,10 +266,6 @@ impl EncodeDbn for DynEncoder<'_, W> where W: io::Write, { - fn encode_records(&mut self, records: &[R]) -> Result<()> { - self.0.encode_records(records) - } - fn encode_stream( &mut self, stream: impl FallibleStreamingIterator, @@ -303,6 +303,14 @@ where } } + fn encode_records(&mut self, records: &[R]) -> Result<()> { + match self { + DynEncoderImpl::Dbn(encoder) => encoder.encode_records(records), + DynEncoderImpl::Csv(encoder) => encoder.encode_records(records), + DynEncoderImpl::Json(encoder) => encoder.encode_records(records), + } + } + fn flush(&mut self) -> Result<()> { match self { DynEncoderImpl::Dbn(enc) => enc.flush(), @@ -337,14 +345,6 @@ impl EncodeDbn for DynEncoderImpl<'_, W> where W: io::Write, { - fn encode_records(&mut self, records: &[R]) -> Result<()> { - match self { - DynEncoderImpl::Dbn(encoder) => encoder.encode_records(records), - DynEncoderImpl::Csv(encoder) => encoder.encode_records(records), - DynEncoderImpl::Json(encoder) => encoder.encode_records(records), - } - } - fn encode_stream( &mut self, stream: impl FallibleStreamingIterator, diff --git a/rust/dbn/src/encode/dyn_writer.rs b/rust/dbn/src/encode/dyn_writer.rs index 95a02533..2a7e93dc 100644 --- a/rust/dbn/src/encode/dyn_writer.rs +++ b/rust/dbn/src/encode/dyn_writer.rs @@ -173,6 +173,28 @@ mod r#async { DynBufWriterImpl::Zstd(enc) => io::AsyncWrite::poll_shutdown(Pin::new(enc), cx), } } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + match &mut self.0 { + DynBufWriterImpl::Uncompressed(w) => { + io::AsyncWrite::poll_write_vectored(Pin::new(w), cx, bufs) + } + DynBufWriterImpl::Zstd(enc) => { + io::AsyncWrite::poll_write_vectored(Pin::new(enc), cx, bufs) + } + } + } + + fn is_write_vectored(&self) -> bool { + match &self.0 { + DynBufWriterImpl::Uncompressed(w) => w.is_write_vectored(), + DynBufWriterImpl::Zstd(enc) => enc.is_write_vectored(), + } + } } /// An object that allows for abstracting over compressed and uncompressed output. @@ -248,5 +270,27 @@ mod r#async { DynWriterImpl::Zstd(enc) => io::AsyncWrite::poll_shutdown(Pin::new(enc), cx), } } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + match &mut self.0 { + DynWriterImpl::Uncompressed(w) => { + io::AsyncWrite::poll_write_vectored(Pin::new(w), cx, bufs) + } + DynWriterImpl::Zstd(enc) => { + io::AsyncWrite::poll_write_vectored(Pin::new(enc), cx, bufs) + } + } + } + + fn is_write_vectored(&self) -> bool { + match &self.0 { + DynWriterImpl::Uncompressed(w) => w.is_write_vectored(), + DynWriterImpl::Zstd(enc) => enc.is_write_vectored(), + } + } } } diff --git a/rust/dbn/src/encode/io_utils.rs b/rust/dbn/src/encode/io_utils.rs new file mode 100644 index 00000000..ffb9cec1 --- /dev/null +++ b/rust/dbn/src/encode/io_utils.rs @@ -0,0 +1,59 @@ +//! Shared I/O utility functions for batch encoding. + +use std::io::{self, IoSlice}; + +/// Writes all vectored slices to the writer, handling partial writes. +/// +/// This is taken from the unstable standard library implementation of +/// `Write::write_all_vectored`. +pub fn write_all_vectored( + writer: &mut W, + mut slices: &mut [IoSlice<'_>], +) -> io::Result<()> { + // Guarantee that bufs is empty if it contains no data, + // to avoid calling write_vectored if there is no data to be written. + IoSlice::advance_slices(&mut slices, 0); + + while !slices.is_empty() { + match writer.write_vectored(slices) { + Ok(0) => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to write whole buffer", + )); + } + Ok(n) => IoSlice::advance_slices(&mut slices, n), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + + Ok(()) +} + +/// Async version of [`write_all_vectored`]. +#[cfg(feature = "async")] +pub async fn write_all_vectored_async( + writer: &mut W, + mut slices: &mut [IoSlice<'_>], +) -> io::Result<()> { + // Guarantee that bufs is empty if it contains no data, + // to avoid calling write_vectored if there is no data to be written. + IoSlice::advance_slices(&mut slices, 0); + + while !slices.is_empty() { + match writer.write_vectored(slices).await { + Ok(0) => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to write whole buffer", + )); + } + Ok(n) => IoSlice::advance_slices(&mut slices, n), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + + Ok(()) +} diff --git a/rust/dbn/src/encode/json/async.rs b/rust/dbn/src/encode/json/async.rs index 117697eb..0dba2bc7 100644 --- a/rust/dbn/src/encode/json/async.rs +++ b/rust/dbn/src/encode/json/async.rs @@ -1,20 +1,20 @@ use tokio::io::{self, AsyncWriteExt}; -use super::serialize::{to_json_in_buf, to_json_with_sym_in_buf}; use crate::{ encode::{AsyncEncodeRecord, AsyncEncodeRecordRef, AsyncEncodeRecordTextExt, DbnEncodable}, - record_ref::RecordRef, - rtype_dispatch, Error, Metadata, Result, + rtype_dispatch, Error, Metadata, RecordRef, Result, }; +use super::serialize::{to_json_in_buf, to_json_with_sym_in_buf}; + /// Type for encoding files and streams of DBN records in JSON lines. pub struct Encoder where W: io::AsyncWriteExt + Unpin, { writer: W, - should_pretty_print: bool, buf: String, + should_pretty_print: bool, use_pretty_px: bool, use_pretty_ts: bool, } @@ -34,10 +34,10 @@ where ) -> Self { Self { writer, + buf: String::new(), should_pretty_print, use_pretty_px, use_pretty_ts, - buf: String::new(), } } @@ -110,6 +110,23 @@ where self.write_buf(|e| Error::io(e, "writing record")).await } + /// Encodes a slice of DBN records. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer. + /// + /// # Cancel safety + /// This method is not cancellation safe. If this method is used in a + /// `tokio::select!` statement and another branch completes first, then the + /// record may have been partially written, but future calls will begin writing the + /// encoded record from the beginning. + async fn encode_records(&mut self, records: &[R]) -> Result<()> { + for record in records { + self.encode_to_buf(record); + } + self.write_buf(|e| Error::io(e, format!("writing {} records", records.len()))) + .await + } async fn flush(&mut self) -> Result<()> { self.writer .flush() @@ -133,6 +150,14 @@ where rtype_dispatch!(record_ref, self.encode_record().await)? } + async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> { + for record_ref in record_refs { + rtype_dispatch!(record_ref, self.encode_to_buf())?; + } + self.write_buf(|e| Error::io(e, format!("writing {} records", record_refs.len()))) + .await + } + async unsafe fn encode_record_ref_ts_out( &mut self, record_ref: RecordRef<'_>, diff --git a/rust/dbn/src/encode/json/sync.rs b/rust/dbn/src/encode/json/sync.rs index f89d3545..7a67d228 100644 --- a/rust/dbn/src/encode/json/sync.rs +++ b/rust/dbn/src/encode/json/sync.rs @@ -3,7 +3,7 @@ use std::io; use super::serialize::{to_json_in_buf, to_json_with_sym_in_buf}; use crate::{ encode::{DbnEncodable, EncodeDbn, EncodeRecord, EncodeRecordRef, EncodeRecordTextExt}, - rtype_dispatch, Error, Metadata, Result, + rtype_dispatch, Error, Metadata, RecordRef, Result, }; /// Type for encoding files and streams of DBN records in JSON lines. @@ -94,10 +94,10 @@ where ) -> Self { Self { writer, + buf: String::new(), should_pretty_print, use_pretty_px, use_pretty_ts, - buf: String::new(), } } @@ -145,6 +145,11 @@ where ); } + /// Writes to `self.buf`, but not the writer. + fn encode_ref_to_buf(&mut self, record: RecordRef<'_>) -> crate::Result<()> { + rtype_dispatch!(record, self.encode_to_buf()) + } + fn write_buf(&mut self, handle_err: F) -> crate::Result<()> where F: FnOnce(io::Error) -> Error, @@ -168,6 +173,13 @@ where self.write_buf(|e| Error::io(e, format!("writing record {record:?}"))) } + fn encode_records(&mut self, records: &[R]) -> Result<()> { + for record in records { + self.encode_to_buf(record); + } + self.write_buf(|e| Error::io(e, format!("writing {} records", records.len()))) + } + fn flush(&mut self) -> Result<()> { self.writer .flush() @@ -179,15 +191,18 @@ impl EncodeRecordRef for Encoder where W: io::Write, { - fn encode_record_ref(&mut self, record: crate::RecordRef) -> Result<()> { + fn encode_record_ref(&mut self, record: RecordRef) -> Result<()> { rtype_dispatch!(record, self.encode_record())? } - unsafe fn encode_record_ref_ts_out( - &mut self, - record: crate::RecordRef, - ts_out: bool, - ) -> Result<()> { + fn encode_record_refs(&mut self, records: &[RecordRef]) -> Result<()> { + for record in records { + self.encode_ref_to_buf(*record)?; + } + self.write_buf(|e| Error::io(e, format!("writing {} records", records.len()))) + } + + unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()> { rtype_dispatch!(record, ts_out: ts_out, self.encode_record())? } } diff --git a/rust/dbn/src/symbol_map.rs b/rust/dbn/src/symbol_map.rs index 1028e5f3..4da5d402 100644 --- a/rust/dbn/src/symbol_map.rs +++ b/rust/dbn/src/symbol_map.rs @@ -6,14 +6,17 @@ use time::{macros::time, PrimitiveDateTime}; use crate::{compat, v1, Error, HasRType, Metadata, Record, RecordRef, SymbolMappingMsg}; -/// A timeseries symbol map. Generally useful for working with historical data -/// and is commonly built from a [`Metadata`] object via [`Self::from_metadata()`]. +/// A timeseries symbol map. Useful for working with historical requests over multiple days. +/// +/// Commonly built with [`Metadata::symbol_map()`]. #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct TsSymbolMap(HashMap<(time::Date, u32), Arc>); /// A point-in-time symbol map. Useful for working with live symbology or a /// historical request over a single day or other situations where the symbol /// mappings are known not to change. +/// +/// Commonly built with [`Metadata::symbol_map_for_date()`]. #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct PitSymbolMap(HashMap);