diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowRouter.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowRouter.scala index eda47295a3..4c1acf1b99 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowRouter.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowRouter.scala @@ -8,11 +8,14 @@ import java.nio.ByteBuffer import java.util.Optional import java.util.concurrent.CompletionStage import java.util.function.{ Function => JFunc } + import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.RichOptional import scala.jdk.FutureConverters._ +import scala.util.Try + import com.google.api.HttpBody import com.google.protobuf.any.{ Any => ScalaPbAny } import kalix.javasdk.DeferredCall @@ -176,31 +179,38 @@ abstract class WorkflowRouter[S, W <: AbstractWorkflow[S]](protected val workflo case None => null // to meet a signature of supplier expressed as a function } - val defCall = call.callFunc - .asInstanceOf[JFunc[Any, DeferredCall[Any, Any]]] - .apply(decodedInput) - - val (commandName, serviceName) = - defCall match { - case grpcDefCall: GrpcDeferredCall[_, _] => - (grpcDefCall.methodName, grpcDefCall.fullServiceName) - case restDefCall: RestDeferredCall[_, _] => - (restDefCall.methodName, restDefCall.fullServiceName) - case _ => - // should never happen, but needs to make compiler happy - throw new IllegalStateException("Unknown DeferredCall implementation") + Future + .fromTry(Try { + call.callFunc + .asInstanceOf[JFunc[Any, DeferredCall[Any, Any]]] + .apply(decodedInput) + }) + .map { defCall => + + val (commandName, serviceName) = + defCall match { + case grpcDefCall: GrpcDeferredCall[_, _] => + (grpcDefCall.methodName, grpcDefCall.fullServiceName) + case restDefCall: RestDeferredCall[_, _] => + (restDefCall.methodName, restDefCall.fullServiceName) + case _ => + // should never happen, but needs to make compiler happy + throw new IllegalStateException("Unknown DeferredCall implementation") + } + + val stepDefCall = + StepDeferredCall( + serviceName, + commandName, + payload = Some(messageCodec.encodeScala(defCall.message())), + metadata = MetadataImpl.toProtocol(defCall.metadata())) + + StepResponse(commandId, stepName, StepResponse.Response.DeferredCall(stepDefCall)) + } + .recover { case t: Throwable => + log.error("Workflow call failed.", t) + StepResponse(commandId, stepName, StepResponse.Response.ExecutionFailed(StepExecutionFailed(t.getMessage))) } - - val stepDefCall = - StepDeferredCall( - serviceName, - commandName, - payload = Some(messageCodec.encodeScala(defCall.message())), - metadata = MetadataImpl.toProtocol(defCall.metadata())) - - Future.successful { - StepResponse(commandId, stepName, StepResponse.Response.DeferredCall(stepDefCall)) - } case Some(call: AsyncCallStep[_, _, _]) => val decodedInput = input match { @@ -208,12 +218,12 @@ abstract class WorkflowRouter[S, W <: AbstractWorkflow[S]](protected val workflo case None => null // to meet a signature of supplier expressed as a function } - val future = call.callFunc - .asInstanceOf[JFunc[Any, CompletionStage[Any]]] - .apply(decodedInput) - .asScala - - future + Future { + call.callFunc + .asInstanceOf[JFunc[Any, CompletionStage[Any]]] + .apply(decodedInput) + .asScala + }.flatten .map { res => val encoded = messageCodec.encodeScala(res) val executedRes = StepExecuted(Some(encoded))