-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnonblocking_write_def.py
179 lines (142 loc) · 5.4 KB
/
nonblocking_write_def.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#
# Copyright (C) 2024, Northwestern University and Argonne National Laboratory
# See COPYRIGHT notice in top-level directory.
#
"""
This example is the same as nonblocking_write.py expect all nonblocking
write requests (calls to iput and bput) are posted in define mode.
It creates a netcdf file in CDF-5 format and writes a number of
3D integer non-record variables. The measured write bandwidth is reported
at the end. Usage: (for example)
To run:
mpiexec -n num_processes nonblocking_write_def.py [filename] [len]
where len decides the size of each local array, which is len x len x len.
So, each non-record variable is of size len*len*len * nprocs * sizeof(int)
All variables are partitioned among all processes in a 3D
block-block-block fashion. Below is an example standard output from
command:
mpiexec -n 32 python3 nonblocking_write_def.py tmp/test1.nc -l 100
MPI hint: cb_nodes = 2
MPI hint: cb_buffer_size = 16777216
MPI hint: striping_factor = 32
MPI hint: striping_unit = 1048576
Local array size 100 x 100 x 100 integers, size = 3.81 MB
Global array size 400 x 400 x 200 integers, write size = 0.30 GB
procs Global array size exec(sec) write(MB/s)
------- ------------------ --------- -----------
32 400 x 400 x 200 6.67 45.72
"""
import sys, os, argparse, inspect
import numpy as np
from mpi4py import MPI
import pnetcdf
def pnetcdf_io(file_name, length):
NDIMS = 3
NUM_VARS = 10
if verbose and rank == 0:
print("Number of variables = ", NUM_VARS)
print("Number of dimensions = ", NDIMS)
# set subarray access pattern
start = np.zeros(NDIMS, dtype=np.int32)
count = np.zeros(NDIMS, dtype=np.int32)
gsizes = np.zeros(NDIMS, dtype=np.int32)
buf = []
psizes = MPI.Compute_dims(nprocs, NDIMS)
start[0] = rank % psizes[0]
start[1] = (rank // psizes[1]) % psizes[1]
start[2] = (rank // (psizes[0] * psizes[1])) % psizes[2]
bufsize = 1
for i in range(NDIMS):
gsizes[i] = length * psizes[i]
start[i] *= length
count[i] = length
bufsize *= length
# Allocate buffer and initialize with non-zero numbers
for i in range(NUM_VARS):
buf.append(np.empty(bufsize, dtype=np.int32))
for j in range(bufsize):
buf[i][j] = rank * i + 123 + j
# Create the file
f = pnetcdf.File(filename = filename,
mode = 'w',
format = "NC_64BIT_DATA",
comm = comm,
info = None)
# Define dimensions
dims = []
for i in range(NDIMS):
dim = f.def_dim(chr(ord('x')+i), gsizes[i])
dims.append(dim)
# Define variables
vars = []
for i in range(NUM_VARS):
var = f.def_var("var{}".format(i), pnetcdf.NC_INT, dims)
vars.append(var)
# Write one variable at a time
for i in range(NUM_VARS):
vars[i].iput_var(buf[i], start = start, count = count)
# exit define mode and enter data mode
f.enddef()
# commit posted nonblocking requests
f.wait_all(num = pnetcdf.NC_REQ_ALL)
# use nonblocking bput APIs
# First, calculate the amount of space required
bbufsize = bufsize * NUM_VARS * np.dtype(np.int32).itemsize
f.attach_buff(bbufsize)
# call bput for writing to one variable at a time
reqs = []
for i in range(NUM_VARS):
req_id = vars[i].bput_var(buf[i], start = start, count = count)
reqs.append(req_id)
# can safely change contents or free up the buf[i] here
# wait for the nonblocking I/O to complete
req_errs = [None] * NUM_VARS
f.wait_all(NUM_VARS, reqs, req_errs)
# check errors
for i in range(NUM_VARS):
if pnetcdf.strerrno(req_errs[i]) != "NC_NOERR":
print(f"Error on request {i}:", pnetcdf.strerror(req_errs[i]))
# detach the temporary buffer
f.detach_buff()
# close the file
f.close()
def parse_help():
help_flag = "-h" in sys.argv or "--help" in sys.argv
if help_flag and rank == 0:
help_text = (
"Usage: {} [-h] | [-q] [file_name]\n"
" [-h] Print help\n"
" [-q] Quiet mode (reports when fail)\n"
" [-l len] size of each dimension of the local array\n"
" [filename] (Optional) output netCDF file name\n"
).format(sys.argv[0])
print(help_text)
return help_flag
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
if parse_help():
MPI.Finalize()
sys.exit(1)
# get command-line arguments
args = None
parser = argparse.ArgumentParser()
parser.add_argument("dir", nargs="?", type=str, help="(Optional) output netCDF file name",\
default = "testfile.nc")
parser.add_argument("-q", help="Quiet mode (reports when fail)", action="store_true")
parser.add_argument("-l", help="Size of each dimension of the local array\n")
args = parser.parse_args()
verbose = False if args.q else True
length = 10
if args.l and int(args.l) > 0:
length = int(args.l)
filename = args.dir
if verbose and rank == 0:
print("{}: example of nonblocking APIs in define mode".format(os.path.basename(__file__)))
try:
pnetcdf_io(filename, length)
except BaseException as err:
print("Error: type:", type(err), str(err))
raise
MPI.Finalize()