diff --git a/src/io/subhalo_io.cpp b/src/io/subhalo_io.cpp
index b239d3c3..f630e37a 100644
--- a/src/io/subhalo_io.cpp
+++ b/src/io/subhalo_io.cpp
@@ -8,6 +8,8 @@
#include "../snapshot_number.h"
#include "../subhalo.h"
#include "../config_parser.h"
+#include "../task_limited_section.h"
+
#include "git_version_info.h"
void SubhaloSnapshot_t::BuildHDFDataType()
@@ -276,20 +278,14 @@ void SubhaloSnapshot_t::Save(MpiWorker_t &world)
string subdir = GetSubDir();
mkdir(subdir.c_str(), 0755);
- /* Decide how many ranks per node write simultaneously */
- int nr_nodes = (world.size() / world.MaxNodeSize);
- int nr_writing = HBTConfig.MaxConcurrentIO / nr_nodes;
- if (nr_writing < 1)
- nr_writing = 1; // Always at least one per node
-
/* Subhalo properties and bound particle lists. */
- WriteBoundFiles(world, nr_writing);
+ WriteBoundFiles(world);
/* Particles associated to each subhalo. Used for debugging and restarting. */
- WriteSourceFiles(world, nr_writing);
+ WriteSourceFiles(world);
}
-void SubhaloSnapshot_t::WriteBoundFiles(MpiWorker_t &world, const int &number_ranks_writing)
+void SubhaloSnapshot_t::WriteBoundFiles(MpiWorker_t &world)
{
/* Number of total subhalo entries */
HBTInt NumSubsAll = 0, NumSubs = Subhalos.size();
@@ -299,39 +295,19 @@ void SubhaloSnapshot_t::WriteBoundFiles(MpiWorker_t &world, const int &number_ra
cout << "saving " << NumSubsAll << " subhalos to " << GetSubDir() << endl;
/* Allow a limited number of ranks per node to write simultaneously */
- int writes_done = 0;
- for (int rank_within_node = 0; rank_within_node < world.MaxNodeSize; rank_within_node += 1)
- {
- if (rank_within_node == world.NodeRank)
- {
- WriteBoundSubfile(world.rank(), world.size(), NumSubsAll);
- writes_done += 1;
- }
- if (rank_within_node % number_ranks_writing == number_ranks_writing - 1)
- MPI_Barrier(world.Communicator);
- }
-
- /* Every rank should have executed the writing code exactly once */
- assert(writes_done == 1);
+ TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO);
+ section.start();
+ WriteBoundSubfile(world.rank(), world.size(), NumSubsAll);
+ section.end();
}
-void SubhaloSnapshot_t::WriteSourceFiles(MpiWorker_t &world, const int &number_ranks_writing)
+void SubhaloSnapshot_t::WriteSourceFiles(MpiWorker_t &world)
{
/* Allow a limited number of ranks per node to write simultaneously */
- int writes_done = 0;
- for (int rank_within_node = 0; rank_within_node < world.MaxNodeSize; rank_within_node += 1)
- {
- if (rank_within_node == world.NodeRank)
- {
- WriteSourceSubfile(world.rank(), world.size());
- writes_done += 1;
- }
- if (rank_within_node % number_ranks_writing == number_ranks_writing - 1)
- MPI_Barrier(world.Communicator);
- }
-
- /* Every rank should have executed the writing code exactly once */
- assert(writes_done == 1);
+ TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO);
+ section.start();
+ WriteSourceSubfile(world.rank(), world.size());
+ section.end();
}
void SubhaloSnapshot_t::WriteBoundSubfile(int iFile, int nfiles, HBTInt NumSubsAll)
diff --git a/src/io/swiftsim_io.cpp b/src/io/swiftsim_io.cpp
index 13dd5984..1130e07a 100644
--- a/src/io/swiftsim_io.cpp
+++ b/src/io/swiftsim_io.cpp
@@ -16,6 +16,7 @@ using namespace std;
#include "../hdf_wrapper.h"
#include "../mymath.h"
#include "../snapshot.h"
+#include "../task_limited_section.h"
#include "swiftsim_io.h"
#include "exchange_and_merge.h"
@@ -639,14 +640,6 @@ void SwiftSimReader_t::LoadSnapshot(MpiWorker_t &world, int snapshotId, vector
i1)
- i1 = local_first_offset;
-
- // Determine global offset of last particle to read from this file:
- // This is the smaller of the offset to the last particle in this file
- // and the offset of the last particle this rank is to read.
- HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1;
- if (local_last_offset < i2)
- i2 = local_last_offset;
-
- if (i2 >= i1)
+ // Determine global offset of first particle to read from this file:
+ // This is the larger of the offset of the first particle in the file
+ // and the offset of the first particle this rank is to read.
+ HBTInt i1 = offset_file[file_nr];
+ if (local_first_offset > i1)
+ i1 = local_first_offset;
+
+ // Determine global offset of last particle to read from this file:
+ // This is the smaller of the offset to the last particle in this file
+ // and the offset of the last particle this rank is to read.
+ HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1;
+ if (local_last_offset < i2)
+ i2 = local_last_offset;
+
+ if (i2 >= i1)
{
// We have particles to read from this file.
HBTInt file_start = i1 - offset_file[file_nr]; // Offset to first particle to read
@@ -748,16 +737,9 @@ void SwiftSimReader_t::LoadSnapshot(MpiWorker_t &world, int snapshotId, vector
i1)
- i1 = local_first_offset;
-
- // Determine global offset of last particle to read from this file:
- // This is the smaller of the offset to the last particle in this file
- // and the offset of the last particle this rank is to read.
- HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1;
- if (local_last_offset < i2)
- i2 = local_last_offset;
-
- if (i2 >= i1)
+ // Determine global offset of first particle to read from this file:
+ // This is the larger of the offset of the first particle in the file
+ // and the offset of the first particle this rank is to read.
+ HBTInt i1 = offset_file[file_nr];
+ if (local_first_offset > i1)
+ i1 = local_first_offset;
+
+ // Determine global offset of last particle to read from this file:
+ // This is the smaller of the offset to the last particle in this file
+ // and the offset of the last particle this rank is to read.
+ HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1;
+ if (local_last_offset < i2)
+ i2 = local_last_offset;
+
+ if (i2 >= i1)
{
// We have particles to read from this file.
HBTInt file_start = i1 - offset_file[file_nr]; // Offset to first particle to read
@@ -896,17 +869,10 @@ void SwiftSimReader_t::LoadGroups(MpiWorker_t &world, int snapshotId, vector
+#include
+#include
+
+/*
+ Class to limit the number of MPI ranks executing a piece of code
+ simultaneously. Used to implement the MaxConcurrentIO option.
+
+ Example usage:
+
+ TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO);
+ section.start();
+ ...
+ (I/O code goes here!)
+ ...
+ section.end();
+
+*/
+class TaskLimitedSection {
+
+private:
+
+ int max_nr_tasks;
+ MPI_Comm comm;
+ MPI_Win win;
+ int *buffer;
+ int controller_rank;
+ MPI_Request controller_rank_request;
+ int order;
+
+ const int CONTROLLER_RANK_TAG = 0;
+ const int GO_TAG = 1;
+ const int COMPLETION_TAG = 2;
+
+public:
+
+ TaskLimitedSection(MPI_Comm comm, const int max_nr_tasks) {
+
+ int comm_size;
+ MPI_Comm_size(comm, &comm_size);
+ int comm_rank;
+ MPI_Comm_rank(comm, &comm_rank);
+
+ // Renumber ranks so we're not just allowing the first N to run initially -
+ // ideally we want to have the active ranks spread over all compute nodes.
+ int block_size = max_nr_tasks;
+ int position_in_block = comm_rank % block_size;
+ int block_index = comm_rank / block_size;
+ int nr_blocks = comm_size / max_nr_tasks;
+ if(comm_size % max_nr_tasks != 0)nr_blocks += 1;
+ assert(block_size*block_index+position_in_block == comm_rank);
+ order = position_in_block * nr_blocks + block_index;
+
+ // Create the reordered communicator
+ MPI_Comm_split(comm, 0, order, &(this->comm));
+ this->max_nr_tasks = max_nr_tasks;
+ }
+
+ ~TaskLimitedSection() {
+ MPI_Comm_free(&comm);
+ }
+
+ void start() {
+
+ /* Get rank and number of ranks */
+ int comm_size;
+ MPI_Comm_size(comm, &comm_size);
+ int comm_rank;
+ MPI_Comm_rank(comm, &comm_rank);
+
+ /* If all ranks are allowed to run there's nothing to do */
+ if(max_nr_tasks >= comm_size)return;
+
+ /* Allocate and init counter for RMA */
+ MPI_Alloc_mem(sizeof(int), MPI_INFO_NULL, &buffer);
+ *buffer = 0;
+ MPI_Win_create(buffer, sizeof(int), sizeof(int), MPI_INFO_NULL, comm, &win);
+
+ /* Post a receive to get controller task's rank (will be first rank to finish) */
+ MPI_Irecv(&controller_rank, 1, MPI_INT, MPI_ANY_SOURCE,
+ CONTROLLER_RANK_TAG, comm, &controller_rank_request);
+
+ /* The first max_nr_tasks ranks can proceed immediately */
+ if(comm_rank < max_nr_tasks)return;
+
+ /* Others need to wait for a message to proceed */
+ int go;
+ MPI_Recv(&go, 1, MPI_INT, MPI_ANY_SOURCE, GO_TAG, comm, MPI_STATUS_IGNORE);
+
+ }
+
+ void end() {
+
+ /* Get rank and number of ranks */
+ int comm_size;
+ MPI_Comm_size(comm, &comm_size);
+ int comm_rank;
+ MPI_Comm_rank(comm, &comm_rank);
+
+ /* If all ranks are allowed to run there's nothing to do */
+ if(max_nr_tasks >= comm_size)return;
+
+ MPI_Request *request = (MPI_Request *) malloc(sizeof(MPI_Request)*comm_size);
+
+ /*
+ Check if we're the first task to reach the end of the section:
+ We do this by doing an atomic fetch and increment on the count of
+ the number of ranks that have finished. If the count is zero we're
+ the first and will become responsible for signalling other ranks
+ to proceed.
+
+ We only need to check the completion count for the first max_nr_tasks
+ ranks, because others can't start until another rank finishes so they
+ can't be first to finish.
+ */
+ int completion_count = 0;
+ if(comm_rank < max_nr_tasks) {
+ /* We're one of the ranks that started immediately, so we might be first
+ to complete */
+ MPI_Win_lock(MPI_LOCK_EXCLUSIVE, comm_size-1, 0, win);
+ int to_add = 1;
+ MPI_Get_accumulate(&to_add, 1, MPI_INT,
+ &completion_count, 1, MPI_INT,
+ comm_size-1, 0, 1, MPI_INT, MPI_SUM, win);
+ MPI_Win_unlock(comm_size-1, win);
+ } else {
+ /* We aren't in the initial batch of max_nr_tasks so we can't be first to complete.
+ Skip the get_accumulate so we're not waiting for the last rank to respond
+ when it might be busy in non-MPI code. */
+ completion_count = 1;
+ }
+ if(completion_count == 0) {
+
+ /* This task is the first to reach the end of the section, so tell everyone */
+ for(int dest=0; dest 0) {
+ int done;
+ MPI_Status status;
+ MPI_Recv(&done, 1, MPI_INT, MPI_ANY_SOURCE, COMPLETION_TAG, comm, &status);
+ }
+
+ /* If there are tasks still waiting, send the next go signal */
+ if(nr_left > 0) {
+ int dest = comm_size - nr_left;
+ int go = 1;
+ MPI_Send(&go, 1, MPI_INT, dest, GO_TAG, comm);
+ nr_left -= 1;
+ }
+ }
+ }
+
+ /* Make sure we've received the controller rank */
+ MPI_Wait(&(controller_rank_request), MPI_STATUS_IGNORE);
+
+ /* Send completion message if we're not the controller */
+ if(completion_count > 0) {
+ int complete = 1;
+ MPI_Send(&complete, 1, MPI_INT, controller_rank, COMPLETION_TAG, comm);
+ }
+
+ /* Make sure all sends from the controller completed */
+ if(completion_count==0)
+ MPI_Waitall(comm_size, request, MPI_STATUSES_IGNORE);
+
+ /* Tidy up */
+ free(request);
+ MPI_Win_free(&win);
+ MPI_Free_mem(buffer);
+ }
+
+};
diff --git a/unit_tests/CMakeLists.txt b/unit_tests/CMakeLists.txt
index e4cb1daf..fa863053 100644
--- a/unit_tests/CMakeLists.txt
+++ b/unit_tests/CMakeLists.txt
@@ -52,6 +52,7 @@ foreach(TEST_NAME
test_locate_ids_random
test_mergertree
test_myalltoall
+ test_limited_section
)
add_executable(${TEST_NAME} ${TEST_NAME}.cpp $ $)
diff --git a/unit_tests/test_limited_section.cpp b/unit_tests/test_limited_section.cpp
new file mode 100644
index 00000000..19f1093a
--- /dev/null
+++ b/unit_tests/test_limited_section.cpp
@@ -0,0 +1,97 @@
+#include
+#include
+#include
+#include
+
+#include "task_limited_section.h"
+#include "verify.h"
+
+/*
+ Test code to limit number of tasks executing simultaneously.
+*/
+int main(int argc, char *argv[])
+{
+
+ MPI_Init(&argc, &argv);
+ int comm_size;
+ MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
+ int comm_rank;
+ MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
+
+ /* Skip this test if we're running on only one MPI rank */
+ if(comm_size==1)return 0;
+
+ /* Set up count of number of tasks executing */
+ int *count;
+ MPI_Alloc_mem(sizeof(int), MPI_INFO_NULL, &count);
+ *count = 0;
+ MPI_Win win;
+ MPI_Win_create(count, sizeof(int), sizeof(int), MPI_INFO_NULL, MPI_COMM_WORLD, &win);
+
+ /* Split off one rank to maintain a count of tasks currently executing */
+ int color = (comm_rank==0) ? 0 : 1;
+ int key = comm_rank;
+ MPI_Comm split_comm;
+ MPI_Comm_split(MPI_COMM_WORLD, color, key, &split_comm);
+
+ if(color == 1) {
+ int split_comm_size = comm_size - 1;
+ for(int max_nr_tasks = 1; max_nr_tasks <= split_comm_size; max_nr_tasks += 1) {
+
+ TaskLimitedSection section(split_comm, max_nr_tasks);
+ section.start();
+
+ /* On starting, increment the counter */
+ int start_count = -1;
+ MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, win);
+ int to_add = 1;
+ MPI_Get_accumulate(&to_add, 1, MPI_INT,
+ &start_count, 1, MPI_INT,
+ 0, 0, 1, MPI_INT, MPI_SUM, win);
+ MPI_Win_unlock(0, win);
+ /* When we start, should have 0 to max_nr_tasks-1 other tasks running */
+ verify(start_count >= 0);
+ verify(start_count < max_nr_tasks);
+
+ /* Sleep for a bit */
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 100 * 1000000; // 100 millisec
+ int res;
+ do {
+ res = nanosleep(&ts, &ts);
+ } while (res && errno == EINTR);
+
+ /* On finishing, decrement the counter */
+ int end_count = -1;
+ MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, win);
+ to_add = -1;
+ MPI_Get_accumulate(&to_add, 1, MPI_INT,
+ &end_count, 1, MPI_INT,
+ 0, 0, 1, MPI_INT, MPI_SUM, win);
+ MPI_Win_unlock(0, win);
+ /* When we finish, should have 1 to max_nr_tasks tasks running (including our self) */
+ verify(end_count > 0);
+ verify(end_count <= max_nr_tasks);
+
+ section.end();
+
+ // Report maximum counts:
+ // We should usually have start_count_max=max_nr_tasks-1 and end_count_max=max_nr_tasks,
+ // although this is not guaranteed (e.g. if system is busy and some tasks are delayed).
+ int start_count_max;
+ MPI_Allreduce(&start_count, &start_count_max, 1, MPI_INT, MPI_MAX, split_comm);
+ int end_count_max;
+ MPI_Allreduce(&end_count, &end_count_max, 1, MPI_INT, MPI_MAX, split_comm);
+ if(comm_rank==1)
+ std::cout << "Max ranks = " << max_nr_tasks << ", max start count = " <<
+ start_count_max << ", max end count = " << end_count_max << std::endl;
+ }
+ }
+
+ MPI_Win_free(&win);
+ MPI_Free_mem(count);
+ MPI_Finalize();
+
+ return 0;
+}