From a3d9ac460d3ad755c49dcdd8ba517bb0b64b61a4 Mon Sep 17 00:00:00 2001 From: Timothy Jennison Date: Thu, 18 Oct 2018 14:57:34 -0400 Subject: [PATCH] Add support for exporting v1 operations --- client/client.go | 82 +++++++ export/export/export.go | 212 +++++++++++++++++++ export/main.go | 195 +++++++++++++++++ pipelines/internal/commands/export/export.go | 190 +++-------------- pipelines/main.go | 70 +----- 5 files changed, 533 insertions(+), 216 deletions(-) create mode 100644 client/client.go create mode 100644 export/export/export.go create mode 100644 export/main.go diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..0b28318 --- /dev/null +++ b/client/client.go @@ -0,0 +1,82 @@ +// Copyright 2019 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package genomics provides methods for connecting to the Genomics service. +package client + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + "strings" + "time" + + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" +) + +func NewClient(ctx context.Context, scope, basePath string) (*http.Client, error) { + var transport robustTransport + + // When connecting to a local server (for Google developers only) disable SSL + // verification since the certificates are not easily verifiable. + if strings.HasPrefix(basePath, "https://localhost:") { + transport.Base.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + } + + ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{Transport: &transport}) + + client, err := google.DefaultClient(ctx, scope) + if err != nil { + return nil, fmt.Errorf("creating authenticated client: %v", err) + } + + return client, nil +} + +type robustTransport struct { + Base http.Transport +} + +func (rt *robustTransport) RoundTrip(req *http.Request) (*http.Response, error) { + delay := time.Second + + var errors []string + for { + resp, err := rt.roundTrip(req) + if err == nil { + return resp, nil + } + errors = append(errors, fmt.Sprintf("attempt %d: %v", len(errors)+1, err)) + if len(errors) == 3 { + return resp, fmt.Errorf("%d failed requests: %v", len(errors), strings.Join(errors, ", ")) + } + + delay *= 2 + time.Sleep(delay) + } +} + +func (rt *robustTransport) roundTrip(req *http.Request) (*http.Response, error) { + resp, err := rt.Base.RoundTrip(req) + if err != nil { + return nil, err + } + switch resp.StatusCode { + case http.StatusServiceUnavailable, http.StatusBadGateway, http.StatusGatewayTimeout: + return nil, fmt.Errorf("retryable HTTP error: %q", resp.Status) + } + return resp, err +} diff --git a/export/export/export.go b/export/export/export.go new file mode 100644 index 0000000..60e33b3 --- /dev/null +++ b/export/export/export.go @@ -0,0 +1,212 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package export provides helpers for exporting pipelines to BigQuery. +package export + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "time" + + "cloud.google.com/go/bigquery" + "google.golang.org/api/iterator" +) + +type Exporter struct { + Filter string + Count int + + bq *bigquery.Client + table *bigquery.Table + schema bigquery.Schema + + buffer bytes.Buffer + encoder *json.Encoder +} + +func NewExporter(ctx context.Context, project, filter, datasetName, tableName string, update bool, addTimestamp func(string, time.Time) string) (*Exporter, error) { + bq, err := bigquery.NewClient(ctx, project) + if err != nil { + return nil, fmt.Errorf("creating BigQuery client: %v", err) + } + + dataset := bq.Dataset(datasetName) + if _, err := dataset.Metadata(ctx); err != nil { + return nil, fmt.Errorf("looking up dataset: %v", err) + } + + schema, err := bigquery.InferSchema(Row{}) + if err != nil { + return nil, fmt.Errorf("inferring schema: %v", err) + } + + f := filter + table := dataset.Table(tableName) + if _, err := table.Metadata(ctx); err != nil { + if err := table.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + return nil, fmt.Errorf("creating table: %v", err) + } + } else if update { + timestamp, err := latestTimestamp(ctx, bq, project, datasetName, tableName) + if err != nil { + return nil, fmt.Errorf("retrieving latest timestamp: %v", err) + } + if timestamp != nil { + f = addTimestamp(f, *timestamp) + } + } + + fmt.Printf("Exporting operations") + + return &Exporter{Filter: f, bq: bq, table: table, schema: schema}, nil +} + +func (e *Exporter) StartPage() { + e.buffer.Reset() + e.encoder = json.NewEncoder(&e.buffer) + e.encoder.SetEscapeHTML(false) +} + +func (e *Exporter) Encode(r *Row) error { + e.Count++ + return e.encoder.Encode(r) +} + +func (e *Exporter) FinishPage(ctx context.Context) error { + fmt.Printf(".") + + if e.buffer.Len() == 0 { + return nil + } + + source := bigquery.NewReaderSource(&e.buffer) + source.Schema = e.schema + source.SourceFormat = bigquery.JSON + loader := e.table.LoaderFrom(source) + + job, err := loader.Run(ctx) + if err != nil { + return fmt.Errorf("running loader (after %d operations): %v", e.Count, err) + } + + status, err := job.Wait(ctx) + if err != nil { + return fmt.Errorf("waiting for job (after %d operations): %v", e.Count, err) + } + + if err := status.Err(); err != nil { + for _, e := range status.Errors { + fmt.Println(e) + } + return fmt.Errorf("job status (after %d operations): %v", e.Count, err) + } + + return nil +} + +func (e *Exporter) Finish() { + fmt.Printf("done\n%d operations exported\n", e.Count) +} + +type Row struct { + Name string + Done bool + Error *Status `bigquery:",nullable"` + + // The raw pipeline JSON. + Pipeline string + + Labels []Label + Events []Event + + CreateTime time.Time + StartTime, EndTime bigquery.NullTimestamp + + // Additional fields pulled out of the pipeline for convenience. + Regions []string + Zones []string + MachineType string + Preemptible bool +} + +type Status struct { + Message string + Code int64 +} + +type Event struct { + Timestamp time.Time + Description string +} + +type Label struct { + Key, Value string +} + +func ParseTimestamp(ts string) bigquery.NullTimestamp { + t, err := time.Parse(time.RFC3339Nano, ts) + if err != nil { + return bigquery.NullTimestamp{} + } + return bigquery.NullTimestamp{ + Timestamp: t, + Valid: true, + } +} + +func CombineTerms(req, opt, format string) string { + if opt == "" { + return req + } + return fmt.Sprintf(format, req, opt) +} + +func latestTimestamp(ctx context.Context, bq *bigquery.Client, project, dataset, table string) (*time.Time, error) { + // Query in micros otherwise BigQuery returns the timestamp as a double. + q := bq.Query(fmt.Sprintf("SELECT UNIX_MICROS(MAX(CreateTime)) FROM `%s.%s.%s`", project, dataset, table)) + job, err := q.Run(ctx) + if err != nil { + return nil, fmt.Errorf("running query: %v", err) + } + status, err := job.Wait(ctx) + if err != nil { + return nil, fmt.Errorf("waiting for query: %v", err) + } + if err := status.Err(); err != nil { + return nil, fmt.Errorf("query status: %v", err) + } + it, err := job.Read(ctx) + if err != nil { + return nil, fmt.Errorf("reading query: %v", err) + } + + var v []bigquery.Value + err = it.Next(&v) + if err == iterator.Done { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("getting query data: %v", err) + } + + micros, ok := v[0].(int64) + if !ok { + return nil, fmt.Errorf("unexpected timestamp type: %T", v[0]) + } + t := time.Unix(0, micros*int64(time.Microsecond)).UTC() + return &t, nil +} diff --git a/export/main.go b/export/main.go new file mode 100644 index 0000000..1abead8 --- /dev/null +++ b/export/main.go @@ -0,0 +1,195 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This tool exports pipelines operations to BigQuery. +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "strings" + "time" + + "github.com/googlegenomics/pipelines-tools/client" + "github.com/googlegenomics/pipelines-tools/export/export" + + genomics "google.golang.org/api/genomics/v1alpha2" +) + +var ( + project = flag.String("project", defaultProject(), "the cloud project name") + basePath = flag.String("api", "", "the API base to use") + + filter = flag.String("filter", "", "the export filter") + dataset = flag.String("dataset", "", "the dataset to export to which must already exist") + table = flag.String("table", "", "the table to export to") + update = flag.Bool("update", true, "only export operations newer than those already exported") +) + +func main() { + flag.Parse() + + if *project == "" { + exitf("You must specify a project with --project") + } + + if *dataset == "" || *table == "" { + exitf("You must specify a dataset and table with --dataset and --table") + } + + ctx := context.Background() + service, err := newService(ctx, *basePath) + if err != nil { + exitf("Failed to create service: %v", err) + } + + e, err := export.NewExporter(ctx, *project, *filter, *dataset, *table, *update, func(filter string, t time.Time) string { + return export.CombineTerms(fmt.Sprintf("createTime >= %d", t.Unix()+1), filter, "%s AND %s") + }) + if err != nil { + exitf("Failed to initialize exporter: %v", err) + } + + f := export.CombineTerms("projectId = "+*project, e.Filter, "%s AND %s") + + call := service.Operations.List("operations").Context(ctx).PageSize(256).Filter(f) + err = call.Pages(ctx, func(resp *genomics.ListOperationsResponse) error { + e.StartPage() + + for _, operation := range resp.Operations { + var metadata genomics.OperationMetadata + if err := json.Unmarshal(operation.Metadata, &metadata); err != nil { + return fmt.Errorf("unmarshalling operation (after %d operations): %v", e.Count, err) + } + + if metadata.Request == nil { + continue + } + + var t struct { + Type string `json:"@type"` + } + if err := json.Unmarshal(metadata.Request, &t); err != nil { + return fmt.Errorf("unmarshalling request type (after %d operations): %v", e.Count, err) + } + if !strings.HasSuffix(t.Type, ".RunPipelineRequest") { + continue + } + + var request genomics.RunPipelineRequest + if err := json.Unmarshal(metadata.Request, &request); err != nil { + return fmt.Errorf("unmarshalling request (after %d operations): %v", e.Count, err) + } + + var zones []string + var preemptible bool + if request.EphemeralPipeline != nil && request.EphemeralPipeline.Resources != nil { + zones = request.EphemeralPipeline.Resources.Zones + preemptible = request.EphemeralPipeline.Resources.Preemptible + } + if request.PipelineArgs != nil && request.PipelineArgs.Resources != nil { + zones = request.PipelineArgs.Resources.Zones + preemptible = request.PipelineArgs.Resources.Preemptible + } + + r := export.Row{ + Name: operation.Name, + Done: operation.Done, + CreateTime: export.ParseTimestamp(metadata.CreateTime).Timestamp, + StartTime: export.ParseTimestamp(metadata.StartTime), + EndTime: export.ParseTimestamp(metadata.EndTime), + + Pipeline: string(metadata.Request), + Zones: zones, + Preemptible: preemptible, + } + + if metadata.RuntimeMetadata != nil { + var runtime genomics.RuntimeMetadata + if err := json.Unmarshal(metadata.RuntimeMetadata, &runtime); err != nil { + return fmt.Errorf("unmarshalling request (after %d operations): %v", e.Count, err) + } + + if runtime.ComputeEngine != nil { + parts := strings.Split(runtime.ComputeEngine.MachineType, "/") + r.MachineType = parts[1] + } + } + + if operation.Error != nil { + r.Error = &export.Status{ + Message: operation.Error.Message, + Code: operation.Error.Code, + } + } + + if request.PipelineArgs != nil { + for k, v := range request.PipelineArgs.Labels { + r.Labels = append(r.Labels, export.Label{Key: k, Value: v}) + } + } + + for _, e := range metadata.Events { + r.Events = append(r.Events, export.Event{ + Timestamp: export.ParseTimestamp(e.StartTime).Timestamp, + Description: e.Description, + }) + } + + if err := e.Encode(&r); err != nil { + return fmt.Errorf("encoding row (after %d operations): %v", e.Count, err) + } + } + + if err := e.FinishPage(ctx); err != nil { + return fmt.Errorf("finishing page (after %d operations): %v", e.Count, err) + } + + return nil + }) + if err != nil { + exitf("Failed to export operations: %v", err) + } + + e.Finish() +} + +func newService(ctx context.Context, basePath string) (*genomics.Service, error) { + c, err := client.NewClient(ctx, genomics.GenomicsScope, basePath) + if err != nil { + return nil, fmt.Errorf("creating client: %v", err) + } + + service, err := genomics.New(c) + if err != nil { + return nil, fmt.Errorf("creating service object: %v", err) + } + if basePath != "" { + service.BasePath = basePath + } + return service, nil +} + +func exitf(format string, arguments ...interface{}) { + fmt.Fprintf(os.Stderr, format, arguments...) + fmt.Fprintln(os.Stderr) + os.Exit(1) +} + +func defaultProject() string { + return os.Getenv("GOOGLE_CLOUD_PROJECT") +} diff --git a/pipelines/internal/commands/export/export.go b/pipelines/internal/commands/export/export.go index 396b1ce..2d290f1 100644 --- a/pipelines/internal/commands/export/export.go +++ b/pipelines/internal/commands/export/export.go @@ -23,137 +23,56 @@ import ( "fmt" "time" - "cloud.google.com/go/bigquery" - "cloud.google.com/go/civil" + "github.com/googlegenomics/pipelines-tools/export/export" genomics "google.golang.org/api/genomics/v2alpha1" ) var ( flags = flag.NewFlagSet("", flag.ExitOnError) - filter = flags.String("filter", "", "the export filter") - datasetName = flags.String("dataset", "", "the dataset to export to which must already exist") - tableName = flags.String("table", "", "the table to export to") - update = flags.Bool("update", true, "only export operations newer than those already exported") + filter = flags.String("filter", "", "the export filter") + dataset = flags.String("dataset", "", "the dataset to export to which must already exist") + table = flags.String("table", "", "the table to export to") + update = flags.Bool("update", true, "only export operations newer than those already exported") ) -type row struct { - Name string - Done bool - Error *status `bigquery:",nullable"` - - // The raw pipeline JSON. - Pipeline string - - Labels []label - Events []event - - CreateTime civil.DateTime - StartTime, EndTime bigquery.NullDateTime - - // Additional fields pulled out of the pipeline for convenience. - Regions []string - Zones []string - MachineType string - Preemptible bool -} - -type status struct { - Message string - Code int64 -} - -type event struct { - Timestamp civil.DateTime - Description string -} - -type label struct { - Key, Value string -} - func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error { flags.Parse(arguments) - if *datasetName == "" || *tableName == "" { + if *dataset == "" || *table == "" { return errors.New("dataset and table are required") } - path := fmt.Sprintf("projects/%s/operations", project) - call := service.Projects.Operations.List(path).Context(ctx) - call.PageSize(256) - - bq, err := bigquery.NewClient(ctx, project) - if err != nil { - return fmt.Errorf("creating BigQuery client: %v", err) - } - - dataset := bq.Dataset(*datasetName) - if _, err := dataset.Metadata(ctx); err != nil { - return fmt.Errorf("looking up dataset: %v", err) - } - - schema, err := bigquery.InferSchema(row{}) + e, err := export.NewExporter(ctx, project, *filter, *dataset, *table, *update, func(filter string, t time.Time) string { + return export.CombineTerms(fmt.Sprintf("metadata.createTime > %q", t.Format(time.RFC3339Nano)), filter, "%s AND (%s)") + }) if err != nil { - return fmt.Errorf("inferring schema: %v", err) - } - - f := *filter - table := dataset.Table(*tableName) - if _, err := table.Metadata(ctx); err != nil { - if err := table.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { - return fmt.Errorf("creating table: %v", err) - } - } else if *update { - timestamp, err := latestTimestamp(ctx, bq, project) - if err != nil { - return fmt.Errorf("retrieving latest timestamp: %v", err) - } - if timestamp != "" { - expr := fmt.Sprintf(`metadata.createTime > %q`, timestamp) - if f != "" { - f = fmt.Sprintf("(%s) AND %s", f, expr) - } else { - f = expr - } - } + return fmt.Errorf("creating exporter: %v", err) } - call.Filter(f) - - uploader := table.Uploader() - - fmt.Printf("Exporting operations") - var count int - var pageToken string - for { - resp, err := call.PageToken(pageToken).Do() - if err != nil { - return fmt.Errorf("calling list (after %d operations): %v", count, err) - } - - fmt.Printf(".") + path := fmt.Sprintf("projects/%s/operations", project) + call := service.Projects.Operations.List(path).Context(ctx).PageSize(256).Filter(e.Filter) + err = call.Pages(ctx, func(resp *genomics.ListOperationsResponse) error { + e.StartPage() - var savers []*bigquery.StructSaver for _, operation := range resp.Operations { var metadata genomics.Metadata if err := json.Unmarshal(operation.Metadata, &metadata); err != nil { - return fmt.Errorf("unmarshalling operation (after %d operations): %v", count, err) - return err + return fmt.Errorf("unmarshalling operation (after %d operations): %v", e.Count, err) } pipeline, err := json.Marshal(metadata.Pipeline) if err != nil { - return fmt.Errorf("marshalling pipeline (after %d operations): %v", count, err) + return fmt.Errorf("marshalling pipeline (after %d operations): %v", e.Count, err) } resources := metadata.Pipeline.Resources - r := row{ + r := export.Row{ Name: operation.Name, Done: operation.Done, - CreateTime: parseTimestamp(metadata.CreateTime).DateTime, - StartTime: parseTimestamp(metadata.StartTime), - EndTime: parseTimestamp(metadata.EndTime), + CreateTime: export.ParseTimestamp(metadata.CreateTime).Timestamp, + StartTime: export.ParseTimestamp(metadata.StartTime), + EndTime: export.ParseTimestamp(metadata.EndTime), Pipeline: string(pipeline), Regions: resources.Regions, @@ -163,79 +82,38 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu } if operation.Error != nil { - r.Error = &status{ + r.Error = &export.Status{ Message: operation.Error.Message, Code: operation.Error.Code, } } for k, v := range metadata.Labels { - r.Labels = append(r.Labels, label{Key: k, Value: v}) + r.Labels = append(r.Labels, export.Label{Key: k, Value: v}) } for _, e := range metadata.Events { - r.Events = append(r.Events, event{ - Timestamp: parseTimestamp(e.Timestamp).DateTime, + r.Events = append(r.Events, export.Event{ + Timestamp: export.ParseTimestamp(e.Timestamp).Timestamp, Description: e.Description, }) } - savers = append(savers, &bigquery.StructSaver{ - Struct: r, - InsertID: operation.Name, - Schema: schema, - }) - count++ - } - - if err := uploader.Put(ctx, savers); err != nil { - return fmt.Errorf("uploading rows (after %d operations): %v", count, err) + if err := e.Encode(&r); err != nil { + return fmt.Errorf("encoding row (after %d operations): %v", e.Count, err) + } } - if resp.NextPageToken == "" { - fmt.Printf("done\n%d operations exported\n", count) - return nil + if err := e.FinishPage(ctx); err != nil { + return fmt.Errorf("finishing page (after %d operations): %v", e.Count, err) } - pageToken = resp.NextPageToken - } -} - -func parseTimestamp(ts string) bigquery.NullDateTime { - t, err := time.Parse(time.RFC3339, ts) + return nil + }) if err != nil { - return bigquery.NullDateTime{} - } - return bigquery.NullDateTime{ - DateTime: civil.DateTimeOf(t), - Valid: true, - } -} - -func latestTimestamp(ctx context.Context, bq *bigquery.Client, project string) (string, error) { - q := bq.Query(fmt.Sprintf("SELECT MAX(CreateTime) FROM `%s.%s.%s`", project, *datasetName, *tableName)) - job, err := q.Run(ctx) - if err != nil { - return "", fmt.Errorf("running query: %v", err) - } - status, err := job.Wait(ctx) - if err != nil { - return "", fmt.Errorf("waiting for query: %v", err) - } - if err := status.Err(); err != nil { - return "", fmt.Errorf("query status: %v", err) - } - it, err := job.Read(ctx) - if err != nil { - return "", fmt.Errorf("reading query: %v", err) + return fmt.Errorf("exporting operations: %v", err) } - var v []bigquery.Value - if err := it.Next(&v); err != nil { - return "", fmt.Errorf("getting query data: %v", err) - } - if v[0] == nil { - return "", nil - } - return fmt.Sprintf("%s", v[0]), nil + e.Finish() + return nil } diff --git a/pipelines/main.go b/pipelines/main.go index 4685c57..161a963 100644 --- a/pipelines/main.go +++ b/pipelines/main.go @@ -17,22 +17,17 @@ package main import ( "context" - "crypto/tls" "flag" "fmt" - "net/http" "os" - "strings" - "time" + "github.com/googlegenomics/pipelines-tools/client" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/cancel" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/export" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/query" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/run" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/watch" - "golang.org/x/oauth2" - "golang.org/x/oauth2/google" genomics "google.golang.org/api/genomics/v2alpha1" ) @@ -81,29 +76,13 @@ func main() { } } -func exitf(format string, arguments ...interface{}) { - fmt.Fprintf(os.Stderr, format, arguments...) - fmt.Fprintln(os.Stderr) - os.Exit(1) -} - func newService(ctx context.Context, basePath string) (*genomics.Service, error) { - var transport robustTransport - - // When connecting to a local server (for Google developers only) disable SSL - // verification since the certificates are not easily verifiable. - if strings.HasPrefix(basePath, "https://localhost:") { - transport.Base.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - } - - ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{Transport: &transport}) - - client, err := google.DefaultClient(ctx, genomics.GenomicsScope) + c, err := client.NewClient(ctx, genomics.GenomicsScope, basePath) if err != nil { - return nil, fmt.Errorf("creating authenticated client: %v", err) + return nil, fmt.Errorf("creating client: %v", err) } - service, err := genomics.New(client) + service, err := genomics.New(c) if err != nil { return nil, fmt.Errorf("creating service object: %v", err) } @@ -113,41 +92,12 @@ func newService(ctx context.Context, basePath string) (*genomics.Service, error) return service, nil } -func defaultProject() string { - return os.Getenv("GOOGLE_CLOUD_PROJECT") -} - -type robustTransport struct { - Base http.Transport -} - -func (rt *robustTransport) RoundTrip(req *http.Request) (*http.Response, error) { - delay := time.Second - - var errors []string - for { - resp, err := rt.roundTrip(req) - if err == nil { - return resp, nil - } - errors = append(errors, fmt.Sprintf("attempt %d: %v", len(errors)+1, err)) - if len(errors) == 3 { - return resp, fmt.Errorf("%d failed requests: %v", len(errors), strings.Join(errors, ", ")) - } - - delay *= 2 - time.Sleep(delay) - } +func exitf(format string, arguments ...interface{}) { + fmt.Fprintf(os.Stderr, format, arguments...) + fmt.Fprintln(os.Stderr) + os.Exit(1) } -func (rt *robustTransport) roundTrip(req *http.Request) (*http.Response, error) { - resp, err := rt.Base.RoundTrip(req) - if err != nil { - return nil, err - } - switch resp.StatusCode { - case http.StatusServiceUnavailable, http.StatusBadGateway, http.StatusGatewayTimeout: - return nil, fmt.Errorf("retryable HTTP error: %q", resp.Status) - } - return resp, err +func defaultProject() string { + return os.Getenv("GOOGLE_CLOUD_PROJECT") }