Skip to content
Merged
35 changes: 31 additions & 4 deletions dev/benchmarks/tpcbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@
from pyspark.sql import SparkSession
import time

# rename same columns aliases
# a, a, b, b -> a, a_1, b, b_1
#
# Important for writing data where column name uniqueness is required
def dedup_columns(df):
counts = {}
new_cols = []
for c in df.columns:
if c not in counts:
counts[c] = 0
new_cols.append(c)
else:
counts[c] += 1
new_cols.append(f"{c}_{counts[c]}")
return df.toDF(*new_cols)

def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, query_num: int = None, write_path: str = None):

# Initialize a SparkSession
Expand Down Expand Up @@ -91,9 +107,19 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
df.explain()

if write_path is not None:
output_path = f"{write_path}/q{query}"
df.coalesce(1).write.mode("overwrite").parquet(output_path)
print(f"Query {query} results written to {output_path}")
# skip results with empty schema
# coming across for running DDL stmt
if len(df.columns) > 0:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark complains on saving df with empty schema, this can happen for DDL statements which came across in TPC sets

output_path = f"{write_path}/q{query}"
# rename same column names for output
# a, a, b, b => a, a_1, b, b_1
# output doesn't allow non unique column names
deduped = dedup_columns(df)
# sort by all columns to have predictable output dataset for comparison
deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path)
print(f"Query {query} results written to {output_path}")
else:
print(f"Skipping write: DataFrame has no schema for {output_path}")
else:
rows = df.collect()
print(f"Query {query} returned {len(rows)} rows")
Expand Down Expand Up @@ -132,4 +158,5 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
parser.add_argument("--write", required=False, help="Path to save query results to, in Parquet format.")
args = parser.parse_args()

main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write)
main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write)

16 changes: 16 additions & 0 deletions fuzz-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,19 @@ $SPARK_HOME/bin/spark-submit \
```

Note that the output filename is currently hard-coded as `results-${System.currentTimeMillis()}.md`

### Compare existing datasets

To compare a pair of existing datasets you can use a comparison tool.
The example below is for TPC-H queries results generated by pure Spark and Comet


```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--class org.apache.comet.fuzz.ComparisonTool
target/comet-fuzz-spark3.5_2.12-0.12.0-SNAPSHOT-jar-with-dependencies.jar \
compareParquet --input-spark-folder=/tmp/tpch/spark --input-comet-folder=/tmp/tpch/comet
```

The tool takes a pair of existing folders of the same layout and compares subfolders treating them as parquet based datasets
143 changes: 143 additions & 0 deletions fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.fuzz

import java.io.File

import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand}

import org.apache.spark.sql.{functions, SparkSession}

class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments) {
object compareParquet extends Subcommand("compareParquet") {
val inputSparkFolder: ScallopOption[String] =
opt[String](required = true, descr = "Folder with Spark produced results in Parquet format")
val inputCometFolder: ScallopOption[String] =
opt[String](required = true, descr = "Folder with Comet produced results in Parquet format")
}
addSubcommand(compareParquet)
verify()
}

object ComparisonTool {

lazy val spark: SparkSession = SparkSession
.builder()
.getOrCreate()

def main(args: Array[String]): Unit = {
val conf = new ComparisonToolConf(args.toIndexedSeq)
conf.subcommand match {
case Some(conf.compareParquet) =>
compareParquetFolders(
spark,
conf.compareParquet.inputSparkFolder(),
conf.compareParquet.inputCometFolder())

case _ =>
// scalastyle:off println
println("Invalid subcommand")
// scalastyle:on println
sys.exit(-1)
}
}

private def compareParquetFolders(
spark: SparkSession,
sparkFolderPath: String,
cometFolderPath: String): Unit = {

val output = QueryRunner.createOutputMdFile()

try {
val sparkFolder = new File(sparkFolderPath)
val cometFolder = new File(cometFolderPath)

if (!sparkFolder.exists() || !sparkFolder.isDirectory) {
throw new IllegalArgumentException(
s"Spark folder does not exist or is not a directory: $sparkFolderPath")
}

if (!cometFolder.exists() || !cometFolder.isDirectory) {
throw new IllegalArgumentException(
s"Comet folder does not exist or is not a directory: $cometFolderPath")
}

// Get all subdirectories from the Spark folder
val sparkSubfolders = sparkFolder
.listFiles()
.filter(_.isDirectory)
.map(_.getName)
.sorted

output.write("# Comparing Parquet Folders\n\n")
output.write(s"Spark folder: $sparkFolderPath\n")
output.write(s"Comet folder: $cometFolderPath\n")
output.write(s"Found ${sparkSubfolders.length} subfolders to compare\n\n")

// Compare each subfolder
sparkSubfolders.foreach { subfolderName =>
val sparkSubfolderPath = new File(sparkFolder, subfolderName)
val cometSubfolderPath = new File(cometFolder, subfolderName)

if (!cometSubfolderPath.exists() || !cometSubfolderPath.isDirectory) {
output.write(s"## Subfolder: $subfolderName\n")
output.write(
s"[WARNING] Comet subfolder not found: ${cometSubfolderPath.getAbsolutePath}\n\n")
} else {
output.write(s"## Comparing subfolder: $subfolderName\n\n")

try {
// Read Spark parquet files
spark.conf.set("spark.comet.enabled", "false")
val sparkDf = spark.read.parquet(sparkSubfolderPath.getAbsolutePath)
val sparkRows = sparkDf.orderBy(sparkDf.columns.map(functions.col): _*).collect()

// Read Comet parquet files
val cometDf = spark.read.parquet(cometSubfolderPath.getAbsolutePath)
val cometRows = cometDf.orderBy(cometDf.columns.map(functions.col): _*).collect()

// Compare the results
if (QueryComparison.assertSameRows(sparkRows, cometRows, output)) {
output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n")
}
} catch {
case e: Exception =>
output.write(
s"[ERROR] Failed to compare subfolder $subfolderName: ${e.getMessage}\n")
val sw = new java.io.StringWriter()
val p = new java.io.PrintWriter(sw)
e.printStackTrace(p)
p.close()
output.write(s"```\n${sw.toString}\n```\n\n")
}
}

output.flush()
}

output.write("\n# Comparison Complete\n")
output.write(s"Compared ${sparkSubfolders.length} subfolders\n")

} finally {
output.close()
}
}
}
Loading
Loading