54
54
import io .micronaut .json .JsonMapper ;
55
55
import io .micronaut .logging .LogLevel ;
56
56
57
+ import java .io .InputStream ;
58
+ import java .io .ByteArrayInputStream ;
59
+ import java .io .ByteArrayOutputStream ;
57
60
import java .io .Closeable ;
58
61
import java .io .IOException ;
59
62
import java .io .PrintWriter ;
@@ -354,6 +357,22 @@ protected HandlerRequestType createHandlerRequest(RequestType request) throws IO
354
357
return null ;
355
358
}
356
359
360
+ /**
361
+ * Creates a GET request for the {@value #NEXT_INVOCATION_URI} endpoint.
362
+ * If a bean of type {@link UserAgentProvider} exists, it adds an HTTP Header User-Agent to the request.
363
+ * @param userAgentProvider UseAgent Provider
364
+ * @param <T> The Http request type
365
+ * @return a Mutable HTTP Request to the {@value #NEXT_INVOCATION_URI} endpoint.
366
+ */
367
+ @ NonNull
368
+ protected <T > MutableHttpRequest <T > createNextInvocationHttpRequest (@ Nullable UserAgentProvider userAgentProvider ) {
369
+ MutableHttpRequest <T > nextInvocationHttpRequest = HttpRequest .GET (AwsLambdaRuntimeApi .NEXT_INVOCATION_URI );
370
+ if (userAgentProvider != null ) {
371
+ nextInvocationHttpRequest .header (USER_AGENT , userAgentProvider .userAgent ());
372
+ }
373
+ return nextInvocationHttpRequest ;
374
+ }
375
+
357
376
/**
358
377
* Starts the runtime API event loop.
359
378
*
@@ -371,83 +390,142 @@ protected void startRuntimeApiEventLoop(@NonNull URL runtimeApiURL,
371
390
if (applicationContext == null ) {
372
391
throw new ConfigurationException ("Application Context is null" );
373
392
}
393
+ UserAgentProvider userAgentProvider = applicationContext .findBean (UserAgentProvider .class ).orElse (null );
374
394
populateUserAgent ();
375
395
final DefaultHttpClientConfiguration config = new DefaultHttpClientConfiguration ();
376
396
config .setReadIdleTimeout (null );
377
397
config .setReadTimeout (null );
378
398
config .setConnectTimeout (null );
379
- final HttpClient endpointClient = applicationContext .createBean (
380
- HttpClient .class ,
381
- runtimeApiURL ,
382
- config );
383
- final BlockingHttpClient blockingHttpClient = endpointClient .toBlocking ();
384
- try {
385
- while (loopUntil .test (runtimeApiURL )) {
386
- MutableHttpRequest <?> nextInvocationHttpRequest = HttpRequest .GET (AwsLambdaRuntimeApi .NEXT_INVOCATION_URI );
387
- applicationContext .findBean (UserAgentProvider .class )
388
- .ifPresent (userAgentProvider -> nextInvocationHttpRequest .header (USER_AGENT , userAgentProvider .userAgent ()));
389
- final HttpResponse <RequestType > response = blockingHttpClient .exchange (nextInvocationHttpRequest , Argument .of (requestType ));
390
- final RequestType request = response .body ();
391
- if (request != null ) {
392
- logn (LogLevel .DEBUG , "request body " , request );
393
-
394
- HandlerRequestType handlerRequest = createHandlerRequest (request );
395
- final HttpHeaders headers = response .getHeaders ();
396
- propagateTraceId (headers );
397
-
398
- final Context context = new RuntimeContext (headers );
399
- final String requestId = context .getAwsRequestId ();
400
- logn (LogLevel .DEBUG , "request id " , requestId , " found" );
401
- try {
402
- if (StringUtils .isNotEmpty (requestId )) {
403
- log (LogLevel .TRACE , "invoking handler\n " );
404
- HandlerResponseType handlerResponse = null ;
405
- if (handler instanceof RequestHandler ) {
406
- handlerResponse = ((RequestHandler <HandlerRequestType , HandlerResponseType >) handler ).handleRequest (handlerRequest , context );
407
- }
408
- log (LogLevel .TRACE , "handler response received\n " );
409
- final ResponseType functionResponse = (handlerResponse == null || handlerResponse instanceof Void ) ? null : createResponse (handlerResponse );
410
- log (LogLevel .TRACE , "sending function response\n " );
411
- blockingHttpClient .exchange (decorateWithUserAgent (invocationResponseRequest (requestId , functionResponse == null ? "" : functionResponse )));
412
- } else {
413
- log (LogLevel .WARN , "request id is empty\n " );
414
- }
415
-
416
- } catch (Throwable e ) {
417
- final StringWriter sw = new StringWriter ();
418
- e .printStackTrace (new PrintWriter (sw ));
419
- logn (LogLevel .WARN , "Invocation with requestId [" , requestId , "] failed: " , e .getMessage (), sw );
420
- try {
421
- blockingHttpClient .exchange (decorateWithUserAgent (invocationErrorRequest (requestId , e .getMessage (), null , null )));
422
- } catch (Throwable e2 ) {
423
- // swallow, nothing we can do...
424
- }
399
+ try (HttpClient endpointClient = applicationContext .createBean (HttpClient .class , runtimeApiURL , config )) {
400
+ final BlockingHttpClient blockingHttpClient = endpointClient .toBlocking ();
401
+ try {
402
+ while (loopUntil .test (runtimeApiURL )) {
403
+ MutableHttpRequest <?> nextInvocationHttpRequest = createNextInvocationHttpRequest (userAgentProvider );
404
+ if (handler instanceof RequestStreamHandler ) {
405
+ handleInvocationForRequestStreamHandler (blockingHttpClient , nextInvocationHttpRequest );
406
+ } else if (handler instanceof RequestHandler <?, ?>) {
407
+ handleInvocationForRequestHandler (blockingHttpClient , nextInvocationHttpRequest );
425
408
}
426
409
}
427
- }
428
- } finally {
429
- if (handler instanceof Closeable closeable ) {
430
- closeable .close ();
431
- }
432
- if (endpointClient != null ) {
433
- endpointClient .close ();
410
+ } finally {
411
+ if (handler instanceof Closeable closeable ) {
412
+ closeable .close ();
413
+ }
434
414
}
435
415
}
436
- } catch (Throwable e ) {
416
+ } catch (Exception e ) {
437
417
e .printStackTrace ();
438
418
logn (LogLevel .ERROR , "Request loop failed with: " , e .getMessage ());
439
419
reportInitializationError (runtimeApiURL , e );
440
420
}
441
421
}
442
422
423
+ /**
424
+ * It handles an invocation event with a handler of type {@link RequestHandler}.
425
+ * @param blockingHttpClient Blocking HTTP Client
426
+ * @param nextInvocationHttpRequest Next Invocation HTTP Request
427
+ * @throws IOException Exception thrown while invoking the handler
428
+ */
429
+ protected void handleInvocationForRequestHandler (@ NonNull BlockingHttpClient blockingHttpClient ,
430
+ @ NonNull MutableHttpRequest <?> nextInvocationHttpRequest ) throws IOException {
431
+ final HttpResponse <RequestType > response = blockingHttpClient .exchange (nextInvocationHttpRequest , Argument .of (requestType ));
432
+ final RequestType request = response .body ();
433
+ if (request != null && handler instanceof RequestHandler ) {
434
+ logn (LogLevel .DEBUG , "request body " , request );
435
+ Context context = createRuntimeContext (response );
436
+ final String requestId = context .getAwsRequestId ();
437
+ HandlerRequestType handlerRequest = createHandlerRequest (request );
438
+ try {
439
+ if (StringUtils .isEmpty (requestId )) {
440
+ log (LogLevel .WARN , "request id is empty\n " );
441
+ return ;
442
+ }
443
+ log (LogLevel .TRACE , "invoking handler\n " );
444
+ HandlerResponseType handlerResponse = null ;
445
+ handlerResponse = ((RequestHandler <HandlerRequestType , HandlerResponseType >) handler ).handleRequest (handlerRequest , context );
446
+ log (LogLevel .TRACE , "handler response received\n " );
447
+ final ResponseType functionResponse = (handlerResponse == null || handlerResponse instanceof Void ) ? null : createResponse (handlerResponse );
448
+ log (LogLevel .TRACE , "sending function response\n " );
449
+ blockingHttpClient .exchange (decorateWithUserAgent (invocationResponseRequest (requestId , functionResponse == null ? "" : functionResponse )));
450
+ } catch (Exception e ) {
451
+ handleInvocationException (blockingHttpClient , requestId , e );
452
+ }
453
+ }
454
+ }
455
+
456
+ /**
457
+ *
458
+ * @param blockingHttpClient Blocking HTTP Client
459
+ * @param requestId AWS Request ID retried via {@link Context#getAwsRequestId()}
460
+ * @param exception Execption thrown invoking the handler
461
+ */
462
+ protected void handleInvocationException (@ NonNull BlockingHttpClient blockingHttpClient ,
463
+ @ NonNull String requestId ,
464
+ @ NonNull Exception exception ) {
465
+ final StringWriter sw = new StringWriter ();
466
+ exception .printStackTrace (new PrintWriter (sw ));
467
+ logn (LogLevel .WARN , "Invocation with requestId [" , requestId , "] failed: " , exception .getMessage (), sw );
468
+ try {
469
+ blockingHttpClient .exchange (decorateWithUserAgent (invocationErrorRequest (requestId , exception .getMessage (), null , null )));
470
+ } catch (Exception e2 ) {
471
+ // swallow, nothing we can do...
472
+ }
473
+ }
474
+
475
+ /**
476
+ *
477
+ * @param response Next Invocation Response
478
+ * @return a new {@link Context} backed by a {@link RuntimeContext} populated with the HTTP Headers of the Invocation Response.
479
+ */
480
+ protected Context createRuntimeContext (HttpResponse <?> response ) {
481
+ final HttpHeaders headers = response .getHeaders ();
482
+ propagateTraceId (headers );
483
+ final Context context = new RuntimeContext (headers );
484
+ final String requestId = context .getAwsRequestId ();
485
+ logn (LogLevel .DEBUG , "request id " , requestId , " found" );
486
+ return context ;
487
+ }
488
+
489
+ /**
490
+ * It handles an invocation event with a handler of type {@link RequestStreamHandler}.
491
+ * @param blockingHttpClient Blocking HTTP Client
492
+ * @param nextInvocationHttpRequest Next Invocation HTTP Request
493
+ */
494
+ protected void handleInvocationForRequestStreamHandler (@ NonNull BlockingHttpClient blockingHttpClient ,
495
+ MutableHttpRequest <?> nextInvocationHttpRequest ) {
496
+ if (handler instanceof RequestStreamHandler requestStreamHandler ) {
497
+ final HttpResponse <byte []> response = blockingHttpClient .exchange (nextInvocationHttpRequest , byte [].class );
498
+ final byte [] request = response .body ();
499
+ if (request != null ) {
500
+ Context context = createRuntimeContext (response );
501
+ String requestId = context .getAwsRequestId ();
502
+ if (StringUtils .isNotEmpty (requestId )) {
503
+ try (InputStream inputStream = new ByteArrayInputStream (request )) {
504
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream ();
505
+ log (LogLevel .TRACE , "invoking handler\n " );
506
+ requestStreamHandler .handleRequest (inputStream , outputStream , context );
507
+ log (LogLevel .TRACE , "handler response received\n " );
508
+ byte [] handlerResponse = outputStream .toByteArray ();
509
+ log (LogLevel .TRACE , "sending function response\n " );
510
+ blockingHttpClient .exchange (decorateWithUserAgent (invocationResponseRequest (requestId , handlerResponse )));
511
+ } catch (Exception e ) {
512
+ handleInvocationException (blockingHttpClient , requestId , e );
513
+ }
514
+ }
515
+ } else {
516
+ log (LogLevel .WARN , "request id is empty\n " );
517
+ }
518
+ }
519
+ }
520
+
443
521
/**
444
522
* If the request is {@link MutableHttpRequest} and {@link AbstractMicronautLambdaRuntime#userAgent} is not null,
445
523
* it adds an HTTP Header User-Agent.
446
524
* @param request HTTP Request
447
525
* @return The HTTP Request decorated
448
526
*/
449
527
protected HttpRequest decorateWithUserAgent (HttpRequest <?> request ) {
450
- if (userAgent != null && request instanceof MutableHttpRequest mutableHttpRequest ) {
528
+ if (userAgent != null && request instanceof MutableHttpRequest <?> mutableHttpRequest ) {
451
529
return mutableHttpRequest .header (USER_AGENT , userAgent );
452
530
}
453
531
return request ;
0 commit comments