Skip to content

Commit 28ac449

Browse files
Avro writer prefix support (#8371)
# Which issue does this PR close? - Part of #4886 - Extends work in #8242 # Rationale for this change This introduces writer-side fingerprint prefix support, removing the existing hard-coded Rabin approach with a configurable pattern extending off of the work done on the reader side. In addition to supporting the SHA256 and MD5 (feature flagged), we also cover compatibility with Confluent's wire format IDs. # What changes are included in this PR? - Replaced fixed Rabin fingerprinting with support for configurable `FingerprintAlgorithm` in schema and writer. - Removed deprecated methods and unnecessary variable assignments for single-object encoding. - Simplified prefix generation logic and encoding workflows. - Updated benchmarks and added unit tests to validate updated fingerprinting strategies. # Are these changes tested? Yes, existing tests are all passing, and tests have been added to validate the prefix outputs. Benchmark results show no appreciable changes. # Are there any user-facing changes? - Crate is not yet public - Confluent users are expected to provide the schema store ID when registering a WriterBuilder --------- Co-authored-by: Connor Sanders <[email protected]>
1 parent 13fb041 commit 28ac449

File tree

6 files changed

+365
-33
lines changed

6 files changed

+365
-33
lines changed

arrow-avro/benches/decoder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,9 @@ macro_rules! dataset {
418418
let schema =
419419
ApacheSchema::parse_str($schema_json).expect("invalid schema for generator");
420420
let arrow_schema = AvroSchema::new($schema_json.parse().unwrap());
421-
let fingerprint = arrow_schema.fingerprint().expect("fingerprint failed");
421+
let fingerprint = arrow_schema
422+
.fingerprint(FingerprintAlgorithm::Rabin)
423+
.expect("fingerprint failed");
422424
let prefix = make_prefix(fingerprint);
423425
SIZES
424426
.iter()

arrow-avro/src/schema.rs

Lines changed: 193 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
3434
/// The Confluent "magic" byte (`0x00`)
3535
pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
3636

37+
/// The maximum possible length of a prefix.
38+
/// SHA256 (32) + single-object magic (2)
39+
pub const MAX_PREFIX_LEN: usize = 34;
40+
3741
/// The metadata key used for storing the JSON encoded [`Schema`]
3842
pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
3943

@@ -349,9 +353,9 @@ impl AvroSchema {
349353
.map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
350354
}
351355

352-
/// Returns the Rabin fingerprint of the schema.
353-
pub fn fingerprint(&self) -> Result<Fingerprint, ArrowError> {
354-
Self::generate_fingerprint_rabin(&self.schema()?)
356+
/// Returns the fingerprint of the schema.
357+
pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
358+
Self::generate_fingerprint(&self.schema()?, hash_type)
355359
}
356360

357361
/// Generates a fingerprint for the given `Schema` using the specified [`FingerprintAlgorithm`].
@@ -476,6 +480,68 @@ impl AvroSchema {
476480
}
477481
}
478482

483+
/// A stack-allocated, fixed-size buffer for the prefix.
484+
#[derive(Debug, Copy, Clone)]
485+
pub struct Prefix {
486+
buf: [u8; MAX_PREFIX_LEN],
487+
len: u8,
488+
}
489+
490+
impl Prefix {
491+
#[inline]
492+
pub(crate) fn as_slice(&self) -> &[u8] {
493+
&self.buf[..self.len as usize]
494+
}
495+
}
496+
497+
/// Defines the strategy for generating the per-record prefix for an Avro binary stream.
498+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
499+
pub enum FingerprintStrategy {
500+
/// Use the 64-bit Rabin fingerprint (default for single-object encoding).
501+
#[default]
502+
Rabin,
503+
/// Use a Confluent Schema Registry 32-bit ID.
504+
Id(u32),
505+
#[cfg(feature = "md5")]
506+
/// Use the 128-bit MD5 fingerprint.
507+
MD5,
508+
#[cfg(feature = "sha256")]
509+
/// Use the 256-bit SHA-256 fingerprint.
510+
SHA256,
511+
}
512+
513+
impl From<Fingerprint> for FingerprintStrategy {
514+
fn from(f: Fingerprint) -> Self {
515+
Self::from(&f)
516+
}
517+
}
518+
519+
impl From<FingerprintAlgorithm> for FingerprintStrategy {
520+
fn from(f: FingerprintAlgorithm) -> Self {
521+
match f {
522+
FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
523+
FingerprintAlgorithm::None => FingerprintStrategy::Id(0),
524+
#[cfg(feature = "md5")]
525+
FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
526+
#[cfg(feature = "sha256")]
527+
FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
528+
}
529+
}
530+
}
531+
532+
impl From<&Fingerprint> for FingerprintStrategy {
533+
fn from(f: &Fingerprint) -> Self {
534+
match f {
535+
Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
536+
Fingerprint::Id(id) => FingerprintStrategy::Id(*id),
537+
#[cfg(feature = "md5")]
538+
Fingerprint::MD5(_) => FingerprintStrategy::MD5,
539+
#[cfg(feature = "sha256")]
540+
Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
541+
}
542+
}
543+
}
544+
479545
/// Supported fingerprint algorithms for Avro schema identification.
480546
/// For use with Confluent Schema Registry IDs, set to None.
481547
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
@@ -507,6 +573,25 @@ impl From<&Fingerprint> for FingerprintAlgorithm {
507573
}
508574
}
509575

576+
impl From<FingerprintStrategy> for FingerprintAlgorithm {
577+
fn from(s: FingerprintStrategy) -> Self {
578+
Self::from(&s)
579+
}
580+
}
581+
582+
impl From<&FingerprintStrategy> for FingerprintAlgorithm {
583+
fn from(s: &FingerprintStrategy) -> Self {
584+
match s {
585+
FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
586+
FingerprintStrategy::Id(_) => FingerprintAlgorithm::None,
587+
#[cfg(feature = "md5")]
588+
FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
589+
#[cfg(feature = "sha256")]
590+
FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
591+
}
592+
}
593+
}
594+
510595
/// A schema fingerprint in one of the supported formats.
511596
///
512597
/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
@@ -529,6 +614,38 @@ pub enum Fingerprint {
529614
SHA256([u8; 32]),
530615
}
531616

617+
impl From<FingerprintStrategy> for Fingerprint {
618+
fn from(s: FingerprintStrategy) -> Self {
619+
Self::from(&s)
620+
}
621+
}
622+
623+
impl From<&FingerprintStrategy> for Fingerprint {
624+
fn from(s: &FingerprintStrategy) -> Self {
625+
match s {
626+
FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
627+
FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
628+
#[cfg(feature = "md5")]
629+
FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
630+
#[cfg(feature = "sha256")]
631+
FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
632+
}
633+
}
634+
}
635+
636+
impl From<FingerprintAlgorithm> for Fingerprint {
637+
fn from(s: FingerprintAlgorithm) -> Self {
638+
match s {
639+
FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
640+
FingerprintAlgorithm::None => Fingerprint::Id(0),
641+
#[cfg(feature = "md5")]
642+
FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
643+
#[cfg(feature = "sha256")]
644+
FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
645+
}
646+
}
647+
}
648+
532649
impl Fingerprint {
533650
/// Loads the 32-bit Schema Registry fingerprint (Confluent Schema Registry ID).
534651
///
@@ -540,6 +657,53 @@ impl Fingerprint {
540657
pub fn load_fingerprint_id(id: u32) -> Self {
541658
Fingerprint::Id(u32::from_be(id))
542659
}
660+
661+
/// Constructs a serialized prefix represented as a `Vec<u8>` based on the variant of the enum.
662+
///
663+
/// This method serializes data in different formats depending on the variant of `self`:
664+
/// - **`Id(id)`**: Uses the Confluent wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
665+
/// followed by the big-endian byte representation of the `id`.
666+
/// - **`Rabin(val)`**: Uses the Avro single-object specification format. This includes a different magic header
667+
/// (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte representation of the `val`.
668+
/// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard extension that adds the
669+
/// `SINGLE_OBJECT_MAGIC` header followed by the provided `bytes`.
670+
/// - **`SHA256(bytes)`** (optional, `sha256` feature enabled): Similar to the `MD5` variant, this is
671+
/// a non-standard extension that attaches the `SINGLE_OBJECT_MAGIC` header followed by the given `bytes`.
672+
///
673+
/// # Returns
674+
///
675+
/// A `Prefix` containing the serialized prefix data.
676+
///
677+
/// # Features
678+
///
679+
/// - You can optionally enable the `md5` feature to include the `MD5` variant.
680+
/// - You can optionally enable the `sha256` feature to include the `SHA256` variant.
681+
///
682+
pub fn make_prefix(&self) -> Prefix {
683+
let mut buf = [0u8; MAX_PREFIX_LEN];
684+
let len = match self {
685+
Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
686+
Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
687+
#[cfg(feature = "md5")]
688+
Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
689+
#[cfg(feature = "sha256")]
690+
Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
691+
};
692+
Prefix { buf, len }
693+
}
694+
}
695+
696+
fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
697+
buf: &mut [u8; MAX_PREFIX_LEN],
698+
magic: &[u8; MAGIC_LEN],
699+
payload: &[u8; PAYLOAD_LEN],
700+
) -> u8 {
701+
debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
702+
let total = MAGIC_LEN + PAYLOAD_LEN;
703+
let prefix_slice = &mut buf[..total];
704+
prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
705+
prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
706+
total as u8
543707
}
544708

545709
/// An in-memory cache of Avro schemas, indexed by their fingerprint.
@@ -1744,17 +1908,25 @@ mod tests {
17441908
let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
17451909
let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
17461910
schemas.insert(
1747-
int_avro_schema.fingerprint().unwrap(),
1911+
int_avro_schema
1912+
.fingerprint(FingerprintAlgorithm::Rabin)
1913+
.unwrap(),
17481914
int_avro_schema.clone(),
17491915
);
17501916
schemas.insert(
1751-
record_avro_schema.fingerprint().unwrap(),
1917+
record_avro_schema
1918+
.fingerprint(FingerprintAlgorithm::Rabin)
1919+
.unwrap(),
17521920
record_avro_schema.clone(),
17531921
);
17541922
let store = SchemaStore::try_from(schemas).unwrap();
1755-
let int_fp = int_avro_schema.fingerprint().unwrap();
1923+
let int_fp = int_avro_schema
1924+
.fingerprint(FingerprintAlgorithm::Rabin)
1925+
.unwrap();
17561926
assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
1757-
let rec_fp = record_avro_schema.fingerprint().unwrap();
1927+
let rec_fp = record_avro_schema
1928+
.fingerprint(FingerprintAlgorithm::Rabin)
1929+
.unwrap();
17581930
assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
17591931
}
17601932

@@ -1764,21 +1936,29 @@ mod tests {
17641936
let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
17651937
let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
17661938
schemas.insert(
1767-
int_avro_schema.fingerprint().unwrap(),
1939+
int_avro_schema
1940+
.fingerprint(FingerprintAlgorithm::Rabin)
1941+
.unwrap(),
17681942
int_avro_schema.clone(),
17691943
);
17701944
schemas.insert(
1771-
record_avro_schema.fingerprint().unwrap(),
1945+
record_avro_schema
1946+
.fingerprint(FingerprintAlgorithm::Rabin)
1947+
.unwrap(),
17721948
record_avro_schema.clone(),
17731949
);
17741950
// Insert duplicate of int schema
17751951
schemas.insert(
1776-
int_avro_schema.fingerprint().unwrap(),
1952+
int_avro_schema
1953+
.fingerprint(FingerprintAlgorithm::Rabin)
1954+
.unwrap(),
17771955
int_avro_schema.clone(),
17781956
);
17791957
let store = SchemaStore::try_from(schemas).unwrap();
17801958
assert_eq!(store.schemas.len(), 2);
1781-
let int_fp = int_avro_schema.fingerprint().unwrap();
1959+
let int_fp = int_avro_schema
1960+
.fingerprint(FingerprintAlgorithm::Rabin)
1961+
.unwrap();
17821962
assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
17831963
}
17841964

@@ -1838,7 +2018,7 @@ mod tests {
18382018
fn test_set_and_lookup_with_provided_fingerprint() {
18392019
let mut store = SchemaStore::new();
18402020
let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
1841-
let fp = schema.fingerprint().unwrap();
2021+
let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
18422022
let out_fp = store.set(fp, schema.clone()).unwrap();
18432023
assert_eq!(out_fp, fp);
18442024
assert_eq!(store.lookup(&fp).cloned(), Some(schema));
@@ -1848,7 +2028,7 @@ mod tests {
18482028
fn test_set_duplicate_same_schema_ok() {
18492029
let mut store = SchemaStore::new();
18502030
let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
1851-
let fp = schema.fingerprint().unwrap();
2031+
let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
18522032
let _ = store.set(fp, schema.clone()).unwrap();
18532033
let _ = store.set(fp, schema.clone()).unwrap();
18542034
assert_eq!(store.schemas.len(), 1);

arrow-avro/src/writer/encoder.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Avro Encoder for Arrow types.
1919
2020
use crate::codec::{AvroDataType, AvroField, Codec};
21-
use crate::schema::Nullability;
21+
use crate::schema::{Fingerprint, Nullability, Prefix};
2222
use arrow_array::cast::AsArray;
2323
use arrow_array::types::{
2424
ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, IntervalDayTimeType,
@@ -33,6 +33,7 @@ use arrow_array::{
3333
use arrow_array::{Decimal32Array, Decimal64Array};
3434
use arrow_buffer::NullBuffer;
3535
use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit};
36+
use serde::Serialize;
3637
use std::io::Write;
3738
use std::sync::Arc;
3839
use uuid::Uuid;
@@ -522,6 +523,7 @@ struct FieldBinding {
522523
pub struct RecordEncoderBuilder<'a> {
523524
avro_root: &'a AvroField,
524525
arrow_schema: &'a ArrowSchema,
526+
fingerprint: Option<Fingerprint>,
525527
}
526528

527529
impl<'a> RecordEncoderBuilder<'a> {
@@ -530,9 +532,15 @@ impl<'a> RecordEncoderBuilder<'a> {
530532
Self {
531533
avro_root,
532534
arrow_schema,
535+
fingerprint: None,
533536
}
534537
}
535538

539+
pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
540+
self.fingerprint = fingerprint;
541+
self
542+
}
543+
536544
/// Build the `RecordEncoder` by walking the Avro **record** root in Avro order,
537545
/// resolving each field to an Arrow index by name.
538546
pub fn build(self) -> Result<RecordEncoder, ArrowError> {
@@ -557,7 +565,10 @@ impl<'a> RecordEncoderBuilder<'a> {
557565
)?,
558566
});
559567
}
560-
Ok(RecordEncoder { columns })
568+
Ok(RecordEncoder {
569+
columns,
570+
prefix: self.fingerprint.map(|fp| fp.make_prefix()),
571+
})
561572
}
562573
}
563574

@@ -569,6 +580,8 @@ impl<'a> RecordEncoderBuilder<'a> {
569580
#[derive(Debug, Clone)]
570581
pub struct RecordEncoder {
571582
columns: Vec<FieldBinding>,
583+
/// Optional pre-built, variable-length prefix written before each record.
584+
prefix: Option<Prefix>,
572585
}
573586

574587
impl RecordEncoder {
@@ -602,9 +615,23 @@ impl RecordEncoder {
602615
/// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of many small writes.
603616
pub fn encode<W: Write>(&self, out: &mut W, batch: &RecordBatch) -> Result<(), ArrowError> {
604617
let mut column_encoders = self.prepare_for_batch(batch)?;
605-
for row in 0..batch.num_rows() {
606-
for encoder in column_encoders.iter_mut() {
607-
encoder.encode(out, row)?;
618+
let n = batch.num_rows();
619+
match self.prefix {
620+
Some(prefix) => {
621+
for row in 0..n {
622+
out.write_all(prefix.as_slice())
623+
.map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?;
624+
for enc in column_encoders.iter_mut() {
625+
enc.encode(out, row)?;
626+
}
627+
}
628+
}
629+
None => {
630+
for row in 0..n {
631+
for enc in column_encoders.iter_mut() {
632+
enc.encode(out, row)?;
633+
}
634+
}
608635
}
609636
}
610637
Ok(())

0 commit comments

Comments
 (0)