-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathredev_adios_channel.h
192 lines (185 loc) · 6.63 KB
/
redev_adios_channel.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#ifndef REDEV_REDEV_ADIOS_CHANNEL_H
#define REDEV_REDEV_ADIOS_CHANNEL_H
#include "redev_assert.h"
#include "redev_profile.h"
#include <adios2.h>
namespace redev {
class AdiosChannel {
public:
AdiosChannel(adios2::ADIOS &adios, MPI_Comm comm, std::string name,
adios2::Params params, TransportType transportType,
ProcessType processType, Partition &partition, std::string path,
bool noClients = false)
: comm_(comm), process_type_(processType), partition_(partition)
{
REDEV_FUNCTION_TIMER;
MPI_Comm_rank(comm, &rank_);
auto s2cName = path + name + "_s2c";
auto c2sName = path + name + "_c2s";
s2c_io_ = adios.DeclareIO(s2cName);
c2s_io_ = adios.DeclareIO(c2sName);
if (transportType == TransportType::SST && noClients == true) {
// TODO log message here
transportType = TransportType::BP4;
}
std::string engineType;
switch (transportType) {
case TransportType::BP4:
engineType = "BP4";
s2cName = s2cName + ".bp";
c2sName = c2sName + ".bp";
break;
case TransportType::SST:
engineType = "SST";
break;
// no default case. This will cause a compiler error if we do not handle a
// an engine type that has been defined in the TransportType enum.
// (-Werror=switch)
}
s2c_io_.SetEngine(engineType);
c2s_io_.SetEngine(engineType);
s2c_io_.SetParameters(params);
c2s_io_.SetParameters(params);
REDEV_ALWAYS_ASSERT(s2c_io_.EngineType() == c2s_io_.EngineType());
switch (transportType) {
case TransportType::SST:
openEnginesSST(noClients, s2cName, c2sName, s2c_io_, c2s_io_, s2c_engine_,
c2s_engine_);
break;
case TransportType::BP4:
openEnginesBP4(noClients, s2cName, c2sName, s2c_io_, c2s_io_, s2c_engine_,
c2s_engine_);
break;
}
// TODO pull begin/end step out of Setup/SendReceive metadata functions
// begin step
// send metadata
Setup(s2c_io_, s2c_engine_);
num_server_ranks_ = SendServerCommSizeToClient(s2c_io_, s2c_engine_);
num_client_ranks_ = SendClientCommSizeToServer(c2s_io_, c2s_engine_);
// end step
}
// don't allow copying of class because it creates
AdiosChannel(const AdiosChannel &) = delete;
AdiosChannel operator=(const AdiosChannel &) = delete;
// FIXME
AdiosChannel(AdiosChannel &&o)
: s2c_io_(std::exchange(o.s2c_io_, adios2::IO())),
c2s_io_(std::exchange(o.c2s_io_, adios2::IO())),
c2s_engine_(std::exchange(o.c2s_engine_, adios2::Engine())),
s2c_engine_(std::exchange(o.s2c_engine_, adios2::Engine())),
num_client_ranks_(o.num_client_ranks_),
num_server_ranks_(o.num_server_ranks_),
comm_(std::exchange(o.comm_, MPI_COMM_NULL)),
process_type_(o.process_type_), rank_(o.rank_),
partition_(o.partition_) {REDEV_FUNCTION_TIMER;}
AdiosChannel operator=(AdiosChannel &&) = delete;
// FIXME IMPL RULE OF 5
~AdiosChannel() {
REDEV_FUNCTION_TIMER;
// NEED TO CHECK that the engine exists before trying to close it because it
// could be in a moved from state
if (s2c_engine_) {
s2c_engine_.Close();
}
if (c2s_engine_) {
c2s_engine_.Close();
}
}
template <typename T>
[[nodiscard]] BidirectionalComm<T> CreateComm(std::string name, MPI_Comm comm) {
REDEV_FUNCTION_TIMER;
// TODO, remove s2c/c2s destinction on variable names then use std::move
// name
if(comm != MPI_COMM_NULL) {
auto s2c = std::make_unique<AdiosComm<T>>(comm, num_client_ranks_,
s2c_engine_, s2c_io_, name);
auto c2s = std::make_unique<AdiosComm<T>>(comm, num_server_ranks_,
c2s_engine_, c2s_io_, name);
switch (process_type_) {
case ProcessType::Client:
return {std::move(c2s), std::move(s2c)};
case ProcessType::Server:
return {std::move(s2c), std::move(c2s)};
}
}
return {std::make_unique<NoOpComm<T>>(), std::make_unique<NoOpComm<T>>()};
}
// TODO s2c/c2s Engine/IO -> send/receive Engine/IO. This removes need for all
// the switch statements...
void BeginSendCommunicationPhase() {
REDEV_FUNCTION_TIMER;
adios2::StepStatus status;
switch (process_type_) {
case ProcessType::Client:
status = c2s_engine_.BeginStep();
break;
case ProcessType::Server:
status = s2c_engine_.BeginStep();
break;
}
REDEV_ALWAYS_ASSERT(status == adios2::StepStatus::OK);
}
void EndSendCommunicationPhase() {
switch (process_type_) {
case ProcessType::Client:
c2s_engine_.EndStep();
break;
case ProcessType::Server:
s2c_engine_.EndStep();
break;
}
}
void BeginReceiveCommunicationPhase() {
REDEV_FUNCTION_TIMER;
adios2::StepStatus status;
switch (process_type_) {
case ProcessType::Client:
status = s2c_engine_.BeginStep();
break;
case ProcessType::Server:
status = c2s_engine_.BeginStep();
break;
}
REDEV_ALWAYS_ASSERT(status == adios2::StepStatus::OK);
}
void EndReceiveCommunicationPhase() {
REDEV_FUNCTION_TIMER;
switch (process_type_) {
case ProcessType::Client:
s2c_engine_.EndStep();
break;
case ProcessType::Server:
c2s_engine_.EndStep();
break;
}
}
private:
void openEnginesBP4(bool noClients, std::string s2cName, std::string c2sName,
adios2::IO &s2cIO, adios2::IO &c2sIO,
adios2::Engine &s2cEngine, adios2::Engine &c2sEngine);
void openEnginesSST(bool noClients, std::string s2cName, std::string c2sName,
adios2::IO &s2cIO, adios2::IO &c2sIO,
adios2::Engine &s2cEngine, adios2::Engine &c2sEngine);
[[nodiscard]] redev::LO SendServerCommSizeToClient(adios2::IO &s2cIO,
adios2::Engine &s2cEngine);
[[nodiscard]] redev::LO SendClientCommSizeToServer(adios2::IO &c2sIO,
adios2::Engine &c2sEngine);
[[nodiscard]] std::size_t
SendPartitionTypeToClient(adios2::IO &s2cIO, adios2::Engine &s2cEngine);
void Setup(adios2::IO &s2cIO, adios2::Engine &s2cEngine);
void CheckVersion(adios2::Engine &eng, adios2::IO &io);
void ConstructPartitionFromIndex(size_t partition_index);
adios2::IO s2c_io_;
adios2::IO c2s_io_;
adios2::Engine s2c_engine_;
adios2::Engine c2s_engine_;
redev::LO num_client_ranks_;
redev::LO num_server_ranks_;
MPI_Comm comm_;
ProcessType process_type_;
int rank_;
Partition &partition_;
};
} // namespace redev
#endif // REDEV__REDEV_ADIOS_CHANNEL_H