Skip to content

Commit a78d4c7

Browse files
rmclarenCoryMartin-NOAAEmily Liu
authored
Add parallel parsing with MPI (#9)
* got some of necdf stuff to work * writing netcdf vars * working on better netcdf support. * cut ties between the DataObject and the encoders * added some attributes to the netcdf file * removed commented code * updated python code, cleaned up the YAML files * add chunkig from descrition * added fill value on dimension vars * fixed some things with the encoder * fixed some unit tests * Fixed special case where dimension source has no data. * got python interface working for netcdf * Python interface improvements, support for memory only data. * added some help to error string * uncommented python cache test * removed uneeded code from bufr2ioda * added comments * updated some documentation * Moved Description the encoders namespace and made it more generic * updated the package and lib names * fixed hera compile bugs * fixed compiler warnings for log functions * fixed some small unit test issues * formatting * removed a test that used ioda * bufr2ioda.x is now bufr2netcdf.x * bufr2ioda.x is now bufr2netcdf.x * fixed some comments * now installing python interface in install directoory * Added parallel version of parse method * got a preliminary working with mpi * tasks now writing the netcdf files * now successfully gathering the data * attempting to fix dimensions when gathering data where dimensions are not identical.. * added parallel switch * some python api work, fixed initialization problem * added python interface for mpi functions * small name change * removed some uneeded comments * added comment * handle some uncommon types when gathering data. Changed method name for clarity. * Changed test method name for clarity. * Updated some documentation. * Updated some documentation. * updated api calls for mpi * updated name of gather of DataObject * small doc changes * Update core/include/bufr/BufrParser.h Co-authored-by: Cory Martin <cory.r.martin@noaa.gov> * fixed spelling of remainder * Fixed unintended CMake file changes * Now printing more detailed timeing data when running bufr2netcdf * sending print statements to log info * renamed function * renamed function * tweak, checnaged time elapsed msg * tweak, checnaged time elapsed msg * tweak, checnaged time elapsed msg * Encoder now logs the time it takes to encode. * fix for string gather * fix for string gather * fixed another gather problem for str fields * some code cleanup * fixed broken test * removed git attributes * Prevent show queries from crashing. * added --no-gather flag to bufr2netcdf app * bufr2netcdf help tweak * bufr2netcdf help tweak * improved satwnd reading problem * more mpi reading fixes * another change for mpi reading * fixed bug where show queries would not print queries for all subsets, sometimes --------- Co-authored-by: Cory Martin <cory.r.martin@noaa.gov> Co-authored-by: Emily Liu <eliu@hercules-login-4.hpc.msstate.edu>
1 parent 97367fc commit a78d4c7

31 files changed

+776
-87
lines changed

CMakeLists.txt

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ set( CMAKE_DIRECTORY_LABELS ${PROJECT_NAME} )
2020
include( ${PROJECT_NAME}_compiler_flags )
2121

2222
## Dependencies
23-
find_package( eckit 1.23.0 REQUIRED )
23+
find_package( OpenMP REQUIRED)
24+
find_package( MPI REQUIRED)
25+
find_package( eckit 1.24.4 REQUIRED COMPONENTS MPI )
2426
find_package( Eigen3 REQUIRED NO_MODULE HINTS
2527
$ENV{Eigen3_ROOT} $ENV{EIGEN3_ROOT} $ENV{Eigen_ROOT} $ENV{EIGEN_ROOT}
2628
$ENV{Eigen3_PATH} $ENV{EIGEN3_PATH} $ENV{Eigen_PATH} $ENV{EIGEN_PATH} )

core/CMakeLists.txt

+4-2
Original file line numberDiff line numberDiff line change
@@ -129,18 +129,20 @@ ecbuild_add_library( TARGET bufr_query
129129
## Linking options
130130
target_link_libraries(bufr_query PUBLIC gsl::gsl-lite)
131131
target_link_libraries(bufr_query PUBLIC Eigen3::Eigen)
132-
target_link_libraries(bufr_query PUBLIC eckit)
132+
target_link_libraries(bufr_query PUBLIC MPI::MPI_CXX)
133133
target_link_libraries(bufr_query PUBLIC bufr::bufr_4)
134134
target_link_libraries(bufr_query PUBLIC NetCDF::NetCDF_CXX)
135+
target_link_libraries(bufr_query PUBLIC eckit eckit_mpi)
135136

136137

137138
## Public include files
138-
139139
target_include_directories(bufr_query PUBLIC
140140
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
141141
$<INSTALL_INTERFACE:$<INSTALL_PREFIX>/${CMAKE_INSTALL_INCLUDEDIR}>
142142
)
143143

144+
## Install
145+
144146
install(DIRECTORY include/bufr
145147
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
146148
COMPONENT Headers)

core/include/bufr/BufrParser.h

+8
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
#include <string>
88
#include <vector>
99

10+
#include <mpi.h>
11+
1012
#include "eckit/config/LocalConfiguration.h"
13+
#include "eckit/mpi/Comm.h"
14+
1115

1216
#include "File.h"
1317
#include "BufrTypes.h"
@@ -38,6 +42,10 @@ namespace bufr {
3842
/// \param maxMsgsToParse Messages to parse (0 for everything)
3943
std::shared_ptr<DataContainer> parse(const size_t maxMsgsToParse = 0);
4044

45+
/// \brief Uses the provided description to parse the BUFR file using MPI.
46+
/// \param comm The eckit MPI comm object
47+
std::shared_ptr<DataContainer> parse(const eckit::mpi::Comm&);
48+
4149
/// \brief Start over from beginning of the BUFR file
4250
void reset();
4351

core/include/bufr/DataContainer.h

+6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include "BufrTypes.h"
1111
#include "DataObject.h"
1212

13+
#include "eckit/mpi/Comm.h"
14+
1315
namespace bufr {
1416
/// List of possible category strings (for splitting data)
1517
typedef std::vector<std::string> SubCategory;
@@ -90,6 +92,10 @@ namespace bufr {
9092
/// \param other DataContainer to append.
9193
void append(const DataContainer& other);
9294

95+
/// \brief Gather data from all ranks into rank 0.
96+
/// \param comm MPI communicator to use.
97+
void gather(const eckit::mpi::Comm& comm);
98+
9399
private:
94100
/// Category map given (see constructor).
95101
CategoryMap categoryMap_;

core/include/bufr/DataObject.h

+265-1
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@
77

88
#pragma once
99

10+
11+
#include <type_traits>
1012
#include <memory>
1113
#include <iostream>
1214
#include <vector>
1315
#include <netcdf>
1416

17+
#include "eckit/mpi/Comm.h"
18+
1519
#include "QueryParser.h"
1620
#include "Data.h"
1721

@@ -155,6 +159,10 @@ namespace bufr {
155159
/// \param writer The writer to use.
156160
virtual void write(std::shared_ptr<ObjectWriterBase> writer) = 0;
157161

162+
/// \brief Do an MPI Gather operation and accumalate the data into the root process.
163+
/// \param comm The MPI communicator to use.
164+
virtual void gather(const eckit::mpi::Comm& comm) = 0;
165+
158166
/// \brief Makes a new dimension scale using this data object as the source
159167
/// \param name The name of the dimension variable.
160168
/// \param dimIdx The idx of the data dimension to use.
@@ -175,7 +183,7 @@ namespace bufr {
175183

176184
// Compute the index into the data array
177185
size_t index = 0;
178-
for (int dim_idx = loc.size() - 1; dim_idx >= 0; --dim_idx) {
186+
for (int dim_idx = static_cast<int>(loc.size() - 1); dim_idx >= 0; --dim_idx) {
179187
index += dim_prod * loc[dim_idx];
180188
dim_prod *= dims_[dim_idx];
181189
}
@@ -416,6 +424,126 @@ namespace bufr {
416424
}
417425
}
418426

427+
/// \brief Do an MPI Gather operation and accumalate the data into the root process.
428+
/// \param comm The MPI communicator to use.
429+
void gather(const eckit::mpi::Comm& comm) final
430+
{
431+
size_t numDims = dims_.size();
432+
comm.reduce(numDims, numDims, eckit::mpi::Operation::MAX, 0);
433+
434+
// Ensure all ranks have the same number of dimensions
435+
if (numDims != dims_.size())
436+
{
437+
int missingDims = numDims - dims_.size();
438+
for (int idx = 0; idx < missingDims; ++idx)
439+
{
440+
dims_.insert(dims_.end() - 1, 1);
441+
}
442+
}
443+
444+
std::vector<int> rcvDims = dims_;
445+
comm.reduce(rcvDims[0], rcvDims[0], eckit::mpi::Operation::SUM, 0);
446+
447+
for (size_t i = 1; i < numDims; ++i)
448+
{
449+
comm.allReduce(rcvDims[i], rcvDims[i], eckit::mpi::Operation::MAX);
450+
}
451+
452+
size_t sendSize = dims_[0];
453+
for (size_t idx = 1; idx < rcvDims.size(); idx++)
454+
{
455+
sendSize *= rcvDims[idx];
456+
}
457+
458+
size_t rcvSize = 1;
459+
for (size_t idx = 0; idx < rcvDims.size(); idx++)
460+
{
461+
rcvSize *= rcvDims[idx];
462+
}
463+
464+
// Fix my send buffer if the global extra dimensions (not the first one) differ from my own
465+
// (resize and fill with missing values where necessary). This will involve creating a send
466+
// array and copying data into the correct indices.
467+
468+
// Do the extra dimensions from the different ranks match?
469+
bool adjustDims = false;
470+
for (size_t idx = 1; idx < rcvDims.size(); idx++)
471+
{
472+
adjustDims = (rcvDims[idx] != getDims()[idx]);
473+
}
474+
475+
// Resize the dimensions to match the global dimensions
476+
if (adjustDims)
477+
{
478+
std::vector<T> sendBuffer(sendSize, missingValue());
479+
480+
// Map the local data into the sendBuffer using the dimensions
481+
for (size_t i = 0; i < data_.size(); ++i)
482+
{
483+
Location loc;
484+
485+
// Compute the location coordinate in the old data
486+
size_t idx = i;
487+
for (size_t dimIdx = 0; dimIdx < dims_.size(); ++dimIdx)
488+
{
489+
loc.push_back(idx % dims_[dimIdx]);
490+
idx /= dims_[dimIdx];
491+
}
492+
493+
// Map that location into the new data (compute the new index)
494+
idx = 0;
495+
for (size_t dimIdx = 0; dimIdx < rcvDims.size(); ++dimIdx)
496+
{
497+
idx += loc[dimIdx] * rcvDims[dimIdx];
498+
}
499+
500+
sendBuffer[idx] = data_[i];
501+
}
502+
503+
data_ = std::move(sendBuffer);
504+
}
505+
506+
auto sizeArray = std::vector<int>(comm.size());
507+
comm.allGather(static_cast<int>(size()), sizeArray.begin(), sizeArray.end());
508+
509+
std::vector<T> rcvBuffer(rcvSize, missingValue());
510+
auto rcvCounts = std::vector<int>(comm.size());
511+
512+
std::vector<int> displacement(comm.size(), 0);
513+
for (size_t i = 1; i < comm.size(); i++)
514+
{
515+
displacement[i] = displacement[i - 1] + sizeArray[i - 1];
516+
}
517+
518+
if constexpr (!std::is_same_v<T, unsigned long long> && !std::is_same_v<T, unsigned int>)
519+
{
520+
comm.gatherv(data_, rcvBuffer, sizeArray, displacement, 0);
521+
}
522+
else
523+
{
524+
// Use unsigned long as the type and use that to gatherv back to the correct type. This is
525+
// necessary because eckit MPI does not support unsigned long long or unsigned int
526+
std::vector<unsigned long> ulData(data_.begin(), data_.end());
527+
std::vector<unsigned long> ulRcvBuffer(rcvSize, DataObject<unsigned long>::missingValue());
528+
comm.gatherv(ulData, ulRcvBuffer, sizeArray, displacement, 0);
529+
530+
// manually copy preserving missing values
531+
for (size_t i = 0; i < rcvSize; i++)
532+
{
533+
if (ulRcvBuffer[i] != DataObject<unsigned long>::missingValue())
534+
{
535+
rcvBuffer[i] = static_cast<T>(ulRcvBuffer[i]);
536+
}
537+
}
538+
}
539+
540+
if (comm.rank() == 0)
541+
{
542+
dims_ = rcvDims;
543+
data_ = std::move(rcvBuffer);
544+
}
545+
}
546+
419547
/// \brief Append the data from another DataObject to this one.
420548
/// \param data The data object to append.
421549
void append(const std::shared_ptr<DataObjectBase>& data) final
@@ -698,6 +826,142 @@ namespace bufr {
698826
}
699827
}
700828

829+
/// \brief Do an MPI Gather operation and accumalate the data into the root process.
830+
/// \param comm The MPI communicator to use.
831+
void gather(const eckit::mpi::Comm& comm) final
832+
{
833+
size_t numDims = dims_.size();
834+
comm.reduce(numDims, numDims, eckit::mpi::Operation::MAX, 0);
835+
836+
// Ensure all ranks have the same number of dimensions
837+
if (numDims != dims_.size())
838+
{
839+
int missingDims = numDims - dims_.size();
840+
for (int idx = 0; idx < missingDims; ++idx)
841+
{
842+
dims_.insert(dims_.end() - 1, 1);
843+
}
844+
}
845+
846+
std::vector<int> rcvDims = dims_;
847+
comm.reduce(rcvDims[0], rcvDims[0], eckit::mpi::Operation::SUM, 0);
848+
849+
for (size_t i = 1; i < numDims; ++i)
850+
{
851+
comm.allReduce(rcvDims[i], rcvDims[i], eckit::mpi::Operation::MAX);
852+
}
853+
854+
size_t sendSize = dims_[0];
855+
for (size_t idx = 1; idx < rcvDims.size(); idx++)
856+
{
857+
sendSize *= rcvDims[idx];
858+
}
859+
860+
// Fix my send buffer if the global extra dimensions (not the first one) differ from my own
861+
// (resize and fill with missing values where necessary). This will involve creating a send
862+
// array and copying data into the correct indices.
863+
864+
// Do the extra dimensions from the different ranks match?
865+
bool adjustDims = false;
866+
for (size_t idx = 1; idx < rcvDims.size(); idx++)
867+
{
868+
adjustDims = (rcvDims[idx] != getDims()[idx]);
869+
}
870+
871+
// Resize the dimensions to match the global dimensions
872+
if (adjustDims)
873+
{
874+
std::vector<std::string> sendBuffer(sendSize, missingValue());
875+
876+
// Map the local data into the sendBuffer using the dimensions
877+
for (size_t i = 0; i < data_.size(); ++i)
878+
{
879+
Location loc;
880+
881+
// Compute the location coordinate in the old data
882+
size_t idx = i;
883+
for (size_t dimIdx = 0; dimIdx < dims_.size(); ++dimIdx)
884+
{
885+
loc.push_back(idx % dims_[dimIdx]);
886+
idx /= dims_[dimIdx];
887+
}
888+
889+
// Map that location into the new data (compute the new index)
890+
idx = 0;
891+
for (size_t dimIdx = 0; dimIdx < rcvDims.size(); ++dimIdx)
892+
{
893+
idx += loc[dimIdx] * rcvDims[dimIdx];
894+
}
895+
896+
sendBuffer[idx] = data_[i];
897+
}
898+
899+
data_ = std::move(sendBuffer);
900+
}
901+
902+
size_t charsToSend = 0;
903+
for (const auto& str : data_)
904+
{
905+
charsToSend += str.size();
906+
}
907+
908+
size_t charsToReceive = charsToSend;
909+
comm.reduce(charsToReceive, charsToReceive, eckit::mpi::Operation::SUM, 0);
910+
911+
auto sizeArray = std::vector<int>(comm.size());
912+
comm.allGather(static_cast<int>(charsToSend), sizeArray.begin(), sizeArray.end());
913+
914+
std::vector<char> rcvBuffer(charsToReceive, 0);
915+
auto rcvCounts = std::vector<int>(comm.size());
916+
917+
std::vector<int> displacement(comm.size(), 0);
918+
for (size_t i = 1; i < comm.size(); i++)
919+
{
920+
displacement[i] = displacement[i - 1] + sizeArray[i - 1];
921+
}
922+
923+
std::vector<char> charSendBuffer;
924+
for (const auto& str : data_)
925+
{
926+
charSendBuffer.insert(charSendBuffer.end(), str.begin(), str.end());
927+
}
928+
929+
comm.gatherv(charSendBuffer, rcvBuffer, sizeArray, displacement, 0);
930+
931+
std::vector<int> myStrSizes(data_.size());
932+
for (size_t idx=0; idx < data_.size(); ++idx)
933+
{
934+
myStrSizes[idx] = data_[idx].size();
935+
}
936+
937+
comm.allGather(static_cast<int>(myStrSizes.size()), sizeArray.begin(), sizeArray.end());
938+
939+
for (size_t i = 1; i < comm.size(); i++)
940+
{
941+
displacement[i] = displacement[i - 1] + sizeArray[i - 1];
942+
}
943+
944+
size_t numStrs = data_.size();
945+
comm.reduce(numStrs, numStrs, eckit::mpi::Operation::SUM, 0);
946+
std::vector<int> strSizes(numStrs);
947+
comm.gatherv(myStrSizes, strSizes, sizeArray, displacement, 0);
948+
949+
if (comm.rank() == 0)
950+
{
951+
dims_ = rcvDims;
952+
953+
// write rcvBuffer back to data
954+
data_.resize(numStrs);
955+
size_t offset = 0;
956+
for (size_t idx = 0; idx < numStrs; ++idx)
957+
{
958+
std::string str(rcvBuffer.begin() + offset, rcvBuffer.begin() + offset + strSizes[idx]);
959+
data_[idx] = str;
960+
offset += strSizes[idx];
961+
}
962+
}
963+
}
964+
701965
/// \brief Append the data from another DataObject to this one.
702966
/// \param data The data object to append.
703967
void append(const std::shared_ptr<DataObjectBase>& data) final

0 commit comments

Comments
 (0)