Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.{ Function => JFunc }

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
import scala.jdk.FutureConverters._
import scala.util.Try

import com.google.api.HttpBody
import com.google.protobuf.any.{ Any => ScalaPbAny }
import kalix.javasdk.DeferredCall
Expand Down Expand Up @@ -176,44 +179,51 @@ abstract class WorkflowRouter[S, W <: AbstractWorkflow[S]](protected val workflo
case None => null // to meet a signature of supplier expressed as a function
}

val defCall = call.callFunc
.asInstanceOf[JFunc[Any, DeferredCall[Any, Any]]]
.apply(decodedInput)

val (commandName, serviceName) =
defCall match {
case grpcDefCall: GrpcDeferredCall[_, _] =>
(grpcDefCall.methodName, grpcDefCall.fullServiceName)
case restDefCall: RestDeferredCall[_, _] =>
(restDefCall.methodName, restDefCall.fullServiceName)
case _ =>
// should never happen, but needs to make compiler happy
throw new IllegalStateException("Unknown DeferredCall implementation")
Future
.fromTry(Try {
call.callFunc
.asInstanceOf[JFunc[Any, DeferredCall[Any, Any]]]
.apply(decodedInput)
})
.map { defCall =>

val (commandName, serviceName) =
defCall match {
case grpcDefCall: GrpcDeferredCall[_, _] =>
(grpcDefCall.methodName, grpcDefCall.fullServiceName)
case restDefCall: RestDeferredCall[_, _] =>
(restDefCall.methodName, restDefCall.fullServiceName)
case _ =>
// should never happen, but needs to make compiler happy
throw new IllegalStateException("Unknown DeferredCall implementation")
}

val stepDefCall =
StepDeferredCall(
serviceName,
commandName,
payload = Some(messageCodec.encodeScala(defCall.message())),
metadata = MetadataImpl.toProtocol(defCall.metadata()))

StepResponse(commandId, stepName, StepResponse.Response.DeferredCall(stepDefCall))
}
.recover { case t: Throwable =>
log.error("Workflow call failed.", t)
StepResponse(commandId, stepName, StepResponse.Response.ExecutionFailed(StepExecutionFailed(t.getMessage)))
}

val stepDefCall =
StepDeferredCall(
serviceName,
commandName,
payload = Some(messageCodec.encodeScala(defCall.message())),
metadata = MetadataImpl.toProtocol(defCall.metadata()))

Future.successful {
StepResponse(commandId, stepName, StepResponse.Response.DeferredCall(stepDefCall))
}

case Some(call: AsyncCallStep[_, _, _]) =>
val decodedInput = input match {
case Some(inputValue) => decodeInput(messageCodec, inputValue, call.callInputClass)
case None => null // to meet a signature of supplier expressed as a function
}

val future = call.callFunc
.asInstanceOf[JFunc[Any, CompletionStage[Any]]]
.apply(decodedInput)
.asScala

future
Future {
call.callFunc
.asInstanceOf[JFunc[Any, CompletionStage[Any]]]
.apply(decodedInput)
.asScala
}.flatten
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess wrapping is to make sure it always fails the future and doesn't throw on the calling thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly

.map { res =>
val encoded = messageCodec.encodeScala(res)
val executedRes = StepExecuted(Some(encoded))
Expand Down
Loading