Skip to content

Commit 816daaf

Browse files
committed
Small refactoring
1 parent b3299b9 commit 816daaf

File tree

7 files changed

+150
-135
lines changed

7 files changed

+150
-135
lines changed

cmd/localstack/main.go

Lines changed: 73 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ import (
1111
"runtime/debug"
1212
"strconv"
1313
"strings"
14-
"sync"
15-
"time"
14+
"syscall"
1615

1716
"github.com/aws/aws-sdk-go-v2/config"
1817
"github.com/localstack/lambda-runtime-init/internal/aws/lambda"
@@ -22,16 +21,17 @@ import (
2221
"github.com/localstack/lambda-runtime-init/internal/hotreloading"
2322
"github.com/localstack/lambda-runtime-init/internal/localstack"
2423
"github.com/localstack/lambda-runtime-init/internal/logging"
24+
"github.com/localstack/lambda-runtime-init/internal/sandbox"
2525
"github.com/localstack/lambda-runtime-init/internal/server"
2626

2727
"github.com/localstack/lambda-runtime-init/internal/supervisor"
2828
"github.com/localstack/lambda-runtime-init/internal/tracing"
2929
"github.com/localstack/lambda-runtime-init/internal/utils"
3030
log "github.com/sirupsen/logrus"
3131
"go.amzn.com/lambda/core/directinvoke"
32+
"go.amzn.com/lambda/extensions"
3233
"go.amzn.com/lambda/interop"
33-
"go.amzn.com/lambda/rapidcore"
34-
supv "go.amzn.com/lambda/supervisor"
34+
"go.amzn.com/lambda/rapid"
3535
)
3636

3737
func InitLsOpts() *localstack.Config {
@@ -66,7 +66,7 @@ func InitFunctionConfig() lambda.FunctionConfig {
6666
InitializationType: utils.GetEnvWithDefault("AWS_LAMBDA_INITIALIZATION_TYPE", "on-demand"),
6767
LogGroupName: utils.GetEnvWithDefault("AWS_LAMBDA_LOG_GROUP_NAME", "/aws/lambda/Functions"),
6868
LogStreamName: utils.GetEnvWithDefault("AWS_LAMBDA_LOG_STREAM_NAME", "$LATEST"),
69-
FunctionMemorySizeMb: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "3008"),
69+
FunctionMemorySizeMb: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "128"),
7070
FunctionHandler: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_HANDLER", os.Getenv("_HANDLER")),
7171
}
7272
}
@@ -101,6 +101,10 @@ func UnsetLsEnvs() {
101101
}
102102
}
103103

104+
type closer struct{ fn func() }
105+
106+
func (c *closer) Close() error { c.fn(); return nil }
107+
104108
func main() {
105109
// we're setting this to the same value as in the official RIE
106110
debug.SetGCPercent(33)
@@ -116,31 +120,19 @@ func main() {
116120
// set up logging following the Logrus logging levels: https://github.com/sirupsen/logrus#level-logging
117121
log.SetReportCaller(true)
118122
// https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-configuration.html
119-
xRayLogLevel := "info"
120-
switch lsOpts.InitLogLevel {
121-
case "trace":
123+
124+
logLevel, err := log.ParseLevel(lsOpts.InitLogLevel)
125+
if err != nil {
126+
log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL")
127+
}
128+
log.SetLevel(logLevel)
129+
130+
xRayLogLevel := lsOpts.InitLogLevel
131+
switch logLevel {
132+
case log.TraceLevel:
122133
log.SetFormatter(&log.JSONFormatter{})
123-
log.SetLevel(log.TraceLevel)
124-
xRayLogLevel = "debug"
125-
case "debug":
126-
log.SetLevel(log.DebugLevel)
127-
xRayLogLevel = "debug"
128-
case "info":
129-
log.SetLevel(log.InfoLevel)
130-
case "warn":
131-
log.SetLevel(log.WarnLevel)
132-
xRayLogLevel = "warn"
133-
case "error":
134-
log.SetLevel(log.ErrorLevel)
135-
xRayLogLevel = "error"
136-
case "fatal":
137-
log.SetLevel(log.FatalLevel)
138-
xRayLogLevel = "error"
139-
case "panic":
140-
log.SetLevel(log.PanicLevel)
134+
case log.ErrorLevel, log.FatalLevel, log.PanicLevel:
141135
xRayLogLevel = "error"
142-
default:
143-
log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL")
144136
}
145137

146138
// patch MaxPayloadSize
@@ -183,52 +175,62 @@ func main() {
183175
}
184176
}
185177

186-
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
178+
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
187179
defer stop()
188180

189-
// file watcher for hot-reloading
190-
fileWatcherContext, cancelFileWatcher := context.WithCancel(ctx)
191-
defer cancelFileWatcher()
192-
193-
// Custom Interop Server
194-
defaultServer := rapidcore.NewServer()
181+
// LocalStack client used for sending callbacks
195182
lsClient := localstack.NewLocalStackClient(lsOpts.RuntimeEndpoint, lsOpts.RuntimeId)
196-
interopServer := server.NewInteropServer(defaultServer, lsClient)
197183

198184
// Services required for Sandbox environment
185+
interopServer := server.NewInteropServer(lsClient)
186+
defer interopServer.Close()
187+
199188
logCollector := logging.NewLogCollector()
200189
localStackLogsEgressApi := logging.NewLocalStackLogsEgressAPI(logCollector)
201190
tracer := tracing.NewLocalStackTracer()
202-
eventsListener := events.NewLocalStackEventsAPI(lsClient)
203-
204-
defaultSupv := supv.NewLocalSupervisor()
205-
localStackSupv := supervisor.NewLocalStackSupervisor(ctx, defaultSupv, eventsListener)
191+
lsEventsAPI := events.NewLocalStackEventsAPI(lsClient)
192+
localStackSupv := supervisor.NewLocalStackSupervisor(ctx, lsEventsAPI)
206193

207194
// build sandbox
208-
exitChan := make(chan struct{})
209-
sandbox := rapidcore.
210-
NewSandboxBuilder().
211-
AddShutdownFunc(func() {
212-
log.Debugln("Stopping file watcher")
213-
cancelFileWatcher()
214-
}).
215-
AddShutdownFunc(func() {
216-
exitChan <- struct{}{}
217-
}).
218-
SetExtensionsFlag(true).
219-
SetInitCachingFlag(true).
220-
SetLogsEgressAPI(localStackLogsEgressApi).
221-
SetTracer(tracer).
222-
SetInteropServer(interopServer).
223-
SetSupervisor(localStackSupv).
224-
SetHandler(handler)
195+
sandboxConfig := rapid.Sandbox{
196+
EnableTelemetryAPI: false,
197+
StandaloneMode: true,
198+
InitCachingEnabled: true,
199+
200+
Tracer: tracer,
201+
EventsAPI: lsEventsAPI,
202+
Supervisor: localStackSupv,
203+
InteropServer: interopServer,
204+
LogsEgressAPI: localStackLogsEgressApi,
205+
206+
Handler: handler,
207+
208+
RuntimeFsRootPath: "/",
209+
RuntimeAPIHost: "127.0.0.1",
210+
RuntimeAPIPort: 9001,
211+
}
212+
213+
extensions.Enable()
214+
215+
rapidCtx, internalStateFn, addr := rapid.Start(ctx, &sandboxConfig)
216+
sandboxCtx, err := sandbox.CreateSandboxContext(rapidCtx, handler, addr)
217+
if err != nil {
218+
log.Fatalf("fatal error encountered when creating SandboxContext: %w", err)
219+
}
220+
221+
// Populate our interop server
222+
interopServer.SetSandboxContext(sandboxCtx)
223+
interopServer.SetInternalStateGetter(internalStateFn)
225224

226225
// Start daemons
227226

228-
// Start hot-reloading watcher
227+
// file watcher for hot-reloading
228+
fileWatcherContext, cancelFileWatcher := context.WithCancel(ctx)
229+
defer cancelFileWatcher()
230+
229231
go hotreloading.RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)
230232

231-
// xray daemon
233+
// Start xray daemon
232234
endpoint := "http://" + net.JoinHostPort(lsOpts.LocalstackIP, lsOpts.EdgePort)
233235
xrayConfig := xray.NewConfig(endpoint, xRayLogLevel)
234236
d := xray.NewDaemon(xrayConfig, lsOpts.EnableXRayTelemetry == "1")
@@ -240,15 +242,11 @@ func main() {
240242
}()
241243
d.Run() // served async
242244

243-
// initialize all flows and start runtime API
244-
sandboxContext, internalStateFn := sandbox.Create()
245-
// Populate our interop server
246-
interopServer.SetSandboxContext(sandboxContext)
247-
interopServer.SetInternalStateGetter(internalStateFn)
248-
245+
// Create the LocalStack service
249246
localStackService := server.NewLocalStackService(
250247
interopServer, logCollector, lsClient, localStackSupv, xrayConfig.Endpoint, lsOpts, functionConf, awsEnvConf,
251248
)
249+
defer localStackService.Close()
252250

253251
// start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the
254252
// notification channels and status fields are properly initialized before `AwaitInitialized`
@@ -258,26 +256,18 @@ func main() {
258256
}
259257

260258
invokeServer := server.NewServer(lsOpts.InteropPort, localStackService)
261-
invokeServer.RegisterOnShutdown(localStackService.Close)
259+
defer invokeServer.Close()
262260

263-
defer invokeServer.Shutdown(context.Background())
264-
265-
var wg sync.WaitGroup
266-
267-
wg.Add(1)
261+
serverErr := make(chan error, 1)
268262
go func() {
269-
defer wg.Done()
270263
listener, err := net.Listen("tcp", fmt.Sprintf(":%s", lsOpts.InteropPort))
271-
272264
if err != nil {
273-
log.Fatalf("failed to start listener for custom interops server: %s", err)
265+
log.Fatalf("failed to start LocalStack Lambda Runtime Interface server: %s", err)
274266
}
275-
go invokeServer.Serve(listener)
267+
go func() { serverErr <- invokeServer.Serve(listener); close(serverErr) }()
276268
log.Debugf("LocalStack API gateway listening on %s", listener.Addr().String())
277269
}()
278270

279-
wg.Wait()
280-
281271
log.Debugln("Awaiting initialization of runtime init.")
282272
if err := interopServer.AwaitInitialized(); err != nil {
283273
// Error cases: ErrInitDoneFailed or ErrInitResetReceived
@@ -292,16 +282,13 @@ func main() {
292282
}
293283
}
294284

285+
// Block until context is cancelled OR the server errors out
295286
select {
296287
case <-ctx.Done():
297-
case <-exitChan:
298-
}
299-
300-
gracefulCtx, cancel := context.WithTimeout(ctx, time.Millisecond*500)
301-
defer cancel()
302-
303-
if err := localStackService.AwaitCompleted(gracefulCtx); err != nil {
304-
log.Warnf("Did not gracefully complete: %w", err)
288+
log.Info("Shutdown signal received.")
289+
case <-serverErr:
290+
if err != nil {
291+
log.Errorf("Server error: %v", err)
292+
}
305293
}
306-
307294
}

internal/hotreloading/reloader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Resetter interface {
1212
Reset(reason string, timeoutMs int64) (*statejson.ResetDescription, error)
1313
}
1414

15+
// TODO: Rework this into a struct/service where the ChangeListener is passed via dependency injection
1516
func RunHotReloadingListener(server Resetter, targetPaths []string, ctx context.Context, fileWatcherStrategy string) {
1617
if len(targetPaths) == 1 && targetPaths[0] == "" {
1718
log.Debugln("Hot reloading disabled.")
@@ -31,5 +32,4 @@ func RunHotReloadingListener(server Resetter, targetPaths []string, ctx context.
3132

3233
<-ctx.Done()
3334
log.Infoln("Closing down filewatcher.")
34-
3535
}

internal/sandbox/sandbox.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package sandbox
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"unsafe"
7+
8+
"go.amzn.com/lambda/interop"
9+
"go.amzn.com/lambda/rapidcore"
10+
)
11+
12+
// CreateSandboxContext creates a SandboxContext using reflection
13+
// This is a dirty hack since the SandboxBuilder sets quite a few elements by default that cause issues
14+
func CreateSandboxContext(rapidCtx interop.RapidContext, handler string, runtimeAPIAddress string) (interop.SandboxContext, error) {
15+
sandboxCtx := &rapidcore.SandboxContext{}
16+
v := reflect.ValueOf(sandboxCtx).Elem()
17+
18+
setField := func(fieldName string, value interface{}) {
19+
field := v.FieldByName(fieldName)
20+
if !field.IsValid() {
21+
return
22+
}
23+
24+
var settableField reflect.Value
25+
if field.CanSet() {
26+
settableField = field
27+
} else {
28+
29+
settableField = reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem()
30+
}
31+
32+
settableField.Set(reflect.ValueOf(value))
33+
}
34+
35+
setField("rapidCtx", rapidCtx)
36+
setField("handler", handler)
37+
setField("runtimeAPIAddress", runtimeAPIAddress)
38+
39+
if sandboxCtx == nil || reflect.ValueOf(*sandboxCtx).IsZero() {
40+
return nil, fmt.Errorf("failed to dynamically create SandboxContext.")
41+
}
42+
43+
return sandboxCtx, nil
44+
}

internal/server/interop.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ type LocalStackInteropsServer struct {
2424
localStackAdapter *localstack.LocalStackClient
2525
}
2626

27-
func NewInteropServer(server *rapidcore.Server, ls *localstack.LocalStackClient) *LocalStackInteropsServer {
27+
func NewInteropServer(ls *localstack.LocalStackClient) *LocalStackInteropsServer {
2828
return &LocalStackInteropsServer{
29-
Server: server,
29+
Server: rapidcore.NewServer(),
3030
localStackAdapter: ls,
3131
}
3232
}
@@ -166,3 +166,9 @@ func (c *LocalStackInteropsServer) SendInitErrorResponse(resp *interop.ErrorInvo
166166

167167
return c.Server.SendInitErrorResponse(resp)
168168
}
169+
170+
func (c *LocalStackInteropsServer) Close() error {
171+
log.Info("Shutting down...")
172+
_, err := c.Reset("SandboxTerminated", 2000)
173+
return err
174+
}

internal/server/service.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,10 @@ func (ls *LocalStackService) AfterInvoke(ctx context.Context) error {
259259
return nil
260260
}
261261

262-
func (ls *LocalStackService) Close() {
262+
func (ls *LocalStackService) Close() error {
263263
ls.isShuttingDown.Store(true)
264264
log.Debug("Shutdown of LocalStackService triggered.")
265+
return nil
265266
}
266267

267268
func (ls *LocalStackService) shutdownTriggered() bool {
@@ -277,7 +278,10 @@ func (ls *LocalStackService) AwaitCompleted(ctx context.Context) error {
277278
select {
278279
case <-ctx.Done():
279280
pending := atomic.LoadInt64(&ls.pendingCallbacks)
280-
return fmt.Errorf("failed to gracefully complete all callbacks to LocalStack: %d remaining", pending)
281+
if pending > 0 {
282+
return fmt.Errorf("failed to gracefully complete all callbacks to LocalStack: %d remaining", pending)
283+
}
284+
return nil
281285
case <-ls.allDone:
282286
if atomic.LoadInt64(&ls.pendingCallbacks) == 0 {
283287
return nil

internal/supervisor/supervisor.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/sirupsen/logrus"
1111
"go.amzn.com/lambda/fatalerror"
1212
"go.amzn.com/lambda/interop"
13+
"go.amzn.com/lambda/supervisor"
1314
"go.amzn.com/lambda/supervisor/model"
1415
)
1516

@@ -24,10 +25,10 @@ type LocalStackSupervisor struct {
2425
isShuttingDown *atomic.Bool
2526
}
2627

27-
func NewLocalStackSupervisor(ctx context.Context, supv model.ProcessSupervisor, evs interop.EventsAPI) *LocalStackSupervisor {
28+
func NewLocalStackSupervisor(ctx context.Context, evs interop.EventsAPI) *LocalStackSupervisor {
2829
var isShuttingDown atomic.Bool
2930
ls := &LocalStackSupervisor{
30-
ProcessSupervisor: supv,
31+
ProcessSupervisor: supervisor.NewLocalSupervisor(),
3132
eventsAPI: evs,
3233
eventsChan: make(chan model.Event),
3334
isShuttingDown: &isShuttingDown,

0 commit comments

Comments
 (0)