Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ cargo run --example dataframe
- [`examples/udf/advanced_udaf.rs`](examples/udf/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
- [`examples/udf/advanced_udf.rs`](examples/udf/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
- [`examples/udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
- [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
- [`examples/data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
- [`examples/udf/async_udf.rs`](examples/udf/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF)
- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control)
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
- [`examples/data_io/catalog.rs`](examples/data_io/catalog.rs): Register the table into a custom catalog
- [`examples/data_io/json_shredding.rs`](examples/data_io/json_shredding.rs): Shows how to implement custom filter rewriting for JSON shredding
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
- [`examples/custom_data_source/csv_sql_streaming.rs`](examples/custom_data_source/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
- [`examples/custom_data_source/csv_json_opener.rs`](examples/custom_data_source/csv_json_opener.rs): Use low level `FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es
Expand All @@ -71,19 +72,19 @@ cargo run --example dataframe
- [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages
- [`memory_pool_execution_plan.rs`](examples/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
- [`parquet_embedded_index.rs`](examples/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries
- [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion
- [`parquet_encrypted_with_kms.rs`](examples/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`examples/data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries
- [`examples/data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion
- [`examples/data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory
- [`examples/data_io/parquet_index.rs`](examples/data_io/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`examples/data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
- [`planner_api.rs`](examples/planner_api.rs) APIs to manipulate logical and physical plans
- [`pruning.rs`](examples/pruning.rs): Use pruning to rule out files based on statistics
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
- [`examples/data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs): Configure `object_store` and run a query against files via HTTP
- [`examples/builtin_functions/regexp.rs`](examples/builtin_functions/regexp.rs): Examples of using regular expression functions
- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network)
- [`examples/data_io/remote_catalog.rs`](examples/data_io/remote_catalog.rs): Examples of interfacing with a remote catalog (e.g. over a network)
- [`examples/udf/simple_udaf.rs`](examples/udf/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`examples/udf/simple_udf.rs`](examples/udf/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`examples/udf/simple_udtf.rs`](examples/udf/simple_udtf.rs): Define and invoke a User Defined Table Function (UDTF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use std::{any::Any, collections::HashMap, path::Path, sync::Arc};
use std::{fs::File, io::Write};
use tempfile::TempDir;

#[tokio::main]
async fn main() -> Result<()> {
/// Register the table into a custom catalog
pub async fn catalog() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.init();
Expand Down Expand Up @@ -134,12 +134,14 @@ struct DirSchemaOpts<'a> {
dir: &'a Path,
format: Arc<dyn FileFormat>,
}

/// Schema where every file with extension `ext` in a given `dir` is a table.
#[derive(Debug)]
struct DirSchema {
ext: String,
tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
}

impl DirSchema {
async fn create(state: &SessionState, opts: DirSchemaOpts<'_>) -> Result<Arc<Self>> {
let DirSchemaOpts { ext, dir, format } = opts;
Expand Down Expand Up @@ -172,6 +174,7 @@ impl DirSchema {
ext: ext.to_string(),
}))
}

#[allow(unused)]
fn name(&self) -> &str {
&self.ext
Expand All @@ -198,6 +201,7 @@ impl SchemaProvider for DirSchema {
let tables = self.tables.read().unwrap();
tables.contains_key(name)
}

fn register_table(
&self,
name: String,
Expand All @@ -223,17 +227,20 @@ impl SchemaProvider for DirSchema {
struct DirCatalog {
schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
}

impl DirCatalog {
fn new() -> Self {
Self {
schemas: RwLock::new(HashMap::new()),
}
}
}

impl CatalogProvider for DirCatalog {
fn as_any(&self) -> &dyn Any {
self
}

fn register_schema(
&self,
name: &str,
Expand All @@ -260,22 +267,26 @@ impl CatalogProvider for DirCatalog {
}
}
}

/// Catalog lists holds multiple catalog providers. Each context has a single catalog list.
#[derive(Debug)]
struct CustomCatalogProviderList {
catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
}

impl CustomCatalogProviderList {
fn new() -> Self {
Self {
catalogs: RwLock::new(HashMap::new()),
}
}
}

impl CatalogProviderList for CustomCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ use object_store::{ObjectStore, PutPayload};
// 1. Push down predicates for better filtering
// 2. Avoid expensive JSON parsing at query time
// 3. Leverage columnar storage benefits for the materialized fields
#[tokio::main]
async fn main() -> Result<()> {
pub async fn json_shredding() -> Result<()> {
println!("=== Creating example data with flat columns and underscore prefixes ===");

// Create sample data with flat columns using underscore prefixes
Expand Down
160 changes: 160 additions & 0 deletions datafusion-examples/examples/data_io/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! # These examples of data formats and I/O
//!
//! These examples demonstrate data formats and I/O.
//!
//! ## Usage
//! ```bash
//! cargo run --example data_io -- [catalog|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog]
//! ```
//!
//! Each subcommand runs a corresponding example:
//! - `catalog` — register the table into a custom catalog
//! - `json_shredding` — shows how to implement custom filter rewriting for JSON shredding
//! - `parquet_adv_idx` — create a detailed secondary index that covers the contents of several parquet files
//! - `parquet_emb_idx` — store a custom index inside a Parquet file and use it to speed up queries
//! - `parquet_enc_with_kms` — read and write encrypted Parquet files using an encryption factory
//! - `parquet_enc` — read and write encrypted Parquet files using DataFusion
//! - `parquet_exec_visitor` — extract statistics by visiting an ExecutionPlan after execution
//! - `parquet_idx` — create an secondary index over several parquet files and use it to speed up queries
//! - `query_http_csv` — configure `object_store` and run a query against files via HTTP
//! - `remote_catalog` — interfacing with a remote catalog (e.g. over a network)

mod catalog;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

mod json_shredding;
mod parquet_advanced_index;
mod parquet_embedded_index;
mod parquet_encrypted;
mod parquet_encrypted_with_kms;
mod parquet_exec_visitor;
mod parquet_index;
mod query_http_csv;
mod remote_catalog;

use std::str::FromStr;

use datafusion::error::{DataFusionError, Result};

enum ExampleKind {
Catalog,
JsonShredding,
ParquetAdvIdx,
ParquetEmbIdx,
ParquetEnc,
ParquetEncWithKms,
ParquetExecVisitor,
ParquetIdx,
QueryHttpCsv,
RemoteCatalog,
}

impl AsRef<str> for ExampleKind {
fn as_ref(&self) -> &str {
match self {
Self::Catalog => "catalog",
Self::JsonShredding => "json_shredding",
Self::ParquetAdvIdx => "parquet_adv_idx",
Self::ParquetEmbIdx => "parquet_emb_idx",
Self::ParquetEnc => "parquet_enc",
Self::ParquetEncWithKms => "parquet_enc_with_kms",
Self::ParquetExecVisitor => "parquet_exec_visitor",
Self::ParquetIdx => "parquet_idx",
Self::QueryHttpCsv => "query_http_csv",
Self::RemoteCatalog => "remote_catalog",
}
}
}

impl FromStr for ExampleKind {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
match s {
"catalog" => Ok(Self::Catalog),
"json_shredding" => Ok(Self::JsonShredding),
"parquet_adv_idx" => Ok(Self::ParquetAdvIdx),
"parquet_emb_idx" => Ok(Self::ParquetEmbIdx),
"parquet_enc" => Ok(Self::ParquetEnc),
"parquet_enc_with_kms" => Ok(Self::ParquetEncWithKms),
"parquet_exec_visitor" => Ok(Self::ParquetExecVisitor),
"parquet_idx" => Ok(Self::ParquetIdx),
"query_http_csv" => Ok(Self::QueryHttpCsv),
"remote_catalog" => Ok(Self::RemoteCatalog),
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
}
}
}

impl ExampleKind {
const ALL: [Self; 10] = [
Self::Catalog,
Self::JsonShredding,
Self::ParquetAdvIdx,
Self::ParquetEmbIdx,
Self::ParquetEnc,
Self::ParquetEncWithKms,
Self::ParquetExecVisitor,
Self::ParquetIdx,
Self::QueryHttpCsv,
Self::RemoteCatalog,
];

const EXAMPLE_NAME: &str = "data_io";

fn variants() -> Vec<&'static str> {
Self::ALL.iter().map(|x| x.as_ref()).collect()
}
}

#[tokio::main]
async fn main() -> Result<()> {
let usage = format!(
"Usage: cargo run --example {} -- [{}]",
ExampleKind::EXAMPLE_NAME,
ExampleKind::variants().join("|")
);

let arg = std::env::args().nth(1).ok_or_else(|| {
eprintln!("{usage}");
DataFusionError::Execution("Missing argument".to_string())
})?;

match arg.parse::<ExampleKind>()? {
ExampleKind::Catalog => catalog::catalog().await?,
ExampleKind::JsonShredding => json_shredding::json_shredding().await?,
ExampleKind::ParquetAdvIdx => {
parquet_advanced_index::parquet_advanced_index().await?
}
ExampleKind::ParquetEmbIdx => {
parquet_embedded_index::parquet_embedded_index().await?
}
ExampleKind::ParquetEncWithKms => {
parquet_encrypted_with_kms::parquet_encrypted_with_kms().await?
}
ExampleKind::ParquetEnc => parquet_encrypted::parquet_encrypted().await?,
ExampleKind::ParquetExecVisitor => {
parquet_exec_visitor::parquet_exec_visitor().await?
}
ExampleKind::ParquetIdx => parquet_index::parquet_index().await?,
ExampleKind::QueryHttpCsv => query_http_csv::query_http_csv().await?,
ExampleKind::RemoteCatalog => remote_catalog::remote_catalog().await?,
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ use url::Url;
///
/// [`ListingTable`]: datafusion::datasource::listing::ListingTable
/// [Page Index](https://github.com/apache/parquet-format/blob/master/PageIndex.md)
#[tokio::main]
async fn main() -> Result<()> {
pub async fn parquet_advanced_index() -> Result<()> {
// the object store is used to read the parquet files (in this case, it is
// a local file system, but in a real system it could be S3, GCS, etc)
let object_store: Arc<dyn ObjectStore> =
Expand Down Expand Up @@ -239,6 +238,7 @@ pub struct IndexTableProvider {
/// if true, use row selections in addition to row group selections
use_row_selections: AtomicBool,
}

impl IndexTableProvider {
/// Create a new IndexTableProvider
/// * `object_store` - the object store implementation to use for reading files
Expand Down Expand Up @@ -539,6 +539,7 @@ impl CachedParquetFileReaderFactory {
metadata: HashMap::new(),
}
}

/// Add the pre-parsed information about the file to the factor
fn with_file(mut self, indexed_file: &IndexedFile) -> Self {
self.metadata.insert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,31 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;

/// Store a custom index inside a Parquet file and use it to speed up queries
pub async fn parquet_embedded_index() -> Result<()> {
// 1. Create temp dir and write 3 Parquet files with different category sets
let tmp = TempDir::new()?;
let dir = tmp.path();
write_file_with_index(&dir.join("a.parquet"), &["foo", "bar", "foo"])?;
write_file_with_index(&dir.join("b.parquet"), &["baz", "qux"])?;
write_file_with_index(&dir.join("c.parquet"), &["foo", "quux", "quux"])?;

// 2. Register our custom TableProvider
let field = Field::new("category", DataType::Utf8, false);
let schema_ref = Arc::new(Schema::new(vec![field]));
let provider = Arc::new(DistinctIndexTable::try_new(dir, schema_ref.clone())?);

let ctx = SessionContext::new();
ctx.register_table("t", provider)?;

// 3. Run a query: only files containing 'foo' get scanned. The rest are pruned.
// based on the distinct index.
let df = ctx.sql("SELECT * FROM t WHERE category = 'foo'").await?;
df.show().await?;

Ok(())
}

/// An index of distinct values for a single column
///
/// In this example the index is a simple set of strings, but in a real
Expand Down Expand Up @@ -452,28 +477,3 @@ impl TableProvider for DistinctIndexTable {
Ok(vec![TableProviderFilterPushDown::Inexact; fs.len()])
}
}

#[tokio::main]
async fn main() -> Result<()> {
// 1. Create temp dir and write 3 Parquet files with different category sets
let tmp = TempDir::new()?;
let dir = tmp.path();
write_file_with_index(&dir.join("a.parquet"), &["foo", "bar", "foo"])?;
write_file_with_index(&dir.join("b.parquet"), &["baz", "qux"])?;
write_file_with_index(&dir.join("c.parquet"), &["foo", "quux", "quux"])?;

// 2. Register our custom TableProvider
let field = Field::new("category", DataType::Utf8, false);
let schema_ref = Arc::new(Schema::new(vec![field]));
let provider = Arc::new(DistinctIndexTable::try_new(dir, schema_ref.clone())?);

let ctx = SessionContext::new();
ctx.register_table("t", provider)?;

// 3. Run a query: only files containing 'foo' get scanned. The rest are pruned.
// based on the distinct index.
let df = ctx.sql("SELECT * FROM t WHERE category = 'foo'").await?;
df.show().await?;

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use datafusion::prelude::{ParquetReadOptions, SessionContext};
use std::sync::Arc;
use tempfile::TempDir;

#[tokio::main]
async fn main() -> datafusion::common::Result<()> {
/// Read and write encrypted Parquet files using DataFusion
pub async fn parquet_encrypted() -> datafusion::common::Result<()> {
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();

Expand Down
Loading