diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 36f9446e8caa..5243f963c126 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -4374,22 +4374,28 @@ void CoreWorker::HandleDeleteSpilledObjects(rpc::DeleteSpilledObjectsRequest req void CoreWorker::HandleExit(rpc::ExitRequest request, rpc::ExitReply *reply, rpc::SendReplyCallback send_reply_callback) { - const bool own_objects = reference_counter_->OwnObjects(); + const size_t num_objects_with_references = reference_counter_->Size(); const size_t num_pending_tasks = task_manager_->NumPendingTasks(); const int64_t pins_in_flight = local_raylet_client_->GetPinsInFlight(); - // We consider the worker to be idle if it doesn't own any objects and it doesn't have - // any object pinning RPCs in flight and it doesn't have pending tasks. - bool is_idle = !own_objects && (pins_in_flight == 0) && (num_pending_tasks == 0); + // We consider the worker to be idle if it doesn't have object references and it doesn't + // have any object pinning RPCs in flight and it doesn't have pending tasks. + bool is_idle = (num_objects_with_references == 0) && (pins_in_flight == 0) && + (num_pending_tasks == 0); bool force_exit = request.force_exit(); RAY_LOG(DEBUG) << "Exiting: is_idle: " << is_idle << " force_exit: " << force_exit; - if (!is_idle && force_exit) { - RAY_LOG(INFO) << "Force exiting worker that owns object. This may cause other " - "workers that depends on the object to lose it. " - << "Own objects: " << own_objects - << " # Pins in flight: " << pins_in_flight - << " # pending tasks: " << num_pending_tasks; - } - bool will_exit = is_idle || force_exit; + if (!is_idle) { + RAY_LOG_EVERY_MS(INFO, 60000) + << "Worker is not idle: reference counter: " << reference_counter_->DebugString() + << " # pins in flight: " << pins_in_flight + << " # pending tasks: " << num_pending_tasks; + if (force_exit) { + RAY_LOG(INFO) << "Force exiting worker that's not idle. " + << "reference counter: " << reference_counter_->DebugString() + << " # Pins in flight: " << pins_in_flight + << " # pending tasks: " << num_pending_tasks; + } + } + const bool will_exit = is_idle || force_exit; reply->set_success(will_exit); send_reply_callback( Status::OK(), diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 4acbc0f2bacd..1e984f03753b 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -14,27 +14,17 @@ #include "ray/core_worker/reference_count.h" -#define PRINT_REF_COUNT(it) \ - RAY_LOG(DEBUG) << "REF " << it->first \ - << " borrowers: " << it->second.borrow().borrowers.size() \ - << " local_ref_count: " << it->second.local_ref_count \ - << " submitted_count: " << it->second.submitted_task_ref_count \ - << " contained_in_owned: " \ - << it->second.nested().contained_in_owned.size() \ - << " contained_in_borrowed: " \ - << (it)->second.nested().contained_in_borrowed_ids.size() \ - << " contains: " << it->second.nested().contains.size() \ - << " stored_in: " << it->second.borrow().stored_in_objects.size() \ - << " lineage_ref_count: " << it->second.lineage_ref_count; +#define PRINT_REF_COUNT(it) \ + RAY_LOG(DEBUG) << "REF " << it->first << ": " << it->second.DebugString(); namespace {} // namespace namespace ray { namespace core { -bool ReferenceCounter::OwnObjects() const { +size_t ReferenceCounter::Size() const { absl::MutexLock lock(&mutex_); - return !object_id_refs_.empty(); + return object_id_refs_.size(); } bool ReferenceCounter::OwnedByUs(const ObjectID &object_id) const { @@ -1617,6 +1607,31 @@ void ReferenceCounter::PublishObjectLocationSnapshot(const ObjectID &object_id) PushToLocationSubscribers(it); } +std::string ReferenceCounter::DebugString() const { + absl::MutexLock lock(&mutex_); + std::stringstream ss; + ss << "ReferenceTable{size: " << object_id_refs_.size(); + if (!object_id_refs_.empty()) { + ss << " sample: " << object_id_refs_.begin()->first << ":" + << object_id_refs_.begin()->second.DebugString(); + } + ss << "}"; + return ss.str(); +} + +std::string ReferenceCounter::Reference::DebugString() const { + std::stringstream ss; + ss << "Reference{borrowers: " << borrow().borrowers.size() + << " local_ref_count: " << local_ref_count + << " submitted_count: " << submitted_task_ref_count + << " contained_on_owned: " << nested().contained_in_owned.size() + << " contained_in_borrowed: " << nested().contained_in_borrowed_ids.size() + << " contains: " << nested().contains.size() + << " stored_in: " << borrow().stored_in_objects.size() + << " lineage_ref_count: " << lineage_ref_count << "}"; + return ss.str(); +} + ReferenceCounter::Reference ReferenceCounter::Reference::FromProto( const rpc::ObjectReferenceCount &ref_count) { Reference ref; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 5a778be08efc..b188e2110693 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -86,13 +86,14 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Wait for all object references to go out of scope, and then shutdown. /// /// \param shutdown The shutdown callback to call. - void DrainAndShutdown(std::function shutdown); + void DrainAndShutdown(std::function shutdown) ABSL_LOCKS_EXCLUDED(mutex_); - /// Return true if the worker owns any object. - bool OwnObjects() const; + /// Return the size of the reference count table + /// (i.e. the number of objects that have references). + size_t Size() const ABSL_LOCKS_EXCLUDED(mutex_); /// Return true if the object is owned by us. - bool OwnedByUs(const ObjectID &object_id) const; + bool OwnedByUs(const ObjectID &object_id) const ABSL_LOCKS_EXCLUDED(mutex_); /// Increase the reference count for the ObjectID by one. If there is no /// entry for the ObjectID, one will be created. The object ID will not have @@ -374,6 +375,8 @@ class ReferenceCounter : public ReferenceCounterInterface, std::unordered_map> GetAllReferenceCounts() const ABSL_LOCKS_EXCLUDED(mutex_); + std::string DebugString() const ABSL_LOCKS_EXCLUDED(mutex_); + /// Populate a table with ObjectIDs that we were or are still borrowing. /// This should be called when a task returns, and the argument should be any /// IDs that were passed by reference in the task spec or that were @@ -725,6 +728,8 @@ class ReferenceCounter : public ReferenceCounterInterface, return nested_reference_count.get(); } + std::string DebugString() const; + /// Description of the call site where the reference was created. std::string call_site = ""; /// Object size if known, otherwise -1;