Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Core] Log why core worker is not idle during HandleExit #47300

Merged
merged 2 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4374,22 +4374,28 @@ void CoreWorker::HandleDeleteSpilledObjects(rpc::DeleteSpilledObjectsRequest req
void CoreWorker::HandleExit(rpc::ExitRequest request,
rpc::ExitReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const bool own_objects = reference_counter_->OwnObjects();
const size_t num_objects_with_references = reference_counter_->Size();
const size_t num_pending_tasks = task_manager_->NumPendingTasks();
const int64_t pins_in_flight = local_raylet_client_->GetPinsInFlight();
// We consider the worker to be idle if it doesn't own any objects and it doesn't have
// any object pinning RPCs in flight and it doesn't have pending tasks.
bool is_idle = !own_objects && (pins_in_flight == 0) && (num_pending_tasks == 0);
// We consider the worker to be idle if it doesn't have object references and it doesn't
// have any object pinning RPCs in flight and it doesn't have pending tasks.
bool is_idle = (num_objects_with_references == 0) && (pins_in_flight == 0) &&
(num_pending_tasks == 0);
bool force_exit = request.force_exit();
RAY_LOG(DEBUG) << "Exiting: is_idle: " << is_idle << " force_exit: " << force_exit;
if (!is_idle && force_exit) {
RAY_LOG(INFO) << "Force exiting worker that owns object. This may cause other "
"workers that depends on the object to lose it. "
<< "Own objects: " << own_objects
<< " # Pins in flight: " << pins_in_flight
<< " # pending tasks: " << num_pending_tasks;
}
bool will_exit = is_idle || force_exit;
if (!is_idle) {
RAY_LOG_EVERY_MS(INFO, 60000)
<< "Worker is not idle: reference counter: " << reference_counter_->DebugString()
<< " # pins in flight: " << pins_in_flight
<< " # pending tasks: " << num_pending_tasks;
if (force_exit) {
RAY_LOG(INFO) << "Force exiting worker that's not idle. "
<< "reference counter: " << reference_counter_->DebugString()
<< " # Pins in flight: " << pins_in_flight
<< " # pending tasks: " << num_pending_tasks;
}
}
const bool will_exit = is_idle || force_exit;
reply->set_success(will_exit);
send_reply_callback(
Status::OK(),
Expand Down
43 changes: 29 additions & 14 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,17 @@

#include "ray/core_worker/reference_count.h"

#define PRINT_REF_COUNT(it) \
RAY_LOG(DEBUG) << "REF " << it->first \
<< " borrowers: " << it->second.borrow().borrowers.size() \
<< " local_ref_count: " << it->second.local_ref_count \
<< " submitted_count: " << it->second.submitted_task_ref_count \
<< " contained_in_owned: " \
<< it->second.nested().contained_in_owned.size() \
<< " contained_in_borrowed: " \
<< (it)->second.nested().contained_in_borrowed_ids.size() \
<< " contains: " << it->second.nested().contains.size() \
<< " stored_in: " << it->second.borrow().stored_in_objects.size() \
<< " lineage_ref_count: " << it->second.lineage_ref_count;
#define PRINT_REF_COUNT(it) \
RAY_LOG(DEBUG) << "REF " << it->first << ": " << it->second.DebugString();

namespace {} // namespace

namespace ray {
namespace core {

bool ReferenceCounter::OwnObjects() const {
size_t ReferenceCounter::Size() const {
absl::MutexLock lock(&mutex_);
return !object_id_refs_.empty();
return object_id_refs_.size();
}

bool ReferenceCounter::OwnedByUs(const ObjectID &object_id) const {
Expand Down Expand Up @@ -1617,6 +1607,31 @@ void ReferenceCounter::PublishObjectLocationSnapshot(const ObjectID &object_id)
PushToLocationSubscribers(it);
}

std::string ReferenceCounter::DebugString() const {
absl::MutexLock lock(&mutex_);
std::stringstream ss;
ss << "ReferenceTable{size: " << object_id_refs_.size();
if (!object_id_refs_.empty()) {
ss << " sample: " << object_id_refs_.begin()->first << ":"
<< object_id_refs_.begin()->second.DebugString();
}
ss << "}";
return ss.str();
}

std::string ReferenceCounter::Reference::DebugString() const {
std::stringstream ss;
ss << "Reference{borrowers: " << borrow().borrowers.size()
<< " local_ref_count: " << local_ref_count
<< " submitted_count: " << submitted_task_ref_count
<< " contained_on_owned: " << nested().contained_in_owned.size()
<< " contained_in_borrowed: " << nested().contained_in_borrowed_ids.size()
<< " contains: " << nested().contains.size()
<< " stored_in: " << borrow().stored_in_objects.size()
<< " lineage_ref_count: " << lineage_ref_count << "}";
return ss.str();
}

ReferenceCounter::Reference ReferenceCounter::Reference::FromProto(
const rpc::ObjectReferenceCount &ref_count) {
Reference ref;
Expand Down
13 changes: 9 additions & 4 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// Wait for all object references to go out of scope, and then shutdown.
///
/// \param shutdown The shutdown callback to call.
void DrainAndShutdown(std::function<void()> shutdown);
void DrainAndShutdown(std::function<void()> shutdown) ABSL_LOCKS_EXCLUDED(mutex_);

/// Return true if the worker owns any object.
bool OwnObjects() const;
/// Return the size of the reference count table
/// (i.e. the number of objects that have references).
size_t Size() const ABSL_LOCKS_EXCLUDED(mutex_);

/// Return true if the object is owned by us.
bool OwnedByUs(const ObjectID &object_id) const;
bool OwnedByUs(const ObjectID &object_id) const ABSL_LOCKS_EXCLUDED(mutex_);

/// Increase the reference count for the ObjectID by one. If there is no
/// entry for the ObjectID, one will be created. The object ID will not have
Expand Down Expand Up @@ -374,6 +375,8 @@ class ReferenceCounter : public ReferenceCounterInterface,
std::unordered_map<ObjectID, std::pair<size_t, size_t>> GetAllReferenceCounts() const
ABSL_LOCKS_EXCLUDED(mutex_);

std::string DebugString() const ABSL_LOCKS_EXCLUDED(mutex_);

/// Populate a table with ObjectIDs that we were or are still borrowing.
/// This should be called when a task returns, and the argument should be any
/// IDs that were passed by reference in the task spec or that were
Expand Down Expand Up @@ -725,6 +728,8 @@ class ReferenceCounter : public ReferenceCounterInterface,
return nested_reference_count.get();
}

std::string DebugString() const;

/// Description of the call site where the reference was created.
std::string call_site = "<unknown>";
/// Object size if known, otherwise -1;
Expand Down