Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Alternate implementation of MaxConcurrentIO parameter #41

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 14 additions & 38 deletions src/io/subhalo_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "../snapshot_number.h"
#include "../subhalo.h"
#include "../config_parser.h"
#include "../task_limited_section.h"

#include "git_version_info.h"

void SubhaloSnapshot_t::BuildHDFDataType()
Expand Down Expand Up @@ -276,20 +278,14 @@ void SubhaloSnapshot_t::Save(MpiWorker_t &world)
string subdir = GetSubDir();
mkdir(subdir.c_str(), 0755);

/* Decide how many ranks per node write simultaneously */
int nr_nodes = (world.size() / world.MaxNodeSize);
int nr_writing = HBTConfig.MaxConcurrentIO / nr_nodes;
if (nr_writing < 1)
nr_writing = 1; // Always at least one per node

/* Subhalo properties and bound particle lists. */
WriteBoundFiles(world, nr_writing);
WriteBoundFiles(world);

/* Particles associated to each subhalo. Used for debugging and restarting. */
WriteSourceFiles(world, nr_writing);
WriteSourceFiles(world);
}

void SubhaloSnapshot_t::WriteBoundFiles(MpiWorker_t &world, const int &number_ranks_writing)
void SubhaloSnapshot_t::WriteBoundFiles(MpiWorker_t &world)
{
/* Number of total subhalo entries */
HBTInt NumSubsAll = 0, NumSubs = Subhalos.size();
Expand All @@ -299,39 +295,19 @@ void SubhaloSnapshot_t::WriteBoundFiles(MpiWorker_t &world, const int &number_ra
cout << "saving " << NumSubsAll << " subhalos to " << GetSubDir() << endl;

/* Allow a limited number of ranks per node to write simultaneously */
int writes_done = 0;
for (int rank_within_node = 0; rank_within_node < world.MaxNodeSize; rank_within_node += 1)
{
if (rank_within_node == world.NodeRank)
{
WriteBoundSubfile(world.rank(), world.size(), NumSubsAll);
writes_done += 1;
}
if (rank_within_node % number_ranks_writing == number_ranks_writing - 1)
MPI_Barrier(world.Communicator);
}

/* Every rank should have executed the writing code exactly once */
assert(writes_done == 1);
TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO);
section.start();
WriteBoundSubfile(world.rank(), world.size(), NumSubsAll);
section.end();
}

void SubhaloSnapshot_t::WriteSourceFiles(MpiWorker_t &world, const int &number_ranks_writing)
void SubhaloSnapshot_t::WriteSourceFiles(MpiWorker_t &world)
{
/* Allow a limited number of ranks per node to write simultaneously */
int writes_done = 0;
for (int rank_within_node = 0; rank_within_node < world.MaxNodeSize; rank_within_node += 1)
{
if (rank_within_node == world.NodeRank)
{
WriteSourceSubfile(world.rank(), world.size());
writes_done += 1;
}
if (rank_within_node % number_ranks_writing == number_ranks_writing - 1)
MPI_Barrier(world.Communicator);
}

/* Every rank should have executed the writing code exactly once */
assert(writes_done == 1);
TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO);
section.start();
WriteSourceSubfile(world.rank(), world.size());
section.end();
}

void SubhaloSnapshot_t::WriteBoundSubfile(int iFile, int nfiles, HBTInt NumSubsAll)
Expand Down
138 changes: 52 additions & 86 deletions src/io/swiftsim_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ using namespace std;
#include "../hdf_wrapper.h"
#include "../mymath.h"
#include "../snapshot.h"
#include "../task_limited_section.h"
#include "swiftsim_io.h"
#include "exchange_and_merge.h"

Expand Down Expand Up @@ -639,14 +640,6 @@ void SwiftSimReader_t::LoadSnapshot(MpiWorker_t &world, int snapshotId, vector<P
Cosmology_t &Cosmology)
{

MPI_Barrier(world.Communicator);

// Decide how many ranks per node read simultaneously
int nr_nodes = (world.size() / world.MaxNodeSize);
int nr_reading = HBTConfig.MaxConcurrentIO / nr_nodes;
if (nr_reading < 1)
nr_reading = 1; // Always at least one per node

SetSnapshot(snapshotId);

const int root = 0;
Expand Down Expand Up @@ -711,33 +704,29 @@ void SwiftSimReader_t::LoadSnapshot(MpiWorker_t &world, int snapshotId, vector<P
// Allocate storage for the particles
Particles.resize(np_local);

// Allow a limited number of ranks per node to read simultaneously
int reads_done = 0;
for (int rank_within_node = 0; rank_within_node < world.MaxNodeSize; rank_within_node += 1)
{
if (rank_within_node == world.NodeRank)
// Limit number of ranks doing I/O at once
TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO);
section.start();

// Loop over all files
HBTInt particle_offset = 0;
for (int file_nr = 0; file_nr < Header.NumberOfFiles; file_nr += 1)
{

// Loop over all files
HBTInt particle_offset = 0;
for (int file_nr = 0; file_nr < Header.NumberOfFiles; file_nr += 1)
{

// Determine global offset of first particle to read from this file:
// This is the larger of the offset of the first particle in the file
// and the offset of the first particle this rank is to read.
HBTInt i1 = offset_file[file_nr];
if (local_first_offset > i1)
i1 = local_first_offset;

// Determine global offset of last particle to read from this file:
// This is the smaller of the offset to the last particle in this file
// and the offset of the last particle this rank is to read.
HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1;
if (local_last_offset < i2)
i2 = local_last_offset;

if (i2 >= i1)
// Determine global offset of first particle to read from this file:
// This is the larger of the offset of the first particle in the file
// and the offset of the first particle this rank is to read.
HBTInt i1 = offset_file[file_nr];
if (local_first_offset > i1)
i1 = local_first_offset;

// Determine global offset of last particle to read from this file:
// This is the smaller of the offset to the last particle in this file
// and the offset of the last particle this rank is to read.
HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1;
if (local_last_offset < i2)
i2 = local_last_offset;

if (i2 >= i1)
{
// We have particles to read from this file.
HBTInt file_start = i1 - offset_file[file_nr]; // Offset to first particle to read
Expand All @@ -748,16 +737,9 @@ void SwiftSimReader_t::LoadSnapshot(MpiWorker_t &world, int snapshotId, vector<P
ReadSnapshot(file_nr, Particles.data() + particle_offset, file_start, file_count);
particle_offset += file_count;
}
} // Next file
assert(particle_offset == np_local); // Check we read the expected number of particles
reads_done += 1;
}
if (rank_within_node % nr_reading == nr_reading - 1)
MPI_Barrier(world.Communicator);
} // Next MPI rank within the node

// Every rank should have executed the reading code exactly once
assert(reads_done == 1);
} // Next file
assert(particle_offset == np_local); // Check we read the expected number of particles
section.end();

global_timer.Tick("snap_io", world.Communicator);

Expand Down Expand Up @@ -818,12 +800,6 @@ void SwiftSimReader_t::LoadGroups(MpiWorker_t &world, int snapshotId, vector<Hal
{ // read in particle properties at the same time, to avoid particle look-up at later stage.
SetSnapshot(snapshotId);

// Decide how many ranks per node read simultaneously
int nr_nodes = (world.size() / world.MaxNodeSize);
int nr_reading = HBTConfig.MaxConcurrentIO / nr_nodes;
if (nr_reading < 1)
nr_reading = 1; // Always at least one per node

const int root = 0;
if (world.rank() == root)
{
Expand Down Expand Up @@ -859,33 +835,30 @@ void SwiftSimReader_t::LoadGroups(MpiWorker_t &world, int snapshotId, vector<Hal

bool FlagReadId = true; //! HBTConfig.GroupLoadedIndex;

// Allow a limited number of ranks per node to read simultaneously
int reads_done = 0;
for (int rank_within_node = 0; rank_within_node < world.MaxNodeSize; rank_within_node += 1)
{
if (rank_within_node == world.NodeRank)
// Limit number of ranks doing I/O at once
TaskLimitedSection section(MPI_COMM_WORLD, HBTConfig.MaxConcurrentIO);
section.start();

// Loop over all files
HBTInt particle_offset = 0;
for (int file_nr = 0; file_nr < Header.NumberOfFiles; file_nr += 1)
{

// Loop over all files
HBTInt particle_offset = 0;
for (int file_nr = 0; file_nr < Header.NumberOfFiles; file_nr += 1)
{

// Determine global offset of first particle to read from this file:
// This is the larger of the offset of the first particle in the file
// and the offset of the first particle this rank is to read.
HBTInt i1 = offset_file[file_nr];
if (local_first_offset > i1)
i1 = local_first_offset;

// Determine global offset of last particle to read from this file:
// This is the smaller of the offset to the last particle in this file
// and the offset of the last particle this rank is to read.
HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1;
if (local_last_offset < i2)
i2 = local_last_offset;

if (i2 >= i1)
// Determine global offset of first particle to read from this file:
// This is the larger of the offset of the first particle in the file
// and the offset of the first particle this rank is to read.
HBTInt i1 = offset_file[file_nr];
if (local_first_offset > i1)
i1 = local_first_offset;

// Determine global offset of last particle to read from this file:
// This is the smaller of the offset to the last particle in this file
// and the offset of the last particle this rank is to read.
HBTInt i2 = offset_file[file_nr] + np_file[file_nr] - 1;
if (local_last_offset < i2)
i2 = local_last_offset;

if (i2 >= i1)
{
// We have particles to read from this file.
HBTInt file_start = i1 - offset_file[file_nr]; // Offset to first particle to read
Expand All @@ -896,17 +869,10 @@ void SwiftSimReader_t::LoadGroups(MpiWorker_t &world, int snapshotId, vector<Hal
ReadGroupParticles(file_nr, ParticleHosts.data() + particle_offset, file_start, file_count, FlagReadId);
particle_offset += file_count;
}
} // Next file
assert(particle_offset == np_local); // Check we read the expected number of particles
reads_done += 1;
}
if (rank_within_node % nr_reading == nr_reading - 1)
MPI_Barrier(world.Communicator);
} // Next MPI rank within the node

// Every rank should have executed the reading code exactly once
assert(reads_done == 1);

} // Next file

assert(particle_offset == np_local); // Check we read the expected number of particles
section.end();
global_timer.Tick("halo_io", world.Communicator);

// #define HALO_IO_TEST
Expand Down
4 changes: 2 additions & 2 deletions src/subhalo.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ class SubhaloSnapshot_t : public Snapshot_t

/* I/O methods */
void ReadFile(int iFile, const SubReaderDepth_t depth);
void WriteBoundFiles(MpiWorker_t &world, const int &number_ranks_writing);
void WriteSourceFiles(MpiWorker_t &world, const int &number_ranks_writing);
void WriteBoundFiles(MpiWorker_t &world);
void WriteSourceFiles(MpiWorker_t &world);
void WriteBoundSubfile(int iFile, int nfiles, HBTInt NumSubsAll);
void WriteSourceSubfile(int iFile, int nfiles);

Expand Down
Loading