-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcollective_write.py
170 lines (137 loc) · 5.28 KB
/
collective_write.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#
# Copyright (C) 2024, Northwestern University and Argonne National Laboratory
# See COPYRIGHT notice in top-level directory.
#
"""
This example mimics the coll_perf.c from ROMIO. It creates a netcdf file and
writes a number of 3D integer non-record variables. The measured write
bandwidth is reported at the end.
To run:
% mpiexec -n num_process python3 collective_write.py [test_file_name] [-l len]
where len decides the size of each local array, which is len x len x len.
So, each non-record variable is of size len*len*len * nprocs * sizeof(int)
All variables are partitioned among all processes in a 3D block-block-block
fashion.
Example commands for MPI run and outputs from running ncmpidump on the
netCDF file produced by this example program:
% mpiexec -n 32 python3 collective_write.py tmp/test1.nc -l 100
% ncmpidump tmp/test1.nc
Example standard output:
MPI hint: cb_nodes = 2
MPI hint: cb_buffer_size = 16777216
MPI hint: striping_factor = 32
MPI hint: striping_unit = 1048576
Local array size 100 x 100 x 100 integers, size = 3.81 MB
Global array size 400 x 400 x 200 integers, write size = 0.30 GB
procs Global array size exec(sec) write(MB/s)
------- ------------------ --------- -----------
32 400 x 400 x 200 6.67 45.72
"""
import sys, os, argparse
import numpy as np
from mpi4py import MPI
import pnetcdf
def pnetcdf_io(filename, file_format, length):
# number of dimensions
NDIMS = 3
# number of variables
NUM_VARS = 10
if verbose and rank == 0:
print("Number of variables = ", NUM_VARS)
print("Number of dimensions = ", NDIMS)
start = np.zeros(NDIMS, dtype=np.int32)
count = np.zeros(NDIMS, dtype=np.int32)
gsizes = np.zeros(NDIMS, dtype=np.int32)
buf = []
# calculate local subarray access pattern
psizes = MPI.Compute_dims(nprocs, NDIMS)
start[0] = rank % psizes[0]
start[1] = (rank // psizes[1]) % psizes[1]
start[2] = (rank // (psizes[0] * psizes[1])) % psizes[2]
bufsize = 1
for i in range(NDIMS):
gsizes[i] = length * psizes[i]
start[i] *= length
count[i] = length
bufsize *= length
end = np.add(start, count)
# Allocate buffer and initialize with non-zero numbers
for i in range(NUM_VARS):
buf.append(np.empty(bufsize, dtype=np.int32))
for j in range(bufsize):
buf[i][j] = rank * i + 123 + j
# Create the file using file clobber mode
f = pnetcdf.File(filename = filename,
mode = 'w',
format = file_format,
comm = comm,
info = None)
# Define dimensions
dims = []
for i in range(NDIMS):
dim = f.def_dim(chr(ord('x')+i), gsizes[i])
dims.append(dim)
# Define variables
vars = []
for i in range(NUM_VARS):
var = f.def_var("var{}".format(i), pnetcdf.NC_INT, dims)
vars.append(var)
# Exit the define mode
f.enddef()
# Get the MPI-IO hint objects, which containing all hints used
info_used = f.inq_info()
# Collectively write one variable at a time
for i in range(NUM_VARS):
# write using Python style subarray access
vars[i][start[0]:end[0], start[1]:end[1], start[2]:end[2]] = buf[i]
# Equivalently, below uses function call
vars[i].put_var_all(buf[i], start = start, count = count)
# Close the file
f.close()
def parse_help():
help_flag = "-h" in sys.argv or "--help" in sys.argv
if help_flag and rank == 0:
help_text = (
"Usage: {} [-h] | [-q] [file_name]\n"
" [-h] Print help\n"
" [-q] Quiet mode (reports when fail)\n"
" [-k format] file format: 1 for CDF-1, 2 for CDF-2, 5 for CDF-5\n"
" [-l len] size of each dimension of the local array\n"
" [filename] (Optional) output netCDF file name\n"
).format(sys.argv[0])
print(help_text)
return help_flag
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
if parse_help():
MPI.Finalize()
sys.exit(1)
# Get command-line arguments
args = None
parser = argparse.ArgumentParser()
parser.add_argument("dir", nargs="?", type=str, help="(Optional) output netCDF file name",\
default = "testfile.nc")
parser.add_argument("-q", help="Quiet mode (reports when fail)", action="store_true")
parser.add_argument("-k", help="File format: 1 for CDF-1, 2 for CDF-2, 5 for CDF-5")
parser.add_argument("-l", help="Size of each dimension of the local array\n")
args = parser.parse_args()
verbose = False if args.q else True
file_format = None
if args.k:
kind_dict = {'1':None, '2':"NC_64BIT_OFFSET", '5':"NC_64BIT_DATA"}
file_format = kind_dict[args.k]
length = 10
if args.l and int(args.l) > 0:
length = int(args.l)
filename = args.dir
if verbose and rank == 0:
print("{}: example of collective writes".format(os.path.basename(__file__)))
# Run I/O
try:
pnetcdf_io(filename, file_format, length)
except BaseException as err:
print("Error: type:", type(err), str(err))
raise
MPI.Finalize()