From 732476f00dcfb3a4eea426b5d40ccf55eb3f7658 Mon Sep 17 00:00:00 2001 From: John Helly Date: Wed, 29 May 2024 16:24:52 +0100 Subject: [PATCH 1/7] Modify MaxConcurrentIO implementation for swift The previous implementation split the MPI ranks into groups and prevented any ranks in the next group from proceeding until all ranks in the current group have finished. This will waste a lot of time if one (or a few) ranks are very slow. This new implementation tries to make sure that we always have MaxConcurrentIO ranks reading. When any one rank finishes the next is allowed to start immediately. --- src/io/swiftsim_io.cpp | 139 ++++++++++++++----------------------- src/task_limited_section.h | 131 ++++++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 87 deletions(-) create mode 100644 src/task_limited_section.h diff --git a/src/io/swiftsim_io.cpp b/src/io/swiftsim_io.cpp index bd440b0e..a8375ed2 100644 --- a/src/io/swiftsim_io.cpp +++ b/src/io/swiftsim_io.cpp @@ -16,6 +16,7 @@ using namespace std; #include "../hdf_wrapper.h" #include "../mymath.h" #include "../snapshot.h" +#include "../task_limited_section.h" #include "swiftsim_io.h" #include "exchange_and_merge.h" @@ -655,14 +656,6 @@ void SwiftSimReader_t::LoadSnapshot(MpiWorker_t &world, int snapshotId, vector

i1) - i1 = local_first_offset; - - // Determine global offset of last particle to read from this file: - // This is the smaller of the offset to the last particle in this file - // and the offset of the last particle this rank is to read. - HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1; - if (local_last_offset < i2) - i2 = local_last_offset; - - if (i2 >= i1) + // Determine global offset of first particle to read from this file: + // This is the larger of the offset of the first particle in the file + // and the offset of the first particle this rank is to read. + HBTInt i1 = offset_file[file_nr]; + if (local_first_offset > i1) + i1 = local_first_offset; + + // Determine global offset of last particle to read from this file: + // This is the smaller of the offset to the last particle in this file + // and the offset of the last particle this rank is to read. + HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1; + if (local_last_offset < i2) + i2 = local_last_offset; + + if (i2 >= i1) { // We have particles to read from this file. HBTInt file_start = i1 - offset_file[file_nr]; // Offset to first particle to read @@ -762,16 +750,9 @@ void SwiftSimReader_t::LoadSnapshot(MpiWorker_t &world, int snapshotId, vector

i1) - i1 = local_first_offset; - - // Determine global offset of last particle to read from this file: - // This is the smaller of the offset to the last particle in this file - // and the offset of the last particle this rank is to read. - HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1; - if (local_last_offset < i2) - i2 = local_last_offset; - - if (i2 >= i1) + // Determine global offset of first particle to read from this file: + // This is the larger of the offset of the first particle in the file + // and the offset of the first particle this rank is to read. + HBTInt i1 = offset_file[file_nr]; + if (local_first_offset > i1) + i1 = local_first_offset; + + // Determine global offset of last particle to read from this file: + // This is the smaller of the offset to the last particle in this file + // and the offset of the last particle this rank is to read. + HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1; + if (local_last_offset < i2) + i2 = local_last_offset; + + if (i2 >= i1) { // We have particles to read from this file. HBTInt file_start = i1 - offset_file[file_nr]; // Offset to first particle to read @@ -910,17 +882,10 @@ void SwiftSimReader_t::LoadGroups(MpiWorker_t &world, int snapshotId, vector + +class TaskLimitedSection { + +private: + + int max_nr_tasks; + MPI_Comm comm; + MPI_Win win; + int *buffer; + int controller_rank; + MPI_Request controller_rank_request; + + const int CONTROLLER_RANK_TAG = 0; + const int GO_TAG = 1; + const int COMPLETION_TAG = 2; + +public: + + TaskLimitedSection(MPI_Comm comm, const int max_nr_tasks) { + + MPI_Comm_dup(comm, &(this->comm)); + this->max_nr_tasks = max_nr_tasks; + } + + void start() { + + /* Get rank and number of ranks */ + int comm_size; + MPI_Comm_size(comm, &comm_size); + int comm_rank; + MPI_Comm_rank(comm, &comm_rank); + + /* If all ranks are allowed to run there's nothing to do */ + if(max_nr_tasks >= comm_size)return; + + /* Allocate and init counter for RMA */ + MPI_Alloc_mem(sizeof(int), MPI_INFO_NULL, &buffer); + *buffer = 0; + MPI_Win_create(buffer, sizeof(int), sizeof(int), MPI_INFO_NULL, MPI_COMM_WORLD, &win); + + /* Post a receive to get controller task's rank (will be first rank to finish) */ + MPI_Irecv(&controller_rank, 1, MPI_INT, MPI_ANY_SOURCE, + CONTROLLER_RANK_TAG, comm, &controller_rank_request); + + /* The first max_nr_tasks ranks can proceed immediately */ + if(comm_rank < max_nr_tasks)return; + + /* Others need to wait for a message to proceed */ + int go; + MPI_Recv(&go, 1, MPI_INT, MPI_ANY_SOURCE, GO_TAG, comm, MPI_STATUS_IGNORE); + + } + + void end() { + + /* Get rank and number of ranks */ + int comm_size; + MPI_Comm_size(comm, &comm_size); + int comm_rank; + MPI_Comm_rank(comm, &comm_rank); + + /* If all ranks are allowed to run there's nothing to do */ + if(max_nr_tasks >= comm_size)return; + + MPI_Request *request = (MPI_Request *) malloc(sizeof(MPI_Request)*comm_size); + + /* + Check if we're the first task to reach the end of the section: + We do this by doing an atomic fetch and increment on the count of + the number of ranks that have finished. If the count is zero we're + the first and will become responsible for signalling other ranks + to proceed. + + Hopefully this shouldn't block because the rank with the counter + is sitting waiting at a blocking receive. + */ + int completion_count = 0; + MPI_Win_lock(MPI_LOCK_EXCLUSIVE, comm_size-1, 0, win); + int to_add = 1; + MPI_Get_accumulate(&to_add, 1, MPI_INT, + &completion_count, 1, MPI_INT, + comm_size-1, 0, 1, MPI_INT, MPI_SUM, win); + MPI_Win_unlock(comm_size-1, win); + if(completion_count == 0) { + + /* This task is the first to reach the end of the section, so tell everyone */ + for(int dest=0; dest 0) { + int done; + MPI_Status status; + MPI_Recv(&done, 1, MPI_INT, MPI_ANY_SOURCE, COMPLETION_TAG, comm, &status); + } + + /* If there are tasks still waiting, send the next go signal */ + if(nr_left > 0) { + int dest = comm_size - nr_left; + int go = 1; + MPI_Send(&go, 1, MPI_INT, dest, GO_TAG, comm); + nr_left -= 1; + } + } + } + + /* Make sure we've received the controller rank */ + MPI_Wait(&(controller_rank_request), MPI_STATUS_IGNORE); + + /* Send completion message if we're not the controller */ + if(completion_count > 0) { + int complete = 1; + MPI_Send(&complete, 1, MPI_INT, controller_rank, COMPLETION_TAG, comm); + } + + /* Make sure all sends from the controller completed */ + if(completion_count==0) + MPI_Waitall(comm_size, request, MPI_STATUSES_IGNORE); + + /* Tidy up */ + free(request); + MPI_Win_free(&(win)); + MPI_Free_mem(buffer); + } + +}; From 9bc80bc6b3856593739047cd8ce1dcd554e4a308 Mon Sep 17 00:00:00 2001 From: John Helly Date: Wed, 29 May 2024 16:54:32 +0100 Subject: [PATCH 2/7] Renumber MPI ranks in concurrent IO code --- src/task_limited_section.h | 40 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/task_limited_section.h b/src/task_limited_section.h index e047f69d..2fc10b35 100644 --- a/src/task_limited_section.h +++ b/src/task_limited_section.h @@ -1,5 +1,20 @@ #include +#include +/* + Class to limit the number of MPI ranks executing a piece of code + simultaneously. Used to implement the MaxConcurrentIO option. + + Example usage: + + TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO); + section.start(); + ... + (I/O code goes here!) + ... + section.end(); + +*/ class TaskLimitedSection { private: @@ -10,7 +25,8 @@ class TaskLimitedSection { int *buffer; int controller_rank; MPI_Request controller_rank_request; - + int order; + const int CONTROLLER_RANK_TAG = 0; const int GO_TAG = 1; const int COMPLETION_TAG = 2; @@ -19,9 +35,29 @@ class TaskLimitedSection { TaskLimitedSection(MPI_Comm comm, const int max_nr_tasks) { - MPI_Comm_dup(comm, &(this->comm)); + int comm_size; + MPI_Comm_size(comm, &comm_size); + int comm_rank; + MPI_Comm_rank(comm, &comm_rank); + + // Renumber ranks so we're not just allowing the first N to run initially - + // ideally we want to have the active ranks spread over all compute nodes. + int block_size = max_nr_tasks; + int position_in_block = comm_rank % block_size; + int block_index = comm_rank / block_size; + int nr_blocks = comm_size / max_nr_tasks; + if(comm_size % max_nr_tasks != 0)nr_blocks += 1; + assert(block_size*block_index+position_in_block == comm_rank); + order = position_in_block * nr_blocks + block_index; + + // Create the reordered communicator + MPI_Comm_split(comm, 0, order, &(this->comm)); this->max_nr_tasks = max_nr_tasks; } + + ~TaskLimitedSection() { + MPI_Comm_free(&comm); + } void start() { From dd798860825dde60e1891e2711cf3acfc71ab3a3 Mon Sep 17 00:00:00 2001 From: John Helly Date: Thu, 30 May 2024 11:09:20 +0100 Subject: [PATCH 3/7] Fix wrong communicator in task_limited_section.h --- src/task_limited_section.h | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/task_limited_section.h b/src/task_limited_section.h index 2fc10b35..9a6bec65 100644 --- a/src/task_limited_section.h +++ b/src/task_limited_section.h @@ -73,7 +73,7 @@ class TaskLimitedSection { /* Allocate and init counter for RMA */ MPI_Alloc_mem(sizeof(int), MPI_INFO_NULL, &buffer); *buffer = 0; - MPI_Win_create(buffer, sizeof(int), sizeof(int), MPI_INFO_NULL, MPI_COMM_WORLD, &win); + MPI_Win_create(buffer, sizeof(int), sizeof(int), MPI_INFO_NULL, comm, &win); /* Post a receive to get controller task's rank (will be first rank to finish) */ MPI_Irecv(&controller_rank, 1, MPI_INT, MPI_ANY_SOURCE, @@ -108,16 +108,26 @@ class TaskLimitedSection { the first and will become responsible for signalling other ranks to proceed. - Hopefully this shouldn't block because the rank with the counter - is sitting waiting at a blocking receive. + We only need to check the completion count for the first max_nr_tasks + ranks, because others can't start until another rank finishes so they + can't be first to finish. */ int completion_count = 0; - MPI_Win_lock(MPI_LOCK_EXCLUSIVE, comm_size-1, 0, win); - int to_add = 1; - MPI_Get_accumulate(&to_add, 1, MPI_INT, - &completion_count, 1, MPI_INT, - comm_size-1, 0, 1, MPI_INT, MPI_SUM, win); - MPI_Win_unlock(comm_size-1, win); + if(comm_rank < max_nr_tasks) { + /* We're one of the ranks that started immediately, so we might be first + to complete */ + MPI_Win_lock(MPI_LOCK_EXCLUSIVE, comm_size-1, 0, win); + int to_add = 1; + MPI_Get_accumulate(&to_add, 1, MPI_INT, + &completion_count, 1, MPI_INT, + comm_size-1, 0, 1, MPI_INT, MPI_SUM, win); + MPI_Win_unlock(comm_size-1, win); + } else { + /* We aren't in the initial batch of max_nr_tasks so we can't be first to complete. + Skip the get_accumulate so we're not waiting for the last rank to respond + when it might be busy in non-MPI code. */ + completion_count = 1; + } if(completion_count == 0) { /* This task is the first to reach the end of the section, so tell everyone */ @@ -160,7 +170,7 @@ class TaskLimitedSection { /* Tidy up */ free(request); - MPI_Win_free(&(win)); + MPI_Win_free(&win); MPI_Free_mem(buffer); } From 907a275bb31722c38e4aca7f155436a977e6dc4b Mon Sep 17 00:00:00 2001 From: John Helly Date: Thu, 30 May 2024 11:09:35 +0100 Subject: [PATCH 4/7] Add unit test for task_limited_section.h --- unit_tests/CMakeLists.txt | 1 + unit_tests/test_limited_section.cpp | 96 +++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 unit_tests/test_limited_section.cpp diff --git a/unit_tests/CMakeLists.txt b/unit_tests/CMakeLists.txt index e4cb1daf..fa863053 100644 --- a/unit_tests/CMakeLists.txt +++ b/unit_tests/CMakeLists.txt @@ -52,6 +52,7 @@ foreach(TEST_NAME test_locate_ids_random test_mergertree test_myalltoall + test_limited_section ) add_executable(${TEST_NAME} ${TEST_NAME}.cpp $ $) diff --git a/unit_tests/test_limited_section.cpp b/unit_tests/test_limited_section.cpp new file mode 100644 index 00000000..2b6a2272 --- /dev/null +++ b/unit_tests/test_limited_section.cpp @@ -0,0 +1,96 @@ +#include +#include +#include + +#include "task_limited_section.h" +#include "verify.h" + +/* + Test code to limit number of tasks executing simultaneously. +*/ +int main(int argc, char *argv[]) +{ + + MPI_Init(&argc, &argv); + int comm_size; + MPI_Comm_size(MPI_COMM_WORLD, &comm_size); + int comm_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank); + + /* Skip this test if we're running on only one MPI rank */ + if(comm_size==1)return 0; + + /* Set up count of number of tasks executing */ + int *count; + MPI_Alloc_mem(sizeof(int), MPI_INFO_NULL, &count); + *count = 0; + MPI_Win win; + MPI_Win_create(count, sizeof(int), sizeof(int), MPI_INFO_NULL, MPI_COMM_WORLD, &win); + + /* Split off one rank to maintain a count of tasks currently executing */ + int color = (comm_rank==0) ? 0 : 1; + int key = comm_rank; + MPI_Comm split_comm; + MPI_Comm_split(MPI_COMM_WORLD, color, key, &split_comm); + + if(color == 1) { + int split_comm_size = comm_size - 1; + for(int max_nr_tasks = 1; max_nr_tasks <= split_comm_size; max_nr_tasks += 1) { + + TaskLimitedSection section(split_comm, max_nr_tasks); + section.start(); + + /* On starting, increment the counter */ + int start_count = -1; + MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, win); + int to_add = 1; + MPI_Get_accumulate(&to_add, 1, MPI_INT, + &start_count, 1, MPI_INT, + 0, 0, 1, MPI_INT, MPI_SUM, win); + MPI_Win_unlock(0, win); + /* When we start, should have 0 to max_nr_tasks-1 other tasks running */ + verify(start_count >= 0); + verify(start_count < max_nr_tasks); + + /* Sleep for a bit */ + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 100 * 1000000; // 100 millisec + int res; + do { + res = nanosleep(&ts, &ts); + } while (res && errno == EINTR); + + /* On finishing, decrement the counter */ + int end_count = -1; + MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, win); + to_add = -1; + MPI_Get_accumulate(&to_add, 1, MPI_INT, + &end_count, 1, MPI_INT, + 0, 0, 1, MPI_INT, MPI_SUM, win); + MPI_Win_unlock(0, win); + /* When we finish, should have 1 to max_nr_tasks tasks running (including our self) */ + verify(end_count > 0); + verify(end_count <= max_nr_tasks); + + section.end(); + + // Report maximum counts: + // We should usually have start_count_max=max_nr_tasks-1 and end_count_max=max_nr_tasks, + // although this is not guaranteed (e.g. if system is busy and some tasks are delayed). + int start_count_max; + MPI_Allreduce(&start_count, &start_count_max, 1, MPI_INT, MPI_MAX, split_comm); + int end_count_max; + MPI_Allreduce(&end_count, &end_count_max, 1, MPI_INT, MPI_MAX, split_comm); + if(comm_rank==1) + std::cout << "Max ranks = " << max_nr_tasks << ", max start count = " << + start_count_max << ", max end count = " << end_count_max << std::endl; + } + } + + MPI_Win_free(&win); + MPI_Free_mem(count); + MPI_Finalize(); + + return 0; +} From 0cde664f5050c87a00d73931342a3d5b13ae9a03 Mon Sep 17 00:00:00 2001 From: John Helly Date: Mon, 16 Dec 2024 16:06:36 +0000 Subject: [PATCH 5/7] Use TaskLimitedSection to limit ranks writing Sub/SrcSnap files --- src/io/subhalo_io.cpp | 52 ++++++++++++------------------------------- 1 file changed, 14 insertions(+), 38 deletions(-) diff --git a/src/io/subhalo_io.cpp b/src/io/subhalo_io.cpp index b239d3c3..f630e37a 100644 --- a/src/io/subhalo_io.cpp +++ b/src/io/subhalo_io.cpp @@ -8,6 +8,8 @@ #include "../snapshot_number.h" #include "../subhalo.h" #include "../config_parser.h" +#include "../task_limited_section.h" + #include "git_version_info.h" void SubhaloSnapshot_t::BuildHDFDataType() @@ -276,20 +278,14 @@ void SubhaloSnapshot_t::Save(MpiWorker_t &world) string subdir = GetSubDir(); mkdir(subdir.c_str(), 0755); - /* Decide how many ranks per node write simultaneously */ - int nr_nodes = (world.size() / world.MaxNodeSize); - int nr_writing = HBTConfig.MaxConcurrentIO / nr_nodes; - if (nr_writing < 1) - nr_writing = 1; // Always at least one per node - /* Subhalo properties and bound particle lists. */ - WriteBoundFiles(world, nr_writing); + WriteBoundFiles(world); /* Particles associated to each subhalo. Used for debugging and restarting. */ - WriteSourceFiles(world, nr_writing); + WriteSourceFiles(world); } -void SubhaloSnapshot_t::WriteBoundFiles(MpiWorker_t &world, const int &number_ranks_writing) +void SubhaloSnapshot_t::WriteBoundFiles(MpiWorker_t &world) { /* Number of total subhalo entries */ HBTInt NumSubsAll = 0, NumSubs = Subhalos.size(); @@ -299,39 +295,19 @@ void SubhaloSnapshot_t::WriteBoundFiles(MpiWorker_t &world, const int &number_ra cout << "saving " << NumSubsAll << " subhalos to " << GetSubDir() << endl; /* Allow a limited number of ranks per node to write simultaneously */ - int writes_done = 0; - for (int rank_within_node = 0; rank_within_node < world.MaxNodeSize; rank_within_node += 1) - { - if (rank_within_node == world.NodeRank) - { - WriteBoundSubfile(world.rank(), world.size(), NumSubsAll); - writes_done += 1; - } - if (rank_within_node % number_ranks_writing == number_ranks_writing - 1) - MPI_Barrier(world.Communicator); - } - - /* Every rank should have executed the writing code exactly once */ - assert(writes_done == 1); + TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO); + section.start(); + WriteBoundSubfile(world.rank(), world.size(), NumSubsAll); + section.end(); } -void SubhaloSnapshot_t::WriteSourceFiles(MpiWorker_t &world, const int &number_ranks_writing) +void SubhaloSnapshot_t::WriteSourceFiles(MpiWorker_t &world) { /* Allow a limited number of ranks per node to write simultaneously */ - int writes_done = 0; - for (int rank_within_node = 0; rank_within_node < world.MaxNodeSize; rank_within_node += 1) - { - if (rank_within_node == world.NodeRank) - { - WriteSourceSubfile(world.rank(), world.size()); - writes_done += 1; - } - if (rank_within_node % number_ranks_writing == number_ranks_writing - 1) - MPI_Barrier(world.Communicator); - } - - /* Every rank should have executed the writing code exactly once */ - assert(writes_done == 1); + TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO); + section.start(); + WriteSourceSubfile(world.rank(), world.size()); + section.end(); } void SubhaloSnapshot_t::WriteBoundSubfile(int iFile, int nfiles, HBTInt NumSubsAll) From 60d84034794eeffd00ca0e278ede4ffc74cacddf Mon Sep 17 00:00:00 2001 From: John Helly Date: Mon, 16 Dec 2024 16:33:06 +0000 Subject: [PATCH 6/7] Fix incorrect function prototypes --- src/subhalo.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/subhalo.h b/src/subhalo.h index f094ea1a..7a45fed5 100644 --- a/src/subhalo.h +++ b/src/subhalo.h @@ -263,8 +263,8 @@ class SubhaloSnapshot_t : public Snapshot_t /* I/O methods */ void ReadFile(int iFile, const SubReaderDepth_t depth); - void WriteBoundFiles(MpiWorker_t &world, const int &number_ranks_writing); - void WriteSourceFiles(MpiWorker_t &world, const int &number_ranks_writing); + void WriteBoundFiles(MpiWorker_t &world); + void WriteSourceFiles(MpiWorker_t &world); void WriteBoundSubfile(int iFile, int nfiles, HBTInt NumSubsAll); void WriteSourceSubfile(int iFile, int nfiles); From 4be4eb592b2cc064468834cbd0534dbf70361933 Mon Sep 17 00:00:00 2001 From: John Helly Date: Mon, 16 Dec 2024 16:33:16 +0000 Subject: [PATCH 7/7] Add missing includes --- src/task_limited_section.h | 1 + unit_tests/test_limited_section.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/src/task_limited_section.h b/src/task_limited_section.h index 9a6bec65..3229cfd5 100644 --- a/src/task_limited_section.h +++ b/src/task_limited_section.h @@ -1,5 +1,6 @@ #include #include +#include /* Class to limit the number of MPI ranks executing a piece of code diff --git a/unit_tests/test_limited_section.cpp b/unit_tests/test_limited_section.cpp index 2b6a2272..19f1093a 100644 --- a/unit_tests/test_limited_section.cpp +++ b/unit_tests/test_limited_section.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include "task_limited_section.h" #include "verify.h"