Skip to content

Commit 7f694fc

Browse files
committed
Add trace_alltomany.c to evaluate alltoall vs issend/irecv
This program uses the trace file, trace_1024p_253n.dat.gz' collected from a WRF run on 1024 MPI processes of grid size 5200 x7600.
1 parent 23349ec commit 7f694fc

File tree

2 files changed

+392
-0
lines changed

2 files changed

+392
-0
lines changed

MPI/trace_1024p_253n.dat.gz

1.22 MB
Binary file not shown.

MPI/trace_alltomany.c

+392
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,392 @@
1+
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2+
*
3+
* Copyright (C) 2025, Northwestern University
4+
* See COPYRIGHT notice in top-level directory.
5+
*
6+
* Evaluate performane of all-to-many personalized communication implemented
7+
* with MPI_Alltoallw() and MPI_Issend()/MPI_Irecv(). The communication pattern
8+
* uses a trace from one of PnetCDF's benchmark programs, WRF-IO, running the
9+
* following commands on 8 CPU nodes, 128 MPI processes each.
10+
* srun -n 1024 wrf_io -l 5200 -w 7600 output.nc
11+
* It also used Lustre striping count 8, striping size 8 MB, MPI-IO hints of
12+
* cb_nodes 32 and cb_buffer_size 16 MB.
13+
*
14+
* To compile:
15+
* % mpicc -O2 trace_alltomany.c -o trace_alltomany
16+
*
17+
* Usage: this program requires an input file as the argument.
18+
* A trace file 'trace_1024p_253n.dat.gz' is provided. Run command
19+
* 'gunzip trace_1024p_253n.dat.gz' before using it.
20+
* This program can run with 1024 or less number of MPI processes.
21+
*
22+
* Example run command and output on screen:
23+
* % mpiexec -n 1024 ./trace_alltomany
24+
* number of MPI processes = 1024
25+
* number of iterations = 253
26+
* Comm amount using MPI_Issend/Irecv = 129074.16 MB
27+
* Time for using MPI_Issend/Irecv = 2.81 sec
28+
* Time bucket[1] = 0.31 sec
29+
* Time bucket[2] = 0.31 sec
30+
* Time bucket[3] = 0.29 sec
31+
* Time bucket[4] = 0.30 sec
32+
* Time bucket[5] = 0.29 sec
33+
* Time bucket[6] = 0.29 sec
34+
* Time bucket[7] = 0.29 sec
35+
* Time bucket[8] = 0.30 sec
36+
* Time bucket[9] = 0.19 sec
37+
* Comm amount using MPI_alltoallw = 129074.16 MB
38+
* Time for using MPI_alltoallw = 7.48 sec
39+
* Time bucket[1] = 0.83 sec
40+
* Time bucket[2] = 0.79 sec
41+
* Time bucket[3] = 0.77 sec
42+
* Time bucket[4] = 0.79 sec
43+
* Time bucket[5] = 0.77 sec
44+
* Time bucket[6] = 0.77 sec
45+
* Time bucket[7] = 0.80 sec
46+
* Time bucket[8] = 0.77 sec
47+
* Time bucket[9] = 0.53 sec
48+
*
49+
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
50+
51+
#include <stdio.h>
52+
#include <stdlib.h>
53+
#include <string.h>
54+
#include <assert.h>
55+
56+
#include <sys/types.h>
57+
#include <sys/stat.h>
58+
#include <fcntl.h>
59+
#include <unistd.h>
60+
#include <errno.h>
61+
62+
#include <mpi.h>
63+
64+
#define NTIMES 253
65+
#define NPROCS 1024
66+
67+
#define ERR \
68+
if (err != MPI_SUCCESS) { \
69+
int errorStringLen; \
70+
char errorString[MPI_MAX_ERROR_STRING]; \
71+
MPI_Error_string(err, errorString, &errorStringLen); \
72+
printf("Error at line %d: %s\n",__LINE__,errorString); \
73+
goto err_out; \
74+
}
75+
76+
typedef struct {
77+
int nprocs; /* number of peers with non-zero amount */
78+
int *ranks; /* rank IDs of peers with non-zero amount */
79+
int *amnts; /* amounts of peers with non-zero amount */
80+
} trace;
81+
82+
/* all-to-many personalized communication by calling MPI_Alltoallw() */
83+
void run_alltoallw(int ntimes,
84+
trace *sender,
85+
trace *recver,
86+
char **sendBuf,
87+
char **recvBuf)
88+
{
89+
int i, j, err, nprocs, rank, bucket_len;
90+
int *sendCounts, *recvCounts, *sendDisps, *recvDisps;
91+
MPI_Datatype *sendTypes, *recvTypes;
92+
MPI_Offset amnt, sum_amnt;
93+
double start_t, end_t, timing[10], maxt[10];
94+
95+
for (i=0; i<10; i++) timing[i]=0;
96+
bucket_len = ntimes / 10;
97+
if (ntimes % 10) bucket_len++;
98+
99+
MPI_Barrier(MPI_COMM_WORLD);
100+
timing[0] = MPI_Wtime();
101+
102+
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
103+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
104+
105+
sendTypes = (MPI_Datatype*) malloc(sizeof(MPI_Datatype) * nprocs * 2);
106+
recvTypes = sendTypes + nprocs;
107+
for (i=0; i<nprocs * 2; i++) sendTypes[i] = MPI_BYTE;
108+
109+
sendCounts = (int*) malloc(sizeof(int) * nprocs * 4);
110+
recvCounts = sendCounts + nprocs;
111+
sendDisps = recvCounts + nprocs;
112+
recvDisps = sendDisps + nprocs;
113+
114+
start_t = MPI_Wtime();
115+
amnt = 0;
116+
for (j=0; j<ntimes; j++) {
117+
int disp, peer;
118+
/* set up sendcounts and sdispls arguments of MPI_Alltoallw() */
119+
for (i=0; i<nprocs*4; i++)
120+
sendCounts[i] = 0;
121+
122+
disp = 0;
123+
for (i=0; i<sender[j].nprocs; i++) {
124+
peer = sender[j].ranks[i];
125+
if (peer >= nprocs) continue;
126+
sendCounts[peer] = sender[j].amnts[i];
127+
sendDisps[peer] = disp;
128+
disp += sendCounts[peer];
129+
}
130+
disp = 0;
131+
for (i=0; i<recver[j].nprocs; i++) {
132+
peer = recver[j].ranks[i];
133+
if (peer >= nprocs) continue;
134+
recvCounts[peer] = recver[j].amnts[i];
135+
recvDisps[peer] = disp;
136+
disp += recvCounts[peer];
137+
}
138+
amnt += disp;
139+
140+
err = MPI_Alltoallw(sendBuf[j], sendCounts, sendDisps, sendTypes,
141+
recvBuf[j], recvCounts, recvDisps, recvTypes,
142+
MPI_COMM_WORLD); ERR
143+
144+
/* record timing */
145+
if (j > 0 && j % bucket_len == 0) {
146+
end_t = MPI_Wtime();
147+
timing[j / bucket_len] = end_t - start_t;
148+
start_t = end_t;
149+
}
150+
}
151+
end_t = MPI_Wtime();
152+
timing[9] = end_t - start_t;
153+
timing[0] = end_t - timing[0]; /* end-to-end time */
154+
155+
err_out:
156+
free(sendTypes);
157+
free(sendCounts);
158+
159+
MPI_Reduce(&timing, &maxt, 10, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
160+
MPI_Reduce(&amnt, &sum_amnt, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
161+
if (rank == 0) {
162+
printf("Comm amount using MPI_alltoallw = %.2f MB\n",
163+
(float)sum_amnt/1048576.0);
164+
printf("Time for using MPI_alltoallw = %.2f sec\n", maxt[0]);
165+
for (i=1; i<10; i++)
166+
printf("\tTime bucket[%d] = %.2f sec\n", i, maxt[i]);
167+
fflush(stdout);
168+
}
169+
}
170+
171+
/* all-to-many personalized communication by calling MPI_Issend/Irecv() */
172+
void run_async_send_recv(int ntimes,
173+
trace *sender,
174+
trace *recver,
175+
char **sendBuf,
176+
char **recvBuf)
177+
{
178+
char *sendPtr, *recvPtr;
179+
int i, j, err, nprocs, rank, nreqs, bucket_len;
180+
MPI_Request *reqs;
181+
MPI_Status *st;
182+
MPI_Offset amnt, sum_amnt;
183+
double start_t, end_t, timing[10], maxt[10];
184+
185+
for (i=0; i<10; i++) timing[i]=0;
186+
bucket_len = ntimes / 10;
187+
if (ntimes % 10) bucket_len++;
188+
189+
MPI_Barrier(MPI_COMM_WORLD);
190+
timing[0] = MPI_Wtime();
191+
192+
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
193+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
194+
195+
/* allocate MPI_Request and MPI_Status arrays */
196+
reqs = (MPI_Request*) malloc(sizeof(MPI_Request) * 2 * nprocs);
197+
st = (MPI_Status*) malloc(sizeof(MPI_Status) * 2 * nprocs);
198+
199+
start_t = MPI_Wtime();
200+
amnt = 0;
201+
for (j=0; j<ntimes; j++) {
202+
nreqs = 0;
203+
204+
/* receivers */
205+
recvPtr = recvBuf[j];
206+
for (i=0; i<recver[j].nprocs; i++) {
207+
if (recver[j].ranks[i] >= nprocs) continue;
208+
err = MPI_Irecv(recvPtr, recver[j].amnts[i], MPI_BYTE,
209+
recver[j].ranks[i], 0, MPI_COMM_WORLD,
210+
&reqs[nreqs++]);
211+
ERR
212+
recvPtr += recver[j].amnts[i];
213+
amnt += recver[j].amnts[i];
214+
}
215+
/* senders */
216+
sendPtr = sendBuf[j];
217+
for (i=0; i<sender[j].nprocs; i++) {
218+
if (sender[j].ranks[i] >= nprocs) continue;
219+
err = MPI_Issend(sendPtr, sender[j].amnts[i], MPI_BYTE,
220+
sender[j].ranks[i], 0, MPI_COMM_WORLD,
221+
&reqs[nreqs++]);
222+
ERR
223+
sendPtr += sender[j].amnts[i];
224+
}
225+
226+
err = MPI_Waitall(nreqs, reqs, st); ERR
227+
228+
if (j > 0 && j % bucket_len == 0) {
229+
end_t = MPI_Wtime();
230+
timing[j / bucket_len] = end_t - start_t;
231+
start_t = end_t;
232+
}
233+
}
234+
end_t = MPI_Wtime();
235+
timing[9] = end_t - start_t;
236+
timing[0] = end_t - timing[0]; /* end-to-end time */
237+
238+
err_out:
239+
free(st);
240+
free(reqs);
241+
242+
MPI_Reduce(&timing, &maxt, 10, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
243+
MPI_Reduce(&amnt, &sum_amnt, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
244+
if (rank == 0) {
245+
printf("Comm amount using MPI_Issend/Irecv = %.2f MB\n",
246+
(float)sum_amnt/1048576.0);
247+
printf("Time for using MPI_Issend/Irecv = %.2f sec\n", maxt[0]);
248+
for (i=1; i<10; i++)
249+
printf("\tTime bucket[%d] = %.2f sec\n", i, maxt[i]);
250+
fflush(stdout);
251+
}
252+
}
253+
254+
/*----< main() >------------------------------------------------------------*/
255+
int main(int argc, char **argv) {
256+
int i, j, fd, rank, nprocs, ntimes;
257+
char **sendBuf, **recvBuf;
258+
259+
MPI_Init(&argc, &argv);
260+
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
261+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
262+
263+
if (argc != 2) {
264+
if (rank == 0) printf("Input trace file is required\n");
265+
goto err_out;
266+
}
267+
268+
if (nprocs > NPROCS) {
269+
if (rank == 0) printf("Number of MPI processes must be <= %d\n",NPROCS);
270+
goto err_out;
271+
}
272+
273+
ntimes = NTIMES;
274+
if (rank == 0) {
275+
printf("number of MPI processes = %d\n", nprocs);
276+
printf("number of iterations = %d\n", ntimes);
277+
}
278+
279+
if ((fd = open(argv[1], O_RDONLY, 0600)) == -1) {
280+
printf("Error! open() failed %s (error: %s)\n",argv[1],strerror(errno));
281+
goto err_out;
282+
}
283+
284+
/* trace file was generated using 1024 MPI processes, running 253
285+
* iterations. nprocs can be less than 1024
286+
*/
287+
/* read nprocs */
288+
int in_nprocs;
289+
read(fd, &in_nprocs, sizeof(int));
290+
assert(in_nprocs == NPROCS);
291+
292+
/* read ntimes */
293+
int in_ntimes;
294+
read(fd, &in_ntimes, sizeof(int));
295+
assert(in_ntimes == NTIMES);
296+
297+
/* read block_lens[NPROCS] */
298+
int *block_lens = (int*) malloc(sizeof(int) * NPROCS);
299+
read(fd, block_lens, sizeof(int) * NPROCS);
300+
301+
/* read block 'rank' */
302+
int *file_block = (int*) malloc(sizeof(int) * block_lens[rank]);
303+
off_t off=0;
304+
for (i=0; i<rank; i++) off += block_lens[i];
305+
off *= sizeof(int);
306+
lseek(fd, off, SEEK_CUR);
307+
read(fd, file_block, sizeof(int) * block_lens[rank]);
308+
309+
free(block_lens);
310+
311+
/* close input file */
312+
close(fd);
313+
314+
/* allocate buffer for storing pairwise communication amounts */
315+
trace *sender = (trace*) malloc(sizeof(trace) * NTIMES);
316+
trace *recver = (trace*) malloc(sizeof(trace) * NTIMES);
317+
318+
int *nonzero_nprocs, *ptr=file_block;
319+
320+
/* populate sender communication pattern */
321+
nonzero_nprocs = ptr;
322+
ptr += NTIMES;
323+
for (i=0; i<NTIMES; i++) {
324+
sender[i].nprocs = nonzero_nprocs[i];
325+
sender[i].ranks = ptr;
326+
ptr += nonzero_nprocs[i];
327+
sender[i].amnts = ptr;
328+
ptr += nonzero_nprocs[i];
329+
}
330+
331+
/* populate receiver communication pattern */
332+
nonzero_nprocs = ptr;
333+
ptr += NTIMES;
334+
for (i=0; i<NTIMES; i++) {
335+
recver[i].nprocs = nonzero_nprocs[i];
336+
recver[i].ranks = ptr;
337+
ptr += nonzero_nprocs[i];
338+
recver[i].amnts = ptr;
339+
ptr += nonzero_nprocs[i];
340+
}
341+
342+
/* allocate send and receive message buffers */
343+
sendBuf = (char**) malloc(sizeof(char*) * ntimes);
344+
for (i=0; i<ntimes; i++) {
345+
size_t amnt=0;
346+
for (j=0; j<sender[i].nprocs; j++) {
347+
if (sender[i].ranks[j] >= nprocs) break;
348+
amnt += sender[i].amnts[j];
349+
}
350+
sendBuf[i] = (amnt == 0) ? NULL : (char*) malloc(amnt);
351+
for (j=0; j<amnt; j++) sendBuf[i][j] = (rank+j)%128;
352+
}
353+
354+
/* recv buffer is reused in each iteration */
355+
recvBuf = (char**) malloc(sizeof(char*) * ntimes);
356+
size_t recv_amnt = 0;
357+
for (i=0; i<ntimes; i++) {
358+
size_t amnt=0;
359+
for (j=0; j<recver[i].nprocs; j++) {
360+
if (recver[i].ranks[j] >= nprocs) break;
361+
amnt += recver[i].amnts[j];
362+
}
363+
if (amnt > recv_amnt) recv_amnt = amnt;
364+
}
365+
recvBuf[0] = (recv_amnt == 0) ? NULL : (char*) malloc(recv_amnt);
366+
for (i=1; i<ntimes; i++) recvBuf[i] = recvBuf[0];
367+
368+
for (i=0; i<3; i++) {
369+
370+
/* perform all-to-many communication */
371+
MPI_Barrier(MPI_COMM_WORLD);
372+
run_async_send_recv(ntimes, sender, recver, sendBuf, recvBuf);
373+
374+
/* perform all-to-many communication */
375+
MPI_Barrier(MPI_COMM_WORLD);
376+
run_alltoallw(ntimes, sender, recver, sendBuf, recvBuf);
377+
378+
}
379+
380+
for (i=0; i<ntimes; i++) if (sendBuf[i] != NULL) free(sendBuf[i]);
381+
free(sendBuf);
382+
if (recvBuf[0] != NULL) free(recvBuf[0]);
383+
free(recvBuf);
384+
free(sender);
385+
free(recver);
386+
free(file_block);
387+
388+
err_out:
389+
MPI_Finalize();
390+
return 0;
391+
}
392+

0 commit comments

Comments
 (0)