Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions stage/stage_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"pbench/log"
"pbench/presto"
"pbench/utils"
"strconv"
"strings"
"time"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -275,6 +277,38 @@ func (s *Stage) propagateStates() {
}
}

func uploadToS3(localFilePath, queryId string) error {
clusterName := os.Getenv("PRESTO_CLUSTER_NAME")
if clusterName == "" {
return fmt.Errorf("environment variable PRESTO_CLUSTER_NAME is not set")
}
s3BucketId := os.Getenv("PRESTO_S3_BUCKET_ID")
if s3BucketId == "" {
return fmt.Errorf("environment variable PRESTO_S3_BUCKET_ID is not set")
}

cleanedFilePath := strings.Replace(localFilePath, ".error", "", -1)

// Path is s3://{s3BucketId}/clusters/{clusterName}/queries/{queryId}.json
s3Path := fmt.Sprintf("s3://%s/clusters/%s/queries/%s.json", s3BucketId, clusterName, queryId)

fmt.Printf("Uploading file: %s to S3 path: %s\n", cleanedFilePath, s3Path)

cmd := exec.Command("aws", "s3", "cp", cleanedFilePath, s3Path)

cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

err := cmd.Run()
if err != nil {
fmt.Printf("Failed to upload file: %s to S3. Error: %v\n", cleanedFilePath, err)
return fmt.Errorf("failed to upload file to S3: %w", err)
}

fmt.Printf("File successfully uploaded to S3: %s\n", s3Path)
return nil
}

func (s *Stage) saveQueryJsonFile(result *QueryResult) {
// We do not save json file when saveJson is false. But when there is an error, we always save the json file.
if !*s.SaveJson && result.QueryError == nil {
Expand All @@ -301,9 +335,8 @@ func (s *Stage) saveQueryJsonFile(result *QueryResult) {
}
}
if result.QueryError != nil {
queryErrorFile, err := os.OpenFile(
filepath.Join(s.States.OutputPath, querySourceStr)+".error.json",
utils.OpenNewFileFlags, 0644)
queryErrorFilePath := filepath.Join(s.States.OutputPath, querySourceStr) + ".error.json"
queryErrorFile, err := os.OpenFile(queryErrorFilePath, utils.OpenNewFileFlags, 0644)
checkErr(err)
if err == nil {
bytes, e := json.MarshalIndent(result.QueryError, "", " ")
Expand All @@ -315,6 +348,14 @@ func (s *Stage) saveQueryJsonFile(result *QueryResult) {
}
checkErr(e)
checkErr(queryErrorFile.Close())

// Upload the query error JSON file to S3
err := uploadToS3(queryErrorFilePath, result.QueryId)
if err == nil {
log.Info().Msgf("Successfully uploaded failed query JSON to S3: %s", queryErrorFilePath)
} else {
log.Error().Err(err).Msgf("Failed to upload failed query JSON to S3: %s", queryErrorFilePath)
}
}
}
s.States.wgExitMainStage.Done()
Expand Down