From 199beddbfeb365ed10ae0799276cb0be7964c9cd Mon Sep 17 00:00:00 2001 From: William Date: Fri, 1 Aug 2025 19:10:23 -0700 Subject: [PATCH 1/3] add workload to data submitted to db --- cmd/run/main.go | 54 +++++++++++++++++++++++++++++++++++- stage/influx_run_recorder.go | 1 + stage/mysql_run_recorder.go | 6 ++-- stage/pbench_runs_ddl.sql | 1 + stage/states.go | 1 + 5 files changed, 59 insertions(+), 4 deletions(-) diff --git a/cmd/run/main.go b/cmd/run/main.go index 14bbacd6..17af5a8a 100644 --- a/cmd/run/main.go +++ b/cmd/run/main.go @@ -53,9 +53,15 @@ func Run(_ *cobra.Command, args []string) { } else { mainStage.States.RunName = strings.ReplaceAll(mainStage.States.RunName, `%t`, mainStage.States.RunStartTime.Format(utils.DirectoryNameTimeFormat)) } + var workloads []string for _, path := range args { if st, err := processStagePath(path); err == nil { mainStage.MergeWith(st) + // Extract workload from this path + workload := extractWorkloadFromStagePath(path) + if workload != "" { + workloads = append(workloads, workload) + } if defaultRunNameBuilder != nil { if defaultRunNameBuilder.Len() > 0 { defaultRunNameBuilder.WriteByte('_') @@ -66,12 +72,27 @@ func Run(_ *cobra.Command, args []string) { os.Exit(-1) } } + + // Set the workload field (join multiple workloads with comma if there are multiple) + if len(workloads) > 0 { + // Remove duplicates and join + workloadMap := make(map[string]bool) + uniqueWorkloads := []string{} + for _, w := range workloads { + if !workloadMap[w] { + workloadMap[w] = true + uniqueWorkloads = append(uniqueWorkloads, w) + } + } + mainStage.States.Workload = strings.Join(uniqueWorkloads, ",") + } + if defaultRunNameBuilder != nil { defaultRunNameBuilder.WriteByte('_') defaultRunNameBuilder.WriteString(mainStage.States.RunStartTime.Format(utils.DirectoryNameTimeFormat)) mainStage.States.RunName = defaultRunNameBuilder.String() } - log.Info().Str("run_name", mainStage.States.RunName).Send() + log.Info().Str("run_name", mainStage.States.RunName).Str("workload", mainStage.States.Workload).Send() if _, _, err := stage.ParseStageGraph(mainStage); err != nil { log.Fatal().Err(err).Msg("failed to parse benchmark stage graph") @@ -121,3 +142,34 @@ func processStagePath(path string) (st *stage.Stage, returnErr error) { return stage.ReadStageFromFile(path) } } + +// extractWorkloadFromStagePath extracts the workload name from stage file paths +// Examples: benchmarks/clickbench/clickbench.json -> clickbench +// benchmarks/tpc-ds/tpc-ds.json -> tpc-ds +// imdb.json -> imdb +func extractWorkloadFromStagePath(path string) string { + // Get the directory name containing the stage file + dir := filepath.Dir(path) + dirName := filepath.Base(dir) + + // If the directory is "benchmarks" or current directory, use the filename + if dirName == "benchmarks" || dirName == "." { + filename := filepath.Base(path) + // Remove the .json extension + if strings.HasSuffix(filename, ".json") { + return strings.TrimSuffix(filename, ".json") + } + return filename + } + + // Check if this is a known workload directory + knownWorkloads := []string{"biday3", "bolt", "catalina", "clickbench", "imdb", "nielsen", "tpc-ds", "tpch"} + for _, workload := range knownWorkloads { + if dirName == workload { + return workload + } + } + + // If not a known workload, use the directory name + return dirName +} diff --git a/stage/influx_run_recorder.go b/stage/influx_run_recorder.go index 60a50452..f8d72a80 100644 --- a/stage/influx_run_recorder.go +++ b/stage/influx_run_recorder.go @@ -57,6 +57,7 @@ func (i *InfluxRunRecorder) RecordQuery(ctx context.Context, s *Stage, result *Q "run_name": s.States.RunName, "stage_id": result.StageId, "query_id": result.QueryId, + "workload": s.States.Workload, } fields := map[string]interface{}{ "query_index": result.Query.Index, diff --git a/stage/mysql_run_recorder.go b/stage/mysql_run_recorder.go index 7c56aca5..383b46d6 100644 --- a/stage/mysql_run_recorder.go +++ b/stage/mysql_run_recorder.go @@ -48,9 +48,9 @@ func NewMySQLRunRecorderWithDb(db *sql.DB) *MySQLRunRecorder { } func (m *MySQLRunRecorder) Start(_ context.Context, s *Stage) error { - recordNewRun := `INSERT INTO pbench_runs (run_name, cluster_fqdn, start_time, queries_ran, failed, mismatch, comment) -VALUES (?, ?, ?, 0, 0, 0, ?)` - res, err := m.db.Exec(recordNewRun, s.States.RunName, s.States.ServerFQDN, s.States.RunStartTime, s.States.Comment) + recordNewRun := `INSERT INTO pbench_runs (run_name, cluster_fqdn, start_time, queries_ran, failed, mismatch, comment, workload) +VALUES (?, ?, ?, 0, 0, 0, ?, ?)` + res, err := m.db.Exec(recordNewRun, s.States.RunName, s.States.ServerFQDN, s.States.RunStartTime, s.States.Comment, s.States.Workload) if err != nil { log.Error().Err(err).Str("run_name", s.States.RunName).Time("start_time", s.States.RunStartTime). Msg("failed to add a new run to the MySQL database") diff --git a/stage/pbench_runs_ddl.sql b/stage/pbench_runs_ddl.sql index d9987a71..1b82fddc 100644 --- a/stage/pbench_runs_ddl.sql +++ b/stage/pbench_runs_ddl.sql @@ -12,6 +12,7 @@ create table if not exists pbench_runs hidden tinyint(1) default 0 not null, comment varchar(255) null, rand_seed bigint null, + workload varchar(255) null, constraint pbench_runs_run_id unique (run_id) ); diff --git a/stage/states.go b/stage/states.go index ee3fa65a..94a7f2a0 100644 --- a/stage/states.go +++ b/stage/states.go @@ -18,6 +18,7 @@ type SharedStageStates struct { RandSeedUsed bool RunStartTime time.Time RunFinishTime time.Time + Workload string // OutputPath is where we store the logs, query results, query json files, query column metadata files, etc. // It should be set by the --output/-o command-line argument. Once set there, its value gets propagated to all the stages. OutputPath string From 8cd3ecf0be26144c38e56898ade949798ae4c8c3 Mon Sep 17 00:00:00 2001 From: William Date: Sun, 3 Aug 2025 00:45:03 -0700 Subject: [PATCH 2/3] Add migration script for workload column - Adds ALTER TABLE statement for existing databases - Includes performance index for workload column - Safe for existing installations --- stage/add_workload_column_migration.sql | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 stage/add_workload_column_migration.sql diff --git a/stage/add_workload_column_migration.sql b/stage/add_workload_column_migration.sql new file mode 100644 index 00000000..7beb1d08 --- /dev/null +++ b/stage/add_workload_column_migration.sql @@ -0,0 +1,9 @@ +-- Migration script to add workload column to existing pbench_runs table +-- Run this script on existing databases that already have the pbench_runs table + +-- Add workload column if it doesn't exist +ALTER TABLE pbench_runs +ADD COLUMN IF NOT EXISTS workload varchar(255) null; + +-- Create index on workload column for better query performance +CREATE INDEX IF NOT EXISTS pbench_runs_workload_index ON pbench_runs (workload); From 02a182eb3a9dd57fc61f22879ef2288e9694c5a2 Mon Sep 17 00:00:00 2001 From: eziur <159961542+eziur@users.noreply.github.com> Date: Sun, 3 Aug 2025 00:50:47 -0700 Subject: [PATCH 3/3] Delete stage/add_workload_column_migration.sql --- stage/add_workload_column_migration.sql | 9 --------- 1 file changed, 9 deletions(-) delete mode 100644 stage/add_workload_column_migration.sql diff --git a/stage/add_workload_column_migration.sql b/stage/add_workload_column_migration.sql deleted file mode 100644 index 7beb1d08..00000000 --- a/stage/add_workload_column_migration.sql +++ /dev/null @@ -1,9 +0,0 @@ --- Migration script to add workload column to existing pbench_runs table --- Run this script on existing databases that already have the pbench_runs table - --- Add workload column if it doesn't exist -ALTER TABLE pbench_runs -ADD COLUMN IF NOT EXISTS workload varchar(255) null; - --- Create index on workload column for better query performance -CREATE INDEX IF NOT EXISTS pbench_runs_workload_index ON pbench_runs (workload);