Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 158 additions & 113 deletions pkg/app/instances/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/google/uuid"
)

const DockerIMType IMType = "docker"
Expand All @@ -54,18 +55,12 @@ const uaMountTarget = "/var/lib/cuttlefish-common/userartifacts"

// Docker implementation of the instance manager.
type DockerInstanceManager struct {
Config Config
Client *client.Client
mutexes sync.Map
Config Config
Client *client.Client
mutexes sync.Map
Copy link
Member

@ser-io ser-io Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CO IM is designed to be stateless.

Why do you need to add this new state to the Docker IM implementation? What are the actual operations you need this for? You should be relying on Docker Engine to query whether a container was created, deleted and so on, to track such operations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was motivated from taking long(> 1min) times to reply REST API, such as POST /hosts and DELETE /hosts.

The CO IM is designed to be stateless.

Then this PR should be definitely my fault.. How should I get such information? I think these circumstances are repeated, and I wish to retrieve how we manage those before working on it.

operations sync.Map
}

type OPType string

const (
CreateHostOPType OPType = "createhost"
DeleteHostOPType OPType = "deletehost"
)

func NewDockerInstanceManager(cfg Config, cli *client.Client) *DockerInstanceManager {
return &DockerInstanceManager{
Config: cfg,
Expand All @@ -85,26 +80,14 @@ func (m *DockerInstanceManager) CreateHost(zone string, _ *apiv1.CreateHostReque
if zone != "local" {
return nil, errors.NewBadRequestError("Invalid zone. It should be 'local'.", nil)
}
mu := m.getRWMutex(user)
mu.RLock()
defer mu.RUnlock()
ctx := context.TODO()
if err := m.downloadDockerImageIfNeeded(ctx); err != nil {
return nil, fmt.Errorf("failed to retrieve docker image name: %w", err)
}
// A docker volume is shared across all hosts under each user. If no volume
// exists for given user, create it.
if err := m.createDockerVolumeIfNeeded(ctx, user); err != nil {
return nil, fmt.Errorf("failed to prepare docker volume: %w", err)
}
host, err := m.createDockerContainer(ctx, user)
if err != nil {
return nil, fmt.Errorf("failed to prepare docker container: %w", err)
}
return &apiv1.Operation{
Name: EncodeOperationName(CreateHostOPType, host),
Done: true,
}, nil
op := m.newOperation()
go func() {
val, err := m.createHost(user)
if opErr := m.completeOperation(op.Name, &operationResult{Error: err, Value: val}); opErr != nil {
log.Printf("error completing operation %q: %v\n", op.Name, opErr)
}
}()
return &op, nil
}

func (m *DockerInstanceManager) ListHosts(zone string, user accounts.User, _ *ListHostsRequest) (*apiv1.ListHostsResponse, error) {
Expand Down Expand Up @@ -141,88 +124,25 @@ func (m *DockerInstanceManager) DeleteHost(zone string, user accounts.User, host
if zone != "local" {
return nil, errors.NewBadRequestError("Invalid zone. It should be 'local'.", nil)
}
ctx := context.TODO()
if err := m.deleteDockerContainer(ctx, user, host); err != nil {
return nil, fmt.Errorf("failed to delete docker container: %w", err)
}
// A docker volume is shared across all hosts under each user. If no host
// exists for given user, delete volume afterwards to cleanup.
if err := m.deleteDockerVolumeIfNeeded(ctx, user); err != nil {
return nil, fmt.Errorf("failed to cleanup docker volume: %w", err)
}
return &apiv1.Operation{
Name: EncodeOperationName(DeleteHostOPType, host),
Done: true,
}, nil
}

func EncodeOperationName(opType OPType, host string) string {
return string(opType) + "_" + host
}

func DecodeOperationName(name string) (OPType, string, error) {
words := strings.SplitN(name, "_", 2)
if len(words) == 2 {
return OPType(words[0]), words[1], nil
} else {
return "", "", errors.NewBadRequestError(fmt.Sprintf("cannot parse operation from %q", name), nil)
}
}

func (m *DockerInstanceManager) waitCreateHostOperation(host string) (*apiv1.HostInstance, error) {
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Minute)
defer cancel()
for {
select {
case <-ctx.Done():
return nil, errors.NewServiceUnavailableError("Wait for operation timed out", nil)
default:
res, err := m.Client.ContainerInspect(ctx, host)
if err != nil {
return nil, fmt.Errorf("failed to inspect docker container: %w", err)
}
if res.State.Running {
return &apiv1.HostInstance{
Name: host,
}, nil
}
time.Sleep(time.Second)
op := m.newOperation()
go func() {
val, err := m.deleteHost(user, host)
if opErr := m.completeOperation(op.Name, &operationResult{Error: err, Value: val}); opErr != nil {
log.Printf("error completing operation %q: %v\n", op.Name, opErr)
}
}
}

func (m *DockerInstanceManager) waitDeleteHostOperation(host string) (*apiv1.HostInstance, error) {
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Minute)
defer cancel()
resCh, errCh := m.Client.ContainerWait(ctx, host, "")
select {
case <-ctx.Done():
return nil, errors.NewServiceUnavailableError("Wait for operation timed out", nil)
case err := <-errCh:
return nil, fmt.Errorf("error is thrown while waiting for deleting host: %w", err)
case <-resCh:
return &apiv1.HostInstance{
Name: host,
}, nil
}
}()
return &op, nil
}

func (m *DockerInstanceManager) WaitOperation(zone string, _ accounts.User, name string) (any, error) {
if zone != "local" {
return nil, errors.NewBadRequestError("Invalid zone. It should be 'local'.", nil)
}
opType, host, err := DecodeOperationName(name)
val, err := m.waitOperation(name, 3*time.Minute)
if err != nil {
return nil, err
}
switch opType {
case CreateHostOPType:
return m.waitCreateHostOperation(host)
case DeleteHostOPType:
return m.waitDeleteHostOperation(host)
default:
return nil, errors.NewBadRequestError(fmt.Sprintf("operation type %s not found.", opType), nil)
}
return val.Value, val.Error
}

func (m *DockerInstanceManager) getIpAddr(container *types.Container) (string, error) {
Expand Down Expand Up @@ -318,6 +238,28 @@ func (m *DockerInstanceManager) getContainerLabel(host string, key string) (stri
return value, nil
}

func (m *DockerInstanceManager) createHost(user accounts.User) (*apiv1.HostInstance, error) {
mu := m.getRWMutex(user)
mu.RLock()
defer mu.RUnlock()
ctx := context.TODO()
if err := m.downloadDockerImageIfNeeded(ctx); err != nil {
return nil, fmt.Errorf("failed to retrieve docker image name: %w", err)
}
// A docker volume is shared across all hosts under each user. If no volume
// exists for given user, create it.
if err := m.createDockerVolumeIfNeeded(ctx, user); err != nil {
return nil, fmt.Errorf("failed to prepare docker volume: %w", err)
}
host, err := m.createDockerContainer(ctx, user)
if err != nil {
return nil, fmt.Errorf("failed to prepare docker container: %w", err)
}
return &apiv1.HostInstance{
Name: host,
}, nil
}

func (m *DockerInstanceManager) downloadDockerImageIfNeeded(ctx context.Context) error {
listRes, err := m.Client.ImageList(ctx, image.ListOptions{})
if err != nil {
Expand Down Expand Up @@ -364,6 +306,8 @@ func (m *DockerInstanceManager) createDockerVolumeIfNeeded(ctx context.Context,
return nil
}

const containerInspectRetryLimit = 5

func (m *DockerInstanceManager) createDockerContainer(ctx context.Context, user accounts.User) (string, error) {
config := &container.Config{
AttachStdin: true,
Expand Down Expand Up @@ -401,7 +345,31 @@ func (m *DockerInstanceManager) createDockerContainer(ctx context.Context, user
if err := m.Client.ContainerExecStart(ctx, execRes.ID, container.ExecStartOptions{}); err != nil {
return "", fmt.Errorf("failed to start container execution %q: %w", strings.Join(execConfig.Cmd, " "), err)
}
return createRes.ID, nil
for i := 0; ; i++ {
res, err := m.Client.ContainerInspect(ctx, createRes.ID)
if err == nil && res.State.Running {
return createRes.ID, nil
}
if i >= containerInspectRetryLimit {
return "", fmt.Errorf("failed to inspect docker container: %w", err)
}
time.Sleep(time.Second)
}
}

func (m *DockerInstanceManager) deleteHost(user accounts.User, host string) (*apiv1.HostInstance, error) {
ctx := context.TODO()
if err := m.deleteDockerContainer(ctx, user, host); err != nil {
return nil, fmt.Errorf("failed to delete docker container: %w", err)
}
// A docker volume is shared across all hosts under each user. If no host
// exists for given user, delete volume afterwards to cleanup.
if err := m.deleteDockerVolumeIfNeeded(ctx, user); err != nil {
return nil, fmt.Errorf("failed to cleanup docker volume: %w", err)
}
return &apiv1.HostInstance{
Name: host,
}, nil
}

func (m *DockerInstanceManager) deleteDockerContainer(ctx context.Context, user accounts.User, host string) error {
Expand All @@ -419,16 +387,9 @@ func (m *DockerInstanceManager) deleteDockerContainer(ctx context.Context, user
return nil
}

func (m *DockerInstanceManager) deleteDockerVolumeIfNeeded(ctx context.Context, user accounts.User) error {
containerListOpts := container.ListOptions{Filters: dockerFilter(user)}
listRes, err := m.Client.ContainerList(ctx, containerListOpts)
if err != nil {
return fmt.Errorf("failed to list docker containers: %w", err)
}
if len(listRes) > 0 {
return nil
}
const volumeInspectRetryCount = 3

func (m *DockerInstanceManager) deleteDockerVolumeIfNeeded(ctx context.Context, user accounts.User) error {
mu := m.getRWMutex(user)
if locked := mu.TryLock(); !locked {
// If it can't acquire lock on this mutex, there's ongoing host
Expand All @@ -437,6 +398,15 @@ func (m *DockerInstanceManager) deleteDockerVolumeIfNeeded(ctx context.Context,
return nil
}
defer mu.Unlock()

containerListOpts := container.ListOptions{Filters: dockerFilter(user)}
listRes, err := m.Client.ContainerList(ctx, containerListOpts)
if err != nil {
return fmt.Errorf("failed to list docker containers: %w", err)
}
if len(listRes) > 0 {
return nil
}
listOpts := volume.ListOptions{Filters: dockerFilter(user)}
volumeListRes, err := m.Client.VolumeList(ctx, listOpts)
if err != nil {
Expand All @@ -447,10 +417,85 @@ func (m *DockerInstanceManager) deleteDockerVolumeIfNeeded(ctx context.Context,
return fmt.Errorf("failed to remove docker volume: %w", err)
}
}
return nil
// Ensure the deletion of docker volumes
for i := 0; i < volumeInspectRetryCount; i++ {
volumeListRes, err := m.Client.VolumeList(ctx, listOpts)
if err == nil && len(volumeListRes.Volumes) == 0 {
return nil
}
}
return fmt.Errorf("removed docker volume but still exists")
}

func (m *DockerInstanceManager) getRWMutex(user accounts.User) *sync.RWMutex {
mu, _ := m.mutexes.LoadOrStore(user.Username(), &sync.RWMutex{})
return mu.(*sync.RWMutex)
}

type operationResult struct {
Error error
Value interface{}
}

type operationEntry struct {
op apiv1.Operation
result *operationResult
mutex sync.RWMutex
done chan struct{}
}

const newOperationRetryLimit = 100

func (m *DockerInstanceManager) newOperation() apiv1.Operation {
for i := 0; i < newOperationRetryLimit; i++ {
name := uuid.New().String()
newEntry := &operationEntry{
op: apiv1.Operation{
Name: name,
Done: false,
},
mutex: sync.RWMutex{},
done: make(chan struct{}),
}
entry, loaded := m.operations.LoadOrStore(name, newEntry)
if !loaded {
// It succeeded to store a new operation entry.
return entry.(*operationEntry).op
}
}
panic("Reached newOperationRetryLimit")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoud this be panic? looks like it stops service if newOperation is failed, how about just logging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to be panic, as retry count increments when uuid conflict happens. I believe it's very close to 0% with large retry couny, even less than sha256 hash conflict.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a comment explaining that the chances of hitting this are practically zero and not worth it of changing the API to return an error and that in the unlikely scenario this triggers a panic is better than running forever or returning an invalid/nil operation.

}

func (m *DockerInstanceManager) completeOperation(name string, result *operationResult) error {
val, loaded := m.operations.Load(name)
if !loaded {
return fmt.Errorf("operation not found for %q", name)
}
entry := val.(*operationEntry)

entry.mutex.Lock()
defer entry.mutex.Unlock()
entry.op.Done = true
entry.result = result
close(entry.done)
return nil
}

func (m *DockerInstanceManager) waitOperation(name string, dt time.Duration) (*operationResult, error) {
val, loaded := m.operations.Load(name)
if !loaded {
return nil, fmt.Errorf("operation not found for %q", name)
}
entry := val.(*operationEntry)
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Minute)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can context.TODO() be context.Background() ?

defer cancel()
select {
case <-entry.done:
entry.mutex.RLock()
result := entry.result
entry.mutex.RUnlock()
return result, nil
case <-ctx.Done():
return nil, fmt.Errorf("reached timeout for %q", name)
}
}
Loading
Loading