Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use datafusion::datasource::listing::{
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::memory_pool::{human_readable_size, units};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
Expand All @@ -44,6 +43,7 @@ use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt, QueryResult};
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{exec_err, DEFAULT_PARQUET_EXTENSION};
use datafusion_common::{human_readable_size, units};

#[derive(Debug, StructOpt)]
#[structopt(
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
pub fn print_memory_stats() {
#[cfg(all(feature = "mimalloc", feature = "mimalloc_extended"))]
{
use datafusion::execution::memory_pool::human_readable_size;
use datafusion_common::human_readable_size;
let mut peak_rss = 0;
let mut peak_commit = 0;
let mut page_faults = 0;
Expand Down
139 changes: 139 additions & 0 deletions datafusion/common/src/display/human_readable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Helpers for rendering sizes, counts, and durations in human readable form.

/// Common data size units
pub mod units {
pub const TB: u64 = 1 << 40;
pub const GB: u64 = 1 << 30;
pub const MB: u64 = 1 << 20;
pub const KB: u64 = 1 << 10;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those should have been named *iB, e.g. TiB, GiB, etc. since they are base-2, not base-10. But maybe it is too late now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe it is not too late. This PR breaks the API anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is re-exported in the original place to keep the API stable, so unfortunately it's too late 🤦🏼

}

/// Present size in human-readable form
pub fn human_readable_size(size: usize) -> String {
use units::*;

let size = size as u64;
let (value, unit) = {
if size >= 2 * TB {
(size as f64 / TB as f64, "TB")
} else if size >= 2 * GB {
(size as f64 / GB as f64, "GB")
} else if size >= 2 * MB {
(size as f64 / MB as f64, "MB")
} else if size >= 2 * KB {
(size as f64 / KB as f64, "KB")
} else {
(size as f64, "B")
}
};
format!("{value:.1} {unit}")
}

/// Present count in human-readable form with K, M, B, T suffixes
pub fn human_readable_count(count: usize) -> String {
let count = count as u64;
let (value, unit) = {
if count >= 1_000_000_000_000 {
(count as f64 / 1_000_000_000_000.0, " T")
} else if count >= 1_000_000_000 {
(count as f64 / 1_000_000_000.0, " B")
} else if count >= 1_000_000 {
(count as f64 / 1_000_000.0, " M")
} else if count >= 1_000 {
(count as f64 / 1_000.0, " K")
} else {
return count.to_string();
}
};

// Format with appropriate precision
// For values >= 100, show 1 decimal place (e.g., 123.4 K)
// For values < 100, show 2 decimal places (e.g., 10.12 K)
if value >= 100.0 {
format!("{value:.1}{unit}")
} else {
format!("{value:.2}{unit}")
}
}

/// Present duration in human-readable form with 2 decimal places
pub fn human_readable_duration(nanos: u64) -> String {
const NANOS_PER_SEC: f64 = 1_000_000_000.0;
const NANOS_PER_MILLI: f64 = 1_000_000.0;
const NANOS_PER_MICRO: f64 = 1_000.0;

let nanos_f64 = nanos as f64;

if nanos >= 1_000_000_000 {
// >= 1 second: show in seconds
format!("{:.2}s", nanos_f64 / NANOS_PER_SEC)
} else if nanos >= 1_000_000 {
// >= 1 millisecond: show in milliseconds
format!("{:.2}ms", nanos_f64 / NANOS_PER_MILLI)
} else if nanos >= 1_000 {
// >= 1 microsecond: show in microseconds
format!("{:.2}µs", nanos_f64 / NANOS_PER_MICRO)
} else {
// < 1 microsecond: show in nanoseconds
format!("{nanos}ns")
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_human_readable_count() {
assert_eq!(human_readable_count(0), "0");
assert_eq!(human_readable_count(1), "1");
assert_eq!(human_readable_count(999), "999");
assert_eq!(human_readable_count(1_000), "1.00 K");
assert_eq!(human_readable_count(10_100), "10.10 K");
assert_eq!(human_readable_count(1_532), "1.53 K");
assert_eq!(human_readable_count(99_999), "100.00 K");
assert_eq!(human_readable_count(1_000_000), "1.00 M");
assert_eq!(human_readable_count(1_532_000), "1.53 M");
assert_eq!(human_readable_count(99_000_000), "99.00 M");
assert_eq!(human_readable_count(123_456_789), "123.5 M");
assert_eq!(human_readable_count(1_000_000_000), "1.00 B");
assert_eq!(human_readable_count(1_532_000_000), "1.53 B");
assert_eq!(human_readable_count(999_999_999_999), "1000.0 B");
assert_eq!(human_readable_count(1_000_000_000_000), "1.00 T");
assert_eq!(human_readable_count(42_000_000_000_000), "42.00 T");
}

#[test]
fn test_human_readable_duration() {
assert_eq!(human_readable_duration(0), "0ns");
assert_eq!(human_readable_duration(1), "1ns");
assert_eq!(human_readable_duration(999), "999ns");
assert_eq!(human_readable_duration(1_000), "1.00µs");
assert_eq!(human_readable_duration(1_234), "1.23µs");
assert_eq!(human_readable_duration(999_999), "1000.00µs");
assert_eq!(human_readable_duration(1_000_000), "1.00ms");
assert_eq!(human_readable_duration(11_295_377), "11.30ms");
assert_eq!(human_readable_duration(1_234_567), "1.23ms");
assert_eq!(human_readable_duration(999_999_999), "1000.00ms");
assert_eq!(human_readable_duration(1_000_000_000), "1.00s");
assert_eq!(human_readable_duration(1_234_567_890), "1.23s");
assert_eq!(human_readable_duration(42_000_000_000), "42.00s");
}
}
1 change: 1 addition & 0 deletions datafusion/common/src/display/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Types for plan display

mod graphviz;
pub mod human_readable;
pub use graphviz::*;

use std::{
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ pub use dfschema::{
DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema, qualified_name,
};
pub use diagnostic::Diagnostic;
pub use display::human_readable::{
human_readable_count, human_readable_duration, human_readable_size, units,
};
pub use error::{
DataFusionError, Result, SchemaError, SharedResult, field_not_found,
unqualified_field_not_found,
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion::datasource::MemTable;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{instant::Instant, Result};
use datafusion_common::{human_readable_size, instant::Instant, Result};
use datafusion_execution::disk_manager::DiskManagerBuilder;
use datafusion_execution::memory_pool::{
human_readable_size, MemoryPool, UnboundedMemoryPool,
};
use datafusion_execution::memory_pool::{MemoryPool, UnboundedMemoryPool};
use datafusion_expr::display_schema;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use std::time::Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionConfig;
use datafusion_execution::memory_pool::units::{KB, MB};
use datafusion_common::units::{KB, MB};
use datafusion_execution::memory_pool::{
FairSpillPool, MemoryConsumer, MemoryReservation,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, System};
use tokio::time::{interval, Duration};

use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_execution::{
memory_pool::{human_readable_size, FairSpillPool},
runtime_env::RuntimeEnvBuilder,
};
use datafusion_common::human_readable_size;
use datafusion_execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder};

/// Measures the maximum RSS (in bytes) during the execution of an async task. RSS
/// will be sampled every 7ms.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tempfile::{Builder, NamedTempFile, TempDir};

use crate::memory_pool::human_readable_size;
use datafusion_common::human_readable_size;

pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB

Expand Down
134 changes: 3 additions & 131 deletions datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub mod proxy {
pub use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
}

pub use datafusion_common::{
human_readable_count, human_readable_duration, human_readable_size, units,
};
pub use pool::*;

/// Tracks and potentially limits memory use across operators during execution.
Expand Down Expand Up @@ -473,84 +476,6 @@ impl Drop for MemoryReservation {
}
}

pub mod units {
pub const TB: u64 = 1 << 40;
pub const GB: u64 = 1 << 30;
pub const MB: u64 = 1 << 20;
pub const KB: u64 = 1 << 10;
}

/// Present size in human-readable form
pub fn human_readable_size(size: usize) -> String {
use units::*;

let size = size as u64;
let (value, unit) = {
if size >= 2 * TB {
(size as f64 / TB as f64, "TB")
} else if size >= 2 * GB {
(size as f64 / GB as f64, "GB")
} else if size >= 2 * MB {
(size as f64 / MB as f64, "MB")
} else if size >= 2 * KB {
(size as f64 / KB as f64, "KB")
} else {
(size as f64, "B")
}
};
format!("{value:.1} {unit}")
}

/// Present count in human-readable form with K, M, B, T suffixes
pub fn human_readable_count(count: usize) -> String {
let count = count as u64;
let (value, unit) = {
if count >= 1_000_000_000_000 {
(count as f64 / 1_000_000_000_000.0, " T")
} else if count >= 1_000_000_000 {
(count as f64 / 1_000_000_000.0, " B")
} else if count >= 1_000_000 {
(count as f64 / 1_000_000.0, " M")
} else if count >= 1_000 {
(count as f64 / 1_000.0, " K")
} else {
return count.to_string();
}
};

// Format with appropriate precision
// For values >= 100, show 1 decimal place (e.g., 123.4 K)
// For values < 100, show 2 decimal places (e.g., 10.12 K)
if value >= 100.0 {
format!("{value:.1}{unit}")
} else {
format!("{value:.2}{unit}")
}
}

/// Present duration in human-readable form with 2 decimal places
pub fn human_readable_duration(nanos: u64) -> String {
const NANOS_PER_SEC: f64 = 1_000_000_000.0;
const NANOS_PER_MILLI: f64 = 1_000_000.0;
const NANOS_PER_MICRO: f64 = 1_000.0;

let nanos_f64 = nanos as f64;

if nanos >= 1_000_000_000 {
// >= 1 second: show in seconds
format!("{:.2}s", nanos_f64 / NANOS_PER_SEC)
} else if nanos >= 1_000_000 {
// >= 1 millisecond: show in milliseconds
format!("{:.2}ms", nanos_f64 / NANOS_PER_MILLI)
} else if nanos >= 1_000 {
// >= 1 microsecond: show in microseconds
format!("{:.2}µs", nanos_f64 / NANOS_PER_MICRO)
} else {
// < 1 microsecond: show in nanoseconds
format!("{nanos}ns")
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -647,57 +572,4 @@ mod tests {
assert_eq!(r2.size(), 25);
assert_eq!(pool.reserved(), 28);
}

#[test]
fn test_human_readable_count() {
// Test small numbers (< 1000) - should display as-is
assert_eq!(human_readable_count(0), "0");
assert_eq!(human_readable_count(1), "1");
assert_eq!(human_readable_count(999), "999");

// Test thousands (K)
assert_eq!(human_readable_count(1_000), "1.00 K");
assert_eq!(human_readable_count(10_100), "10.10 K");
assert_eq!(human_readable_count(1_532), "1.53 K");
assert_eq!(human_readable_count(99_999), "100.00 K");

// Test millions (M)
assert_eq!(human_readable_count(1_000_000), "1.00 M");
assert_eq!(human_readable_count(1_532_000), "1.53 M");
assert_eq!(human_readable_count(99_000_000), "99.00 M");
assert_eq!(human_readable_count(123_456_789), "123.5 M");

// Test billions (B)
assert_eq!(human_readable_count(1_000_000_000), "1.00 B");
assert_eq!(human_readable_count(1_532_000_000), "1.53 B");
assert_eq!(human_readable_count(999_999_999_999), "1000.0 B");

// Test trillions (T)
assert_eq!(human_readable_count(1_000_000_000_000), "1.00 T");
assert_eq!(human_readable_count(42_000_000_000_000), "42.00 T");
}

#[test]
fn test_human_readable_duration() {
// Test nanoseconds (< 1µs)
assert_eq!(human_readable_duration(0), "0ns");
assert_eq!(human_readable_duration(1), "1ns");
assert_eq!(human_readable_duration(999), "999ns");

// Test microseconds (1µs to < 1ms)
assert_eq!(human_readable_duration(1_000), "1.00µs");
assert_eq!(human_readable_duration(1_234), "1.23µs");
assert_eq!(human_readable_duration(999_999), "1000.00µs");

// Test milliseconds (1ms to < 1s)
assert_eq!(human_readable_duration(1_000_000), "1.00ms");
assert_eq!(human_readable_duration(11_295_377), "11.30ms");
assert_eq!(human_readable_duration(1_234_567), "1.23ms");
assert_eq!(human_readable_duration(999_999_999), "1000.00ms");

// Test seconds (>= 1s)
assert_eq!(human_readable_duration(1_000_000_000), "1.00s");
assert_eq!(human_readable_duration(1_234_567_890), "1.23s");
assert_eq!(human_readable_duration(42_000_000_000), "42.00s");
}
}
2 changes: 1 addition & 1 deletion datafusion/physical-plan/benches/spill_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use criterion::{
criterion_group, criterion_main, BatchSize, BenchmarkGroup, BenchmarkId, Criterion,
};
use datafusion_common::config::SpillCompression;
use datafusion_common::human_readable_size;
use datafusion_common::instant::Instant;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::common::collect;
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
Expand Down
Loading
Loading