From f598bf8db6c7c4eceb372e7a2d8e0396a43ea53f Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Fri, 10 Oct 2025 09:01:19 -0700 Subject: [PATCH 01/16] create a strongly typed sandboxes map this allows us to put sandbox-specific code here, to execute on insert, on delete, etc --- packages/orchestrator/benchmark_test.go | 3 +- .../orchestrator/cmd/build-template/main.go | 3 +- .../hyperloopserver/handlers/store.go | 5 +-- .../internal/hyperloopserver/server.go | 3 +- .../internal/metrics/sandboxes.go | 5 +-- packages/orchestrator/internal/proxy/proxy.go | 3 +- .../internal/sandbox/container.go | 45 +++++++++++++++++++ packages/orchestrator/internal/server/main.go | 5 +-- .../orchestrator/internal/server/sandboxes.go | 14 +----- .../internal/server/sandboxes_test.go | 5 +-- .../internal/service/service_info.go | 5 +-- .../internal/template/build/builder.go | 5 +-- .../template/build/layer/layer_executor.go | 7 ++- .../internal/template/server/main.go | 3 +- packages/orchestrator/main.go | 3 +- 15 files changed, 68 insertions(+), 46 deletions(-) create mode 100644 packages/orchestrator/internal/sandbox/container.go diff --git a/packages/orchestrator/benchmark_test.go b/packages/orchestrator/benchmark_test.go index fc7f000e61..51a6f95681 100644 --- a/packages/orchestrator/benchmark_test.go +++ b/packages/orchestrator/benchmark_test.go @@ -36,7 +36,6 @@ import ( featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" "github.com/e2b-dev/infra/packages/shared/pkg/limit" sbxlogger "github.com/e2b-dev/infra/packages/shared/pkg/logger/sandbox" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" "github.com/e2b-dev/infra/packages/shared/pkg/utils" @@ -210,7 +209,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) { var proxyPort uint16 = 5007 - sandboxes := smap.New[*sandbox.Sandbox]() + sandboxes := sandbox.NewSandboxesMap() sandboxProxy, err := proxy.NewSandboxProxy(noop.MeterProvider{}, proxyPort, sandboxes) require.NoError(b, err) diff --git a/packages/orchestrator/cmd/build-template/main.go b/packages/orchestrator/cmd/build-template/main.go index dee1275705..fe0ef594de 100644 --- a/packages/orchestrator/cmd/build-template/main.go +++ b/packages/orchestrator/cmd/build-template/main.go @@ -28,7 +28,6 @@ import ( featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" l "github.com/e2b-dev/infra/packages/shared/pkg/logger" sbxlogger "github.com/e2b-dev/infra/packages/shared/pkg/logger/sandbox" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" "github.com/e2b-dev/infra/packages/shared/pkg/storage" ) @@ -87,7 +86,7 @@ func buildTemplate( // The sandbox map is shared between the server and the proxy // to propagate information about sandbox routing. - sandboxes := smap.New[*sandbox.Sandbox]() + sandboxes := sandbox.NewSandboxesMap() sandboxProxy, err := proxy.NewSandboxProxy(noop.MeterProvider{}, proxyPort, sandboxes) if err != nil { diff --git a/packages/orchestrator/internal/hyperloopserver/handlers/store.go b/packages/orchestrator/internal/hyperloopserver/handlers/store.go index ae173a4988..e8cce80b59 100644 --- a/packages/orchestrator/internal/hyperloopserver/handlers/store.go +++ b/packages/orchestrator/internal/hyperloopserver/handlers/store.go @@ -12,20 +12,19 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" api "github.com/e2b-dev/infra/packages/shared/pkg/http/hyperloop" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" ) const CollectorExporterTimeout = 10 * time.Second type APIStore struct { logger *zap.Logger - sandboxes *smap.Map[*sandbox.Sandbox] + sandboxes *sandbox.SandboxesMap collectorClient http.Client collectorAddr string } -func NewHyperloopStore(logger *zap.Logger, sandboxes *smap.Map[*sandbox.Sandbox], sandboxCollectorAddr string) *APIStore { +func NewHyperloopStore(logger *zap.Logger, sandboxes *sandbox.SandboxesMap, sandboxCollectorAddr string) *APIStore { return &APIStore{ logger: logger, sandboxes: sandboxes, diff --git a/packages/orchestrator/internal/hyperloopserver/server.go b/packages/orchestrator/internal/hyperloopserver/server.go index 602ee3e724..ad44b19878 100644 --- a/packages/orchestrator/internal/hyperloopserver/server.go +++ b/packages/orchestrator/internal/hyperloopserver/server.go @@ -15,12 +15,11 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" "github.com/e2b-dev/infra/packages/shared/pkg/env" api "github.com/e2b-dev/infra/packages/shared/pkg/http/hyperloop" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" ) const maxUploadLimit = 1 << 28 // 256 MiB -func NewHyperloopServer(ctx context.Context, port uint16, logger *zap.Logger, sandboxes *smap.Map[*sandbox.Sandbox]) (*http.Server, error) { +func NewHyperloopServer(ctx context.Context, port uint16, logger *zap.Logger, sandboxes *sandbox.SandboxesMap) (*http.Server, error) { sandboxCollectorAddr := env.LogsCollectorAddress() store := handlers.NewHyperloopStore(logger, sandboxes, sandboxCollectorAddr) swagger, err := api.GetSwagger() diff --git a/packages/orchestrator/internal/metrics/sandboxes.go b/packages/orchestrator/internal/metrics/sandboxes.go index 7ce2b0f4f8..77a32d4feb 100644 --- a/packages/orchestrator/internal/metrics/sandboxes.go +++ b/packages/orchestrator/internal/metrics/sandboxes.go @@ -19,7 +19,6 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" "github.com/e2b-dev/infra/packages/shared/pkg/logger" sbxlogger "github.com/e2b-dev/infra/packages/shared/pkg/logger/sandbox" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" "github.com/e2b-dev/infra/packages/shared/pkg/utils" ) @@ -51,7 +50,7 @@ type SandboxObserver struct { registration metric.Registration exportInterval time.Duration - sandboxes *smap.Map[*sandbox.Sandbox] + sandboxes *sandbox.SandboxesMap meter metric.Meter cpuTotal metric.Int64ObservableGauge @@ -62,7 +61,7 @@ type SandboxObserver struct { diskUsed metric.Int64ObservableGauge } -func NewSandboxObserver(ctx context.Context, nodeID, serviceName, serviceCommit, serviceVersion, serviceInstanceID string, sandboxes *smap.Map[*sandbox.Sandbox]) (*SandboxObserver, error) { +func NewSandboxObserver(ctx context.Context, nodeID, serviceName, serviceCommit, serviceVersion, serviceInstanceID string, sandboxes *sandbox.SandboxesMap) (*SandboxObserver, error) { deltaTemporality := otlpmetricgrpc.WithTemporalitySelector(func(kind sdkmetric.InstrumentKind) metricdata.Temporality { // Use delta temporality for gauges and cumulative for all other instrument kinds. // This is used to prevent reporting sandbox metrics indefinitely. diff --git a/packages/orchestrator/internal/proxy/proxy.go b/packages/orchestrator/internal/proxy/proxy.go index 5a2f431513..09270de678 100644 --- a/packages/orchestrator/internal/proxy/proxy.go +++ b/packages/orchestrator/internal/proxy/proxy.go @@ -14,7 +14,6 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/logger" reverseproxy "github.com/e2b-dev/infra/packages/shared/pkg/proxy" "github.com/e2b-dev/infra/packages/shared/pkg/proxy/pool" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) @@ -29,7 +28,7 @@ type SandboxProxy struct { proxy *reverseproxy.Proxy } -func NewSandboxProxy(meterProvider metric.MeterProvider, port uint16, sandboxes *smap.Map[*sandbox.Sandbox]) (*SandboxProxy, error) { +func NewSandboxProxy(meterProvider metric.MeterProvider, port uint16, sandboxes *sandbox.SandboxesMap) (*SandboxProxy, error) { proxy := reverseproxy.New( port, idleTimeout, diff --git a/packages/orchestrator/internal/sandbox/container.go b/packages/orchestrator/internal/sandbox/container.go new file mode 100644 index 0000000000..c2694b2b82 --- /dev/null +++ b/packages/orchestrator/internal/sandbox/container.go @@ -0,0 +1,45 @@ +package sandbox + +import "github.com/e2b-dev/infra/packages/shared/pkg/smap" + +type SandboxesMap struct { + sandboxes *smap.Map[*Sandbox] +} + +func (m *SandboxesMap) Items() map[string]*Sandbox { + return m.sandboxes.Items() +} + +func (m *SandboxesMap) Count() int { + return m.sandboxes.Count() +} + +func (m *SandboxesMap) Get(sandboxID string) (*Sandbox, bool) { + return m.sandboxes.Get(sandboxID) +} + +func (m *SandboxesMap) Insert(sbx *Sandbox) { + m.sandboxes.Insert(sbx.Runtime.SandboxID, sbx) +} + +func (m *SandboxesMap) Remove(sandboxID string) { + m.sandboxes.Remove(sandboxID) +} + +func (m *SandboxesMap) RemoveByExecutionID(sandboxID, executionID string) { + m.sandboxes.RemoveCb(sandboxID, func(_ string, v *Sandbox, exists bool) bool { + if !exists { + return false + } + + if v == nil { + return false + } + + return v.Runtime.ExecutionID == executionID + }) +} + +func NewSandboxesMap() *SandboxesMap { + return &SandboxesMap{sandboxes: smap.New[*Sandbox]()} +} diff --git a/packages/orchestrator/internal/server/main.go b/packages/orchestrator/internal/server/main.go index 7783efdd15..537669b241 100644 --- a/packages/orchestrator/internal/server/main.go +++ b/packages/orchestrator/internal/server/main.go @@ -19,7 +19,6 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/events/event" featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) @@ -29,7 +28,7 @@ type server struct { sandboxFactory *sandbox.Factory info *service.ServiceInfo - sandboxes *smap.Map[*sandbox.Sandbox] + sandboxes *sandbox.SandboxesMap proxy *proxy.SandboxProxy networkPool *network.Pool templateCache *template.Cache @@ -58,7 +57,7 @@ type ServiceConfig struct { Info *service.ServiceInfo Proxy *proxy.SandboxProxy SandboxFactory *sandbox.Factory - Sandboxes *smap.Map[*sandbox.Sandbox] + Sandboxes *sandbox.SandboxesMap Persistence storage.StorageProvider FeatureFlags *featureflags.Client SbxEventsService events.EventsService[event.SandboxEvent] diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index 1dacedf55e..f37e277eb4 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -134,7 +134,7 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ return nil, status.Errorf(codes.Internal, "failed to create sandbox: %s", err) } - s.sandboxes.Insert(req.GetSandbox().GetSandboxId(), sbx) + s.sandboxes.Insert(sbx) go func() { ctx, childSpan := tracer.Start(context.WithoutCancel(ctx), "sandbox-create-stop", trace.WithNewRoot()) defer childSpan.End() @@ -152,17 +152,7 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ // Remove the sandbox from cache only if the cleanup IDs match. // This prevents us from accidentally removing started sandbox (via resume) from the cache if cleanup is taking longer than the request timeout. // This could have caused the "invisible" sandboxes that are not in orchestrator or API, but are still on client. - s.sandboxes.RemoveCb(req.GetSandbox().GetSandboxId(), func(_ string, v *sandbox.Sandbox, exists bool) bool { - if !exists { - return false - } - - if v == nil { - return false - } - - return sbx.Runtime.ExecutionID == v.Runtime.ExecutionID - }) + s.sandboxes.RemoveByExecutionID(req.GetSandbox().GetSandboxId(), sbx.Runtime.ExecutionID) // Remove the proxies assigned to the sandbox from the pool to prevent them from being reused. s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID) diff --git a/packages/orchestrator/internal/server/sandboxes_test.go b/packages/orchestrator/internal/server/sandboxes_test.go index 10b9570a7f..9f897aa52d 100644 --- a/packages/orchestrator/internal/server/sandboxes_test.go +++ b/packages/orchestrator/internal/server/sandboxes_test.go @@ -12,7 +12,6 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/service" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" "github.com/e2b-dev/infra/packages/shared/pkg/id" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" ) var ( @@ -68,11 +67,11 @@ func Test_server_List(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &server{ - sandboxes: smap.New[*sandbox.Sandbox](), + sandboxes: sandbox.NewSandboxesMap(), info: &service.ServiceInfo{}, } for _, sbx := range tt.data { - s.sandboxes.Insert(sbx.Runtime.SandboxID, sbx) + s.sandboxes.Insert(sbx) } got, err := s.List(t.Context(), tt.args.in1) if (err != nil) != tt.wantErr { diff --git a/packages/orchestrator/internal/service/service_info.go b/packages/orchestrator/internal/service/service_info.go index 82cac525de..a42ea150b8 100644 --- a/packages/orchestrator/internal/service/service_info.go +++ b/packages/orchestrator/internal/service/service_info.go @@ -11,17 +11,16 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/metrics" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" orchestratorinfo "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" ) type Server struct { orchestratorinfo.UnimplementedInfoServiceServer info *ServiceInfo - sandboxes *smap.Map[*sandbox.Sandbox] + sandboxes *sandbox.SandboxesMap } -func NewInfoService(_ context.Context, grpc *grpc.Server, info *ServiceInfo, sandboxes *smap.Map[*sandbox.Sandbox]) *Server { +func NewInfoService(_ context.Context, grpc *grpc.Server, info *ServiceInfo, sandboxes *sandbox.SandboxesMap) *Server { s := &Server{ info: info, sandboxes: sandboxes, diff --git a/packages/orchestrator/internal/template/build/builder.go b/packages/orchestrator/internal/template/build/builder.go index 48c28a693c..6e71321506 100644 --- a/packages/orchestrator/internal/template/build/builder.go +++ b/packages/orchestrator/internal/template/build/builder.go @@ -31,7 +31,6 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/template/constants" artifactsregistry "github.com/e2b-dev/infra/packages/shared/pkg/artifacts-registry" "github.com/e2b-dev/infra/packages/shared/pkg/dockerhub" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" @@ -50,7 +49,7 @@ type Builder struct { artifactRegistry artifactsregistry.ArtifactsRegistry dockerhubRepository dockerhub.RemoteRepository proxy *proxy.SandboxProxy - sandboxes *smap.Map[*sandbox.Sandbox] + sandboxes *sandbox.SandboxesMap templateCache *sbxtemplate.Cache metrics *metrics.BuildMetrics } @@ -63,7 +62,7 @@ func NewBuilder( artifactRegistry artifactsregistry.ArtifactsRegistry, dockerhubRepository dockerhub.RemoteRepository, proxy *proxy.SandboxProxy, - sandboxes *smap.Map[*sandbox.Sandbox], + sandboxes *sandbox.SandboxesMap, templateCache *sbxtemplate.Cache, buildMetrics *metrics.BuildMetrics, ) *Builder { diff --git a/packages/orchestrator/internal/template/build/layer/layer_executor.go b/packages/orchestrator/internal/template/build/layer/layer_executor.go index 5eeecdf07c..5c36a8b2f9 100644 --- a/packages/orchestrator/internal/template/build/layer/layer_executor.go +++ b/packages/orchestrator/internal/template/build/layer/layer_executor.go @@ -15,7 +15,6 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/template/build/sandboxtools" "github.com/e2b-dev/infra/packages/orchestrator/internal/template/build/storage/cache" "github.com/e2b-dev/infra/packages/orchestrator/internal/template/metadata" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" "github.com/e2b-dev/infra/packages/shared/pkg/storage" ) @@ -28,7 +27,7 @@ type LayerExecutor struct { templateCache *sbxtemplate.Cache proxy *proxy.SandboxProxy - sandboxes *smap.Map[*sandbox.Sandbox] + sandboxes *sandbox.SandboxesMap templateStorage storage.StorageProvider buildStorage storage.StorageProvider index cache.Index @@ -39,7 +38,7 @@ func NewLayerExecutor( logger *zap.Logger, templateCache *sbxtemplate.Cache, proxy *proxy.SandboxProxy, - sandboxes *smap.Map[*sandbox.Sandbox], + sandboxes *sandbox.SandboxesMap, templateStorage storage.StorageProvider, buildStorage storage.StorageProvider, index cache.Index, @@ -80,7 +79,7 @@ func (lb *LayerExecutor) BuildLayer( defer sbx.Close(ctx) // Add to proxy so we can call envd commands - lb.sandboxes.Insert(sbx.Runtime.SandboxID, sbx) + lb.sandboxes.Insert(sbx) defer func() { lb.sandboxes.Remove(sbx.Runtime.SandboxID) lb.proxy.RemoveFromPool(sbx.Runtime.ExecutionID) diff --git a/packages/orchestrator/internal/template/server/main.go b/packages/orchestrator/internal/template/server/main.go index 0cfd2bbc35..058626cc08 100644 --- a/packages/orchestrator/internal/template/server/main.go +++ b/packages/orchestrator/internal/template/server/main.go @@ -23,7 +23,6 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/env" templatemanager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager" "github.com/e2b-dev/infra/packages/shared/pkg/limit" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" "github.com/e2b-dev/infra/packages/shared/pkg/storage" ) @@ -56,7 +55,7 @@ func New( grpc *grpcserver.GRPCServer, sandboxFactory *sandbox.Factory, proxy *proxy.SandboxProxy, - sandboxes *smap.Map[*sandbox.Sandbox], + sandboxes *sandbox.SandboxesMap, templateCache *sbxtemplate.Cache, templatePersistence storage.StorageProvider, limiter *limit.Limiter, diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 780c679775..6feb6e0705 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -44,7 +44,6 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/logger" sbxlogger "github.com/e2b-dev/infra/packages/shared/pkg/logger/sandbox" "github.com/e2b-dev/infra/packages/shared/pkg/pubsub" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) @@ -206,7 +205,7 @@ func run(config cfg.Config) (success bool) { // The sandbox map is shared between the server and the proxy // to propagate information about sandbox routing. - sandboxes := smap.New[*sandbox.Sandbox]() + sandboxes := sandbox.NewSandboxesMap() sandboxProxy, err := proxy.NewSandboxProxy(tel.MeterProvider, config.ProxyPort, sandboxes) if err != nil { From 44029082069337bf00b4b2b871843c18cd2601ea Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Fri, 10 Oct 2025 09:19:14 -0700 Subject: [PATCH 02/16] rename struct, add subscribers --- .../hyperloopserver/handlers/store.go | 4 +- .../internal/hyperloopserver/server.go | 2 +- .../internal/metrics/sandboxes.go | 4 +- packages/orchestrator/internal/proxy/proxy.go | 2 +- .../internal/sandbox/container.go | 45 ----------- packages/orchestrator/internal/sandbox/map.go | 75 +++++++++++++++++++ packages/orchestrator/internal/server/main.go | 4 +- .../internal/service/service_info.go | 4 +- .../internal/template/build/builder.go | 4 +- .../template/build/layer/layer_executor.go | 4 +- .../internal/template/server/main.go | 2 +- 11 files changed, 90 insertions(+), 60 deletions(-) delete mode 100644 packages/orchestrator/internal/sandbox/container.go create mode 100644 packages/orchestrator/internal/sandbox/map.go diff --git a/packages/orchestrator/internal/hyperloopserver/handlers/store.go b/packages/orchestrator/internal/hyperloopserver/handlers/store.go index e8cce80b59..255ee61cca 100644 --- a/packages/orchestrator/internal/hyperloopserver/handlers/store.go +++ b/packages/orchestrator/internal/hyperloopserver/handlers/store.go @@ -18,13 +18,13 @@ const CollectorExporterTimeout = 10 * time.Second type APIStore struct { logger *zap.Logger - sandboxes *sandbox.SandboxesMap + sandboxes *sandbox.Map collectorClient http.Client collectorAddr string } -func NewHyperloopStore(logger *zap.Logger, sandboxes *sandbox.SandboxesMap, sandboxCollectorAddr string) *APIStore { +func NewHyperloopStore(logger *zap.Logger, sandboxes *sandbox.Map, sandboxCollectorAddr string) *APIStore { return &APIStore{ logger: logger, sandboxes: sandboxes, diff --git a/packages/orchestrator/internal/hyperloopserver/server.go b/packages/orchestrator/internal/hyperloopserver/server.go index ad44b19878..1e6cc158ab 100644 --- a/packages/orchestrator/internal/hyperloopserver/server.go +++ b/packages/orchestrator/internal/hyperloopserver/server.go @@ -19,7 +19,7 @@ import ( const maxUploadLimit = 1 << 28 // 256 MiB -func NewHyperloopServer(ctx context.Context, port uint16, logger *zap.Logger, sandboxes *sandbox.SandboxesMap) (*http.Server, error) { +func NewHyperloopServer(ctx context.Context, port uint16, logger *zap.Logger, sandboxes *sandbox.Map) (*http.Server, error) { sandboxCollectorAddr := env.LogsCollectorAddress() store := handlers.NewHyperloopStore(logger, sandboxes, sandboxCollectorAddr) swagger, err := api.GetSwagger() diff --git a/packages/orchestrator/internal/metrics/sandboxes.go b/packages/orchestrator/internal/metrics/sandboxes.go index 77a32d4feb..bc0bf6aed4 100644 --- a/packages/orchestrator/internal/metrics/sandboxes.go +++ b/packages/orchestrator/internal/metrics/sandboxes.go @@ -50,7 +50,7 @@ type SandboxObserver struct { registration metric.Registration exportInterval time.Duration - sandboxes *sandbox.SandboxesMap + sandboxes *sandbox.Map meter metric.Meter cpuTotal metric.Int64ObservableGauge @@ -61,7 +61,7 @@ type SandboxObserver struct { diskUsed metric.Int64ObservableGauge } -func NewSandboxObserver(ctx context.Context, nodeID, serviceName, serviceCommit, serviceVersion, serviceInstanceID string, sandboxes *sandbox.SandboxesMap) (*SandboxObserver, error) { +func NewSandboxObserver(ctx context.Context, nodeID, serviceName, serviceCommit, serviceVersion, serviceInstanceID string, sandboxes *sandbox.Map) (*SandboxObserver, error) { deltaTemporality := otlpmetricgrpc.WithTemporalitySelector(func(kind sdkmetric.InstrumentKind) metricdata.Temporality { // Use delta temporality for gauges and cumulative for all other instrument kinds. // This is used to prevent reporting sandbox metrics indefinitely. diff --git a/packages/orchestrator/internal/proxy/proxy.go b/packages/orchestrator/internal/proxy/proxy.go index 09270de678..8b783e241c 100644 --- a/packages/orchestrator/internal/proxy/proxy.go +++ b/packages/orchestrator/internal/proxy/proxy.go @@ -28,7 +28,7 @@ type SandboxProxy struct { proxy *reverseproxy.Proxy } -func NewSandboxProxy(meterProvider metric.MeterProvider, port uint16, sandboxes *sandbox.SandboxesMap) (*SandboxProxy, error) { +func NewSandboxProxy(meterProvider metric.MeterProvider, port uint16, sandboxes *sandbox.Map) (*SandboxProxy, error) { proxy := reverseproxy.New( port, idleTimeout, diff --git a/packages/orchestrator/internal/sandbox/container.go b/packages/orchestrator/internal/sandbox/container.go deleted file mode 100644 index c2694b2b82..0000000000 --- a/packages/orchestrator/internal/sandbox/container.go +++ /dev/null @@ -1,45 +0,0 @@ -package sandbox - -import "github.com/e2b-dev/infra/packages/shared/pkg/smap" - -type SandboxesMap struct { - sandboxes *smap.Map[*Sandbox] -} - -func (m *SandboxesMap) Items() map[string]*Sandbox { - return m.sandboxes.Items() -} - -func (m *SandboxesMap) Count() int { - return m.sandboxes.Count() -} - -func (m *SandboxesMap) Get(sandboxID string) (*Sandbox, bool) { - return m.sandboxes.Get(sandboxID) -} - -func (m *SandboxesMap) Insert(sbx *Sandbox) { - m.sandboxes.Insert(sbx.Runtime.SandboxID, sbx) -} - -func (m *SandboxesMap) Remove(sandboxID string) { - m.sandboxes.Remove(sandboxID) -} - -func (m *SandboxesMap) RemoveByExecutionID(sandboxID, executionID string) { - m.sandboxes.RemoveCb(sandboxID, func(_ string, v *Sandbox, exists bool) bool { - if !exists { - return false - } - - if v == nil { - return false - } - - return v.Runtime.ExecutionID == executionID - }) -} - -func NewSandboxesMap() *SandboxesMap { - return &SandboxesMap{sandboxes: smap.New[*Sandbox]()} -} diff --git a/packages/orchestrator/internal/sandbox/map.go b/packages/orchestrator/internal/sandbox/map.go new file mode 100644 index 0000000000..0079a03b79 --- /dev/null +++ b/packages/orchestrator/internal/sandbox/map.go @@ -0,0 +1,75 @@ +package sandbox + +import "github.com/e2b-dev/infra/packages/shared/pkg/smap" + +type MapSubscriber interface { + OnInsert(sandbox *Sandbox) + OnRemove(sandboxID string) +} + +type Map struct { + sandboxes *smap.Map[*Sandbox] + subscribers []MapSubscriber +} + +func (m *Map) Subscribe(subscriber MapSubscriber) { + m.subscribers = append(m.subscribers, subscriber) +} + +func (m *Map) trigger(fn func(MapSubscriber)) { + for _, subscriber := range m.subscribers { + fn(subscriber) + } +} + +func (m *Map) Items() map[string]*Sandbox { + return m.sandboxes.Items() +} + +func (m *Map) Count() int { + return m.sandboxes.Count() +} + +func (m *Map) Get(sandboxID string) (*Sandbox, bool) { + return m.sandboxes.Get(sandboxID) +} + +func (m *Map) Insert(sbx *Sandbox) { + m.sandboxes.Insert(sbx.Runtime.SandboxID, sbx) + + go m.trigger(func(s MapSubscriber) { + s.OnInsert(sbx) + }) +} + +func (m *Map) Remove(sandboxID string) { + m.sandboxes.Remove(sandboxID) + + go m.trigger(func(s MapSubscriber) { + s.OnRemove(sandboxID) + }) +} + +func (m *Map) RemoveByExecutionID(sandboxID, executionID string) { + removed := m.sandboxes.RemoveCb(sandboxID, func(_ string, v *Sandbox, exists bool) bool { + if !exists { + return false + } + + if v == nil { + return false + } + + return v.Runtime.ExecutionID == executionID + }) + + if removed { + go m.trigger(func(s MapSubscriber) { + s.OnRemove(sandboxID) + }) + } +} + +func NewSandboxesMap() *Map { + return &Map{sandboxes: smap.New[*Sandbox]()} +} diff --git a/packages/orchestrator/internal/server/main.go b/packages/orchestrator/internal/server/main.go index 537669b241..c22e78041c 100644 --- a/packages/orchestrator/internal/server/main.go +++ b/packages/orchestrator/internal/server/main.go @@ -28,7 +28,7 @@ type server struct { sandboxFactory *sandbox.Factory info *service.ServiceInfo - sandboxes *sandbox.SandboxesMap + sandboxes *sandbox.Map proxy *proxy.SandboxProxy networkPool *network.Pool templateCache *template.Cache @@ -57,7 +57,7 @@ type ServiceConfig struct { Info *service.ServiceInfo Proxy *proxy.SandboxProxy SandboxFactory *sandbox.Factory - Sandboxes *sandbox.SandboxesMap + Sandboxes *sandbox.Map Persistence storage.StorageProvider FeatureFlags *featureflags.Client SbxEventsService events.EventsService[event.SandboxEvent] diff --git a/packages/orchestrator/internal/service/service_info.go b/packages/orchestrator/internal/service/service_info.go index a42ea150b8..96c34ebeed 100644 --- a/packages/orchestrator/internal/service/service_info.go +++ b/packages/orchestrator/internal/service/service_info.go @@ -17,10 +17,10 @@ type Server struct { orchestratorinfo.UnimplementedInfoServiceServer info *ServiceInfo - sandboxes *sandbox.SandboxesMap + sandboxes *sandbox.Map } -func NewInfoService(_ context.Context, grpc *grpc.Server, info *ServiceInfo, sandboxes *sandbox.SandboxesMap) *Server { +func NewInfoService(_ context.Context, grpc *grpc.Server, info *ServiceInfo, sandboxes *sandbox.Map) *Server { s := &Server{ info: info, sandboxes: sandboxes, diff --git a/packages/orchestrator/internal/template/build/builder.go b/packages/orchestrator/internal/template/build/builder.go index 6e71321506..bcae443095 100644 --- a/packages/orchestrator/internal/template/build/builder.go +++ b/packages/orchestrator/internal/template/build/builder.go @@ -49,7 +49,7 @@ type Builder struct { artifactRegistry artifactsregistry.ArtifactsRegistry dockerhubRepository dockerhub.RemoteRepository proxy *proxy.SandboxProxy - sandboxes *sandbox.SandboxesMap + sandboxes *sandbox.Map templateCache *sbxtemplate.Cache metrics *metrics.BuildMetrics } @@ -62,7 +62,7 @@ func NewBuilder( artifactRegistry artifactsregistry.ArtifactsRegistry, dockerhubRepository dockerhub.RemoteRepository, proxy *proxy.SandboxProxy, - sandboxes *sandbox.SandboxesMap, + sandboxes *sandbox.Map, templateCache *sbxtemplate.Cache, buildMetrics *metrics.BuildMetrics, ) *Builder { diff --git a/packages/orchestrator/internal/template/build/layer/layer_executor.go b/packages/orchestrator/internal/template/build/layer/layer_executor.go index 5c36a8b2f9..9d6c708cff 100644 --- a/packages/orchestrator/internal/template/build/layer/layer_executor.go +++ b/packages/orchestrator/internal/template/build/layer/layer_executor.go @@ -27,7 +27,7 @@ type LayerExecutor struct { templateCache *sbxtemplate.Cache proxy *proxy.SandboxProxy - sandboxes *sandbox.SandboxesMap + sandboxes *sandbox.Map templateStorage storage.StorageProvider buildStorage storage.StorageProvider index cache.Index @@ -38,7 +38,7 @@ func NewLayerExecutor( logger *zap.Logger, templateCache *sbxtemplate.Cache, proxy *proxy.SandboxProxy, - sandboxes *sandbox.SandboxesMap, + sandboxes *sandbox.Map, templateStorage storage.StorageProvider, buildStorage storage.StorageProvider, index cache.Index, diff --git a/packages/orchestrator/internal/template/server/main.go b/packages/orchestrator/internal/template/server/main.go index 058626cc08..7090df60d5 100644 --- a/packages/orchestrator/internal/template/server/main.go +++ b/packages/orchestrator/internal/template/server/main.go @@ -55,7 +55,7 @@ func New( grpc *grpcserver.GRPCServer, sandboxFactory *sandbox.Factory, proxy *proxy.SandboxProxy, - sandboxes *sandbox.SandboxesMap, + sandboxes *sandbox.Map, templateCache *sbxtemplate.Cache, templatePersistence storage.StorageProvider, limiter *limit.Limiter, From fa22f6aeff099f8618913f83b34e9d98eb4d6c50 Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Fri, 10 Oct 2025 10:05:59 -0700 Subject: [PATCH 03/16] add locking to the subs list --- packages/orchestrator/internal/sandbox/map.go | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/packages/orchestrator/internal/sandbox/map.go b/packages/orchestrator/internal/sandbox/map.go index 0079a03b79..ac493a0770 100644 --- a/packages/orchestrator/internal/sandbox/map.go +++ b/packages/orchestrator/internal/sandbox/map.go @@ -1,6 +1,10 @@ package sandbox -import "github.com/e2b-dev/infra/packages/shared/pkg/smap" +import ( + "sync" + + "github.com/e2b-dev/infra/packages/shared/pkg/smap" +) type MapSubscriber interface { OnInsert(sandbox *Sandbox) @@ -8,16 +12,24 @@ type MapSubscriber interface { } type Map struct { - sandboxes *smap.Map[*Sandbox] - subscribers []MapSubscriber + sandboxes *smap.Map[*Sandbox] + + subs []MapSubscriber + subsLock sync.RWMutex } func (m *Map) Subscribe(subscriber MapSubscriber) { - m.subscribers = append(m.subscribers, subscriber) + m.subsLock.Lock() + defer m.subsLock.Unlock() + + m.subs = append(m.subs, subscriber) } func (m *Map) trigger(fn func(MapSubscriber)) { - for _, subscriber := range m.subscribers { + m.subsLock.RLock() + defer m.subsLock.RUnlock() + + for _, subscriber := range m.subs { fn(subscriber) } } From 80ed359d52f8f059196df92e353887d88370e1fd Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Fri, 10 Oct 2025 10:02:23 -0700 Subject: [PATCH 04/16] collect/publish metrics across peers --- packages/orchestrator/internal/cfg/model.go | 27 +- .../orchestrator/internal/metrics/tracker.go | 269 ++++++++++++++++++ .../internal/metrics/tracker_test.go | 196 +++++++++++++ packages/orchestrator/internal/server/main.go | 49 ++-- .../orchestrator/internal/server/sandboxes.go | 34 +-- .../internal/service/service_info.go | 35 +-- packages/orchestrator/main.go | 13 +- 7 files changed, 548 insertions(+), 75 deletions(-) create mode 100644 packages/orchestrator/internal/metrics/tracker.go create mode 100644 packages/orchestrator/internal/metrics/tracker_test.go diff --git a/packages/orchestrator/internal/cfg/model.go b/packages/orchestrator/internal/cfg/model.go index a3a00a74c9..322609acef 100644 --- a/packages/orchestrator/internal/cfg/model.go +++ b/packages/orchestrator/internal/cfg/model.go @@ -1,23 +1,28 @@ package cfg import ( + "time" + "github.com/caarlos0/env/v11" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network" ) type Config struct { - AllowSandboxInternet bool `env:"ALLOW_SANDBOX_INTERNET" envDefault:"true"` - ClickhouseConnectionString string `env:"CLICKHOUSE_CONNECTION_STRING"` - ForceStop bool `env:"FORCE_STOP"` - GRPCPort uint16 `env:"GRPC_PORT" envDefault:"5008"` - LaunchDarklyAPIKey string `env:"LAUNCH_DARKLY_API_KEY"` - OrchestratorBasePath string `env:"ORCHESTRATOR_BASE_PATH" envDefault:"/orchestrator"` - OrchestratorLockPath string `env:"ORCHESTRATOR_LOCK_PATH" envDefault:"/orchestrator.lock"` - ProxyPort uint16 `env:"PROXY_PORT" envDefault:"5007"` - RedisClusterURL string `env:"REDIS_CLUSTER_URL"` - RedisURL string `env:"REDIS_URL"` - Services []string `env:"ORCHESTRATOR_SERVICES" envDefault:"orchestrator"` + AllowSandboxInternet bool `env:"ALLOW_SANDBOX_INTERNET" envDefault:"true"` + ClickhouseConnectionString string `env:"CLICKHOUSE_CONNECTION_STRING"` + ForceStop bool `env:"FORCE_STOP"` + GRPCPort uint16 `env:"GRPC_PORT" envDefault:"5008"` + LaunchDarklyAPIKey string `env:"LAUNCH_DARKLY_API_KEY"` + OrchestratorBasePath string `env:"ORCHESTRATOR_BASE_PATH" envDefault:"/orchestrator"` + OrchestratorLockPath string `env:"ORCHESTRATOR_LOCK_PATH" envDefault:"/orchestrator.lock"` + ProxyPort uint16 `env:"PROXY_PORT" envDefault:"5007"` + RedisClusterURL string `env:"REDIS_CLUSTER_URL"` + RedisURL string `env:"REDIS_URL"` + Services []string `env:"ORCHESTRATOR_SERVICES" envDefault:"orchestrator"` + MetricsDirectory string `env:"METRICS_DIRECTORY" envDefault:"/orchestrator/metrics"` + MetricsWriteInterval time.Duration `env:"METRICS_WRITE_INTERVAL" envDefault:"1m"` + MaxStartingInstances int64 `env:"MAX_STARTING_INSTANCES" envDefault:"3"` NetworkConfig network.Config } diff --git a/packages/orchestrator/internal/metrics/tracker.go b/packages/orchestrator/internal/metrics/tracker.go new file mode 100644 index 0000000000..ca5f0833aa --- /dev/null +++ b/packages/orchestrator/internal/metrics/tracker.go @@ -0,0 +1,269 @@ +package metrics + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "go.uber.org/zap" + "golang.org/x/sync/semaphore" + + "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" + featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" + "github.com/e2b-dev/infra/packages/shared/pkg/smap" + "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" +) + +type Tracker struct { + featureFlags *featureflags.Client + watcher *fsnotify.Watcher + startingSandboxes *semaphore.Weighted + + selfPath string + selfSandboxResources *smap.Map[sandbox.Config] + selfWriteInterval time.Duration + otherMetrics map[int]Allocations + otherLock sync.RWMutex +} + +func (t *Tracker) OnInsert(sandbox *sandbox.Sandbox) { + t.selfSandboxResources.Insert(sandbox.Metadata.Runtime.SandboxID, sandbox.Config) +} + +func (t *Tracker) OnRemove(sandboxID string) { + t.selfSandboxResources.Remove(sandboxID) +} + +func NewTracker(maxStartingInstancesPerNode int64, directory string, selfWriteInterval time.Duration, featureFlags *featureflags.Client) (*Tracker, error) { + filename := fmt.Sprintf("%d.json", os.Getpid()) + selfPath := filepath.Join(directory, filename) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("failed to create watcher: %w", err) + } + + if err = watcher.Add(directory); err != nil { + if err2 := watcher.Close(); err2 != nil { + err = errors.Join(err, fmt.Errorf("failed to close watcher: %w", err2)) + } + return nil, fmt.Errorf("failed to watch %q: %w", directory, err) + } + + return &Tracker{ + featureFlags: featureFlags, + watcher: watcher, + otherMetrics: map[int]Allocations{}, + + selfPath: selfPath, + selfWriteInterval: selfWriteInterval, + selfSandboxResources: smap.New[sandbox.Config](), + startingSandboxes: semaphore.NewWeighted(maxStartingInstancesPerNode), + }, nil +} + +func (t *Tracker) getSelfAllocated() Allocations { + var allocated Allocations + for _, item := range t.selfSandboxResources.Items() { + allocated.VCPUs += uint32(item.Vcpu) + allocated.MemoryBytes += uint64(item.RamMB) * 1024 * 1024 + allocated.DiskBytes += uint64(item.TotalDiskSizeMB) * 1024 * 1024 + allocated.Sandboxes++ + } + return allocated +} + +func (t *Tracker) removeSelfFile() { + if err := os.Remove(t.selfPath); err != nil { + zap.L().Error("Failed to remove self file", zap.Error(err), zap.String("path", t.selfPath)) + } +} + +func (t *Tracker) Run(ctx context.Context) error { + defer t.removeSelfFile() + + writeTicks := time.Tick(t.selfWriteInterval) + + for { + select { + case <-writeTicks: + if err := t.handleWriteSelf(); err != nil { + zap.L().Error("Failed to write allocations", + zap.Error(err), + zap.String("path", t.selfPath)) + } + case <-ctx.Done(): + err := ctx.Err() + if err2 := t.watcher.Close(); err2 != nil { + err = errors.Join(err, fmt.Errorf("failed to close watcher: %w", err2)) + } + return err + case event := <-t.watcher.Events: + switch { + case event.Name == t.selfPath: + // ignore our writes + case event.Has(fsnotify.Write), event.Has(fsnotify.Create): + if err := t.handleOtherWrite(event.Name); err != nil { + zap.L().Error("Failed to handle other write", + zap.Error(err), + zap.String("path", event.Name)) + } + case event.Has(fsnotify.Remove): + if err := t.handleOtherRemove(event.Name); err != nil { + zap.L().Error("Failed to handle other remove", + zap.Error(err), + zap.String("path", event.Name)) + } + } + } + } +} + +func getPIDFromFilename(path string) (int, bool) { + basePath := filepath.Base(path) + dotIndex := strings.Index(basePath, ".") + if dotIndex == -1 { + zap.L().Warn("Ignoring file without extension", zap.String("file", path)) + return 0, false + } + + pidStr := basePath[:dotIndex] + pid, err := strconv.Atoi(pidStr) + if err != nil { + zap.L().Error("Filename is not a number", zap.String("path", path), zap.Error(err)) + return 0, false + } + + return pid, true +} + +func (t *Tracker) handleOtherRemove(name string) error { + pid, ok := getPIDFromFilename(name) + if !ok { + return errInvalidMetricsFilename + } + + t.otherLock.Lock() + defer t.otherLock.Unlock() + + delete(t.otherMetrics, pid) + + return nil +} + +var errInvalidMetricsFilename = errors.New("invalid metrics filename") + +func (t *Tracker) handleOtherWrite(name string) error { + pid, ok := getPIDFromFilename(name) + if !ok { + return errInvalidMetricsFilename + } + + data, err := os.ReadFile(name) + if err != nil { + return fmt.Errorf("failed to read file: %w", err) + } + + var allocations Allocations + if err := json.Unmarshal(data, &allocations); err != nil { + return fmt.Errorf("failed to unmarshal file: %w", err) + } + + t.otherLock.Lock() + defer t.otherLock.Unlock() + + t.otherMetrics[pid] = allocations + + return nil +} + +var ErrTooManyStarting = errors.New("too many starting sandboxes") + +type TooManySandboxesRunningError struct { + Current, Max int +} + +func (t TooManySandboxesRunningError) Error() string { + return fmt.Sprintf("max number of running sandboxes on node reached (%d), please retry", t.Max) +} + +var _ error = TooManySandboxesRunningError{} + +type TooManySandboxesStartingError struct { + Current, Max int +} + +var _ error = TooManySandboxesStartingError{} + +func (t TooManySandboxesStartingError) Error() string { + return fmt.Sprintf("max number of starting sandboxes on node reached (%d), please retry", t.Max) +} + +func (t *Tracker) AcquireStarting(ctx context.Context) error { + maxRunningSandboxesPerNode, err := t.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode) + if err != nil { + zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err)) + } + + runningSandboxes := t.selfSandboxResources.Count() + if runningSandboxes >= maxRunningSandboxesPerNode { + telemetry.ReportEvent(ctx, "max number of running sandboxes reached") + + return TooManySandboxesRunningError{runningSandboxes, maxRunningSandboxesPerNode} + } + + // Check if we've reached the max number of starting instances on this node + acquired := t.startingSandboxes.TryAcquire(1) + if !acquired { + telemetry.ReportEvent(ctx, "too many starting sandboxes on node") + return ErrTooManyStarting + } + + return nil +} + +func (t *Tracker) ReleaseStarting() { + defer t.startingSandboxes.Release(1) +} + +type Allocations struct { + DiskBytes uint64 `json:"disk_bytes"` + MemoryBytes uint64 `json:"memory_bytes"` + Sandboxes uint32 `json:"sandboxes"` + VCPUs uint32 `json:"vcpus"` +} + +func (t *Tracker) TotalAllocated() Allocations { + allocated := t.getSelfAllocated() + + t.otherLock.RLock() + for _, item := range t.otherMetrics { + allocated.VCPUs += item.VCPUs + allocated.MemoryBytes += item.MemoryBytes + allocated.DiskBytes += item.DiskBytes + allocated.Sandboxes += item.Sandboxes + } + t.otherLock.RUnlock() + + return allocated +} + +func (t *Tracker) handleWriteSelf() error { + selfAllocated := t.getSelfAllocated() + data, err := json.Marshal(selfAllocated) + if err != nil { + return fmt.Errorf("failed to marshal allocations: %w", err) + } + if err := os.WriteFile(t.selfPath, data, 0o644); err != nil { + return fmt.Errorf("failed to write allocations: %w", err) + } + return nil +} diff --git a/packages/orchestrator/internal/metrics/tracker_test.go b/packages/orchestrator/internal/metrics/tracker_test.go new file mode 100644 index 0000000000..b6a8b7e4f2 --- /dev/null +++ b/packages/orchestrator/internal/metrics/tracker_test.go @@ -0,0 +1,196 @@ +package metrics + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" + featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" +) + +func TestTrackerRoundTrip(t *testing.T) { + tempDir := t.TempDir() + + flags, err := featureflags.NewClient() + require.NoError(t, err) + + tracker, err := NewTracker(1, tempDir, time.Millisecond*100, flags) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + // start the tracker in the background + go func() { + err := tracker.Run(ctx) + assert.ErrorIs(t, err, context.Canceled) + }() + + // write one file + otherJSON1 := toJSON(t, Allocations{ + DiskBytes: 1 * megabytes, + MemoryBytes: 2 * megabytes, + Sandboxes: 3, + VCPUs: 4, + }) + err = os.WriteFile(filepath.Join(tempDir, "999.json"), otherJSON1, 0o644) + require.NoError(t, err) + + // wait for the watcher to pick up the changes + time.Sleep(time.Millisecond * 100) + + allocated := tracker.TotalAllocated() + assert.Equal(t, Allocations{ + DiskBytes: 1 * megabytes, + MemoryBytes: 2 * megabytes, + Sandboxes: 3, + VCPUs: 4, + }, allocated) + + // write a second file + otherJSON2 := toJSON(t, Allocations{ + DiskBytes: 1 * megabytes, + MemoryBytes: 2 * megabytes, + Sandboxes: 3, + VCPUs: 4, + }) + err = os.WriteFile(filepath.Join(tempDir, "998.json"), otherJSON2, 0o644) + require.NoError(t, err) + + // wait for the watcher to pick up the changes + time.Sleep(time.Millisecond * 100) + + // verify the total is the combination of both json files + allocated = tracker.TotalAllocated() + assert.Equal(t, Allocations{ + DiskBytes: 2 * megabytes, + MemoryBytes: 4 * megabytes, + Sandboxes: 6, + VCPUs: 8, + }, allocated) + + // modify the second file + otherJSON2 = toJSON(t, Allocations{ + DiskBytes: 3 * megabytes, + MemoryBytes: 4 * megabytes, + Sandboxes: 5, + VCPUs: 6, + }) + err = os.WriteFile(filepath.Join(tempDir, "998.json"), otherJSON2, 0o644) + require.NoError(t, err) + + // wait for the watcher to pick up the changes + time.Sleep(time.Millisecond * 100) + + // verify the total is the combination of both json files + allocated = tracker.TotalAllocated() + assert.Equal(t, Allocations{ + DiskBytes: 4 * megabytes, + MemoryBytes: 6 * megabytes, + Sandboxes: 8, + VCPUs: 10, + }, allocated) + + // add a local sandbox + tracker.OnInsert(&sandbox.Sandbox{ + Metadata: &sandbox.Metadata{ + Config: sandbox.Config{ + Vcpu: 1, + RamMB: 2, + TotalDiskSizeMB: 3, + }, + }, + }) + + // wait for the watcher to pick up the changes + time.Sleep(time.Millisecond * 100) + + // verify the total is the combination of both json files + allocated = tracker.TotalAllocated() + assert.Equal(t, Allocations{ + DiskBytes: 7 * megabytes, + MemoryBytes: 8 * megabytes, + Sandboxes: 9, + VCPUs: 11, + }, allocated) + + err = os.Remove(filepath.Join(tempDir, "998.json")) + require.NoError(t, err) + + // wait for the watcher to pick up the changes + time.Sleep(time.Millisecond * 100) + + // ensure metrics are removed + allocated = tracker.TotalAllocated() + assert.Equal(t, Allocations{ + DiskBytes: 4 * megabytes, + MemoryBytes: 4 * megabytes, + Sandboxes: 4, + VCPUs: 5, + }, allocated) + + // ensure the self file has been created + _, err = os.Stat(tracker.selfPath) + require.NoError(t, err) + + cancel() + + time.Sleep(time.Millisecond * 100) + + // ensure the self file has been removed + _, err = os.Stat(tracker.selfPath) + assert.ErrorIs(t, err, os.ErrNotExist) +} + +func TestTracker_handleWriteSelf(t *testing.T) { + tempDir := t.TempDir() + + flags, err := featureflags.NewClient() + require.NoError(t, err) + + tracker, err := NewTracker(1, tempDir, 10*time.Second, flags) + require.NoError(t, err) + + tracker.OnInsert(&sandbox.Sandbox{ + Metadata: &sandbox.Metadata{ + Config: sandbox.Config{ + Vcpu: 1, + RamMB: 2, + TotalDiskSizeMB: 3, + }, + }, + }) + + err = tracker.handleWriteSelf() + require.NoError(t, err) + + data, err := os.ReadFile(tracker.selfPath) + require.NoError(t, err) + + var allocations Allocations + err = json.Unmarshal(data, &allocations) + require.NoError(t, err) + assert.Equal(t, Allocations{ + DiskBytes: 3 * megabytes, + MemoryBytes: 2 * megabytes, + Sandboxes: 1, + VCPUs: 1, + }, allocations) +} + +const megabytes = 1024 * 1024 + +func toJSON[T any](t *testing.T, model T) []byte { + t.Helper() + + data, err := json.Marshal(model) + require.NoError(t, err) + return data +} diff --git a/packages/orchestrator/internal/server/main.go b/packages/orchestrator/internal/server/main.go index c22e78041c..3984d42003 100644 --- a/packages/orchestrator/internal/server/main.go +++ b/packages/orchestrator/internal/server/main.go @@ -6,10 +6,10 @@ import ( "go.opentelemetry.io/otel/metric" "go.uber.org/zap" - "golang.org/x/sync/semaphore" "github.com/e2b-dev/infra/packages/orchestrator/internal/events" "github.com/e2b-dev/infra/packages/orchestrator/internal/grpcserver" + "github.com/e2b-dev/infra/packages/orchestrator/internal/metrics" "github.com/e2b-dev/infra/packages/orchestrator/internal/proxy" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd" @@ -26,18 +26,18 @@ import ( type server struct { orchestrator.UnimplementedSandboxServiceServer - sandboxFactory *sandbox.Factory - info *service.ServiceInfo - sandboxes *sandbox.Map - proxy *proxy.SandboxProxy - networkPool *network.Pool - templateCache *template.Cache - pauseMu sync.Mutex - devicePool *nbd.DevicePool - persistence storage.StorageProvider - featureFlags *featureflags.Client - sbxEventsService events.EventsService[event.SandboxEvent] - startingSandboxes *semaphore.Weighted + metricsTracker *metrics.Tracker + sandboxFactory *sandbox.Factory + info *service.ServiceInfo + sandboxes *sandbox.Map + proxy *proxy.SandboxProxy + networkPool *network.Pool + templateCache *template.Cache + pauseMu sync.Mutex + devicePool *nbd.DevicePool + persistence storage.StorageProvider + featureFlags *featureflags.Client + sbxEventsService events.EventsService[event.SandboxEvent] } type Service struct { @@ -55,6 +55,7 @@ type ServiceConfig struct { DevicePool *nbd.DevicePool TemplateCache *template.Cache Info *service.ServiceInfo + MetricsTracker *metrics.Tracker Proxy *proxy.SandboxProxy SandboxFactory *sandbox.Factory Sandboxes *sandbox.Map @@ -70,17 +71,17 @@ func New(cfg ServiceConfig) *Service { persistence: cfg.Persistence, } srv.server = &server{ - sandboxFactory: cfg.SandboxFactory, - info: cfg.Info, - proxy: srv.proxy, - sandboxes: cfg.Sandboxes, - networkPool: cfg.NetworkPool, - templateCache: cfg.TemplateCache, - devicePool: cfg.DevicePool, - persistence: cfg.Persistence, - featureFlags: cfg.FeatureFlags, - sbxEventsService: cfg.SbxEventsService, - startingSandboxes: semaphore.NewWeighted(maxStartingInstancesPerNode), + sandboxFactory: cfg.SandboxFactory, + info: cfg.Info, + proxy: srv.proxy, + sandboxes: cfg.Sandboxes, + networkPool: cfg.NetworkPool, + templateCache: cfg.TemplateCache, + devicePool: cfg.DevicePool, + persistence: cfg.Persistence, + featureFlags: cfg.FeatureFlags, + sbxEventsService: cfg.SbxEventsService, + metricsTracker: cfg.MetricsTracker, } meter := cfg.Tel.MeterProvider.Meter("orchestrator.sandbox") diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index f37e277eb4..7247169268 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -18,6 +18,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" clickhouse "github.com/e2b-dev/infra/packages/clickhouse/pkg" + "github.com/e2b-dev/infra/packages/orchestrator/internal/metrics" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" "github.com/e2b-dev/infra/packages/shared/pkg/events/event" featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" @@ -67,25 +68,24 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ Build(), ) - maxRunningSandboxesPerNode, err := s.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode) - if err != nil { - zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err)) - } - - runningSandboxes := s.sandboxes.Count() - if runningSandboxes >= maxRunningSandboxesPerNode { - telemetry.ReportEvent(ctx, "max number of running sandboxes reached") - - return nil, status.Errorf(codes.ResourceExhausted, "max number of running sandboxes on node reached (%d), please retry", maxRunningSandboxesPerNode) - } - // Check if we've reached the max number of starting instances on this node - acquired := s.startingSandboxes.TryAcquire(1) - if !acquired { - telemetry.ReportEvent(ctx, "too many starting sandboxes on node") - return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node, please retry") + if err := s.metricsTracker.AcquireStarting(ctx); err != nil { + var tooManyRunning metrics.TooManySandboxesRunningError + var tooManyStarting metrics.TooManySandboxesStartingError + switch { + case errors.As(err, &tooManyRunning): + telemetry.ReportEvent(ctx, "max number of running sandboxes reached") + return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes on node reached (%d>=%d), please retry", tooManyRunning.Current, tooManyRunning.Max) + case errors.As(err, &tooManyStarting): + telemetry.ReportEvent(ctx, "too many starting sandboxes on node") + return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node (%d>=%d, please retry", tooManyStarting.Current, tooManyStarting.Max) + default: + return nil, fmt.Errorf("unexpected error while acquiring starting lock: %w", err) + } } - defer s.startingSandboxes.Release(1) + defer func() { + s.metricsTracker.ReleaseStarting() + }() template, err := s.templateCache.GetTemplate( ctx, diff --git a/packages/orchestrator/internal/service/service_info.go b/packages/orchestrator/internal/service/service_info.go index 96c34ebeed..fccef2d56a 100644 --- a/packages/orchestrator/internal/service/service_info.go +++ b/packages/orchestrator/internal/service/service_info.go @@ -9,21 +9,20 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/e2b-dev/infra/packages/orchestrator/internal/metrics" - "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" orchestratorinfo "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info" ) type Server struct { orchestratorinfo.UnimplementedInfoServiceServer - info *ServiceInfo - sandboxes *sandbox.Map + info *ServiceInfo + tracker *metrics.Tracker } -func NewInfoService(_ context.Context, grpc *grpc.Server, info *ServiceInfo, sandboxes *sandbox.Map) *Server { +func NewInfoService(_ context.Context, grpc *grpc.Server, info *ServiceInfo, tracker *metrics.Tracker) *Server { s := &Server{ - info: info, - sandboxes: sandboxes, + info: info, + tracker: tracker, } orchestratorinfo.RegisterInfoServiceServer(grpc, s) @@ -53,15 +52,7 @@ func (s *Server) ServiceInfo(_ context.Context, _ *emptypb.Empty) (*orchestrator } // Calculate sandbox resource allocation - sandboxVCpuAllocated := uint32(0) - sandboxMemoryAllocated := uint64(0) - sandboxDiskAllocated := uint64(0) - - for _, item := range s.sandboxes.Items() { - sandboxVCpuAllocated += uint32(item.Config.Vcpu) - sandboxMemoryAllocated += uint64(item.Config.RamMB) * 1024 * 1024 - sandboxDiskAllocated += uint64(item.Config.TotalDiskSizeMB) * 1024 * 1024 - } + allocated := s.tracker.TotalAllocated() return &orchestratorinfo.ServiceInfoResponse{ NodeId: info.ClientId, @@ -75,10 +66,10 @@ func (s *Server) ServiceInfo(_ context.Context, _ *emptypb.Empty) (*orchestrator ServiceRoles: info.Roles, // Allocated resources to sandboxes - MetricCpuAllocated: sandboxVCpuAllocated, - MetricMemoryAllocatedBytes: sandboxMemoryAllocated, - MetricDiskAllocatedBytes: sandboxDiskAllocated, - MetricSandboxesRunning: uint32(s.sandboxes.Count()), + MetricCpuAllocated: allocated.VCPUs, + MetricMemoryAllocatedBytes: allocated.MemoryBytes, + MetricDiskAllocatedBytes: allocated.DiskBytes, + MetricSandboxesRunning: allocated.Sandboxes, // Host system usage metrics MetricCpuPercent: uint32(cpuMetrics.UsedPercent), @@ -92,9 +83,9 @@ func (s *Server) ServiceInfo(_ context.Context, _ *emptypb.Empty) (*orchestrator MetricDisks: convertDiskMetrics(diskMetrics), // TODO: Remove when migrated - MetricVcpuUsed: int64(sandboxVCpuAllocated), - MetricMemoryUsedMb: int64(sandboxMemoryAllocated / (1024 * 1024)), - MetricDiskMb: int64(sandboxDiskAllocated / (1024 * 1024)), + MetricVcpuUsed: int64(allocated.VCPUs), + MetricMemoryUsedMb: int64(allocated.MemoryBytes / (1024 * 1024)), + MetricDiskMb: int64(allocated.DiskBytes / (1024 * 1024)), }, nil } diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 6feb6e0705..1709ebd864 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -406,7 +406,18 @@ func run(config cfg.Config) (success bool) { closers = append([]Closeable{tmpl}, closers...) } - service.NewInfoService(ctx, grpcSrv.GRPCServer(), serviceInfo, sandboxes) + metricsTracker, err := metrics.NewTracker(config.MaxStartingInstances, config.MetricsDirectory, config.MetricsWriteInterval, featureFlags) + if err != nil { + zap.L().Fatal("failed to create metrics tracker", zap.Error(err)) + } + sandboxes.Subscribe(metricsTracker) + go func() { + if err := metricsTracker.Run(ctx); err != nil { + zap.L().Error("metrics tracker failed", zap.Error(err)) + } + }() + + service.NewInfoService(ctx, grpcSrv.GRPCServer(), serviceInfo, metricsTracker) g.Go(func() error { zap.L().Info("Starting session proxy") From 2fba68d1da04bd24b2109144c39a1ca15a8a7f28 Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Fri, 10 Oct 2025 10:16:01 -0700 Subject: [PATCH 05/16] use the errgroup.Group to run the metrics tracker --- packages/orchestrator/main.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 1709ebd864..94d0c5fee5 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -411,11 +411,12 @@ func run(config cfg.Config) (success bool) { zap.L().Fatal("failed to create metrics tracker", zap.Error(err)) } sandboxes.Subscribe(metricsTracker) - go func() { + g.Go(func() error { if err := metricsTracker.Run(ctx); err != nil { zap.L().Error("metrics tracker failed", zap.Error(err)) } - }() + return nil + }) service.NewInfoService(ctx, grpcSrv.GRPCServer(), serviceInfo, metricsTracker) From 5bababb45ed21ad81b5cf7cf10c92d62af562d78 Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Fri, 10 Oct 2025 10:44:34 -0700 Subject: [PATCH 06/16] split up the limiter from the tracker --- .../orchestrator/internal/metrics/tracker.go | 72 ++++------------- .../internal/metrics/tracker_test.go | 11 +-- .../orchestrator/internal/server/limiter.go | 81 +++++++++++++++++++ packages/orchestrator/internal/server/main.go | 3 + .../orchestrator/internal/server/sandboxes.go | 14 ++-- packages/orchestrator/main.go | 35 ++++---- 6 files changed, 124 insertions(+), 92 deletions(-) create mode 100644 packages/orchestrator/internal/server/limiter.go diff --git a/packages/orchestrator/internal/metrics/tracker.go b/packages/orchestrator/internal/metrics/tracker.go index ca5f0833aa..e2e5d10f2c 100644 --- a/packages/orchestrator/internal/metrics/tracker.go +++ b/packages/orchestrator/internal/metrics/tracker.go @@ -14,18 +14,13 @@ import ( "github.com/fsnotify/fsnotify" "go.uber.org/zap" - "golang.org/x/sync/semaphore" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" - featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" "github.com/e2b-dev/infra/packages/shared/pkg/smap" - "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) type Tracker struct { - featureFlags *featureflags.Client - watcher *fsnotify.Watcher - startingSandboxes *semaphore.Weighted + watcher *fsnotify.Watcher selfPath string selfSandboxResources *smap.Map[sandbox.Config] @@ -42,7 +37,7 @@ func (t *Tracker) OnRemove(sandboxID string) { t.selfSandboxResources.Remove(sandboxID) } -func NewTracker(maxStartingInstancesPerNode int64, directory string, selfWriteInterval time.Duration, featureFlags *featureflags.Client) (*Tracker, error) { +func NewTracker(directory string, selfWriteInterval time.Duration) (*Tracker, error) { filename := fmt.Sprintf("%d.json", os.Getpid()) selfPath := filepath.Join(directory, filename) @@ -59,17 +54,27 @@ func NewTracker(maxStartingInstancesPerNode int64, directory string, selfWriteIn } return &Tracker{ - featureFlags: featureFlags, watcher: watcher, otherMetrics: map[int]Allocations{}, selfPath: selfPath, selfWriteInterval: selfWriteInterval, selfSandboxResources: smap.New[sandbox.Config](), - startingSandboxes: semaphore.NewWeighted(maxStartingInstancesPerNode), }, nil } +func (t *Tracker) TotalRunningCount() int { + count := t.selfSandboxResources.Count() + + t.otherLock.RLock() + for _, item := range t.otherMetrics { + count += int(item.Sandboxes) + } + t.otherLock.RUnlock() + + return count +} + func (t *Tracker) getSelfAllocated() Allocations { var allocated Allocations for _, item := range t.selfSandboxResources.Items() { @@ -185,55 +190,6 @@ func (t *Tracker) handleOtherWrite(name string) error { return nil } -var ErrTooManyStarting = errors.New("too many starting sandboxes") - -type TooManySandboxesRunningError struct { - Current, Max int -} - -func (t TooManySandboxesRunningError) Error() string { - return fmt.Sprintf("max number of running sandboxes on node reached (%d), please retry", t.Max) -} - -var _ error = TooManySandboxesRunningError{} - -type TooManySandboxesStartingError struct { - Current, Max int -} - -var _ error = TooManySandboxesStartingError{} - -func (t TooManySandboxesStartingError) Error() string { - return fmt.Sprintf("max number of starting sandboxes on node reached (%d), please retry", t.Max) -} - -func (t *Tracker) AcquireStarting(ctx context.Context) error { - maxRunningSandboxesPerNode, err := t.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode) - if err != nil { - zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err)) - } - - runningSandboxes := t.selfSandboxResources.Count() - if runningSandboxes >= maxRunningSandboxesPerNode { - telemetry.ReportEvent(ctx, "max number of running sandboxes reached") - - return TooManySandboxesRunningError{runningSandboxes, maxRunningSandboxesPerNode} - } - - // Check if we've reached the max number of starting instances on this node - acquired := t.startingSandboxes.TryAcquire(1) - if !acquired { - telemetry.ReportEvent(ctx, "too many starting sandboxes on node") - return ErrTooManyStarting - } - - return nil -} - -func (t *Tracker) ReleaseStarting() { - defer t.startingSandboxes.Release(1) -} - type Allocations struct { DiskBytes uint64 `json:"disk_bytes"` MemoryBytes uint64 `json:"memory_bytes"` diff --git a/packages/orchestrator/internal/metrics/tracker_test.go b/packages/orchestrator/internal/metrics/tracker_test.go index b6a8b7e4f2..7516af40fe 100644 --- a/packages/orchestrator/internal/metrics/tracker_test.go +++ b/packages/orchestrator/internal/metrics/tracker_test.go @@ -12,16 +12,12 @@ import ( "github.com/stretchr/testify/require" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" - featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" ) func TestTrackerRoundTrip(t *testing.T) { tempDir := t.TempDir() - flags, err := featureflags.NewClient() - require.NoError(t, err) - - tracker, err := NewTracker(1, tempDir, time.Millisecond*100, flags) + tracker, err := NewTracker(tempDir, time.Millisecond*100) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -152,10 +148,7 @@ func TestTrackerRoundTrip(t *testing.T) { func TestTracker_handleWriteSelf(t *testing.T) { tempDir := t.TempDir() - flags, err := featureflags.NewClient() - require.NoError(t, err) - - tracker, err := NewTracker(1, tempDir, 10*time.Second, flags) + tracker, err := NewTracker(tempDir, 10*time.Second) require.NoError(t, err) tracker.OnInsert(&sandbox.Sandbox{ diff --git a/packages/orchestrator/internal/server/limiter.go b/packages/orchestrator/internal/server/limiter.go new file mode 100644 index 0000000000..ab7e1d2ea6 --- /dev/null +++ b/packages/orchestrator/internal/server/limiter.go @@ -0,0 +1,81 @@ +package server + +import ( + "context" + "errors" + "fmt" + + "go.uber.org/zap" + "golang.org/x/sync/semaphore" + + "github.com/e2b-dev/infra/packages/orchestrator/internal/metrics" + featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" + "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" +) + +type Limiter struct { + featureFlags *featureflags.Client + startingSandboxes *semaphore.Weighted + metricsTracker *metrics.Tracker +} + +func NewLimiter( + maxStartingSandboxes int64, + featureFlags *featureflags.Client, + metricsTracker *metrics.Tracker, +) *Limiter { + return &Limiter{ + featureFlags: featureFlags, + metricsTracker: metricsTracker, + startingSandboxes: semaphore.NewWeighted(maxStartingSandboxes), + } +} + +var ErrTooManyStarting = errors.New("too many starting sandboxes") + +type TooManySandboxesRunningError struct { + Current, Max int +} + +func (t TooManySandboxesRunningError) Error() string { + return fmt.Sprintf("max number of running sandboxes on node reached (%d), please retry", t.Max) +} + +var _ error = TooManySandboxesRunningError{} + +type TooManySandboxesStartingError struct { + Current, Max int +} + +var _ error = TooManySandboxesStartingError{} + +func (t TooManySandboxesStartingError) Error() string { + return fmt.Sprintf("max number of starting sandboxes on node reached (%d), please retry", t.Max) +} + +func (t *Limiter) AcquireStarting(ctx context.Context) error { + maxRunningSandboxesPerNode, err := t.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode) + if err != nil { + zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err)) + } + + runningSandboxes := t.metricsTracker.TotalRunningCount() + if runningSandboxes >= maxRunningSandboxesPerNode { + telemetry.ReportEvent(ctx, "max number of running sandboxes reached") + + return TooManySandboxesRunningError{runningSandboxes, maxRunningSandboxesPerNode} + } + + // Check if we've reached the max number of starting instances on this node + acquired := t.startingSandboxes.TryAcquire(1) + if !acquired { + telemetry.ReportEvent(ctx, "too many starting sandboxes on node") + return ErrTooManyStarting + } + + return nil +} + +func (t *Limiter) ReleaseStarting() { + defer t.startingSandboxes.Release(1) +} diff --git a/packages/orchestrator/internal/server/main.go b/packages/orchestrator/internal/server/main.go index 3984d42003..48bd1fc12c 100644 --- a/packages/orchestrator/internal/server/main.go +++ b/packages/orchestrator/internal/server/main.go @@ -26,6 +26,7 @@ import ( type server struct { orchestrator.UnimplementedSandboxServiceServer + limiter *Limiter metricsTracker *metrics.Tracker sandboxFactory *sandbox.Factory info *service.ServiceInfo @@ -62,6 +63,7 @@ type ServiceConfig struct { Persistence storage.StorageProvider FeatureFlags *featureflags.Client SbxEventsService events.EventsService[event.SandboxEvent] + Limiter *Limiter } func New(cfg ServiceConfig) *Service { @@ -82,6 +84,7 @@ func New(cfg ServiceConfig) *Service { featureFlags: cfg.FeatureFlags, sbxEventsService: cfg.SbxEventsService, metricsTracker: cfg.MetricsTracker, + limiter: cfg.Limiter, } meter := cfg.Tel.MeterProvider.Meter("orchestrator.sandbox") diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index 7247169268..6fd389a6df 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -18,7 +18,6 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" clickhouse "github.com/e2b-dev/infra/packages/clickhouse/pkg" - "github.com/e2b-dev/infra/packages/orchestrator/internal/metrics" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" "github.com/e2b-dev/infra/packages/shared/pkg/events/event" featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" @@ -32,10 +31,7 @@ import ( var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/internal/server") -const ( - requestTimeout = 60 * time.Second - maxStartingInstancesPerNode = 3 -) +const requestTimeout = 60 * time.Second func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequest) (*orchestrator.SandboxCreateResponse, error) { // set max request timeout for this request @@ -69,9 +65,9 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ ) // Check if we've reached the max number of starting instances on this node - if err := s.metricsTracker.AcquireStarting(ctx); err != nil { - var tooManyRunning metrics.TooManySandboxesRunningError - var tooManyStarting metrics.TooManySandboxesStartingError + if err := s.limiter.AcquireStarting(ctx); err != nil { + var tooManyRunning TooManySandboxesRunningError + var tooManyStarting TooManySandboxesStartingError switch { case errors.As(err, &tooManyRunning): telemetry.ReportEvent(ctx, "max number of running sandboxes reached") @@ -84,7 +80,7 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ } } defer func() { - s.metricsTracker.ReleaseStarting() + s.limiter.ReleaseStarting() }() template, err := s.templateCache.GetTemplate( diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 94d0c5fee5..39d6baedb1 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -229,12 +229,12 @@ func run(config cfg.Config) (success bool) { zap.L().Fatal("failed to create feature flags client", zap.Error(err)) } - limiter, err := limit.New(ctx, featureFlags) + googleStorageLimiter, err := limit.New(ctx, featureFlags) if err != nil { zap.L().Fatal("failed to create limiter", zap.Error(err)) } - persistence, err := storage.GetTemplateStorageProvider(ctx, limiter) + persistence, err := storage.GetTemplateStorageProvider(ctx, googleStorageLimiter) if err != nil { zap.L().Fatal("failed to create template storage provider", zap.Error(err)) } @@ -334,6 +334,20 @@ func run(config cfg.Config) (success bool) { sandboxFactory := sandbox.NewFactory(networkPool, devicePool, featureFlags, defaultAllowSandboxInternet) + metricsTracker, err := metrics.NewTracker(config.MetricsDirectory, config.MetricsWriteInterval) + if err != nil { + zap.L().Fatal("failed to create metrics tracker", zap.Error(err)) + } + sandboxes.Subscribe(metricsTracker) + g.Go(func() error { + if err := metricsTracker.Run(ctx); err != nil { + zap.L().Error("metrics tracker failed", zap.Error(err)) + } + return nil + }) + + limiter := server.NewLimiter(config.MaxStartingInstances, featureFlags, metricsTracker) + server.New(server.ServiceConfig{ SandboxFactory: sandboxFactory, GRPC: grpcSrv, @@ -347,6 +361,7 @@ func run(config cfg.Config) (success bool) { Persistence: persistence, FeatureFlags: featureFlags, SbxEventsService: sbxEventsService, + Limiter: limiter, }) tmplSbxLoggerExternal := sbxlogger.NewLogger( @@ -379,7 +394,7 @@ func run(config cfg.Config) (success bool) { sandboxProxy, featureFlags, sandboxObserver, - limiter, + googleStorageLimiter, sandboxEventBatcher, ) @@ -396,7 +411,7 @@ func run(config cfg.Config) (success bool) { sandboxes, templateCache, persistence, - limiter, + googleStorageLimiter, serviceInfo, ) if err != nil { @@ -406,18 +421,6 @@ func run(config cfg.Config) (success bool) { closers = append([]Closeable{tmpl}, closers...) } - metricsTracker, err := metrics.NewTracker(config.MaxStartingInstances, config.MetricsDirectory, config.MetricsWriteInterval, featureFlags) - if err != nil { - zap.L().Fatal("failed to create metrics tracker", zap.Error(err)) - } - sandboxes.Subscribe(metricsTracker) - g.Go(func() error { - if err := metricsTracker.Run(ctx); err != nil { - zap.L().Error("metrics tracker failed", zap.Error(err)) - } - return nil - }) - service.NewInfoService(ctx, grpcSrv.GRPCServer(), serviceInfo, metricsTracker) g.Go(func() error { From 753a5d5d239766c68367c33aa2b0d92dfd1d94c4 Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Fri, 10 Oct 2025 10:49:01 -0700 Subject: [PATCH 07/16] make tidy --- packages/orchestrator/go.mod | 1 + packages/orchestrator/go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/packages/orchestrator/go.mod b/packages/orchestrator/go.mod index 616f3d68bc..02e6631fa2 100644 --- a/packages/orchestrator/go.mod +++ b/packages/orchestrator/go.mod @@ -27,6 +27,7 @@ require ( github.com/e2b-dev/infra/packages/shared v0.0.0 github.com/edsrzf/mmap-go v1.2.0 github.com/firecracker-microvm/firecracker-go-sdk v1.0.0 + github.com/fsnotify/fsnotify v1.9.0 github.com/gin-contrib/size v1.0.2 github.com/gin-gonic/gin v1.10.1 github.com/go-openapi/strfmt v0.23.0 diff --git a/packages/orchestrator/go.sum b/packages/orchestrator/go.sum index 2bd38097b5..8dcf3299e3 100644 --- a/packages/orchestrator/go.sum +++ b/packages/orchestrator/go.sum @@ -425,6 +425,8 @@ github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoD github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA= github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= From 8d31aa56c84f14c9ae9c1c7f3e3052774b25cf83 Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Fri, 10 Oct 2025 11:30:19 -0700 Subject: [PATCH 08/16] shrink the tracker a bit --- .../orchestrator/internal/metrics/tracker.go | 89 ++++++++++++------- .../internal/metrics/tracker_test.go | 49 +++++++--- packages/orchestrator/main.go | 4 +- 3 files changed, 94 insertions(+), 48 deletions(-) diff --git a/packages/orchestrator/internal/metrics/tracker.go b/packages/orchestrator/internal/metrics/tracker.go index e2e5d10f2c..81aab7a0bf 100644 --- a/packages/orchestrator/internal/metrics/tracker.go +++ b/packages/orchestrator/internal/metrics/tracker.go @@ -20,9 +20,6 @@ import ( ) type Tracker struct { - watcher *fsnotify.Watcher - - selfPath string selfSandboxResources *smap.Map[sandbox.Config] selfWriteInterval time.Duration otherMetrics map[int]Allocations @@ -37,27 +34,10 @@ func (t *Tracker) OnRemove(sandboxID string) { t.selfSandboxResources.Remove(sandboxID) } -func NewTracker(directory string, selfWriteInterval time.Duration) (*Tracker, error) { - filename := fmt.Sprintf("%d.json", os.Getpid()) - selfPath := filepath.Join(directory, filename) - - watcher, err := fsnotify.NewWatcher() - if err != nil { - return nil, fmt.Errorf("failed to create watcher: %w", err) - } - - if err = watcher.Add(directory); err != nil { - if err2 := watcher.Close(); err2 != nil { - err = errors.Join(err, fmt.Errorf("failed to close watcher: %w", err2)) - } - return nil, fmt.Errorf("failed to watch %q: %w", directory, err) - } - +func NewTracker(selfWriteInterval time.Duration) (*Tracker, error) { return &Tracker{ - watcher: watcher, otherMetrics: map[int]Allocations{}, - selfPath: selfPath, selfWriteInterval: selfWriteInterval, selfSandboxResources: smap.New[sandbox.Config](), }, nil @@ -86,34 +66,77 @@ func (t *Tracker) getSelfAllocated() Allocations { return allocated } -func (t *Tracker) removeSelfFile() { - if err := os.Remove(t.selfPath); err != nil { - zap.L().Error("Failed to remove self file", zap.Error(err), zap.String("path", t.selfPath)) +func (t *Tracker) removeSelfFile(path string) { + if err := os.Remove(path); err != nil { + zap.L().Error("Failed to remove self file", zap.Error(err), zap.String("path", path)) } } -func (t *Tracker) Run(ctx context.Context) error { - defer t.removeSelfFile() +func (t *Tracker) makeSelfPath(directory string) string { + filename := fmt.Sprintf("%d.json", os.Getpid()) + selfPath := filepath.Join(directory, filename) + return selfPath +} +func (t *Tracker) Run(ctx context.Context, directory string) error { + if err := os.MkdirAll(directory, 0o777); err != nil { + return fmt.Errorf("failed to create directory: %w", err) + } + + // set up the self file + selfPath := t.makeSelfPath(directory) + defer t.removeSelfFile(selfPath) writeTicks := time.Tick(t.selfWriteInterval) + // set up the file watcher + watcher, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("failed to create watcher: %w", err) + } + if err = watcher.Add(directory); err != nil { + if err2 := watcher.Close(); err2 != nil { + err = errors.Join(err, fmt.Errorf("failed to close watcher: %w", err2)) + } + return fmt.Errorf("failed to watch %q: %w", directory, err) + } + + // read existing files + fullPaths, err := filepath.Glob(filepath.Join(directory, "*.json")) + if err != nil { + return fmt.Errorf("failed to read directory: %w", err) + } + for _, fullPath := range fullPaths { + // fullPath := filepath.Join(directory, fullPath) + if err = t.handleOtherWrite(fullPath); err != nil { + zap.L().Error("Failed to handle other write", + zap.Error(err), + zap.String("path", fullPath)) + } + } + + // main loop + // 1. read allocations from other processes + // 2. write our allocations to a file + // 3. return when context is canceled for { select { case <-writeTicks: - if err := t.handleWriteSelf(); err != nil { + if err := t.handleWriteSelf(selfPath); err != nil { zap.L().Error("Failed to write allocations", zap.Error(err), - zap.String("path", t.selfPath)) + zap.String("path", selfPath)) } case <-ctx.Done(): err := ctx.Err() - if err2 := t.watcher.Close(); err2 != nil { + if err2 := watcher.Close(); err2 != nil { err = errors.Join(err, fmt.Errorf("failed to close watcher: %w", err2)) } return err - case event := <-t.watcher.Events: + case event := <-watcher.Events: switch { - case event.Name == t.selfPath: + default: + zap.L().Warn("Unknown event", zap.Any("event", event)) + case event.Name == selfPath: // ignore our writes case event.Has(fsnotify.Write), event.Has(fsnotify.Create): if err := t.handleOtherWrite(event.Name); err != nil { @@ -212,13 +235,13 @@ func (t *Tracker) TotalAllocated() Allocations { return allocated } -func (t *Tracker) handleWriteSelf() error { +func (t *Tracker) handleWriteSelf(selfPath string) error { selfAllocated := t.getSelfAllocated() data, err := json.Marshal(selfAllocated) if err != nil { return fmt.Errorf("failed to marshal allocations: %w", err) } - if err := os.WriteFile(t.selfPath, data, 0o644); err != nil { + if err := os.WriteFile(selfPath, data, 0o644); err != nil { return fmt.Errorf("failed to write allocations: %w", err) } return nil diff --git a/packages/orchestrator/internal/metrics/tracker_test.go b/packages/orchestrator/internal/metrics/tracker_test.go index 7516af40fe..48ef3a4800 100644 --- a/packages/orchestrator/internal/metrics/tracker_test.go +++ b/packages/orchestrator/internal/metrics/tracker_test.go @@ -14,18 +14,37 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" ) +func TestTracker_CreateMissingDirectory(t *testing.T) { + tempDir := t.TempDir() + metricsDir := filepath.Join(tempDir, "metrics") + + selfWriteInterval := time.Millisecond * 100 + + tracker, err := NewTracker(selfWriteInterval) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(t.Context(), selfWriteInterval*2) + t.Cleanup(cancel) + + // run the tracker for a bit + err = tracker.Run(ctx, metricsDir) + require.ErrorIs(t, err, context.DeadlineExceeded) +} + func TestTrackerRoundTrip(t *testing.T) { tempDir := t.TempDir() - tracker, err := NewTracker(tempDir, time.Millisecond*100) + os.WriteFile(filepath.Join(tempDir, "990.json"), []byte(`{"diskBytes": 0, "memoryBytes": 0, "sandboxes": 0, "vcpus": 1}`), 0o644) + + tracker, err := NewTracker(time.Millisecond * 100) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) - defer cancel() + t.Cleanup(cancel) // start the tracker in the background go func() { - err := tracker.Run(ctx) + err := tracker.Run(ctx, tempDir) assert.ErrorIs(t, err, context.Canceled) }() @@ -47,7 +66,7 @@ func TestTrackerRoundTrip(t *testing.T) { DiskBytes: 1 * megabytes, MemoryBytes: 2 * megabytes, Sandboxes: 3, - VCPUs: 4, + VCPUs: 5, }, allocated) // write a second file @@ -69,7 +88,7 @@ func TestTrackerRoundTrip(t *testing.T) { DiskBytes: 2 * megabytes, MemoryBytes: 4 * megabytes, Sandboxes: 6, - VCPUs: 8, + VCPUs: 9, }, allocated) // modify the second file @@ -91,7 +110,7 @@ func TestTrackerRoundTrip(t *testing.T) { DiskBytes: 4 * megabytes, MemoryBytes: 6 * megabytes, Sandboxes: 8, - VCPUs: 10, + VCPUs: 11, }, allocated) // add a local sandbox @@ -114,7 +133,7 @@ func TestTrackerRoundTrip(t *testing.T) { DiskBytes: 7 * megabytes, MemoryBytes: 8 * megabytes, Sandboxes: 9, - VCPUs: 11, + VCPUs: 12, }, allocated) err = os.Remove(filepath.Join(tempDir, "998.json")) @@ -129,11 +148,13 @@ func TestTrackerRoundTrip(t *testing.T) { DiskBytes: 4 * megabytes, MemoryBytes: 4 * megabytes, Sandboxes: 4, - VCPUs: 5, + VCPUs: 6, }, allocated) + selfPath := tracker.makeSelfPath(tempDir) + // ensure the self file has been created - _, err = os.Stat(tracker.selfPath) + _, err = os.Stat(selfPath) require.NoError(t, err) cancel() @@ -141,14 +162,14 @@ func TestTrackerRoundTrip(t *testing.T) { time.Sleep(time.Millisecond * 100) // ensure the self file has been removed - _, err = os.Stat(tracker.selfPath) + _, err = os.Stat(selfPath) assert.ErrorIs(t, err, os.ErrNotExist) } func TestTracker_handleWriteSelf(t *testing.T) { tempDir := t.TempDir() - tracker, err := NewTracker(tempDir, 10*time.Second) + tracker, err := NewTracker(10 * time.Second) require.NoError(t, err) tracker.OnInsert(&sandbox.Sandbox{ @@ -161,10 +182,12 @@ func TestTracker_handleWriteSelf(t *testing.T) { }, }) - err = tracker.handleWriteSelf() + selfPath := tracker.makeSelfPath(tempDir) + + err = tracker.handleWriteSelf(selfPath) require.NoError(t, err) - data, err := os.ReadFile(tracker.selfPath) + data, err := os.ReadFile(selfPath) require.NoError(t, err) var allocations Allocations diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 39d6baedb1..e3919f431a 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -334,13 +334,13 @@ func run(config cfg.Config) (success bool) { sandboxFactory := sandbox.NewFactory(networkPool, devicePool, featureFlags, defaultAllowSandboxInternet) - metricsTracker, err := metrics.NewTracker(config.MetricsDirectory, config.MetricsWriteInterval) + metricsTracker, err := metrics.NewTracker(config.MetricsWriteInterval) if err != nil { zap.L().Fatal("failed to create metrics tracker", zap.Error(err)) } sandboxes.Subscribe(metricsTracker) g.Go(func() error { - if err := metricsTracker.Run(ctx); err != nil { + if err := metricsTracker.Run(ctx, config.MetricsDirectory); err != nil { zap.L().Error("metrics tracker failed", zap.Error(err)) } return nil From e8db442d4ed6b3704aac2b0948c793d16a8bfae5 Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Wed, 15 Oct 2025 16:56:16 -0700 Subject: [PATCH 09/16] rename some variables and fields --- packages/orchestrator/internal/server/main.go | 6 +++--- packages/orchestrator/internal/server/sandboxes.go | 4 ++-- packages/orchestrator/main.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/orchestrator/internal/server/main.go b/packages/orchestrator/internal/server/main.go index ebed8549e9..3c8731b9b4 100644 --- a/packages/orchestrator/internal/server/main.go +++ b/packages/orchestrator/internal/server/main.go @@ -24,7 +24,7 @@ import ( type Server struct { orchestrator.UnimplementedSandboxServiceServer - limiter *Limiter + sandboxLimiter *Limiter sandboxFactory *sandbox.Factory info *service.ServiceInfo sandboxes *sandbox.Map @@ -50,7 +50,7 @@ type ServiceConfig struct { Persistence storage.StorageProvider FeatureFlags *featureflags.Client SbxEventsService events.EventsService[event.SandboxEvent] - Limiter *Limiter + SandboxLimiter *Limiter } func New(cfg ServiceConfig) *Server { @@ -65,7 +65,7 @@ func New(cfg ServiceConfig) *Server { persistence: cfg.Persistence, featureFlags: cfg.FeatureFlags, sbxEventsService: cfg.SbxEventsService, - limiter: cfg.Limiter, + sandboxLimiter: cfg.SandboxLimiter, } meter := cfg.Tel.MeterProvider.Meter("orchestrator.sandbox") diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index 1fa586882f..c5625d1cdc 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -65,7 +65,7 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ ) // Check if we've reached the max number of starting instances on this node - if err := s.limiter.AcquireStarting(ctx); err != nil { + if err := s.sandboxLimiter.AcquireStarting(ctx); err != nil { var tooManyRunning TooManySandboxesRunningError var tooManyStarting TooManySandboxesStartingError switch { @@ -80,7 +80,7 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ } } defer func() { - s.limiter.ReleaseStarting() + s.sandboxLimiter.ReleaseStarting() }() template, err := s.templateCache.GetTemplate( diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 04c5af8b3e..34bca5ba61 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -363,7 +363,7 @@ func run(config cfg.Config) (success bool) { return nil }) - limiter := server.NewLimiter(config.MaxStartingInstances, featureFlags, metricsTracker) + sandboxLimiter := server.NewLimiter(config.MaxStartingInstances, featureFlags, metricsTracker) orchestratorService := server.New(server.ServiceConfig{ SandboxFactory: sandboxFactory, @@ -377,7 +377,7 @@ func run(config cfg.Config) (success bool) { Persistence: persistence, FeatureFlags: featureFlags, SbxEventsService: sbxEventsService, - Limiter: limiter, + SandboxLimiter: sandboxLimiter, }) // template manager sandbox logger From ec9c0d2e1dbc7379acceb1f2446626103909256a Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Thu, 16 Oct 2025 17:52:44 -0700 Subject: [PATCH 10/16] remove obsolete error --- packages/orchestrator/internal/server/limiter.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/orchestrator/internal/server/limiter.go b/packages/orchestrator/internal/server/limiter.go index ab7e1d2ea6..749214877c 100644 --- a/packages/orchestrator/internal/server/limiter.go +++ b/packages/orchestrator/internal/server/limiter.go @@ -2,7 +2,6 @@ package server import ( "context" - "errors" "fmt" "go.uber.org/zap" @@ -31,8 +30,6 @@ func NewLimiter( } } -var ErrTooManyStarting = errors.New("too many starting sandboxes") - type TooManySandboxesRunningError struct { Current, Max int } @@ -70,7 +67,7 @@ func (t *Limiter) AcquireStarting(ctx context.Context) error { acquired := t.startingSandboxes.TryAcquire(1) if !acquired { telemetry.ReportEvent(ctx, "too many starting sandboxes on node") - return ErrTooManyStarting + return TooManySandboxesStartingError{runningSandboxes, maxRunningSandboxesPerNode} } return nil From 94882c5737322bf69c2ad4079391b92fcaedbcdb Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Thu, 16 Oct 2025 18:02:27 -0700 Subject: [PATCH 11/16] clean up error --- packages/orchestrator/internal/server/limiter.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/orchestrator/internal/server/limiter.go b/packages/orchestrator/internal/server/limiter.go index 749214877c..a19e6fe8ab 100644 --- a/packages/orchestrator/internal/server/limiter.go +++ b/packages/orchestrator/internal/server/limiter.go @@ -13,6 +13,8 @@ import ( ) type Limiter struct { + maxStartingSandboxes int64 + featureFlags *featureflags.Client startingSandboxes *semaphore.Weighted metricsTracker *metrics.Tracker @@ -24,9 +26,10 @@ func NewLimiter( metricsTracker *metrics.Tracker, ) *Limiter { return &Limiter{ - featureFlags: featureFlags, - metricsTracker: metricsTracker, - startingSandboxes: semaphore.NewWeighted(maxStartingSandboxes), + featureFlags: featureFlags, + metricsTracker: metricsTracker, + maxStartingSandboxes: maxStartingSandboxes, + startingSandboxes: semaphore.NewWeighted(maxStartingSandboxes), } } @@ -41,7 +44,7 @@ func (t TooManySandboxesRunningError) Error() string { var _ error = TooManySandboxesRunningError{} type TooManySandboxesStartingError struct { - Current, Max int + Max int64 } var _ error = TooManySandboxesStartingError{} @@ -67,7 +70,7 @@ func (t *Limiter) AcquireStarting(ctx context.Context) error { acquired := t.startingSandboxes.TryAcquire(1) if !acquired { telemetry.ReportEvent(ctx, "too many starting sandboxes on node") - return TooManySandboxesStartingError{runningSandboxes, maxRunningSandboxesPerNode} + return TooManySandboxesStartingError{t.maxStartingSandboxes} } return nil From fb54ef7f9333e02a73b1f90e97163b13c6f3eec0 Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Thu, 16 Oct 2025 18:13:28 -0700 Subject: [PATCH 12/16] fix error message --- packages/orchestrator/internal/server/sandboxes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index c5625d1cdc..1ba9f8b1cb 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -74,7 +74,7 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes on node reached (%d>=%d), please retry", tooManyRunning.Current, tooManyRunning.Max) case errors.As(err, &tooManyStarting): telemetry.ReportEvent(ctx, "too many starting sandboxes on node") - return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node (%d>=%d, please retry", tooManyStarting.Current, tooManyStarting.Max) + return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node (%d), please retry", tooManyStarting.Max) default: return nil, fmt.Errorf("unexpected error while acquiring starting lock: %w", err) } From d5ab23445f21a81cd5858d222318a13d6e2db5c5 Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Mon, 20 Oct 2025 19:05:32 -0700 Subject: [PATCH 13/16] bring back new error message --- packages/orchestrator/internal/server/sandboxes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index 5fd2073880..82c7790aa4 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -74,7 +74,7 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes on node reached (%d>=%d), please retry", tooManyRunning.Current, tooManyRunning.Max) case errors.As(err, &tooManyStarting): telemetry.ReportEvent(ctx, "too many starting sandboxes on node") - return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node (%d), please retry", tooManyStarting.Max) + return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node, please retry") default: return nil, fmt.Errorf("unexpected error while acquiring starting lock: %w", err) } From f7d02fc4d3e82c8e18f62b88c56339aa767962af Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Mon, 20 Oct 2025 19:11:17 -0700 Subject: [PATCH 14/16] linting --- packages/api/internal/handlers/template_build_status.go | 1 + packages/api/internal/sandbox/store/memory/reservation.go | 4 ++++ .../api/internal/sandbox/store/memory/reservation_test.go | 1 + packages/orchestrator/internal/metrics/tracker.go | 7 +++++++ packages/orchestrator/internal/metrics/tracker_test.go | 1 + packages/orchestrator/internal/server/limiter.go | 1 + packages/orchestrator/internal/server/sandboxes.go | 2 ++ .../internal/template/build/commands/copy_test.go | 3 +++ .../internal/template/metadata/template_metadata.go | 1 + packages/orchestrator/main.go | 1 + 10 files changed, 22 insertions(+) diff --git a/packages/api/internal/handlers/template_build_status.go b/packages/api/internal/handlers/template_build_status.go index 69e5e0a81b..212b2740af 100644 --- a/packages/api/internal/handlers/template_build_status.go +++ b/packages/api/internal/handlers/template_build_status.go @@ -109,6 +109,7 @@ func (a *APIStore) GetTemplatesTemplateIDBuildsBuildIDStatus(c *gin.Context, tem if err != nil { telemetry.ReportError(ctx, "error when comparing versions", err, telemetry.WithTemplateID(templateID), telemetry.WithBuildID(buildID)) a.sendAPIStoreError(c, http.StatusInternalServerError, "Error when processing build logs") + return } diff --git a/packages/api/internal/sandbox/store/memory/reservation.go b/packages/api/internal/sandbox/store/memory/reservation.go index ea9ce2697f..bbef59f18c 100644 --- a/packages/api/internal/sandbox/store/memory/reservation.go +++ b/packages/api/internal/sandbox/store/memory/reservation.go @@ -46,16 +46,19 @@ func (s *ReservationStorage) Reserve(teamID, sandboxID string, limit int64) (fin if sbx, ok := teamSandboxes[sandboxID]; ok { alreadyPresent = true startResult = sbx.start + return teamSandboxes } if limit >= 0 && len(teamSandboxes) >= int(limit) { limitExceeded = true + return teamSandboxes } startResult = utils.NewSetOnce[sandbox.Sandbox]() teamSandboxes[sandboxID] = newSandboxReservation(startResult) + return teamSandboxes }) @@ -87,6 +90,7 @@ func (s *ReservationStorage) Remove(teamID, sandboxID string) { } delete(ts, sandboxID) + return len(ts) == 0 }) } diff --git a/packages/api/internal/sandbox/store/memory/reservation_test.go b/packages/api/internal/sandbox/store/memory/reservation_test.go index f0a5bde588..9c487e2fdd 100644 --- a/packages/api/internal/sandbox/store/memory/reservation_test.go +++ b/packages/api/internal/sandbox/store/memory/reservation_test.go @@ -411,6 +411,7 @@ func TestReservation_ConcurrentWaitAndFinish(t *testing.T) { } waiters[i] = waitForStart + return nil }) } diff --git a/packages/orchestrator/internal/metrics/tracker.go b/packages/orchestrator/internal/metrics/tracker.go index 81aab7a0bf..a775d5a5c1 100644 --- a/packages/orchestrator/internal/metrics/tracker.go +++ b/packages/orchestrator/internal/metrics/tracker.go @@ -63,6 +63,7 @@ func (t *Tracker) getSelfAllocated() Allocations { allocated.DiskBytes += uint64(item.TotalDiskSizeMB) * 1024 * 1024 allocated.Sandboxes++ } + return allocated } @@ -75,6 +76,7 @@ func (t *Tracker) removeSelfFile(path string) { func (t *Tracker) makeSelfPath(directory string) string { filename := fmt.Sprintf("%d.json", os.Getpid()) selfPath := filepath.Join(directory, filename) + return selfPath } @@ -97,6 +99,7 @@ func (t *Tracker) Run(ctx context.Context, directory string) error { if err2 := watcher.Close(); err2 != nil { err = errors.Join(err, fmt.Errorf("failed to close watcher: %w", err2)) } + return fmt.Errorf("failed to watch %q: %w", directory, err) } @@ -131,6 +134,7 @@ func (t *Tracker) Run(ctx context.Context, directory string) error { if err2 := watcher.Close(); err2 != nil { err = errors.Join(err, fmt.Errorf("failed to close watcher: %w", err2)) } + return err case event := <-watcher.Events: switch { @@ -160,6 +164,7 @@ func getPIDFromFilename(path string) (int, bool) { dotIndex := strings.Index(basePath, ".") if dotIndex == -1 { zap.L().Warn("Ignoring file without extension", zap.String("file", path)) + return 0, false } @@ -167,6 +172,7 @@ func getPIDFromFilename(path string) (int, bool) { pid, err := strconv.Atoi(pidStr) if err != nil { zap.L().Error("Filename is not a number", zap.String("path", path), zap.Error(err)) + return 0, false } @@ -244,5 +250,6 @@ func (t *Tracker) handleWriteSelf(selfPath string) error { if err := os.WriteFile(selfPath, data, 0o644); err != nil { return fmt.Errorf("failed to write allocations: %w", err) } + return nil } diff --git a/packages/orchestrator/internal/metrics/tracker_test.go b/packages/orchestrator/internal/metrics/tracker_test.go index 48ef3a4800..e3b609fc1b 100644 --- a/packages/orchestrator/internal/metrics/tracker_test.go +++ b/packages/orchestrator/internal/metrics/tracker_test.go @@ -208,5 +208,6 @@ func toJSON[T any](t *testing.T, model T) []byte { data, err := json.Marshal(model) require.NoError(t, err) + return data } diff --git a/packages/orchestrator/internal/server/limiter.go b/packages/orchestrator/internal/server/limiter.go index a19e6fe8ab..521a69475d 100644 --- a/packages/orchestrator/internal/server/limiter.go +++ b/packages/orchestrator/internal/server/limiter.go @@ -70,6 +70,7 @@ func (t *Limiter) AcquireStarting(ctx context.Context) error { acquired := t.startingSandboxes.TryAcquire(1) if !acquired { telemetry.ReportEvent(ctx, "too many starting sandboxes on node") + return TooManySandboxesStartingError{t.maxStartingSandboxes} } diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index 82c7790aa4..561ae30c9f 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -71,9 +71,11 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ switch { case errors.As(err, &tooManyRunning): telemetry.ReportEvent(ctx, "max number of running sandboxes reached") + return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes on node reached (%d>=%d), please retry", tooManyRunning.Current, tooManyRunning.Max) case errors.As(err, &tooManyStarting): telemetry.ReportEvent(ctx, "too many starting sandboxes on node") + return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node, please retry") default: return nil, fmt.Errorf("unexpected error while acquiring starting lock: %w", err) diff --git a/packages/orchestrator/internal/template/build/commands/copy_test.go b/packages/orchestrator/internal/template/build/commands/copy_test.go index 2486b688c4..7675199ace 100644 --- a/packages/orchestrator/internal/template/build/commands/copy_test.go +++ b/packages/orchestrator/internal/template/build/commands/copy_test.go @@ -62,6 +62,7 @@ func executeScript(t *testing.T, script string, workDir string) (stdout, stderr func getCurrentUser() (uid, gid int) { uid = os.Getuid() gid = os.Getgid() + return uid, gid } @@ -70,6 +71,7 @@ func getFilePermissions(t *testing.T, path string) os.FileMode { t.Helper() info, err := os.Stat(path) require.NoError(t, err, "Failed to stat file") + return info.Mode().Perm() } @@ -79,6 +81,7 @@ func renderTemplate(t *testing.T, data copyScriptData) string { var buf bytes.Buffer err := copyScriptTemplate.Execute(&buf, data) require.NoError(t, err, "Template execution should not fail") + return buf.String() } diff --git a/packages/orchestrator/internal/template/metadata/template_metadata.go b/packages/orchestrator/internal/template/metadata/template_metadata.go index 066df74abb..d3c2bc0b69 100644 --- a/packages/orchestrator/internal/template/metadata/template_metadata.go +++ b/packages/orchestrator/internal/template/metadata/template_metadata.go @@ -34,6 +34,7 @@ type Context struct { func (c Context) WithUser(user string) Context { c.User = user + return c } diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 6b4bee4871..ee55aeb2c3 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -357,6 +357,7 @@ func run(config cfg.Config) (success bool) { if err := metricsTracker.Run(ctx, config.MetricsDirectory); err != nil { zap.L().Error("metrics tracker failed", zap.Error(err)) } + return nil }) From 39a2b946540cba1d6649e946c8fb395cb4d56eff Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Tue, 21 Oct 2025 09:59:15 -0700 Subject: [PATCH 15/16] rename metrics tracker to shared state manager --- packages/orchestrator/internal/cfg/model.go | 4 +- .../orchestrator/internal/server/limiter.go | 14 +++---- .../internal/service/service_info.go | 13 +++--- .../{metrics => sharedstate}/tracker.go | 42 +++++++++---------- .../{metrics => sharedstate}/tracker_test.go | 8 ++-- .../internal/sharedstate/tracker_test.go~ | 0 packages/orchestrator/main.go | 13 +++--- 7 files changed, 48 insertions(+), 46 deletions(-) rename packages/orchestrator/internal/{metrics => sharedstate}/tracker.go (88%) rename packages/orchestrator/internal/{metrics => sharedstate}/tracker_test.go (96%) create mode 100644 packages/orchestrator/internal/sharedstate/tracker_test.go~ diff --git a/packages/orchestrator/internal/cfg/model.go b/packages/orchestrator/internal/cfg/model.go index f8b5383e83..f166d65898 100644 --- a/packages/orchestrator/internal/cfg/model.go +++ b/packages/orchestrator/internal/cfg/model.go @@ -32,8 +32,8 @@ type Config struct { RedisClusterURL string `env:"REDIS_CLUSTER_URL"` RedisURL string `env:"REDIS_URL"` Services []string `env:"ORCHESTRATOR_SERVICES" envDefault:"orchestrator"` - MetricsDirectory string `env:"METRICS_DIRECTORY" envDefault:"/orchestrator/metrics"` - MetricsWriteInterval time.Duration `env:"METRICS_WRITE_INTERVAL" envDefault:"1m"` + SharedStateDirectory string `env:"SHARED_STATE_DIRECTORY" envDefault:"/orchestrator/state"` + SharedStateWriteInterval time.Duration `env:"SHARED_STATE_WRITE_INTERVAL" envDefault:"1m"` MaxStartingInstances int64 `env:"MAX_STARTING_INSTANCES" envDefault:"3"` } diff --git a/packages/orchestrator/internal/server/limiter.go b/packages/orchestrator/internal/server/limiter.go index 521a69475d..157af05ad3 100644 --- a/packages/orchestrator/internal/server/limiter.go +++ b/packages/orchestrator/internal/server/limiter.go @@ -7,7 +7,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/semaphore" - "github.com/e2b-dev/infra/packages/orchestrator/internal/metrics" + "github.com/e2b-dev/infra/packages/orchestrator/internal/sharedstate" featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) @@ -15,19 +15,19 @@ import ( type Limiter struct { maxStartingSandboxes int64 - featureFlags *featureflags.Client - startingSandboxes *semaphore.Weighted - metricsTracker *metrics.Tracker + featureFlags *featureflags.Client + startingSandboxes *semaphore.Weighted + sharedStateManager *sharedstate.Manager } func NewLimiter( maxStartingSandboxes int64, featureFlags *featureflags.Client, - metricsTracker *metrics.Tracker, + sharedStateManager *sharedstate.Manager, ) *Limiter { return &Limiter{ featureFlags: featureFlags, - metricsTracker: metricsTracker, + sharedStateManager: sharedStateManager, maxStartingSandboxes: maxStartingSandboxes, startingSandboxes: semaphore.NewWeighted(maxStartingSandboxes), } @@ -59,7 +59,7 @@ func (t *Limiter) AcquireStarting(ctx context.Context) error { zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err)) } - runningSandboxes := t.metricsTracker.TotalRunningCount() + runningSandboxes := t.sharedStateManager.TotalRunningCount() if runningSandboxes >= maxRunningSandboxesPerNode { telemetry.ReportEvent(ctx, "max number of running sandboxes reached") diff --git a/packages/orchestrator/internal/service/service_info.go b/packages/orchestrator/internal/service/service_info.go index 85f983889b..5826fd36bf 100644 --- a/packages/orchestrator/internal/service/service_info.go +++ b/packages/orchestrator/internal/service/service_info.go @@ -8,20 +8,21 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/e2b-dev/infra/packages/orchestrator/internal/metrics" + "github.com/e2b-dev/infra/packages/orchestrator/internal/sharedstate" orchestratorinfo "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info" ) type Server struct { orchestratorinfo.UnimplementedInfoServiceServer - info *ServiceInfo - tracker *metrics.Tracker + info *ServiceInfo + sharedState *sharedstate.Manager } -func NewInfoService(info *ServiceInfo, tracker *metrics.Tracker) *Server { +func NewInfoService(info *ServiceInfo, tracker *sharedstate.Manager) *Server { s := &Server{ - info: info, - tracker: tracker, + info: info, + sharedState: tracker, } return s @@ -50,7 +51,7 @@ func (s *Server) ServiceInfo(_ context.Context, _ *emptypb.Empty) (*orchestrator } // Calculate sandbox resource allocation - allocated := s.tracker.TotalAllocated() + allocated := s.sharedState.TotalAllocated() return &orchestratorinfo.ServiceInfoResponse{ NodeId: info.ClientId, diff --git a/packages/orchestrator/internal/metrics/tracker.go b/packages/orchestrator/internal/sharedstate/tracker.go similarity index 88% rename from packages/orchestrator/internal/metrics/tracker.go rename to packages/orchestrator/internal/sharedstate/tracker.go index a775d5a5c1..443e816e8d 100644 --- a/packages/orchestrator/internal/metrics/tracker.go +++ b/packages/orchestrator/internal/sharedstate/tracker.go @@ -1,4 +1,4 @@ -package metrics +package sharedstate import ( "context" @@ -19,23 +19,15 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/smap" ) -type Tracker struct { +type Manager struct { selfSandboxResources *smap.Map[sandbox.Config] selfWriteInterval time.Duration otherMetrics map[int]Allocations otherLock sync.RWMutex } -func (t *Tracker) OnInsert(sandbox *sandbox.Sandbox) { - t.selfSandboxResources.Insert(sandbox.Metadata.Runtime.SandboxID, sandbox.Config) -} - -func (t *Tracker) OnRemove(sandboxID string) { - t.selfSandboxResources.Remove(sandboxID) -} - -func NewTracker(selfWriteInterval time.Duration) (*Tracker, error) { - return &Tracker{ +func New(selfWriteInterval time.Duration) (*Manager, error) { + return &Manager{ otherMetrics: map[int]Allocations{}, selfWriteInterval: selfWriteInterval, @@ -43,7 +35,15 @@ func NewTracker(selfWriteInterval time.Duration) (*Tracker, error) { }, nil } -func (t *Tracker) TotalRunningCount() int { +func (t *Manager) OnInsert(sandbox *sandbox.Sandbox) { + t.selfSandboxResources.Insert(sandbox.Metadata.Runtime.SandboxID, sandbox.Config) +} + +func (t *Manager) OnRemove(sandboxID string) { + t.selfSandboxResources.Remove(sandboxID) +} + +func (t *Manager) TotalRunningCount() int { count := t.selfSandboxResources.Count() t.otherLock.RLock() @@ -55,7 +55,7 @@ func (t *Tracker) TotalRunningCount() int { return count } -func (t *Tracker) getSelfAllocated() Allocations { +func (t *Manager) getSelfAllocated() Allocations { var allocated Allocations for _, item := range t.selfSandboxResources.Items() { allocated.VCPUs += uint32(item.Vcpu) @@ -67,20 +67,20 @@ func (t *Tracker) getSelfAllocated() Allocations { return allocated } -func (t *Tracker) removeSelfFile(path string) { +func (t *Manager) removeSelfFile(path string) { if err := os.Remove(path); err != nil { zap.L().Error("Failed to remove self file", zap.Error(err), zap.String("path", path)) } } -func (t *Tracker) makeSelfPath(directory string) string { +func (t *Manager) makeSelfPath(directory string) string { filename := fmt.Sprintf("%d.json", os.Getpid()) selfPath := filepath.Join(directory, filename) return selfPath } -func (t *Tracker) Run(ctx context.Context, directory string) error { +func (t *Manager) Run(ctx context.Context, directory string) error { if err := os.MkdirAll(directory, 0o777); err != nil { return fmt.Errorf("failed to create directory: %w", err) } @@ -179,7 +179,7 @@ func getPIDFromFilename(path string) (int, bool) { return pid, true } -func (t *Tracker) handleOtherRemove(name string) error { +func (t *Manager) handleOtherRemove(name string) error { pid, ok := getPIDFromFilename(name) if !ok { return errInvalidMetricsFilename @@ -195,7 +195,7 @@ func (t *Tracker) handleOtherRemove(name string) error { var errInvalidMetricsFilename = errors.New("invalid metrics filename") -func (t *Tracker) handleOtherWrite(name string) error { +func (t *Manager) handleOtherWrite(name string) error { pid, ok := getPIDFromFilename(name) if !ok { return errInvalidMetricsFilename @@ -226,7 +226,7 @@ type Allocations struct { VCPUs uint32 `json:"vcpus"` } -func (t *Tracker) TotalAllocated() Allocations { +func (t *Manager) TotalAllocated() Allocations { allocated := t.getSelfAllocated() t.otherLock.RLock() @@ -241,7 +241,7 @@ func (t *Tracker) TotalAllocated() Allocations { return allocated } -func (t *Tracker) handleWriteSelf(selfPath string) error { +func (t *Manager) handleWriteSelf(selfPath string) error { selfAllocated := t.getSelfAllocated() data, err := json.Marshal(selfAllocated) if err != nil { diff --git a/packages/orchestrator/internal/metrics/tracker_test.go b/packages/orchestrator/internal/sharedstate/tracker_test.go similarity index 96% rename from packages/orchestrator/internal/metrics/tracker_test.go rename to packages/orchestrator/internal/sharedstate/tracker_test.go index e3b609fc1b..8a16481291 100644 --- a/packages/orchestrator/internal/metrics/tracker_test.go +++ b/packages/orchestrator/internal/sharedstate/tracker_test.go @@ -1,4 +1,4 @@ -package metrics +package sharedstate import ( "context" @@ -20,7 +20,7 @@ func TestTracker_CreateMissingDirectory(t *testing.T) { selfWriteInterval := time.Millisecond * 100 - tracker, err := NewTracker(selfWriteInterval) + tracker, err := New(selfWriteInterval) require.NoError(t, err) ctx, cancel := context.WithTimeout(t.Context(), selfWriteInterval*2) @@ -36,7 +36,7 @@ func TestTrackerRoundTrip(t *testing.T) { os.WriteFile(filepath.Join(tempDir, "990.json"), []byte(`{"diskBytes": 0, "memoryBytes": 0, "sandboxes": 0, "vcpus": 1}`), 0o644) - tracker, err := NewTracker(time.Millisecond * 100) + tracker, err := New(time.Millisecond * 100) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -169,7 +169,7 @@ func TestTrackerRoundTrip(t *testing.T) { func TestTracker_handleWriteSelf(t *testing.T) { tempDir := t.TempDir() - tracker, err := NewTracker(10 * time.Second) + tracker, err := New(10 * time.Second) require.NoError(t, err) tracker.OnInsert(&sandbox.Sandbox{ diff --git a/packages/orchestrator/internal/sharedstate/tracker_test.go~ b/packages/orchestrator/internal/sharedstate/tracker_test.go~ new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 90b75e0598..f967833e07 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -37,6 +37,7 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/template" "github.com/e2b-dev/infra/packages/orchestrator/internal/server" "github.com/e2b-dev/infra/packages/orchestrator/internal/service" + "github.com/e2b-dev/infra/packages/orchestrator/internal/sharedstate" "github.com/e2b-dev/infra/packages/orchestrator/internal/template/constants" tmplserver "github.com/e2b-dev/infra/packages/orchestrator/internal/template/server" "github.com/e2b-dev/infra/packages/shared/pkg/env" @@ -347,20 +348,20 @@ func run(config cfg.Config) (success bool) { // sandbox factory sandboxFactory := sandbox.NewFactory(config.BuilderConfig, networkPool, devicePool, featureFlags) - metricsTracker, err := metrics.NewTracker(config.MetricsWriteInterval) + sharedStateManager, err := sharedstate.New(config.SharedStateWriteInterval) if err != nil { zap.L().Fatal("failed to create metrics tracker", zap.Error(err)) } - sandboxes.Subscribe(metricsTracker) + sandboxes.Subscribe(sharedStateManager) g.Go(func() error { - if err := metricsTracker.Run(ctx, config.MetricsDirectory); err != nil { - zap.L().Error("metrics tracker failed", zap.Error(err)) + if err := sharedStateManager.Run(ctx, config.SharedStateDirectory); err != nil { + zap.L().Error("shared state manager failed", zap.Error(err)) } return nil }) - sandboxLimiter := server.NewLimiter(config.MaxStartingInstances, featureFlags, metricsTracker) + sandboxLimiter := server.NewLimiter(config.MaxStartingInstances, featureFlags, sharedStateManager) orchestratorService := server.New(server.ServiceConfig{ SandboxFactory: sandboxFactory, @@ -441,7 +442,7 @@ func run(config cfg.Config) (success bool) { closers = append(closers, closer{"template server", tmpl.Close}) } - infoService := service.NewInfoService(serviceInfo, metricsTracker) + infoService := service.NewInfoService(serviceInfo, sharedStateManager) orchestratorinfo.RegisterInfoServiceServer(grpcServer, infoService) grpcHealth := health.NewServer() From 835ee09fd8a776787cd71b2d053c1197c9c4d975 Mon Sep 17 00:00:00 2001 From: Joe Lombrozo Date: Tue, 21 Oct 2025 15:02:14 -0700 Subject: [PATCH 16/16] protect against nils --- packages/orchestrator/internal/sharedstate/tracker.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/orchestrator/internal/sharedstate/tracker.go b/packages/orchestrator/internal/sharedstate/tracker.go index 443e816e8d..a9a772b965 100644 --- a/packages/orchestrator/internal/sharedstate/tracker.go +++ b/packages/orchestrator/internal/sharedstate/tracker.go @@ -36,7 +36,14 @@ func New(selfWriteInterval time.Duration) (*Manager, error) { } func (t *Manager) OnInsert(sandbox *sandbox.Sandbox) { - t.selfSandboxResources.Insert(sandbox.Metadata.Runtime.SandboxID, sandbox.Config) + metadata := sandbox.Metadata + if metadata == nil { + zap.L().Warn("Ignoring sandbox without metadata") + + return + } + + t.selfSandboxResources.Insert(metadata.Runtime.SandboxID, metadata.Config) } func (t *Manager) OnRemove(sandboxID string) {