Skip to content

Commit c74136d

Browse files
authored
Add Ballista examples (apache#775)
1 parent 6f5878d commit c74136d

File tree

6 files changed

+242
-16
lines changed

6 files changed

+242
-16
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ members = [
2525
"ballista/rust/core",
2626
"ballista/rust/executor",
2727
"ballista/rust/scheduler",
28+
"ballista-examples",
2829
]
2930

3031
exclude = ["python"]

ballista-examples/Cargo.toml

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "ballista-examples"
20+
description = "Ballista usage examples"
21+
version = "0.5.0-SNAPSHOT"
22+
homepage = "https://github.com/apache/arrow-datafusion"
23+
repository = "https://github.com/apache/arrow-datafusion"
24+
authors = ["Apache Arrow <[email protected]>"]
25+
license = "Apache-2.0"
26+
keywords = [ "arrow", "distributed", "query", "sql" ]
27+
edition = "2018"
28+
publish = false
29+
30+
[dependencies]
31+
arrow-flight = { version = "5.0" }
32+
datafusion = { path = "../datafusion" }
33+
ballista = { path = "../ballista/rust/client" }
34+
prost = "0.7"
35+
tonic = "0.4"
36+
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
37+
futures = "0.3"
38+
num_cpus = "1.13.0"

ballista-examples/README.md

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Ballista Examples
21+
22+
This directory contains examples for executing distributed queries with Ballista.
23+
24+
For background information on the Ballista architecture, refer to
25+
the [Ballista README](../ballista/README.md).
26+
27+
## Start a standalone cluster
28+
29+
From the root of the arrow-datafusion project, build release binaries.
30+
31+
```bash
32+
cargo build --release
33+
```
34+
35+
Start a Ballista scheduler process in a new terminal session.
36+
37+
```bash
38+
RUST_LOG=info ./target/release/ballista-scheduler
39+
```
40+
41+
Start one or more Ballista executor processes in new terminal sessions. When starting more than one
42+
executor, a unique port number must be specified for each executor.
43+
44+
```bash
45+
RUST_LOG=info ./target/release/ballista-executor -c 4
46+
```
47+
48+
## Running the examples
49+
50+
Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `ARROW_TEST_DATA` and
51+
`PARQUET_TEST_DATA` environment variables so that the examples can find the test data files.
52+
53+
The examples can be run using the `cargo run --bin` syntax.
54+
55+
```bash
56+
cargo run --release --bin ballista-dataframe
57+
```
58+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use ballista::prelude::*;
19+
use datafusion::arrow::util::pretty;
20+
use datafusion::prelude::{col, lit};
21+
22+
/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
23+
/// fetching results, using the DataFrame trait
24+
#[tokio::main]
25+
async fn main() -> Result<()> {
26+
let config = BallistaConfig::builder()
27+
.set("ballista.shuffle.partitions", "4")
28+
.build()?;
29+
let ctx = BallistaContext::remote("localhost", 50050, &config);
30+
31+
let testdata = datafusion::arrow::util::test_util::parquet_test_data();
32+
33+
let filename = &format!("{}/alltypes_plain.parquet", testdata);
34+
35+
// define the query using the DataFrame trait
36+
let df = ctx
37+
.read_parquet(filename)?
38+
.select_columns(&["id", "bool_col", "timestamp_col"])?
39+
.filter(col("id").gt(lit(1)))?;
40+
41+
// execute the query - note that calling collect on the DataFrame
42+
// trait will execute the query with DataFusion so we have to call
43+
// collect on the BallistaContext instead and pass it the DataFusion
44+
// logical plan
45+
let mut stream = ctx.collect(&df.to_logical_plan()).await?;
46+
47+
// print the results
48+
let mut results = vec![];
49+
while let Some(batch) = stream.next().await {
50+
let batch = batch?;
51+
results.push(batch);
52+
}
53+
pretty::print_batches(&results)?;
54+
55+
Ok(())
56+
}
+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use ballista::prelude::*;
19+
use datafusion::arrow::util::pretty;
20+
use datafusion::prelude::CsvReadOptions;
21+
22+
/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
23+
/// fetching results, using SQL
24+
#[tokio::main]
25+
async fn main() -> Result<()> {
26+
let config = BallistaConfig::builder()
27+
.set("ballista.shuffle.partitions", "4")
28+
.build()?;
29+
let ctx = BallistaContext::remote("localhost", 50050, &config);
30+
31+
let testdata = datafusion::arrow::util::test_util::arrow_test_data();
32+
33+
// register csv file with the execution context
34+
ctx.register_csv(
35+
"aggregate_test_100",
36+
&format!("{}/csv/aggregate_test_100.csv", testdata),
37+
CsvReadOptions::new(),
38+
)?;
39+
40+
// execute the query
41+
let df = ctx.sql(
42+
"SELECT c1, MIN(c12), MAX(c12) \
43+
FROM aggregate_test_100 \
44+
WHERE c11 > 0.1 AND c11 < 0.9 \
45+
GROUP BY c1",
46+
)?;
47+
48+
// execute the query - note that calling collect on the DataFrame
49+
// trait will execute the query with DataFusion so we have to call
50+
// collect on the BallistaContext instead and pass it the DataFusion
51+
// logical plan
52+
let mut stream = ctx.collect(&df.to_logical_plan()).await?;
53+
54+
// print the results
55+
let mut results = vec![];
56+
while let Some(batch) = stream.next().await {
57+
let batch = batch?;
58+
results.push(batch);
59+
}
60+
pretty::print_batches(&results)?;
61+
62+
Ok(())
63+
}

ballista/README.md

+26-16
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
under the License.
1818
-->
1919

20-
# Ballista: Distributed Compute with Apache Arrow
20+
# Ballista: Distributed Compute with Apache Arrow and DataFusion
2121

22-
Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow. It is built
23-
on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as
24-
first-class citizens without paying a penalty for serialization costs.
22+
Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and
23+
DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and
24+
Java) to be supported as first-class citizens without paying a penalty for serialization costs.
2525

2626
The foundational technologies in Ballista are:
2727

@@ -35,9 +35,30 @@ Ballista can be deployed as a standalone cluster and also supports [Kubernetes](
3535
case, the scheduler can be configured to use [etcd](https://etcd.io/) as a backing store to (eventually) provide
3636
redundancy in the case of a scheduler failing.
3737

38+
# Getting Started
39+
40+
Fully working examples are available. Refer to the [Ballista Examples README](../ballista-examples/README.md) for
41+
more information.
42+
43+
## Distributed Scheduler Overview
44+
45+
Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a
46+
distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes.
47+
48+
Specifically, any `RepartitionExec` operator is replaced with an `UnresolvedShuffleExec` and the child operator
49+
of the repartition operator is wrapped in a `ShuffleWriterExec` operator and scheduled for execution.
50+
51+
Each executor polls the scheduler for the next task to run. Tasks are currently always `ShuffleWriterExec` operators
52+
and each task represents one *input* partition that will be executed. The resulting batches are repartitioned
53+
according to the shuffle partitioning scheme and each *output* partition is streamed to disk in Arrow IPC format.
54+
55+
The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle
56+
tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight
57+
interface, and streams the shuffle IPC files.
58+
3859
# How does this compare to Apache Spark?
3960

40-
Although Ballista is largely inspired by Apache Spark, there are some key differences.
61+
Ballista implements a similar design to Apache Spark, but there are some key differences.
4162

4263
- The choice of Rust as the main execution language means that memory usage is deterministic and avoids the overhead of
4364
GC pauses.
@@ -49,14 +70,3 @@ Although Ballista is largely inspired by Apache Spark, there are some key differ
4970
distributed compute.
5071
- The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors
5172
in any programming language with minimal serialization overhead.
52-
53-
## Status
54-
55-
Ballista was [donated](https://arrow.apache.org/blog/2021/04/12/ballista-donation/) to the Apache Arrow project in
56-
April 2021 and should be considered experimental.
57-
58-
## Getting Started
59-
60-
The [Ballista Developer Documentation](docs/README.md) and the
61-
[DataFusion User Guide](https://github.com/apache/arrow-datafusion/tree/master/docs/user-guide) are currently the
62-
best sources of information for getting started with Ballista.

0 commit comments

Comments
 (0)