Skip to content
Open
30 changes: 29 additions & 1 deletion pipelines/internal/commands/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ package run
import (
"bufio"
"context"
"crypto/rand"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
"flag"
Expand All @@ -125,6 +127,7 @@ import (
"strings"
"time"

"cloud.google.com/go/pubsub"
"github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/watch"
"github.com/googlegenomics/pipelines-tools/pipelines/internal/common"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -213,6 +216,13 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R
abort := make(chan os.Signal, 1)
signal.Notify(abort, os.Interrupt)

topic, err := newPubSubTopic(ctx, req.Pipeline.Resources.ProjectId)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to build topics in the tooling - gcloud can be used for that - I think we just want to support an optional flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is one feature that should be enabled by default. I changed the functionality to always use projects//topics/pipelines-tool if only it has the lable created-by:pipelines-tool. If the tool has any problem in using Pub/Sub it will silently switch to long pooling. Also the user can specify --pub-sub=false to opt for long pooling.

if err != nil {
return fmt.Errorf("creating Pub/Sub topic: %v", err)
}
defer topic.Delete(ctx)
req.PubSubTopic = topic.ID()

attempt := uint(1)
for {
req.Pipeline.Resources.VirtualMachine.Preemptible = (attempt <= *pvmAttempts)
Expand All @@ -236,7 +246,7 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R
return nil
}

if err := watch.Invoke(ctx, service, req.Pipeline.Resources.ProjectId, []string{lro.Name}); err != nil {
if err := watch.Invoke(ctx, service, req.Pipeline.Resources.ProjectId, []string{lro.Name, "-topic", topic.ID()}); err != nil {
if err, ok := err.(common.PipelineExecutionError); ok && err.IsRetriable() {
if attempt < *pvmAttempts+*attempts {
attempt++
Expand All @@ -250,6 +260,24 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R
}
}

func newPubSubTopic(ctx context.Context, projectID string) (*pubsub.Topic, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("creating Pub/Sub client: %v", err)
}

var id uint64
if err := binary.Read(rand.Reader, binary.LittleEndian, &id); err != nil {
return nil, fmt.Errorf("generating topic name: %v", err)
}

topic, err := client.CreateTopic(ctx, fmt.Sprintf("t%d", id))
if err != nil {
return nil, fmt.Errorf("creating topic: %v", err)
}
return topic, nil
}

func parseJSON(filename string, v interface{}) error {
f, err := os.Open(filename)
if err != nil {
Expand Down
86 changes: 70 additions & 16 deletions pipelines/internal/commands/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ package watch

import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/json"
"errors"
"flag"
"fmt"
"sync"
"time"

"cloud.google.com/go/pubsub"
"github.com/googlegenomics/pipelines-tools/pipelines/internal/common"
genomics "google.golang.org/api/genomics/v2alpha1"
)
Expand All @@ -32,16 +36,20 @@ var (

actions = flags.Bool("actions", false, "show action details")
details = flags.Bool("details", false, "show event details")
topic = flags.String("topic", "", "the Pub/Sub topic to watch")
)

func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error {
names := common.ParseFlags(flags, arguments)
if len(names) < 1 {
return errors.New("missing operation name")
}
if *topic == "" {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will still want to support polling and I think this doesn't need to be passed: when the operation data is fetched initially if there is a topic we'll start listening instead of polling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return errors.New("missing Pub/Sub topic")
}

name := common.ExpandOperationName(project, names[0])
result, err := watch(ctx, service, name)
result, err := watch(ctx, service, project, name, *topic)
if err != nil {
return fmt.Errorf("watching pipeline: %v", err)
}
Expand All @@ -54,26 +62,52 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu
return nil
}

func watch(ctx context.Context, service *genomics.Service, name string) (interface{}, error) {
func watch(ctx context.Context, service *genomics.Service, project, name, topic string) (interface{}, error) {
sub, err := newPubSubSubscription(ctx, project, topic)
if err != nil {
return nil, fmt.Errorf("creating Pub/Sub subscription: %v", err)
}
defer sub.Delete(ctx)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

var events []*genomics.Event
const initialDelay = 5 * time.Second
delay := initialDelay
for {
var response interface{}
var receiverErr error
var receiverLock sync.Mutex
err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
receiverLock.Lock()
defer receiverLock.Unlock()
m.Ack()

exit := func(r interface{}, err error) {
if ctx.Err() != nil {
return
}
response = r
receiverErr = err
cancel()
}

lro, err := service.Projects.Operations.Get(name).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("getting operation status: %v", err)
exit(nil, fmt.Errorf("getting operation status: %v", err))
return
}

var metadata genomics.Metadata
if err := json.Unmarshal(lro.Metadata, &metadata); err != nil {
return nil, fmt.Errorf("parsing metadata: %v", err)
exit(nil, fmt.Errorf("parsing metadata: %v", err))
return
}

if *actions {
*actions = false
encoded, err := json.MarshalIndent(metadata.Pipeline.Actions, "", " ")
if err != nil {
return nil, fmt.Errorf("encoding actions: %v", err)
exit(nil, fmt.Errorf("encoding actions: %v", err))
return
}
fmt.Printf("%s\n", encoded)
}
Expand All @@ -88,20 +122,40 @@ func watch(ctx context.Context, service *genomics.Service, name string) (interfa
}
}
events = metadata.Events
delay = initialDelay
}

if lro.Done {
if lro.Error != nil {
return lro.Error, nil
exit(lro.Error, nil)
return
}
return lro.Response, nil
exit(lro.Response, nil)
}
})
if err != nil && err != context.Canceled {
return nil, fmt.Errorf("receiving message: %v", err)
}
return response, receiverErr
}

time.Sleep(delay)
delay = time.Duration(float64(delay) * 1.5)
if limit := time.Minute; delay > limit {
delay = limit
}
func newPubSubSubscription(ctx context.Context, projectID, topicName string) (*pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("creating a Pub/Sub client: %v", err)
}

var id uint64
if err := binary.Read(rand.Reader, binary.LittleEndian, &id); err != nil {
return nil, fmt.Errorf("generating subscription name: %v", err)
}

sub, err := client.CreateSubscription(ctx, fmt.Sprintf("s%d", id), pubsub.SubscriptionConfig{
Topic: client.Topic(topicName),
AckDeadline: 10 * time.Second,
ExpirationPolicy: 25 * time.Hour,
})
if err != nil {
return nil, fmt.Errorf("creating subscription: %v", err)
}
return sub, nil
}