Skip to content

Commit 6b81ce9

Browse files
committed
Add trace_alltomany.c to evaluate alltoall vs issend/irecv
This program uses the trace file, trace_1024p_253n.dat.gz' collected from a WRF run on 1024 MPI processes of grid size 5200 x7600.
1 parent 23349ec commit 6b81ce9

File tree

2 files changed

+382
-0
lines changed

2 files changed

+382
-0
lines changed

MPI/trace_1024p_253n.dat.gz

1.22 MB
Binary file not shown.

MPI/trace_alltomany.c

+382
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,382 @@
1+
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2+
*
3+
* Copyright (C) 2025, Northwestern University
4+
* See COPYRIGHT notice in top-level directory.
5+
*
6+
* Evaluate performane of all-to-many personalized communication implemented
7+
* with MPI_Alltoallw() and MPI_Issend()/MPI_Irecv().
8+
*
9+
* To compile:
10+
* % mpicc -O2 trace_alltomany.c -o trace_alltomany
11+
*
12+
* Usage: this program requires an input file as the argument.
13+
* A trace file 'trace_1024p_253n.dat.gz' is provided. Run command
14+
* 'gunzip trace_1024p_253n.dat.gz' before using it.
15+
* This program can run with 1024 or less number of MPI processes.
16+
*
17+
* Example run command and output on screen:
18+
* % mpiexec -n 1024 ./trace_alltomany
19+
* number of MPI processes = 1024
20+
* number of iterations = 253
21+
* Comm amount using MPI_Issend/Irecv = 129074.16 MB
22+
* Time for using MPI_Issend/Irecv = 2.81 sec
23+
* Time bucket[1] = 0.31 sec
24+
* Time bucket[2] = 0.31 sec
25+
* Time bucket[3] = 0.29 sec
26+
* Time bucket[4] = 0.30 sec
27+
* Time bucket[5] = 0.29 sec
28+
* Time bucket[6] = 0.29 sec
29+
* Time bucket[7] = 0.29 sec
30+
* Time bucket[8] = 0.30 sec
31+
* Time bucket[9] = 0.19 sec
32+
* Comm amount using MPI_alltoallw = 129074.16 MB
33+
* Time for using MPI_alltoallw = 7.48 sec
34+
* Time bucket[1] = 0.83 sec
35+
* Time bucket[2] = 0.79 sec
36+
* Time bucket[3] = 0.77 sec
37+
* Time bucket[4] = 0.79 sec
38+
* Time bucket[5] = 0.77 sec
39+
* Time bucket[6] = 0.77 sec
40+
* Time bucket[7] = 0.80 sec
41+
* Time bucket[8] = 0.77 sec
42+
* Time bucket[9] = 0.53 sec
43+
*
44+
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
45+
46+
#include <stdio.h>
47+
#include <stdlib.h>
48+
#include <string.h>
49+
#include <assert.h>
50+
51+
#include <sys/types.h>
52+
#include <sys/stat.h>
53+
#include <fcntl.h>
54+
#include <unistd.h>
55+
#include <errno.h>
56+
57+
#include <mpi.h>
58+
59+
#define NTIMES 253
60+
#define NPROCS 1024
61+
62+
#define ERR \
63+
if (err != MPI_SUCCESS) { \
64+
int errorStringLen; \
65+
char errorString[MPI_MAX_ERROR_STRING]; \
66+
MPI_Error_string(err, errorString, &errorStringLen); \
67+
printf("Error at line %d: %s\n",__LINE__,errorString); \
68+
goto err_out; \
69+
}
70+
71+
typedef struct {
72+
int nprocs; /* number of peers with non-zero amount */
73+
int *ranks; /* rank IDs of peers with non-zero amount */
74+
int *amnts; /* amounts of peers with non-zero amount */
75+
} trace;
76+
77+
/* all-to-many personalized communication by calling MPI_Alltoallw() */
78+
void run_alltoallw(int ntimes,
79+
trace *sender,
80+
trace *recver,
81+
char **sendBuf,
82+
char **recvBuf)
83+
{
84+
int i, j, err, nprocs, rank, bucket_len;
85+
int *sendCounts, *recvCounts, *sendDisps, *recvDisps;
86+
MPI_Datatype *sendTypes, *recvTypes;
87+
MPI_Offset amnt, sum_amnt;
88+
double start_t, end_t, timing[10], maxt[10];
89+
90+
for (i=0; i<10; i++) timing[i]=0;
91+
bucket_len = ntimes / 10;
92+
if (ntimes % 10) bucket_len++;
93+
94+
MPI_Barrier(MPI_COMM_WORLD);
95+
timing[0] = MPI_Wtime();
96+
97+
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
98+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
99+
100+
sendTypes = (MPI_Datatype*) malloc(sizeof(MPI_Datatype) * nprocs * 2);
101+
recvTypes = sendTypes + nprocs;
102+
for (i=0; i<nprocs * 2; i++) sendTypes[i] = MPI_BYTE;
103+
104+
sendCounts = (int*) malloc(sizeof(int) * nprocs * 4);
105+
recvCounts = sendCounts + nprocs;
106+
sendDisps = recvCounts + nprocs;
107+
recvDisps = sendDisps + nprocs;
108+
109+
start_t = MPI_Wtime();
110+
amnt = 0;
111+
for (j=0; j<ntimes; j++) {
112+
int disp, peer;
113+
/* set up sendcounts and sdispls arguments of MPI_Alltoallw() */
114+
for (i=0; i<nprocs*4; i++)
115+
sendCounts[i] = 0;
116+
117+
disp = 0;
118+
for (i=0; i<sender[j].nprocs; i++) {
119+
peer = sender[j].ranks[i];
120+
if (peer >= nprocs) continue;
121+
sendCounts[peer] = sender[j].amnts[i];
122+
sendDisps[peer] = disp;
123+
disp += sendCounts[peer];
124+
}
125+
disp = 0;
126+
for (i=0; i<recver[j].nprocs; i++) {
127+
peer = recver[j].ranks[i];
128+
if (peer >= nprocs) continue;
129+
recvCounts[peer] = recver[j].amnts[i];
130+
recvDisps[peer] = disp;
131+
disp += recvCounts[peer];
132+
}
133+
amnt += disp;
134+
135+
err = MPI_Alltoallw(sendBuf[j], sendCounts, sendDisps, sendTypes,
136+
recvBuf[j], recvCounts, recvDisps, recvTypes,
137+
MPI_COMM_WORLD); ERR
138+
139+
/* record timing */
140+
if (j > 0 && j % bucket_len == 0) {
141+
end_t = MPI_Wtime();
142+
timing[j / bucket_len] = end_t - start_t;
143+
start_t = end_t;
144+
}
145+
}
146+
end_t = MPI_Wtime();
147+
timing[9] = end_t - start_t;
148+
timing[0] = end_t - timing[0]; /* end-to-end time */
149+
150+
err_out:
151+
free(sendTypes);
152+
free(sendCounts);
153+
154+
MPI_Reduce(&timing, &maxt, 10, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
155+
MPI_Reduce(&amnt, &sum_amnt, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
156+
if (rank == 0) {
157+
printf("Comm amount using MPI_alltoallw = %.2f MB\n",
158+
(float)sum_amnt/1048576.0);
159+
printf("Time for using MPI_alltoallw = %.2f sec\n", maxt[0]);
160+
for (i=1; i<10; i++)
161+
printf("\tTime bucket[%d] = %.2f sec\n", i, maxt[i]);
162+
fflush(stdout);
163+
}
164+
}
165+
166+
/* all-to-many personalized communication by calling MPI_Issend/Irecv() */
167+
void run_async_send_recv(int ntimes,
168+
trace *sender,
169+
trace *recver,
170+
char **sendBuf,
171+
char **recvBuf)
172+
{
173+
char *sendPtr, *recvPtr;
174+
int i, j, err, nprocs, rank, nreqs, bucket_len;
175+
MPI_Request *reqs;
176+
MPI_Status *st;
177+
MPI_Offset amnt, sum_amnt;
178+
double start_t, end_t, timing[10], maxt[10];
179+
180+
for (i=0; i<10; i++) timing[i]=0;
181+
bucket_len = ntimes / 10;
182+
if (ntimes % 10) bucket_len++;
183+
184+
MPI_Barrier(MPI_COMM_WORLD);
185+
timing[0] = MPI_Wtime();
186+
187+
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
188+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
189+
190+
/* allocate MPI_Request and MPI_Status arrays */
191+
reqs = (MPI_Request*) malloc(sizeof(MPI_Request) * 2 * nprocs);
192+
st = (MPI_Status*) malloc(sizeof(MPI_Status) * 2 * nprocs);
193+
194+
start_t = MPI_Wtime();
195+
amnt = 0;
196+
for (j=0; j<ntimes; j++) {
197+
nreqs = 0;
198+
199+
/* receivers */
200+
recvPtr = recvBuf[j];
201+
for (i=0; i<recver[j].nprocs; i++) {
202+
if (recver[j].ranks[i] >= nprocs) continue;
203+
err = MPI_Irecv(recvPtr, recver[j].amnts[i], MPI_BYTE,
204+
recver[j].ranks[i], 0, MPI_COMM_WORLD,
205+
&reqs[nreqs++]);
206+
ERR
207+
recvPtr += recver[j].amnts[i];
208+
amnt += recver[j].amnts[i];
209+
}
210+
/* senders */
211+
sendPtr = sendBuf[j];
212+
for (i=0; i<sender[j].nprocs; i++) {
213+
if (sender[j].ranks[i] >= nprocs) continue;
214+
err = MPI_Issend(sendPtr, sender[j].amnts[i], MPI_BYTE,
215+
sender[j].ranks[i], 0, MPI_COMM_WORLD,
216+
&reqs[nreqs++]);
217+
ERR
218+
sendPtr += sender[j].amnts[i];
219+
}
220+
221+
err = MPI_Waitall(nreqs, reqs, st); ERR
222+
223+
if (j > 0 && j % bucket_len == 0) {
224+
end_t = MPI_Wtime();
225+
timing[j / bucket_len] = end_t - start_t;
226+
start_t = end_t;
227+
}
228+
}
229+
end_t = MPI_Wtime();
230+
timing[9] = end_t - start_t;
231+
timing[0] = end_t - timing[0]; /* end-to-end time */
232+
233+
err_out:
234+
free(st);
235+
free(reqs);
236+
237+
MPI_Reduce(&timing, &maxt, 10, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
238+
MPI_Reduce(&amnt, &sum_amnt, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
239+
if (rank == 0) {
240+
printf("Comm amount using MPI_Issend/Irecv = %.2f MB\n",
241+
(float)sum_amnt/1048576.0);
242+
printf("Time for using MPI_Issend/Irecv = %.2f sec\n", maxt[0]);
243+
for (i=1; i<10; i++)
244+
printf("\tTime bucket[%d] = %.2f sec\n", i, maxt[i]);
245+
fflush(stdout);
246+
}
247+
}
248+
249+
/*----< main() >------------------------------------------------------------*/
250+
int main(int argc, char **argv) {
251+
int i, j, fd, rank, nprocs, ntimes;
252+
char **sendBuf, **recvBuf;
253+
254+
MPI_Init(&argc, &argv);
255+
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
256+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
257+
258+
if (argc != 2) {
259+
if (rank == 0) printf("Input trace file is required\n");
260+
goto err_out;
261+
}
262+
263+
if (nprocs > NPROCS) {
264+
if (rank == 0) printf("Number of MPI processes must be <= %d\n",NPROCS);
265+
goto err_out;
266+
}
267+
268+
ntimes = NTIMES;
269+
if (rank == 0) {
270+
printf("number of MPI processes = %d\n", nprocs);
271+
printf("number of iterations = %d\n", ntimes);
272+
}
273+
274+
if ((fd = open(argv[1], O_RDONLY, 0600)) == -1) {
275+
printf("Error! open() failed %s (error: %s)\n",argv[1],strerror(errno));
276+
goto err_out;
277+
}
278+
279+
/* trace file was generated using 1024 MPI processes, running 253
280+
* iterations. nprocs can be less than 1024
281+
*/
282+
/* read nprocs */
283+
int in_nprocs;
284+
read(fd, &in_nprocs, sizeof(int));
285+
assert(in_nprocs == NPROCS);
286+
287+
/* read ntimes */
288+
int in_ntimes;
289+
read(fd, &in_ntimes, sizeof(int));
290+
assert(in_ntimes == NTIMES);
291+
292+
/* read block_lens[NPROCS] */
293+
int *block_lens = (int*) malloc(sizeof(int) * NPROCS);
294+
read(fd, block_lens, sizeof(int) * NPROCS);
295+
296+
/* read block 'rank' */
297+
int *file_block = (int*) malloc(sizeof(int) * block_lens[rank]);
298+
off_t off=0;
299+
for (i=0; i<rank; i++) off += block_lens[i];
300+
off *= sizeof(int);
301+
lseek(fd, off, SEEK_CUR);
302+
read(fd, file_block, sizeof(int) * block_lens[rank]);
303+
304+
free(block_lens);
305+
306+
/* close input file */
307+
close(fd);
308+
309+
/* allocate buffer for storing pairwise communication amounts */
310+
trace *sender = (trace*) malloc(sizeof(trace) * NTIMES);
311+
trace *recver = (trace*) malloc(sizeof(trace) * NTIMES);
312+
313+
int *nonzero_nprocs, *ptr=file_block;
314+
315+
/* populate sender communication pattern */
316+
nonzero_nprocs = ptr;
317+
ptr += NTIMES;
318+
for (i=0; i<NTIMES; i++) {
319+
sender[i].nprocs = nonzero_nprocs[i];
320+
sender[i].ranks = ptr;
321+
ptr += nonzero_nprocs[i];
322+
sender[i].amnts = ptr;
323+
ptr += nonzero_nprocs[i];
324+
}
325+
326+
/* populate receiver communication pattern */
327+
nonzero_nprocs = ptr;
328+
ptr += NTIMES;
329+
for (i=0; i<NTIMES; i++) {
330+
recver[i].nprocs = nonzero_nprocs[i];
331+
recver[i].ranks = ptr;
332+
ptr += nonzero_nprocs[i];
333+
recver[i].amnts = ptr;
334+
ptr += nonzero_nprocs[i];
335+
}
336+
337+
/* allocate send and receive message buffers */
338+
sendBuf = (char**) malloc(sizeof(char*) * ntimes);
339+
size_t amnt=0;
340+
for (i=0; i<ntimes; i++) {
341+
for (j=0; j<sender[i].nprocs; j++) {
342+
if (sender[i].ranks[j] >= nprocs) break;
343+
amnt += sender[i].amnts[j];
344+
}
345+
sendBuf[i] = (amnt == 0) ? NULL : (char*) malloc(amnt);
346+
for (j=0; j<amnt; j++) sendBuf[i][j] = (rank+j)%128;
347+
}
348+
recvBuf = (char**) malloc(sizeof(char*) * ntimes);
349+
for (i=0; i<ntimes; i++) {
350+
size_t amnt=0;
351+
for (j=0; j<recver[i].nprocs; j++) {
352+
if (recver[i].ranks[j] >= nprocs) break;
353+
amnt += recver[i].amnts[j];
354+
}
355+
recvBuf[i] = (amnt == 0) ? NULL : (char*) malloc(amnt);
356+
}
357+
358+
for (i=0; i<3; i++) {
359+
360+
/* perform all-to-many communication */
361+
MPI_Barrier(MPI_COMM_WORLD);
362+
run_async_send_recv(ntimes, sender, recver, sendBuf, recvBuf);
363+
364+
/* perform all-to-many communication */
365+
MPI_Barrier(MPI_COMM_WORLD);
366+
run_alltoallw(ntimes, sender, recver, sendBuf, recvBuf);
367+
368+
}
369+
370+
for (i=0; i<ntimes; i++) if (recvBuf[i] != NULL) free(recvBuf[i]);
371+
for (i=0; i<ntimes; i++) if (sendBuf[i] != NULL) free(sendBuf[i]);
372+
free(sendBuf);
373+
free(recvBuf);
374+
free(sender);
375+
free(recver);
376+
free(file_block);
377+
378+
err_out:
379+
MPI_Finalize();
380+
return 0;
381+
}
382+

0 commit comments

Comments
 (0)