Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions pkg/driver/external/client/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func (d *DriverClient) CreateDisk(ctx context.Context) error {
return nil
}

// Start initiates the driver instance and receives streaming responses. It blocks until
// receiving the initial success response, then spawns a goroutine to consume subsequent
// error messages from the stream. Any errors from the driver are sent to the channel.
func (d *DriverClient) Start(ctx context.Context) (chan error, error) {
d.logger.Debug("Starting driver instance")

Expand All @@ -67,19 +70,37 @@ func (d *DriverClient) Start(ctx context.Context) (chan error, error) {
return nil, err
}

// Blocking to receive an initial response to ensure Start() is initiated
// at the server-side.
initialResp, err := stream.Recv()
if err != nil {
d.logger.WithError(err).Error("Error receiving initial response from driver start")
return nil, err
}
if !initialResp.Success {
return nil, errors.New(initialResp.Error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to stream.CloseSend() here?
The server side should also be closed, so there may be no problem, but I'm not familiar enough with gRPC to determine whether it's a problem or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server side should also be closed, so there may be no problem, but I'm not familiar enough with gRPC to determine whether it's a problem or not.

Yes, no need to close here!

}

go func() {
<-ctx.Done()
if closeErr := stream.CloseSend(); closeErr != nil {
d.logger.WithError(closeErr).Warn("Failed to close stream")
}
}()

errCh := make(chan error, 1)
go func() {
for {
errorStream, err := stream.Recv()
respStream, err := stream.Recv()
if err != nil {
d.logger.Errorf("Error receiving response from driver: %v", err)
d.logger.Infof("Error receiving response from driver: %v", err)
return
}
d.logger.Debugf("Received response: %v", errorStream)
if !errorStream.Success {
errCh <- errors.New(errorStream.Error)
d.logger.Debugf("Received response: %v", respStream)
if !respStream.Success {
errCh <- errors.New(respStream.Error)
} else {
errCh <- nil
close(errCh)
return
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/driver/external/driver.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/driver/external/driver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ message InfoResponse{
bytes info_json = 1;
}

// StartResponse is a streamed response for Start() RPC. It tries to mimic
// errChan from pkg/driver/driver.go. The server sends an initial response
// with success=true when Start() is initiated. If errors occur, they are
// sent as success=false with the error field populated. When the error channel
// closes, a final success=true message is sent.
message StartResponse {
bool success = 1;
string error = 2;
Expand Down
11 changes: 11 additions & 0 deletions pkg/driver/external/server/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ func (s *DriverServer) Start(_ *emptypb.Empty, stream pb.Driver_StartServer) err
errChan, err := s.driver.Start(stream.Context())
if err != nil {
s.logger.Errorf("Start failed: %v", err)
if sendErr := stream.Send(&pb.StartResponse{Success: false, Error: err.Error()}); sendErr != nil {
s.logger.Errorf("Failed to send error response: %v", sendErr)
return status.Errorf(codes.Internal, "failed to send error response: %v", sendErr)
}
return status.Errorf(codes.Internal, "failed to start driver: %v", err)
}

// First send a success response upon receiving the errChan to unblock the client
// and start receiving errors (if any).
if err := stream.Send(&pb.StartResponse{Success: true}); err != nil {
s.logger.Errorf("Failed to send success response: %v", err)
return status.Errorf(codes.Internal, "failed to send success response: %v", err)
}

for {
select {
case err, ok := <-errChan:
Expand Down
3 changes: 3 additions & 0 deletions pkg/driver/external/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ func handlePreConfiguredDriverAction(ctx context.Context, driver driver.Driver)
}
}

// Start begins the driver startup process. It sends an initial response to unblock
// the client and then streams subsequent errors(if any), as the driver initializes.
// A final success message is streamed upon successful completion.
func Start(extDriver *registry.ExternalDriver, instName string) error {
extDriver.Logger.Debugf("Starting external driver at %s", extDriver.Path)
if instName == "" {
Expand Down
Loading