Skip to content

Commit a32c298

Browse files
Timothy Jennisontjennison-work
authored andcommitted
Add continuation support to export command
1 parent 215ec10 commit a32c298

File tree

1 file changed

+44
-4
lines changed

1 file changed

+44
-4
lines changed

pipelines/internal/commands/export/export.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ var (
3434
filter = flags.String("filter", "", "the export filter")
3535
datasetName = flags.String("dataset", "", "the dataset to export to which must already exist")
3636
tableName = flags.String("table", "", "the table to export to")
37+
update = flags.Bool("update", true, "only export operations newer than those already exported")
3738
)
3839

3940
type row struct {
@@ -82,10 +83,6 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu
8283
call := service.Projects.Operations.List(path).Context(ctx)
8384
call.PageSize(256)
8485

85-
if *filter != "" {
86-
call = call.Filter(*filter)
87-
}
88-
8986
bq, err := bigquery.NewClient(ctx, project)
9087
if err != nil {
9188
return fmt.Errorf("creating BigQuery client: %v", err)
@@ -101,12 +98,27 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu
10198
return fmt.Errorf("inferring schema: %v", err)
10299
}
103100

101+
f := *filter
104102
table := dataset.Table(*tableName)
105103
if _, err := table.Metadata(ctx); err != nil {
106104
if err := table.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
107105
return fmt.Errorf("creating table: %v", err)
108106
}
107+
} else if *update {
108+
timestamp, err := latestTimestamp(ctx, bq, project)
109+
if err != nil {
110+
return fmt.Errorf("retrieving latest timestamp: %v", err)
111+
}
112+
if timestamp != "" {
113+
expr := fmt.Sprintf(`metadata.createTime > %q`, timestamp)
114+
if f != "" {
115+
f = fmt.Sprintf("(%s) AND %s", f, expr)
116+
} else {
117+
f = expr
118+
}
119+
}
109120
}
121+
call.Filter(f)
110122

111123
uploader := table.Uploader()
112124

@@ -199,3 +211,31 @@ func parseTimestamp(ts string) bigquery.NullDateTime {
199211
Valid: true,
200212
}
201213
}
214+
215+
func latestTimestamp(ctx context.Context, bq *bigquery.Client, project string) (string, error) {
216+
q := bq.Query(fmt.Sprintf("SELECT MAX(CreateTime) FROM `%s.%s.%s`", project, *datasetName, *tableName))
217+
job, err := q.Run(ctx)
218+
if err != nil {
219+
return "", fmt.Errorf("running query: %v", err)
220+
}
221+
status, err := job.Wait(ctx)
222+
if err != nil {
223+
return "", fmt.Errorf("waiting for query: %v", err)
224+
}
225+
if err := status.Err(); err != nil {
226+
return "", fmt.Errorf("query status: %v", err)
227+
}
228+
it, err := job.Read(ctx)
229+
if err != nil {
230+
return "", fmt.Errorf("reading query: %v", err)
231+
}
232+
233+
var v []bigquery.Value
234+
if err := it.Next(&v); err != nil {
235+
return "", fmt.Errorf("getting query data: %v", err)
236+
}
237+
if v[0] == nil {
238+
return "", nil
239+
}
240+
return fmt.Sprintf("%s", v[0]), nil
241+
}

0 commit comments

Comments
 (0)