diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index c62765d53b10..fe12238018f2 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -58,6 +58,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation, 0, local_mode_ray_tuntime_.GetCurrentTaskId(), address, + -1, 1, required_resources, required_placement_resources, diff --git a/java/api/src/main/java/io/ray/api/call/ActorTaskCaller.java b/java/api/src/main/java/io/ray/api/call/ActorTaskCaller.java index 2ee68e70168d..04d8ff555fe1 100644 --- a/java/api/src/main/java/io/ray/api/call/ActorTaskCaller.java +++ b/java/api/src/main/java/io/ray/api/call/ActorTaskCaller.java @@ -28,6 +28,11 @@ public ActorTaskCaller setConcurrencyGroup(String name) { return self(); } + public ActorTaskCaller setForwardObjectToParentTask(boolean ifForward) { + builder.setForwardObjectToParentTask(ifForward); + return self(); + } + private ActorTaskCaller self() { return this; } diff --git a/java/api/src/main/java/io/ray/api/options/CallOptions.java b/java/api/src/main/java/io/ray/api/options/CallOptions.java index e646887ec09d..f1f5cf1b8922 100644 --- a/java/api/src/main/java/io/ray/api/options/CallOptions.java +++ b/java/api/src/main/java/io/ray/api/options/CallOptions.java @@ -13,6 +13,7 @@ public class CallOptions extends BaseTaskOptions { public final int bundleIndex; public final String concurrencyGroupName; private final String serializedRuntimeEnvInfo; + public final boolean forwardObjectToParentTask; private CallOptions( String name, @@ -20,13 +21,15 @@ private CallOptions( PlacementGroup group, int bundleIndex, String concurrencyGroupName, - RuntimeEnv runtimeEnv) { + RuntimeEnv runtimeEnv, + boolean forwardObjectToParentTask) { super(resources); this.name = name; this.group = group; this.bundleIndex = bundleIndex; this.concurrencyGroupName = concurrencyGroupName; this.serializedRuntimeEnvInfo = runtimeEnv == null ? "" : runtimeEnv.toJsonBytes(); + this.forwardObjectToParentTask = forwardObjectToParentTask; } /** This inner class for building CallOptions. */ @@ -38,6 +41,7 @@ public static class Builder { private int bundleIndex; private String concurrencyGroupName = ""; private RuntimeEnv runtimeEnv = null; + private boolean forwardObjectToParentTask = false; /** * Set a name for this task. @@ -98,8 +102,14 @@ public Builder setRuntimeEnv(RuntimeEnv runtimeEnv) { return this; } + public Builder setForwardObjectToParentTask(boolean ifForward) { + this.forwardObjectToParentTask = ifForward; + return this; + } + public CallOptions build() { - return new CallOptions(name, resources, group, bundleIndex, concurrencyGroupName, runtimeEnv); + return new CallOptions(name, resources, group, bundleIndex, concurrencyGroupName, + runtimeEnv, forwardObjectToParentTask); } } } diff --git a/java/build-jar-multiplatform.sh b/java/build-jar-multiplatform.sh index 417b5b556035..6f5ec70eef51 100755 --- a/java/build-jar-multiplatform.sh +++ b/java/build-jar-multiplatform.sh @@ -79,12 +79,12 @@ build_jars_multiplatform() { return fi fi - if download_jars "ray-runtime-$version.jar"; then - prepare_native - build_jars multiplatform false - else - echo "download_jars failed, skip building multiplatform jars" - fi + # if download_jars "ray-runtime-$version.jar"; then + prepare_native + build_jars linux + # else + # echo "download_jars failed, skip building multiplatform jars" + # fi } # Download darwin/windows ray-related jar from s3 @@ -124,7 +124,8 @@ download_jars() { # prepare native binaries and libraries. prepare_native() { - for os in 'darwin' 'linux'; do + # for os in 'darwin' 'linux'; do + for os in 'linux'; do cd "$JAR_BASE_DIR/$os" jar xf "ray-runtime-$version.jar" "native/$os" local native_dir="$WORKSPACE_DIR/java/runtime/native_dependencies/native/$os" @@ -137,7 +138,8 @@ prepare_native() { # Return 0 if native bianries and libraries exist and 1 if not. native_files_exist() { local os - for os in 'darwin' 'linux'; do + # for os in 'darwin' 'linux'; do + for os in 'linux'; do native_dirs=() native_dirs+=("$WORKSPACE_DIR/java/runtime/native_dependencies/native/$os") for native_dir in "${native_dirs[@]}"; do diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index 3ff7fbff8bff..b5c278179666 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -323,7 +323,8 @@ private ObjectRef callNormalFunction( ObjectRefImpl impl = new ObjectRefImpl<>(); /// Mapping the object id to the object ref. - List preparedReturnIds = getCurrentReturnIds(numReturns, ActorId.NIL); + List preparedReturnIds = getCurrentReturnIds(numReturns, ActorId.NIL, + options.forwardObjectToParentTask); if (rayConfig.runMode == RunMode.CLUSTER && numReturns > 0) { ObjectRefImpl.registerObjectRefImpl(preparedReturnIds.get(0), impl); } @@ -354,7 +355,9 @@ private ObjectRef callActorFunction( ObjectRefImpl impl = new ObjectRefImpl<>(); /// Mapping the object id to the object ref. - List preparedReturnIds = getCurrentReturnIds(numReturns, rayActor.getId()); + System.err.println(options.forwardObjectToParentTask); + List preparedReturnIds = getCurrentReturnIds(numReturns, rayActor.getId(), + options.forwardObjectToParentTask); if (rayConfig.runMode == RunMode.CLUSTER && numReturns > 0) { ObjectRefImpl.registerObjectRefImpl(preparedReturnIds.get(0), impl); } @@ -394,7 +397,7 @@ private BaseActorHandle createActorImpl( return actor; } - abstract List getCurrentReturnIds(int numReturns, ActorId actorId); + abstract List getCurrentReturnIds(int numReturns, ActorId actorId, boolean ifForward); public WorkerContext getWorkerContext() { return workerContext; diff --git a/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java index 8280642bae2d..6734e8b05b67 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java @@ -93,7 +93,7 @@ public Map> getAvailableResourceIds() { } @Override - List getCurrentReturnIds(int numReturns, ActorId actorId) { + List getCurrentReturnIds(int numReturns, ActorId actorId, boolean ifForward) { return null; } diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java index 4f471b890f8e..7bd523f5dea8 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java @@ -217,8 +217,8 @@ public void killActor(BaseActorHandle actor, boolean noRestart) { } @Override - List getCurrentReturnIds(int numReturns, ActorId actorId) { - List ret = nativeGetCurrentReturnIds(numReturns, actorId.getBytes()); + List getCurrentReturnIds(int numReturns, ActorId actorId, boolean ifForward) { + List ret = nativeGetCurrentReturnIds(numReturns, actorId.getBytes(), ifForward); return ret.stream().map(ObjectId::new).collect(Collectors.toList()); } @@ -291,7 +291,7 @@ private static native void nativeInitialize( private static native String nativeGetNamespace(); - private static native List nativeGetCurrentReturnIds(int numReturns, byte[] actorId); + private static native List nativeGetCurrentReturnIds(int numReturns, byte[] actorId, boolean ifForward); private static native byte[] nativeGetCurrentNodeId(); } diff --git a/src/mock/ray/core_worker/core_worker.h b/src/mock/ray/core_worker/core_worker.h index 01afa150cc46..15f7c8180d39 100644 --- a/src/mock/ray/core_worker/core_worker.h +++ b/src/mock/ray/core_worker/core_worker.h @@ -165,6 +165,12 @@ class MockCoreWorker : public CoreWorker { rpc::AssignObjectOwnerReply *reply, rpc::SendReplyCallback send_reply_callback), (override)); + MOCK_METHOD(void, + HandleUpdateForwardedObject, + (const rpc::UpdateForwardedObjectRequest &request, + rpc::UpdateForwardedObjectReply *reply, + rpc::SendReplyCallback send_reply_callback), + (override)); }; } // namespace core diff --git a/src/mock/ray/rpc/worker/core_worker_client.h b/src/mock/ray/rpc/worker/core_worker_client.h index a101da1c19e5..e946214ce6f1 100644 --- a/src/mock/ray/rpc/worker/core_worker_client.h +++ b/src/mock/ray/rpc/worker/core_worker_client.h @@ -129,6 +129,11 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn (const AssignObjectOwnerRequest &request, const ClientCallback &callback), (override)); + MOCK_METHOD(void, + UpdateForwardedObject, + (const UpdateForwardedObjectRequest &request, + const ClientCallback &callback), + (override)); MOCK_METHOD(int64_t, ClientProcessedUpToSeqno, (), (override)); }; diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 4b608d9bf5c1..e9c050fc31b7 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -198,7 +198,16 @@ size_t TaskSpecification::NumArgs() const { return message_->args_size(); } size_t TaskSpecification::NumReturns() const { return message_->num_returns(); } ObjectID TaskSpecification::ReturnId(size_t return_index) const { - return ObjectID::FromIndex(TaskId(), return_index + 1); + auto parent_num_returns = this->message_->parent_num_returns(); + if (parent_num_returns < 0) { + return ObjectID::FromIndex(TaskId(), return_index + 1); + } else { + return ObjectID::FromIndex(ParentTaskId(), parent_num_returns + return_index + 1); + } +} + +bool TaskSpecification::ForwardToParent() const { + return this->message_->parent_num_returns() >= 0; } bool TaskSpecification::ArgByRef(size_t arg_index) const { diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index debe14267864..8d56dfaa1736 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -221,6 +221,8 @@ class TaskSpecification : public MessageWrapper { ObjectID ReturnId(size_t return_index) const; + bool ForwardToParent() const; + const uint8_t *ArgData(size_t arg_index) const; size_t ArgDataSize(size_t arg_index) const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 4f5305f9b5aa..32a17b984356 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -107,6 +107,7 @@ class TaskSpecBuilder { uint64_t parent_counter, const TaskID &caller_id, const rpc::Address &caller_address, + int parent_num_returns, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, @@ -124,6 +125,7 @@ class TaskSpecBuilder { message_->set_parent_counter(parent_counter); message_->set_caller_id(caller_id.Binary()); message_->mutable_caller_address()->CopyFrom(caller_address); + message_->set_parent_num_returns(parent_num_returns); message_->set_num_returns(num_returns); message_->mutable_required_resources()->insert(required_resources.begin(), required_resources.end()); diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 0460494f3201..5854db0f3e11 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -60,12 +60,14 @@ struct TaskOptions { int num_returns, std::unordered_map &resources, const std::string &concurrency_group_name = "", - const std::string &serialized_runtime_env_info = "{}") + const std::string &serialized_runtime_env_info = "{}", + bool use_parent_task_id = false) : name(name), num_returns(num_returns), resources(resources), concurrency_group_name(concurrency_group_name), - serialized_runtime_env_info(serialized_runtime_env_info) {} + serialized_runtime_env_info(serialized_runtime_env_info), + use_parent_task_id(use_parent_task_id) {} /// The name of this task. std::string name; @@ -79,6 +81,8 @@ struct TaskOptions { /// fields which not contained in Runtime Env, such as eager_install. /// Propagated to child actors and tasks. std::string serialized_runtime_env_info; + + bool use_parent_task_id = false; }; /// Options for actor creation tasks. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9c3cfcff9e76..22b9de3c8f79 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -277,6 +277,24 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ double timestamp) { return PushError(job_id, type, error_message, timestamp); }; + auto forward_object_callback = [this](const ObjectID &object_id, + const std::vector &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size) { + return ForwardToOtherWorker(object_id, + contained_object_ids, + borrower_address, + owner_address, + object_size); + }; + auto update_forwarded_object_callback = [this](const rpc::PushTaskReply &reply, + const std::string &raylet_id, + const rpc::Address &owner_address) { + return UpdateForwardedObject(reply, + raylet_id, + owner_address); + }; task_manager_.reset(new TaskManager( memory_store_, reference_counter_, @@ -308,6 +326,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ } }, push_error_callback, + forward_object_callback, + update_forwarded_object_callback, RayConfig::instance().max_lineage_bytes())); // Create an entry for the driver task in the task table. This task is @@ -950,6 +970,68 @@ Status CoreWorker::Put(const RayObject &object, return PutInLocalPlasmaStore(object, object_id, pin_object); } +Status CoreWorker::UpdateForwardedObject(const rpc::PushTaskReply &reply, + const std::string &raylet_id, + const rpc::Address &owner_address) { + rpc::UpdateForwardedObjectRequest request; + RAY_LOG(ERROR) << "update forward object callback: " << ObjectID::FromBinary(reply.return_objects(0).object_id()); + for (int i = 0; i < reply.return_objects_size(); i++) { + request.add_return_objects()->CopyFrom(reply.return_objects(i)); + } + request.set_pinned_at_raylet_id(raylet_id); + auto conn = core_worker_client_pool_->GetOrConnect(owner_address); + std::promise status_promise; + RAY_LOG(ERROR) << "point 1"; + conn->UpdateForwardedObject(request, + [&status_promise](const Status &returned_status, + const rpc::UpdateForwardedObjectReply &reply) { + status_promise.set_value(returned_status); + }); + RAY_LOG(ERROR) << "point 2"; + // Block until the remote call `UpdateForwardedObject` returns. + auto status = status_promise.get_future().get(); + RAY_LOG(ERROR) << "update forwarded object has status " << status; + return status; +} + +Status CoreWorker::ForwardToOtherWorker(const ObjectID &object_id, + const std::vector &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size) { + // Because in the remote worker's `HandleAssignObjectOwner`, + // a `WaitForRefRemoved` RPC request will be sent back to + // the current worker. So we need to make sure ref count is > 0 + // by invoking `AddLocalReference` first. Note that in worker.py we set + // skip_adding_local_ref=True to avoid double referencing the object. + AddLocalReference(object_id); + RAY_UNUSED( + reference_counter_->AddBorrowedObject(object_id, + ObjectID::Nil(), + owner_address, + /*foreign_owner_already_monitoring=*/true)); + + // Remote call `AssignObjectOwner()`. + rpc::AssignObjectOwnerRequest request; + request.set_object_id(object_id.Binary()); + request.mutable_borrower_address()->CopyFrom(borrower_address); + request.set_call_site(CurrentCallSite()); + request.set_is_reconstructable(true); + for (auto &contained_object_id : contained_object_ids) { + request.add_contained_object_ids(contained_object_id.Binary()); + } + request.set_object_size(object_size); + auto conn = core_worker_client_pool_->GetOrConnect(owner_address); + std::promise status_promise; + conn->AssignObjectOwner(request, + [&status_promise](const Status &returned_status, + const rpc::AssignObjectOwnerReply &reply) { + status_promise.set_value(returned_status); + }); + // Block until the remote call `AssignObjectOwner` returns. + return status_promise.get_future().get(); +} + Status CoreWorker::CreateOwnedAndIncrementLocalRef( const std::shared_ptr &metadata, const size_t data_size, @@ -978,37 +1060,11 @@ Status CoreWorker::CreateOwnedAndIncrementLocalRef( /*add_local_ref=*/true, NodeID::FromBinary(rpc_address_.raylet_id())); } else { - // Because in the remote worker's `HandleAssignObjectOwner`, - // a `WaitForRefRemoved` RPC request will be sent back to - // the current worker. So we need to make sure ref count is > 0 - // by invoking `AddLocalReference` first. Note that in worker.py we set - // skip_adding_local_ref=True to avoid double referencing the object. - AddLocalReference(*object_id); - RAY_UNUSED( - reference_counter_->AddBorrowedObject(*object_id, - ObjectID::Nil(), - real_owner_address, - /*foreign_owner_already_monitoring=*/true)); - - // Remote call `AssignObjectOwner()`. - rpc::AssignObjectOwnerRequest request; - request.set_object_id(object_id->Binary()); - request.mutable_borrower_address()->CopyFrom(rpc_address_); - request.set_call_site(CurrentCallSite()); - - for (auto &contained_object_id : contained_object_ids) { - request.add_contained_object_ids(contained_object_id.Binary()); - } - request.set_object_size(data_size + metadata->Size()); - auto conn = core_worker_client_pool_->GetOrConnect(real_owner_address); - std::promise status_promise; - conn->AssignObjectOwner(request, - [&status_promise](const Status &returned_status, - const rpc::AssignObjectOwnerReply &reply) { - status_promise.set_value(returned_status); - }); - // Block until the remote call `AssignObjectOwner` returns. - status = status_promise.get_future().get(); + status = ForwardToOtherWorker(*object_id, + contained_object_ids, + rpc_address_, + real_owner_address, + data_size + metadata->Size()); } if (options_.is_local_mode && owned_by_us && inline_small_object) { @@ -1530,6 +1586,7 @@ void CoreWorker::BuildCommonTaskSpec( const rpc::Address &address, const RayFunction &function, const std::vector> &args, + int parent_num_returns, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, @@ -1549,6 +1606,7 @@ void CoreWorker::BuildCommonTaskSpec( task_index, caller_id, address, + parent_num_returns, num_returns, required_resources, required_placement_resources, @@ -1597,13 +1655,14 @@ std::vector CoreWorker::SubmitTask( rpc_address_, function, args, + -1, task_options.num_returns, constrained_resources, required_resources, debugger_breakpoint, depth, task_options.serialized_runtime_env_info); - builder.SetNormalTaskSpec(max_retries, retry_exceptions, scheduling_strategy); + builder.SetNormalTaskSpec(1, retry_exceptions, scheduling_strategy); TaskSpecification task_spec = builder.Build(); RAY_LOG(DEBUG) << "Submitting normal task " << task_spec.DebugString(); std::vector returned_refs; @@ -1611,7 +1670,7 @@ std::vector CoreWorker::SubmitTask( returned_refs = ExecuteTaskLocalMode(task_spec); } else { returned_refs = task_manager_->AddPendingTask( - task_spec.CallerAddress(), task_spec, CurrentCallSite(), max_retries); + task_spec.CallerAddress(), task_spec, CurrentCallSite(), 1); io_service_.post( [this, task_spec]() { RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec)); @@ -1676,6 +1735,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, rpc_address_, function, args, + -1, 1, new_resource, new_placement_resources, @@ -1872,7 +1932,15 @@ std::optional> CoreWorker::SubmitActorTask( // Add one for actor cursor object id for tasks. const int num_returns = task_options.num_returns + 1; - + int parent_num_returns = -1; + auto caller_address = rpc_address_; + if (task_options.use_parent_task_id) { + auto current_task = worker_context_.GetCurrentTask(); + parent_num_returns = current_task->NumReturns(); + caller_address = current_task->CallerAddress(); + } + RAY_LOG(ERROR) << parent_num_returns; + RAY_LOG(ERROR) << task_options.use_parent_task_id; // Build common task spec. TaskSpecBuilder builder; const auto next_task_index = worker_context_.GetNextTaskIndex(); @@ -1896,9 +1964,10 @@ std::optional> CoreWorker::SubmitActorTask( worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), - rpc_address_, + caller_address, function, args, + parent_num_returns, num_returns, task_options.resources, required_resources, @@ -3200,6 +3269,7 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re rpc::AssignObjectOwnerReply *reply, rpc::SendReplyCallback send_reply_callback) { ObjectID object_id = ObjectID::FromBinary(request.object_id()); + RAY_LOG(ERROR) << "Handling assign ownership for " << object_id; const auto &borrower_address = request.borrower_address(); std::string call_site = request.call_site(); // Get a list of contained object ids. @@ -3214,7 +3284,7 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re rpc_address_, call_site, request.object_size(), - /*is_reconstructable=*/false, + request.is_reconstructable(), /*add_local_ref=*/false, /*pinned_at_raylet_id=*/NodeID::FromBinary(borrower_address.raylet_id())); reference_counter_->AddBorrowerAddress(object_id, borrower_address); @@ -3222,6 +3292,22 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re send_reply_callback(Status::OK(), nullptr, nullptr); } +void CoreWorker::HandleUpdateForwardedObject(const rpc::UpdateForwardedObjectRequest &request, + rpc::UpdateForwardedObjectReply *reply, + rpc::SendReplyCallback send_reply_callback) { + RAY_LOG(ERROR) << "Start handling updateForwarded"; + auto pinned_at_raylet_id = NodeID::FromBinary(request.pinned_at_raylet_id()); + for (int i = 0; i < request.return_objects_size(); i++) { + const auto &return_object = request.return_objects(i); + ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); + RAY_LOG(ERROR) << "Handling updating raylet for " << object_id; + reference_counter_->UpdateObjectSize(object_id, return_object.size()); + reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); + task_manager_->AddReconstructableObject(object_id); + } + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void CoreWorker::YieldCurrentFiber(FiberEvent &event) { RAY_CHECK(worker_context_.CurrentActorIsAsync()); boost::this_fiber::yield(); @@ -3392,11 +3478,20 @@ Status CoreWorker::WaitForActorRegistered(const std::vector &ids) { } std::vector CoreWorker::GetCurrentReturnIds(int num_returns, - const ActorID &callee_actor_id) { + const ActorID &callee_actor_id, + bool use_parent_task_id) { std::vector return_ids(num_returns); const auto next_task_index = worker_context_.GetTaskIndex() + 1; TaskID task_id; - if (callee_actor_id.IsNil()) { + if (use_parent_task_id) { + task_id = worker_context_.GetCurrentTaskID(); + auto current_task = worker_context_.GetCurrentTask(); + size_t parent_task_num_returns = current_task->NumReturns(); + for (int i = 0; i < num_returns; i++) { + return_ids[i] = ObjectID::FromIndex(task_id, parent_task_num_returns + i + 1); + } + return return_ids; + } else if (callee_actor_id.IsNil()) { /// Return ids for normal task call. task_id = TaskID::ForNormalTask(worker_context_.GetCurrentJobID(), worker_context_.GetCurrentInternalTaskId(), diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index bfd43b6dddb7..ebe98a8e0d52 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -266,6 +266,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const ObjectID &object_id, bool pin_object = false); + Status ForwardToOtherWorker(const ObjectID &object_id, + const std::vector &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size); + + Status UpdateForwardedObject(const rpc::PushTaskReply &reply, + const std::string &raylet_id, + const rpc::Address &owner_address); + /// Create and return a buffer in the object store that can be directly written /// into. After writing to the buffer, the caller must call `SealOwned()` to /// finalize the object. The `CreateOwnedAndIncrementLocalRef()` and @@ -673,7 +683,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Get the expected return ids of the next task. std::vector GetCurrentReturnIds(int num_returns, - const ActorID &callee_actor_id); + const ActorID &callee_actor_id, + bool use_parent_task_id); /// The following methods are handlers for the core worker's gRPC server, which follow /// a macro-generated call convention. These are executed on the io_service_ and @@ -784,6 +795,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { rpc::AssignObjectOwnerReply *reply, rpc::SendReplyCallback send_reply_callback) override; + void HandleUpdateForwardedObject(const rpc::UpdateForwardedObjectRequest &request, + rpc::UpdateForwardedObjectReply *reply, + rpc::SendReplyCallback send_reply_callback) override; /// /// Public methods related to async actor call. This should only be used when /// the actor is (1) direct actor and (2) using asyncio mode. @@ -844,6 +858,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const rpc::Address &address, const RayFunction &function, const std::vector> &args, + int parent_num_returns, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index a544847edf16..c64a21f5bb49 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -428,11 +428,12 @@ Java_io_ray_runtime_RayNativeRuntime_nativeGetCurrentNodeId(JNIEnv *env, jclass) } JNIEXPORT jobject JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeGetCurrentReturnIds( - JNIEnv *env, jclass, jint numReturns, jbyteArray actorIdByteArray) { + JNIEnv *env, jclass, jint numReturns, jbyteArray actorIdByteArray, jboolean ifForward) { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); auto return_ids = core_worker.GetCurrentReturnIds( static_cast(numReturns), - JavaByteArrayToId(env, actorIdByteArray)); + JavaByteArrayToId(env, actorIdByteArray), + static_cast(ifForward)); return NativeIdVectorToJavaByteArrayList(env, return_ids); } diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h index a7e8a885469b..36730b2f9fe5 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h @@ -101,7 +101,7 @@ Java_io_ray_runtime_RayNativeRuntime_nativeGetNamespace(JNIEnv *, jclass); * Signature: (I[B)Ljava/util/List; */ JNIEXPORT jobject JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeGetCurrentReturnIds( - JNIEnv *, jclass, jint, jbyteArray); + JNIEnv *, jclass, jint, jbyteArray, jboolean); /* * Class: io_ray_runtime_RayNativeRuntime diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index 68536247030c..c4d203887eb4 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -124,6 +124,7 @@ inline TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptio std::string concurrency_group_name = ""; std::string serialzied_runtime_env_info = ""; + bool if_forward = false; if (callOptions) { jobject java_resources = env->GetObjectField(callOptions, java_base_task_options_resources); @@ -150,8 +151,10 @@ inline TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptio } } + if_forward = (bool) env->GetBooleanField(callOptions, java_call_options_if_forward); TaskOptions task_options{ - name, numReturns, resources, concurrency_group_name, serialzied_runtime_env_info}; + name, numReturns, resources, concurrency_group_name, + serialzied_runtime_env_info, if_forward}; return task_options; } diff --git a/src/ray/core_worker/lib/java/jni_init.cc b/src/ray/core_worker/lib/java/jni_init.cc index f51286f1548e..9e484b667af8 100644 --- a/src/ray/core_worker/lib/java/jni_init.cc +++ b/src/ray/core_worker/lib/java/jni_init.cc @@ -101,6 +101,7 @@ jfieldID java_task_creation_options_group; jfieldID java_task_creation_options_bundle_index; jfieldID java_call_options_concurrency_group_name; jfieldID java_call_options_serialized_runtime_env_info; +jfieldID java_call_options_if_forward; jclass java_actor_creation_options_class; jfieldID java_actor_creation_options_name; @@ -319,6 +320,9 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) { java_call_options_class, "concurrencyGroupName", "Ljava/lang/String;"); java_call_options_serialized_runtime_env_info = env->GetFieldID( java_call_options_class, "serializedRuntimeEnvInfo", "Ljava/lang/String;"); + java_call_options_if_forward = env->GetFieldID( + java_call_options_class, "forwardObjectToParentTask", "Z"); + java_placement_group_class = LoadClass(env, "io/ray/runtime/placementgroup/PlacementGroupImpl"); diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 506b72aade40..18bddbb5e5cb 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -178,9 +178,12 @@ extern jfieldID java_task_creation_options_group; extern jfieldID java_task_creation_options_bundle_index; /// concurrencyGroupName field of CallOptions class extern jfieldID java_call_options_concurrency_group_name; + /// serializedRuntimeEnvInfo field of CallOptions class extern jfieldID java_call_options_serialized_runtime_env_info; +extern jfieldID java_call_options_if_forward; + /// ActorCreationOptions class extern jclass java_actor_creation_options_class; /// name field of ActorCreationOptions class diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index d8a5e5e6a00d..971ac95db574 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -197,7 +197,7 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, bool is_reconstructable, bool add_local_ref, const absl::optional &pinned_at_raylet_id) { - RAY_LOG(DEBUG) << "Adding owned object " << object_id; + RAY_LOG(ERROR) << "Adding owned object " << object_id; absl::MutexLock lock(&mutex_); RAY_CHECK(object_id_refs_.count(object_id) == 0) << "Tried to create an owned object that already exists: " << object_id; @@ -692,6 +692,7 @@ std::vector ReferenceCounter::FlushObjectsToRecover() { void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, const NodeID &raylet_id) { absl::MutexLock lock(&mutex_); + RAY_LOG(ERROR) << "updating raylet of " << object_id; auto it = object_id_refs_.find(object_id); if (it != object_id_refs_.end()) { if (freed_objects_.count(object_id) > 0) { @@ -702,7 +703,7 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, // The object is still in scope. Track the raylet location until the object // has gone out of scope or the raylet fails, whichever happens first. if (it->second.pinned_at_raylet_id.has_value()) { - RAY_LOG(INFO) << "Updating primary location for object " << object_id << " to node " + RAY_LOG(ERROR) << "Updating primary location for object " << object_id << " to node " << raylet_id << ", but it already has a primary location " << *it->second.pinned_at_raylet_id << ". This should only happen during reconstruction"; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 614e6834a547..19446cf75d94 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -35,7 +35,7 @@ std::vector TaskManager::AddPendingTask( int max_retries) { RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId() << " with " << max_retries << " retries"; - + auto owner_address= spec.CallerAddress(); // Add references for the dependencies to the task. std::vector task_deps; for (size_t i = 0; i < spec.NumArgs(); i++) { @@ -66,29 +66,39 @@ std::vector TaskManager::AddPendingTask( for (size_t i = 0; i < num_returns; i++) { auto return_id = spec.ReturnId(i); if (!spec.IsActorCreationTask()) { - bool is_reconstructable = max_retries != 0; - // We pass an empty vector for inner IDs because we do not know the return - // value of the task yet. If the task returns an ID(s), the worker will - // publish the WaitForRefRemoved message that we are now a borrower for - // the inner IDs. Note that this message can be received *before* the - // PushTaskReply. - // NOTE(swang): We increment the local ref count to ensure that the - // object is considered in scope before we return the ObjectRef to the - // language frontend. Note that the language bindings should set - // skip_adding_local_ref=True to avoid double referencing the object. - reference_counter_->AddOwnedObject(return_id, - /*inner_ids=*/{}, - caller_address, - call_site, - -1, - /*is_reconstructable=*/is_reconstructable, - /*add_local_ref=*/true); + bool is_reconstructable = true; + if (spec.ForwardToParent()){ + auto contained_object_ids = std::vector(); + forward_object_callback_(return_id, + contained_object_ids, + caller_address, + owner_address, + -1); + } else { + // bool is_reconstructable = max_retries != 0; + // We pass an empty vector for inner IDs because we do not know the return + // value of the task yet. If the task returns an ID(s), the worker will + // publish the WaitForRefRemoved message that we are now a borrower for + // the inner IDs. Note that this message can be received *before* the + // PushTaskReply. + // NOTE(swang): We increment the local ref count to ensure that the + // object is considered in scope before we return the ObjectRef to the + // language frontend. Note that the language bindings should set + // skip_adding_local_ref=True to avoid double referencing the object. + reference_counter_->AddOwnedObject(return_id, + /*inner_ids=*/{}, + owner_address, + call_site, + -1, + /*is_reconstructable=*/is_reconstructable, + /*add_local_ref=*/true); + } } return_ids.push_back(return_id); rpc::ObjectReference ref; ref.set_object_id(spec.ReturnId(i).Binary()); - ref.mutable_owner_address()->CopyFrom(caller_address); + ref.mutable_owner_address()->CopyFrom(owner_address); ref.set_call_site(call_site); returned_refs.push_back(std::move(ref)); } @@ -98,7 +108,7 @@ std::vector TaskManager::AddPendingTask( { absl::MutexLock lock(&mu_); auto inserted = submissible_tasks_.emplace(spec.TaskId(), - TaskEntry(spec, max_retries, num_returns)); + TaskEntry(spec, 1, num_returns)); RAY_CHECK(inserted.second); num_pending_tasks_++; } @@ -113,12 +123,14 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector *tas { absl::MutexLock lock(&mu_); auto it = submissible_tasks_.find(task_id); + RAY_LOG(ERROR) << "finding in submissible tasks..."; if (it == submissible_tasks_.end()) { // This can happen when the task has already been // retried up to its max attempts. + RAY_LOG(ERROR) << "Cannot find the task in submissible tasks"; return false; } - + RAY_LOG(ERROR) << "found in submissible tasks"; if (!it->second.IsPending()) { resubmit = true; it->second.status = rpc::TaskStatus::WAITING_FOR_DEPENDENCIES; @@ -231,6 +243,8 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, // reference holders that are already scheduled at the raylet can retrieve // these objects through plasma. absl::flat_hash_set store_in_plasma_ids = {}; + bool forward_to_parent = false; + rpc::Address caller_address; { absl::MutexLock lock(&mu_); auto it = submissible_tasks_.find(task_id); @@ -239,12 +253,20 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, if (it->second.num_successful_executions > 0) { store_in_plasma_ids = it->second.reconstructable_return_ids; } + forward_to_parent = it->second.spec.ForwardToParent(); + caller_address = it->second.spec.CallerAddress(); } - std::vector direct_return_ids; + RAY_LOG(ERROR) << "task manager completing task " << task_id; + if (forward_to_parent) { + RAY_LOG(ERROR) << "updating forwarded object"; + update_forwarded_object_callback_(reply, worker_addr.raylet_id(), caller_address); + } else { + for (int i = 0; i < reply.return_objects_size(); i++) { const auto &return_object = reply.return_objects(i); ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); + // in rpc reference_counter_->UpdateObjectSize(object_id, return_object.size()); RAY_LOG(DEBUG) << "Task return object " << object_id << " has size " << return_object.size(); @@ -256,6 +278,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, // it as local in the in-memory store so that the data locality policy // will choose the right raylet for any queued dependent tasks. const auto pinned_at_raylet_id = NodeID::FromBinary(worker_addr.raylet_id()); + // in rpc reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); // Mark it as in plasma with a dummy object. RAY_CHECK( @@ -301,7 +324,8 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, reference_counter_->AddNestedObjectIds(object_id, nested_ids, owner_address); } } - + } + RAY_LOG(ERROR) << "Complete pending task continued..."; TaskSpecification spec; bool release_lineage = true; int64_t min_lineage_bytes_to_evict = 0; @@ -331,8 +355,10 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, // A finished task can only be re-executed if it has some number of // retries left and returned at least one object that is still in use and // stored in plasma. + RAY_LOG(ERROR) << "task " << task_id << " state: num_retries_left=" << it->second.num_retries_left; bool task_retryable = it->second.num_retries_left != 0 && !it->second.reconstructable_return_ids.empty(); + RAY_LOG(ERROR) << "task " << task_id << "retryable is " << task_retryable; if (task_retryable) { // Pin the task spec if it may be retried again. release_lineage = false; @@ -360,6 +386,15 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, ShutdownIfNeeded(); } +void TaskManager::AddReconstructableObject(const ObjectID &object_id) { + absl::MutexLock lock(&mu_); + auto task_id = object_id.TaskId(); + auto it = submissible_tasks_.find(task_id); + RAY_CHECK(it != submissible_tasks_.end()) + << "Tried to add reconstructable object to finished task" << task_id; + it->second.reconstructable_return_ids.insert(object_id); +} + bool TaskManager::RetryTaskIfPossible(const TaskID &task_id) { int num_retries_left = 0; TaskSpecification spec; diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 044ee6b141bb..d87aaa80a74d 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -82,6 +82,14 @@ using PushErrorCallback = std::function; +using ForwardObjectCallback = std::function &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size)>; +using UpdateForwardedObjectCallback = std::function; class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface { public: @@ -90,12 +98,16 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa PutInLocalPlasmaCallback put_in_local_plasma_callback, RetryTaskCallback retry_task_callback, PushErrorCallback push_error_callback, + ForwardObjectCallback forward_object_callback, + UpdateForwardedObjectCallback update_forwarded_object_callback, int64_t max_lineage_bytes) : in_memory_store_(in_memory_store), reference_counter_(reference_counter), put_in_local_plasma_callback_(put_in_local_plasma_callback), retry_task_callback_(retry_task_callback), push_error_callback_(push_error_callback), + forward_object_callback_(forward_object_callback), + update_forwarded_object_callback_(update_forwarded_object_callback), max_lineage_bytes_(max_lineage_bytes) { reference_counter_->SetReleaseLineageCallback( [this](const ObjectID &object_id, std::vector *ids_to_release) { @@ -272,6 +284,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// Fill every task information of the current worker to GetCoreWorkerStatsReply. void FillTaskInfo(rpc::GetCoreWorkerStatsReply *reply, const int64_t limit) const; + /// Add a reconstructable object to a task, used in forward + void AddReconstructableObject(const ObjectID &object_id); + private: struct TaskEntry { TaskEntry(const TaskSpecification &spec_arg, @@ -371,6 +386,12 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa // Called to push an error to the relevant driver. const PushErrorCallback push_error_callback_; + // Called to forward a returned object to another worker + const ForwardObjectCallback forward_object_callback_; + + // Called to forward a returned object to another worker + const UpdateForwardedObjectCallback update_forwarded_object_callback_; + const int64_t max_lineage_bytes_; // The number of task failures we have logged total. diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index fb48f1ceeb1d..9542e1999c18 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -559,6 +559,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { 0, RandomTaskId(), address, + -1, num_returns, resources, resources, diff --git a/src/ray/core_worker/test/dependency_resolver_test.cc b/src/ray/core_worker/test/dependency_resolver_test.cc index 4968dcc73ace..7fd68d1870fc 100644 --- a/src/ray/core_worker/test/dependency_resolver_test.cc +++ b/src/ray/core_worker/test/dependency_resolver_test.cc @@ -40,6 +40,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r 0, TaskID::Nil(), empty_address, + -1, 1, resources, resources, diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 551dfe2cb164..c8fd749f58c3 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -45,6 +45,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r 0, TaskID::Nil(), empty_address, + -1, 1, resources, resources, diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index ef005771b6dc..83f75135d26f 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -64,6 +64,14 @@ class TaskManagerTest : public ::testing::Test { const std::string &type, const std::string &error_message, double timestamp) { return Status::OK(); }, + [](const ObjectID &object_id, + const std::vector &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size) { return Status::OK(); }, + [](const rpc::PushTaskReply &reply, + const std::string &raylet_id, + const rpc::Address &owner_address) { return Status::OK(); }, max_lineage_bytes) {} virtual void TearDown() { AssertNoLeaks(); } diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 31a911c1a3b9..e69f970ed038 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -104,7 +104,7 @@ void CoreWorkerDirectTaskReceiver::HandleTask( if (objects_valid) { for (size_t i = 0; i < return_objects.size(); i++) { auto return_object = reply->add_return_objects(); - ObjectID id = ObjectID::FromIndex(task_spec.TaskId(), /*index=*/i + 1); + ObjectID id = task_spec.ReturnId(i); return_object->set_object_id(id.Binary()); if (!return_objects[i]) { diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 7b7298ab29cd..f2096be2a0eb 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -54,6 +54,7 @@ struct Mocker { 0, TaskID::Nil(), owner_address, + -1, 1, required_resources, required_placement_resources, diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 3dc35c941ea4..36c398f3ae66 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -284,42 +284,44 @@ message TaskSpec { Address caller_address = 10; // Task arguments. repeated TaskArg args = 11; + // Number of parent task's return objects. + int32 parent_num_returns = 12; // Number of return objects. - uint64 num_returns = 12; + uint64 num_returns = 13; // Quantities of the different resources required by this task. - map required_resources = 13; + map required_resources = 14; // The resources required for placing this task on a node. If this is empty, // then the placement resources are equal to the required_resources. - map required_placement_resources = 14; + map required_placement_resources = 15; // Task specification for an actor creation task. // This field is only valid when `type == ACTOR_CREATION_TASK`. - ActorCreationTaskSpec actor_creation_task_spec = 15; + ActorCreationTaskSpec actor_creation_task_spec = 16; // Task specification for an actor task. // This field is only valid when `type == ACTOR_TASK`. - ActorTaskSpec actor_task_spec = 16; + ActorTaskSpec actor_task_spec = 17; // Number of times this task may be retried on worker failure. - int32 max_retries = 17; + int32 max_retries = 18; // Whether or not to skip the execution of this task. When it's true, // the receiver will not execute the task. This field is used by async actors // to guarantee task submission order after restart. - bool skip_execution = 21; + bool skip_execution = 22; // Breakpoint if this task should drop into the debugger when it starts executing // and "" if the task should not drop into the debugger. - bytes debugger_breakpoint = 22; + bytes debugger_breakpoint = 23; // Runtime environment for this task. - RuntimeEnvInfo runtime_env_info = 23; + RuntimeEnvInfo runtime_env_info = 24; // The concurrency group name in which this task will be performed. - string concurrency_group_name = 24; + string concurrency_group_name = 25; // Whether application-level errors (exceptions) should be retried. - bool retry_exceptions = 25; + bool retry_exceptions = 26; // The depth of the task. The driver has depth 0, anything it calls has depth // 1, etc. - int64 depth = 26; + int64 depth = 27; // Strategy about how to schedule this task. - SchedulingStrategy scheduling_strategy = 27; + SchedulingStrategy scheduling_strategy = 28; // A count of the number of times this task has been attempted so far. 0 // means this is the first execution. - uint64 attempt_number = 28; + uint64 attempt_number = 29; } message TaskInfoEntry { diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 78171ec8f07c..cb7161a74cf1 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -353,10 +353,21 @@ message AssignObjectOwnerRequest { Address borrower_address = 4; // Description of the call site where the reference was created. string call_site = 5; + // whether the object is reconstrutable + bool is_reconstructable = 6; } message AssignObjectOwnerReply {} +message UpdateForwardedObjectRequest { + // The returned objects. + repeated ReturnObject return_objects = 1; + // + bytes pinned_at_raylet_id = 2; +} + +message UpdateForwardedObjectReply {} + message RayletNotifyGCSRestartRequest {} message RayletNotifyGCSRestartReply {} @@ -414,4 +425,6 @@ service CoreWorkerService { rpc Exit(ExitRequest) returns (ExitReply); // Assign the owner of an object to the intended worker. rpc AssignObjectOwner(AssignObjectOwnerRequest) returns (AssignObjectOwnerReply); + // + rpc UpdateForwardedObject(UpdateForwardedObjectRequest) returns (UpdateForwardedObjectReply); } diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index cc0ad1efab1b..aa6b684b2044 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -161,6 +161,7 @@ RayTask CreateTask( 0, TaskID::Nil(), address, + -1, 0, required_resources, {}, diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index dc80444cf846..5f76508cf76f 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -193,6 +193,10 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { const ClientCallback &callback) { } + virtual void UpdateForwardedObject(const UpdateForwardedObjectRequest &request, + const ClientCallback &callback) { + } + virtual void RayletNotifyGCSRestart( const RayletNotifyGCSRestartRequest &request, const ClientCallback &callback) {} @@ -331,6 +335,12 @@ class CoreWorkerClient : public std::enable_shared_from_this, /*method_timeout_ms*/ -1, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, + UpdateForwardedObject, + grpc_client_, + /*method_timeout_ms*/ -1, + override) + void PushActorTask(std::unique_ptr request, bool skip_queue, const ClientCallback &callback) override { diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index a66ef4657745..dcf9912ec839 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -47,7 +47,8 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, DeleteSpilledObjects, -1) \ RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady, -1) \ RPC_SERVICE_HANDLER(CoreWorkerService, Exit, -1) \ - RPC_SERVICE_HANDLER(CoreWorkerService, AssignObjectOwner, -1) + RPC_SERVICE_HANDLER(CoreWorkerService, AssignObjectOwner, -1) \ + RPC_SERVICE_HANDLER(CoreWorkerService, UpdateForwardedObject, -1) #define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PushTask) \ @@ -69,7 +70,8 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DeleteSpilledObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit) \ - DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignObjectOwner) + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignObjectOwner) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(UpdateForwardedObject) /// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`. class CoreWorkerServiceHandler {