Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions packages/api/internal/api/api.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

189 changes: 95 additions & 94 deletions packages/api/internal/api/spec.gen.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions packages/api/internal/api/types.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 14 additions & 16 deletions packages/api/internal/cache/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewInstanceInfo(
Metadata: Metadata,
MaxInstanceLength: MaxInstanceLength,
StartTime: StartTime,
endTime: endTime,
EndTime: endTime,
VCpu: VCpu,
TotalDiskSizeMB: TotalDiskSizeMB,
RamMB: RamMB,
Expand All @@ -77,7 +77,6 @@ func NewInstanceInfo(
AutoPause: atomic.Bool{},
Pausing: utils.NewSetOnce[string](),
BaseTemplateID: BaseTemplateID,
mu: sync.RWMutex{},
}

instance.AutoPause.Store(AutoPause)
Expand All @@ -98,7 +97,7 @@ type InstanceInfo struct {
Metadata map[string]string
MaxInstanceLength time.Duration
StartTime time.Time
endTime time.Time
EndTime time.Time
VCpu int64
TotalDiskSizeMB int64
RamMB int64
Expand All @@ -122,29 +121,28 @@ func (i *InstanceInfo) LoggerMetadata() sbxlogger.SandboxMetadata {
}
}

func (i *InstanceInfo) IsExpired() bool {
i.mu.RLock()
defer i.mu.RUnlock()
func (i *InstanceInfo) Lock() {
i.mu.Lock()
}

return time.Now().After(i.endTime)
func (i *InstanceInfo) Unlock() {
i.mu.Unlock()
}

func (i *InstanceInfo) GetEndTime() time.Time {
func (i *InstanceInfo) RLock() {
i.mu.RLock()
defer i.mu.RUnlock()

return i.endTime
}

func (i *InstanceInfo) SetEndTime(endTime time.Time) {
i.mu.Lock()
defer i.mu.Unlock()
func (i *InstanceInfo) RUnlock() {
i.mu.RUnlock()
}

i.endTime = endTime
func (i *InstanceInfo) IsExpired() bool {
return time.Now().After(i.EndTime)
}

func (i *InstanceInfo) SetExpired() {
i.SetEndTime(time.Now())
i.EndTime = time.Now()
}

type InstanceCache struct {
Expand Down
20 changes: 8 additions & 12 deletions packages/api/internal/cache/instance/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *InstanceCache) Add(ctx context.Context, instance *InstanceInfo, newlyCr
sbxlogger.I(instance).Debug("Adding sandbox to cache",
zap.Bool("newly_created", newlyCreated),
zap.Time("start_time", instance.StartTime),
zap.Time("end_time", instance.GetEndTime()),
zap.Time("end_time", instance.EndTime),
)

if instance.SandboxID == "" {
Expand All @@ -79,14 +79,14 @@ func (c *InstanceCache) Add(ctx context.Context, instance *InstanceInfo, newlyCr
return fmt.Errorf("instance %s is missing env ID", instance.TemplateID)
}

endTime := instance.GetEndTime()
endTime := instance.EndTime

if instance.StartTime.IsZero() || endTime.IsZero() || instance.StartTime.After(endTime) {
return fmt.Errorf("instance %s has invalid start(%s)/end(%s) times", instance.SandboxID, instance.StartTime, endTime)
}

if endTime.Sub(instance.StartTime) > instance.MaxInstanceLength {
instance.SetEndTime(instance.StartTime.Add(instance.MaxInstanceLength))
instance.EndTime = instance.StartTime.Add(instance.MaxInstanceLength)
}

c.Set(instance.SandboxID, instance, newlyCreated)
Expand All @@ -99,17 +99,13 @@ func (c *InstanceCache) Add(ctx context.Context, instance *InstanceInfo, newlyCr
}

// Delete the instance and remove it from the cache.
func (c *InstanceCache) Delete(instanceID string, pause bool) bool {
value, found := c.cache.GetAndRemove(instanceID)
if found {
value.AutoPause.Store(pause)
func (c *InstanceCache) Delete(instance *InstanceInfo, pause bool) {
defer c.cache.Remove(instance.SandboxID)
instance.AutoPause.Store(pause)

if pause {
c.MarkAsPausing(value)
}
if pause {
c.MarkAsPausing(instance)
}

return found
}

func (c *InstanceCache) Items() []*InstanceInfo {
Expand Down
2 changes: 1 addition & 1 deletion packages/api/internal/cache/instance/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestReservation_ResumeAlreadyRunningSandbox(t *testing.T) {

TeamID: teamID,
StartTime: time.Now(),
endTime: time.Now().Add(time.Hour),
EndTime: time.Now().Add(time.Hour),
MaxInstanceLength: time.Hour,
}
err := cache.Add(context.Background(), info, false)
Expand Down
72 changes: 41 additions & 31 deletions packages/api/internal/cache/instance/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,27 @@ func getMaxAllowedTTL(now time.Time, startTime time.Time, duration, maxInstanceL
}

// KeepAliveFor the instance's expiration timer.
func (c *InstanceCache) KeepAliveFor(instanceID string, duration time.Duration, allowShorter bool) (*InstanceInfo, *api.APIError) {
instance, err := c.Get(instanceID)
if err != nil {
return nil, &api.APIError{Code: http.StatusNotFound, ClientMsg: fmt.Sprintf("Sandbox '%s' not found", instanceID), Err: err}
}

func (c *InstanceCache) KeepAliveFor(sandbox *InstanceInfo, duration time.Duration, allowShorter bool) (*InstanceInfo, *api.APIError) {
now := time.Now()

endTime := instance.GetEndTime()
endTime := sandbox.EndTime
if !allowShorter && endTime.After(now.Add(duration)) {
return instance, nil
return sandbox, nil
}

if (time.Since(instance.StartTime)) > instance.MaxInstanceLength {
c.cache.Remove(instanceID)
if (time.Since(sandbox.StartTime)) > sandbox.MaxInstanceLength {
c.cache.Remove(sandbox.SandboxID)

msg := fmt.Sprintf("Sandbox '%s' reached maximal allowed uptime", instanceID)
msg := fmt.Sprintf("Sandbox '%s' reached maximal allowed uptime", sandbox.SandboxID)
return nil, &api.APIError{Code: http.StatusForbidden, ClientMsg: msg, Err: errors.New(msg)}
} else {
maxAllowedTTL := getMaxAllowedTTL(now, instance.StartTime, duration, instance.MaxInstanceLength)
maxAllowedTTL := getMaxAllowedTTL(now, sandbox.StartTime, duration, sandbox.MaxInstanceLength)

newEndTime := now.Add(maxAllowedTTL)
instance.SetEndTime(newEndTime)
sandbox.EndTime = newEndTime
}

return instance, nil
return sandbox, nil
}

func (c *InstanceCache) Sync(ctx context.Context, instances []*InstanceInfo, nodeID string) {
Expand All @@ -65,26 +60,41 @@ func (c *InstanceCache) Sync(ctx context.Context, instances []*InstanceInfo, nod

// Delete instances that are not in Orchestrator anymore
for _, item := range c.cache.Items() {
if item.NodeID != nodeID {
continue
}
if time.Since(item.StartTime) <= syncSandboxRemoveGracePeriod {
continue
}
_, found := instanceMap[item.SandboxID]
if !found {
c.cache.Remove(item.SandboxID)
}
c.checkInstance(instanceMap, nodeID, item)
}

// Add instances that are not in the cache with the default TTL
for _, instance := range instances {
if c.Exists(instance.SandboxID) {
continue
}
err := c.Add(ctx, instance, false)
if err != nil {
zap.L().Error("error adding instance to cache", zap.Error(err))
}
c.loadInstance(ctx, instance)
}
}

func (c *InstanceCache) loadInstance(ctx context.Context, instance *InstanceInfo) {
instance.Lock()
defer instance.Unlock()

if c.Exists(instance.SandboxID) {
return
}

err := c.Add(ctx, instance, false)
if err != nil {
zap.L().Error("error adding instance to cache", zap.Error(err))
}
}

func (c *InstanceCache) checkInstance(instanceMap map[string]*InstanceInfo, nodeID string, instance *InstanceInfo) {
instance.Lock()
defer instance.Unlock()

if instance.NodeID != nodeID {
return
}
if time.Since(instance.StartTime) <= syncSandboxRemoveGracePeriod {
return
}
_, found := instanceMap[instance.SandboxID]
if !found {
c.cache.Remove(instance.SandboxID)
}
}
36 changes: 20 additions & 16 deletions packages/api/internal/handlers/sandbox_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,39 @@ func (a *APIStore) GetSandboxesSandboxID(c *gin.Context, id string) {
}

// Try to get the running sandbox first
info, err := a.orchestrator.GetInstance(ctx, sandboxId)
sbx, err := a.orchestrator.GetInstance(ctx, sandboxId)
if err == nil {
sbx.RLock()
defer sbx.RUnlock()

// Check if sandbox belongs to the team
if info.TeamID != team.ID {
if sbx.TeamID != team.ID {
telemetry.ReportCriticalError(ctx, fmt.Sprintf("sandbox '%s' doesn't belong to team '%s'", sandboxId, team.ID.String()), nil)
a.sendAPIStoreError(c, http.StatusNotFound, fmt.Sprintf("sandbox \"%s\" doesn't exist or you don't have access to it", id))

return
}

// Sandbox exists and belongs to the team - return running sandbox info
// Sandbox exists and belongs to the team - return running sandbox sbx
sandbox := api.SandboxDetail{
ClientID: info.ClientID,
TemplateID: info.TemplateID,
Alias: info.Alias,
SandboxID: info.SandboxID,
StartedAt: info.StartTime,
CpuCount: api.CPUCount(info.VCpu),
MemoryMB: api.MemoryMB(info.RamMB),
DiskSizeMB: api.DiskSizeMB(info.TotalDiskSizeMB),
EndAt: info.GetEndTime(),
ClientID: sbx.ClientID,
TemplateID: sbx.TemplateID,
Alias: sbx.Alias,
SandboxID: sbx.SandboxID,
StartedAt: sbx.StartTime,
CpuCount: api.CPUCount(sbx.VCpu),
MemoryMB: api.MemoryMB(sbx.RamMB),
DiskSizeMB: api.DiskSizeMB(sbx.TotalDiskSizeMB),
EndAt: sbx.EndTime,
State: api.Running,
EnvdVersion: info.EnvdVersion,
EnvdAccessToken: info.EnvdAccessToken,
EnvdVersion: sbx.EnvdVersion,
EnvdAccessToken: sbx.EnvdAccessToken,
Domain: sbxDomain,
}

if info.Metadata != nil {
meta := api.SandboxMetadata(info.Metadata)
metadata := sbx.Metadata
if metadata != nil {
meta := api.SandboxMetadata(metadata)
sandbox.Metadata = &meta
}

Expand Down
11 changes: 4 additions & 7 deletions packages/api/internal/handlers/sandbox_kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (a *APIStore) DeleteSandboxesSandboxID(

sbx, err := a.orchestrator.GetSandbox(sandboxID)
if err == nil {
sbx.Lock()
defer sbx.Unlock()

if sbx.TeamID != teamID {
telemetry.ReportCriticalError(ctx, "sandbox does not belong to team", fmt.Errorf("sandbox '%s' does not belong to team '%s'", sandboxID, teamID.String()))

Expand All @@ -93,13 +96,7 @@ func (a *APIStore) DeleteSandboxesSandboxID(
}

// remove running sandbox from the orchestrator
sandboxExists := a.orchestrator.DeleteInstance(ctx, sandboxID, false)
if !sandboxExists {
telemetry.ReportError(ctx, "sandbox not found", fmt.Errorf("sandbox '%s' not found", sandboxID), telemetry.WithSandboxID(sandboxID))
a.sendAPIStoreError(c, http.StatusNotFound, fmt.Sprintf("Error deleting sandbox - sandbox '%s' was not found", sandboxID))

return
}
a.orchestrator.DeleteInstance(ctx, sbx, false)

// remove any snapshots of the sandbox
err := a.deleteSnapshot(ctx, sandboxID, teamID, team.ClusterID)
Expand Down
9 changes: 4 additions & 5 deletions packages/api/internal/handlers/sandbox_pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func (a *APIStore) PostSandboxesSandboxIDPause(c *gin.Context, sandboxID api.San
return
}

sbx.Lock()
defer sbx.Unlock()

if sbx.TeamID != teamID {
telemetry.ReportCriticalError(ctx, "sandbox does not belong to team", fmt.Errorf("sandbox '%s' does not belong to team '%s'", sandboxID, teamID.String()))

Expand All @@ -57,11 +60,7 @@ func (a *APIStore) PostSandboxesSandboxIDPause(c *gin.Context, sandboxID api.San
return
}

found := a.orchestrator.DeleteInstance(ctx, sandboxID, true)
if !found {
a.sendAPIStoreError(c, http.StatusNotFound, fmt.Sprintf("Error pausing sandbox - sandbox '%s' was not found", sandboxID))
return
}
a.orchestrator.DeleteInstance(ctx, sbx, true)

_, err = sbx.Pausing.WaitWithContext(ctx)
if err != nil {
Expand Down
Loading
Loading