From 7237685c791e28702eb24ec8f64a161d8230f768 Mon Sep 17 00:00:00 2001 From: Seungjae Yoo Date: Mon, 10 Nov 2025 10:50:30 +0900 Subject: [PATCH 1/3] DeleteHosts should wait for the operation result --- pkg/client/client.go | 10 +++++++++- pkg/client/client_test.go | 23 ++++++++++------------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index e5eec526..0a75906a 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -158,7 +158,15 @@ func (c *clientImpl) DeleteHosts(names []string) error { wg.Add(1) go func(name string) { defer wg.Done() - if err := c.httpHelper.NewDeleteRequest("/hosts/" + name).JSONResDo(nil); err != nil { + var op apiv1.Operation + if err := c.httpHelper.NewDeleteRequest("/hosts/" + name).JSONResDo(&op); err != nil { + mu.Lock() + defer mu.Unlock() + merr = multierror.Append(merr, fmt.Errorf("delete host %q failed: %w", name, err)) + return + } + ins := &apiv1.HostInstance{} + if err := c.waitForOperation(&op, ins); err != nil { mu.Lock() defer mu.Unlock() merr = multierror.Append(merr, fmt.Errorf("delete host %q failed: %w", name, err)) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 0aa15d3d..ef3b29e7 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -19,7 +19,6 @@ import ( "io" "net/http" "net/http/httptest" - "regexp" "testing" apiv1 "github.com/google/cloud-android-orchestration/api/v1" @@ -28,19 +27,17 @@ import ( ) func TestDeleteHosts(t *testing.T) { - existingNames := map[string]struct{}{"bar": {}, "baz": {}} ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "DELETE" { - panic("unexpected method: " + r.Method) - } - re := regexp.MustCompile(`^/hosts/(.*)$`) - matches := re.FindStringSubmatch(r.URL.Path) - if len(matches) != 2 { - panic("unexpected path: " + r.URL.Path) - } - if _, ok := existingNames[matches[1]]; ok { - writeOK(w, "") - } else { + switch ep := r.Method + " " + r.URL.Path; ep { + case "DELETE /hosts/bar": + writeOK(w, apiv1.Operation{Name: "deletingbar"}) + case "DELETE /hosts/baz": + writeOK(w, apiv1.Operation{Name: "deletingbaz"}) + case "POST /operations/deletingbar/:wait": + writeOK(w, apiv1.HostInstance{Name: "bar"}) + case "POST /operations/deletingbaz/:wait": + writeOK(w, apiv1.HostInstance{Name: "baz"}) + default: writeErr(w, 404) } })) From ab60bf10bfa5bcb31b3e9274b14ffe42a871743b Mon Sep 17 00:00:00 2001 From: Seungjae Yoo Date: Mon, 10 Nov 2025 10:51:56 +0900 Subject: [PATCH 2/3] DockerIM should use operation mechanism properly --- pkg/app/instances/docker.go | 242 ++++++++++++++++++------------- pkg/app/instances/docker_test.go | 54 ------- 2 files changed, 139 insertions(+), 157 deletions(-) delete mode 100644 pkg/app/instances/docker_test.go diff --git a/pkg/app/instances/docker.go b/pkg/app/instances/docker.go index 849b8737..7a7d1063 100644 --- a/pkg/app/instances/docker.go +++ b/pkg/app/instances/docker.go @@ -35,6 +35,7 @@ import ( "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/volume" "github.com/docker/docker/client" + "github.com/google/uuid" ) const DockerIMType IMType = "docker" @@ -54,18 +55,12 @@ const uaMountTarget = "/var/lib/cuttlefish-common/userartifacts" // Docker implementation of the instance manager. type DockerInstanceManager struct { - Config Config - Client *client.Client - mutexes sync.Map + Config Config + Client *client.Client + mutexes sync.Map + operations sync.Map } -type OPType string - -const ( - CreateHostOPType OPType = "createhost" - DeleteHostOPType OPType = "deletehost" -) - func NewDockerInstanceManager(cfg Config, cli *client.Client) *DockerInstanceManager { return &DockerInstanceManager{ Config: cfg, @@ -85,26 +80,14 @@ func (m *DockerInstanceManager) CreateHost(zone string, _ *apiv1.CreateHostReque if zone != "local" { return nil, errors.NewBadRequestError("Invalid zone. It should be 'local'.", nil) } - mu := m.getRWMutex(user) - mu.RLock() - defer mu.RUnlock() - ctx := context.TODO() - if err := m.downloadDockerImageIfNeeded(ctx); err != nil { - return nil, fmt.Errorf("failed to retrieve docker image name: %w", err) - } - // A docker volume is shared across all hosts under each user. If no volume - // exists for given user, create it. - if err := m.createDockerVolumeIfNeeded(ctx, user); err != nil { - return nil, fmt.Errorf("failed to prepare docker volume: %w", err) - } - host, err := m.createDockerContainer(ctx, user) - if err != nil { - return nil, fmt.Errorf("failed to prepare docker container: %w", err) - } - return &apiv1.Operation{ - Name: EncodeOperationName(CreateHostOPType, host), - Done: true, - }, nil + op := m.newOperation() + go func() { + val, err := m.createHost(user) + if opErr := m.completeOperation(op.Name, &operationResult{Error: err, Value: val}); opErr != nil { + log.Printf("error completing operation %q: %v\n", op.Name, opErr) + } + }() + return &op, nil } func (m *DockerInstanceManager) ListHosts(zone string, user accounts.User, _ *ListHostsRequest) (*apiv1.ListHostsResponse, error) { @@ -141,88 +124,25 @@ func (m *DockerInstanceManager) DeleteHost(zone string, user accounts.User, host if zone != "local" { return nil, errors.NewBadRequestError("Invalid zone. It should be 'local'.", nil) } - ctx := context.TODO() - if err := m.deleteDockerContainer(ctx, user, host); err != nil { - return nil, fmt.Errorf("failed to delete docker container: %w", err) - } - // A docker volume is shared across all hosts under each user. If no host - // exists for given user, delete volume afterwards to cleanup. - if err := m.deleteDockerVolumeIfNeeded(ctx, user); err != nil { - return nil, fmt.Errorf("failed to cleanup docker volume: %w", err) - } - return &apiv1.Operation{ - Name: EncodeOperationName(DeleteHostOPType, host), - Done: true, - }, nil -} - -func EncodeOperationName(opType OPType, host string) string { - return string(opType) + "_" + host -} - -func DecodeOperationName(name string) (OPType, string, error) { - words := strings.SplitN(name, "_", 2) - if len(words) == 2 { - return OPType(words[0]), words[1], nil - } else { - return "", "", errors.NewBadRequestError(fmt.Sprintf("cannot parse operation from %q", name), nil) - } -} - -func (m *DockerInstanceManager) waitCreateHostOperation(host string) (*apiv1.HostInstance, error) { - ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Minute) - defer cancel() - for { - select { - case <-ctx.Done(): - return nil, errors.NewServiceUnavailableError("Wait for operation timed out", nil) - default: - res, err := m.Client.ContainerInspect(ctx, host) - if err != nil { - return nil, fmt.Errorf("failed to inspect docker container: %w", err) - } - if res.State.Running { - return &apiv1.HostInstance{ - Name: host, - }, nil - } - time.Sleep(time.Second) + op := m.newOperation() + go func() { + val, err := m.deleteHost(user, host) + if opErr := m.completeOperation(op.Name, &operationResult{Error: err, Value: val}); opErr != nil { + log.Printf("error completing operation %q: %v\n", op.Name, opErr) } - } -} - -func (m *DockerInstanceManager) waitDeleteHostOperation(host string) (*apiv1.HostInstance, error) { - ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Minute) - defer cancel() - resCh, errCh := m.Client.ContainerWait(ctx, host, "") - select { - case <-ctx.Done(): - return nil, errors.NewServiceUnavailableError("Wait for operation timed out", nil) - case err := <-errCh: - return nil, fmt.Errorf("error is thrown while waiting for deleting host: %w", err) - case <-resCh: - return &apiv1.HostInstance{ - Name: host, - }, nil - } + }() + return &op, nil } func (m *DockerInstanceManager) WaitOperation(zone string, _ accounts.User, name string) (any, error) { if zone != "local" { return nil, errors.NewBadRequestError("Invalid zone. It should be 'local'.", nil) } - opType, host, err := DecodeOperationName(name) + val, err := m.waitOperation(name, 3*time.Minute) if err != nil { return nil, err } - switch opType { - case CreateHostOPType: - return m.waitCreateHostOperation(host) - case DeleteHostOPType: - return m.waitDeleteHostOperation(host) - default: - return nil, errors.NewBadRequestError(fmt.Sprintf("operation type %s not found.", opType), nil) - } + return val.Value, val.Error } func (m *DockerInstanceManager) getIpAddr(container *types.Container) (string, error) { @@ -318,6 +238,28 @@ func (m *DockerInstanceManager) getContainerLabel(host string, key string) (stri return value, nil } +func (m *DockerInstanceManager) createHost(user accounts.User) (*apiv1.HostInstance, error) { + mu := m.getRWMutex(user) + mu.RLock() + defer mu.RUnlock() + ctx := context.TODO() + if err := m.downloadDockerImageIfNeeded(ctx); err != nil { + return nil, fmt.Errorf("failed to retrieve docker image name: %w", err) + } + // A docker volume is shared across all hosts under each user. If no volume + // exists for given user, create it. + if err := m.createDockerVolumeIfNeeded(ctx, user); err != nil { + return nil, fmt.Errorf("failed to prepare docker volume: %w", err) + } + host, err := m.createDockerContainer(ctx, user) + if err != nil { + return nil, fmt.Errorf("failed to prepare docker container: %w", err) + } + return &apiv1.HostInstance{ + Name: host, + }, nil +} + func (m *DockerInstanceManager) downloadDockerImageIfNeeded(ctx context.Context) error { listRes, err := m.Client.ImageList(ctx, image.ListOptions{}) if err != nil { @@ -364,6 +306,8 @@ func (m *DockerInstanceManager) createDockerVolumeIfNeeded(ctx context.Context, return nil } +const containerInspectRetryLimit = 5 + func (m *DockerInstanceManager) createDockerContainer(ctx context.Context, user accounts.User) (string, error) { config := &container.Config{ AttachStdin: true, @@ -401,7 +345,31 @@ func (m *DockerInstanceManager) createDockerContainer(ctx context.Context, user if err := m.Client.ContainerExecStart(ctx, execRes.ID, container.ExecStartOptions{}); err != nil { return "", fmt.Errorf("failed to start container execution %q: %w", strings.Join(execConfig.Cmd, " "), err) } - return createRes.ID, nil + for i := 0; ; i++ { + res, err := m.Client.ContainerInspect(ctx, createRes.ID) + if err == nil && res.State.Running { + return createRes.ID, nil + } + if i >= containerInspectRetryLimit { + return "", fmt.Errorf("failed to inspect docker container: %w", err) + } + time.Sleep(time.Second) + } +} + +func (m *DockerInstanceManager) deleteHost(user accounts.User, host string) (*apiv1.HostInstance, error) { + ctx := context.TODO() + if err := m.deleteDockerContainer(ctx, user, host); err != nil { + return nil, fmt.Errorf("failed to delete docker container: %w", err) + } + // A docker volume is shared across all hosts under each user. If no host + // exists for given user, delete volume afterwards to cleanup. + if err := m.deleteDockerVolumeIfNeeded(ctx, user); err != nil { + return nil, fmt.Errorf("failed to cleanup docker volume: %w", err) + } + return &apiv1.HostInstance{ + Name: host, + }, nil } func (m *DockerInstanceManager) deleteDockerContainer(ctx context.Context, user accounts.User, host string) error { @@ -454,3 +422,71 @@ func (m *DockerInstanceManager) getRWMutex(user accounts.User) *sync.RWMutex { mu, _ := m.mutexes.LoadOrStore(user.Username(), &sync.RWMutex{}) return mu.(*sync.RWMutex) } + +type operationResult struct { + Error error + Value interface{} +} + +type operationEntry struct { + op apiv1.Operation + result *operationResult + mutex sync.RWMutex + done chan struct{} +} + +const newOperationRetryLimit = 100 + +func (m *DockerInstanceManager) newOperation() apiv1.Operation { + for i := 0; i < newOperationRetryLimit; i++ { + name := uuid.New().String() + newEntry := &operationEntry{ + op: apiv1.Operation{ + Name: name, + Done: false, + }, + mutex: sync.RWMutex{}, + done: make(chan struct{}), + } + entry, loaded := m.operations.LoadOrStore(name, newEntry) + if !loaded { + // It succeeded to store a new operation entry. + return entry.(*operationEntry).op + } + } + panic("Reached newOperationRetryLimit") +} + +func (m *DockerInstanceManager) completeOperation(name string, result *operationResult) error { + val, loaded := m.operations.Load(name) + if !loaded { + return fmt.Errorf("operation not found for %q", name) + } + entry := val.(*operationEntry) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + entry.op.Done = true + entry.result = result + close(entry.done) + return nil +} + +func (m *DockerInstanceManager) waitOperation(name string, dt time.Duration) (*operationResult, error) { + val, loaded := m.operations.Load(name) + if !loaded { + return nil, fmt.Errorf("operation not found for %q", name) + } + entry := val.(*operationEntry) + ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Minute) + defer cancel() + select { + case <-entry.done: + entry.mutex.RLock() + result := entry.result + entry.mutex.RUnlock() + return result, nil + case <-ctx.Done(): + return nil, fmt.Errorf("reached timeout for %q", name) + } +} diff --git a/pkg/app/instances/docker_test.go b/pkg/app/instances/docker_test.go deleted file mode 100644 index a03f578b..00000000 --- a/pkg/app/instances/docker_test.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package instances - -import ( - "testing" - - "github.com/google/go-cmp/cmp" -) - -func TestEncodeOperationNameSucceeds(t *testing.T) { - if diff := cmp.Diff("foo_bar", EncodeOperationName("foo", "bar")); diff != "" { - t.Errorf("encoded operation name mismatch (-want +got):\n%s", diff) - } -} - -func TestDecodeOperationSucceeds(t *testing.T) { - gotOpType, gotHost, err := DecodeOperationName("foo_bar") - if err != nil { - t.Errorf("got error while decoding operation name: %+v", err) - } - if diff := cmp.Diff(OPType("foo"), gotOpType); diff != "" { - t.Errorf("decoded operation type mismatch (-want +got):\n%s", diff) - } - if diff := cmp.Diff("bar", gotHost); diff != "" { - t.Errorf("decoded host mismatch (-want +got):\n%s", diff) - } -} - -func TestDecodeOperationFailsEmptyString(t *testing.T) { - _, _, err := DecodeOperationName("") - if err == nil { - t.Errorf("expected error") - } -} - -func TestDecodeOperationFailsMissingUnderscore(t *testing.T) { - _, _, err := DecodeOperationName("foobar") - if err == nil { - t.Errorf("expected error") - } -} From 4cfeb36523ec6992fd76a65546a7ab474a7269c5 Mon Sep 17 00:00:00 2001 From: Seungjae Yoo Date: Mon, 10 Nov 2025 11:28:33 +0900 Subject: [PATCH 3/3] Ensure docker volume deletion on DockerIM of CO --- pkg/app/instances/docker.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/pkg/app/instances/docker.go b/pkg/app/instances/docker.go index 7a7d1063..d82ce7b9 100644 --- a/pkg/app/instances/docker.go +++ b/pkg/app/instances/docker.go @@ -387,16 +387,9 @@ func (m *DockerInstanceManager) deleteDockerContainer(ctx context.Context, user return nil } -func (m *DockerInstanceManager) deleteDockerVolumeIfNeeded(ctx context.Context, user accounts.User) error { - containerListOpts := container.ListOptions{Filters: dockerFilter(user)} - listRes, err := m.Client.ContainerList(ctx, containerListOpts) - if err != nil { - return fmt.Errorf("failed to list docker containers: %w", err) - } - if len(listRes) > 0 { - return nil - } +const volumeInspectRetryCount = 3 +func (m *DockerInstanceManager) deleteDockerVolumeIfNeeded(ctx context.Context, user accounts.User) error { mu := m.getRWMutex(user) if locked := mu.TryLock(); !locked { // If it can't acquire lock on this mutex, there's ongoing host @@ -405,6 +398,15 @@ func (m *DockerInstanceManager) deleteDockerVolumeIfNeeded(ctx context.Context, return nil } defer mu.Unlock() + + containerListOpts := container.ListOptions{Filters: dockerFilter(user)} + listRes, err := m.Client.ContainerList(ctx, containerListOpts) + if err != nil { + return fmt.Errorf("failed to list docker containers: %w", err) + } + if len(listRes) > 0 { + return nil + } listOpts := volume.ListOptions{Filters: dockerFilter(user)} volumeListRes, err := m.Client.VolumeList(ctx, listOpts) if err != nil { @@ -415,7 +417,14 @@ func (m *DockerInstanceManager) deleteDockerVolumeIfNeeded(ctx context.Context, return fmt.Errorf("failed to remove docker volume: %w", err) } } - return nil + // Ensure the deletion of docker volumes + for i := 0; i < volumeInspectRetryCount; i++ { + volumeListRes, err := m.Client.VolumeList(ctx, listOpts) + if err == nil && len(volumeListRes.Volumes) == 0 { + return nil + } + } + return fmt.Errorf("removed docker volume but still exists") } func (m *DockerInstanceManager) getRWMutex(user accounts.User) *sync.RWMutex {