-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathWorkshop_19-Parallelization.R
256 lines (195 loc) · 7.32 KB
/
Workshop_19-Parallelization.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
### Handling Big Data ###
setwd("~/Desktop")
# Start by setting the seed so our results are reproducible
set.seed(12345)
# Next create some big fake sparse (lots of zeros) data:
big_data <- matrix(round(rnorm(10000000) * runif(10000000, min = 0,max = 0.2)),
nrow = 1000000,
ncol = 100)
# Find out how many entries are just zeros:
length(which(big_data == 0))
# Now lets represent this as a simple triplet matrix:
install.packages("slam", dependencies = TRUE)
library(slam)
# Now we can use this function provided by slam to transform our dense matrix
# into a sparse matrix:
sparse_big_data <- slam::as.simple_triplet_matrix(big_data)
# Lets check the value at this index in the matrix object:
big_data[1436,1]
# Now we can try the same thing for the sparse matrix object:
as.matrix(sparse_big_data[1436,1])
# Now lets see what happens when we save the data as a .csv vs. as an .RData
# file:
# install.packages("rio", dependencies = TRUE)
library(rio)
setwd("~/Desktop")
# Try saving in both formats and look at the file sizes:
export(big_data, file = "Data.csv")
save(big_data, file = "Data.RData")
## Parallelization ###
# Lets start with an example of parallelization using the foreach package in R:
# First we create some toy data:
num_cols <- 300
my_data <- matrix(rnorm(100000),
nrow = 100000,
ncol = num_cols)
# Define a function that we are going to run in parallel:
my_function <- function(col_number,
my_data){
temp <- 0
for (j in 1:nrow(my_data)) {
if (my_data[j,col_number] > 0.1) {
if (my_data[j,col_number] < 0.5) {
temp <- temp + sum(my_data[j,])
}
}
}
return(temp)
}
# We will rely on a couple of packages here, so make sure you have them
# installed:
# install.packages(c("doParallel","foreach"), dependencies = TRUE)
library(doParallel)
library(foreach)
# First we need to register the number of cores you want to use:
cores <- 4
# Then we create a cluster:
cl <- makePSOCKcluster(cores)
# Next we register the cluster with DoParallel:
registerDoParallel(cl)
# Run analysis in serial
system.time({
serial_results <- rep(0,num_cols)
for (i in 1:num_cols) {
if (i %% 10 == 0) {
print(i)
}
serial_results[i] <- my_function(i, my_data)
}
})
# Run analysis in parallel:
system.time({
parallel_results <- foreach(i = 1:num_cols,.combine = rbind) %dopar% {
cur_result <- my_function(i, my_data)
}
})
stopCluster(cl)
# Now lets try to redo one of our earlier examples managing multiple datasets:
read_in_dataset <- function(filename, # the name of the file we want to read in.
has_header, # option for whether file includes a header.
columns_to_keep) { # number of columns to keep in dataset.
# Assumes a .csv file, but could be modified ot deal with other file types:
temp <- read.csv(filename,
stringsAsFactors = F,
header = has_header)
# lets set some column names using 'paste()' for fun:
colnames(temp) <- paste("Bill",1:ncol(temp), sep = "_")
# If columns_to_keep was NULL, set it to the number of columns in the dataset.
if (is.null(columns_to_keep)) {
columns_to_keep <- ncol(temp)
}
temp <- temp[,1:columns_to_keep]
return(temp)
}
# Now lets define a function for extracting cosponsorship information from a
# dataset and putting it in a cosponsorship matrix:
generate_cosponsorship_matrix <- function(raw_data) {
# Get the number of legislators:
num_legislators <- nrow(raw_data)
# Create a sociomatrix:
sociomatrix <- matrix(0,
ncol = num_legislators,
nrow = num_legislators)
# Loop through bills (columns)
for (j in 1:ncol(raw_data)) {
# If there are a lot of bills, then we may want to check in with the
# user periodically to let them know about progress:
if (j %% 1000 == 0) {
cat("Working on bill",j,"of",ncol(raw_data),"\n")
}
# Find out who the bill sponsor is (coded as a 1):
for (i in 1:nrow(raw_data)) {
if (raw_data[i,j] == 1) {
sponsor <- i
}
}
# Find all of the cosponsors:
for (i in 1:nrow(raw_data)) {
if (raw_data[i,j] == 2) {
sociomatrix[i,sponsor] <- sociomatrix[i,sponsor] + 1
}
}
}
return(sociomatrix)
}
# Now we define a function that calls these two functions to do everything:
preprocess_data <- function(filenames,
has_header = FALSE,
columns_to_keep = NULL) {
# Create a list object to store the raw and preprocessed data:
data_list <- vector(mode = "list",
length = length(filenames))
# Loop over files and load/preprocess them
for (i in 1:length(filenames)) {
cat("Reading in file",i,"\n")
data_list[[i]]$raw_data <- read_in_dataset(filenames[i],
has_header,
columns_to_keep)
cat("Preprocessing file",i,"\n")
data_list[[i]]$sociomatrix <- generate_cosponsorship_matrix(data_list[[i]]$raw_data)
}
# Give list entries some basic names:
names(data_list) <- paste("Dataset",1:length(filenames), sep = "_")
# Now just return everything
return(data_list)
}
# Here we ar going to go one step further, and try this in parallel:
wrapper_function <- function(filename,
has_header,
columns_to_keep,
fun1,
fun2) {
# Create a blank list object
data_list <- list()
# Load in the data and save it to the list object:
data_list$raw_data <- fun1(filename,
has_header,
columns_to_keep)
# Preprocess the data:
data_list$sociomatrix <- fun2(data_list$raw_data)
# Return the list object:
return(data_list)
}
preprocess_parallel <- function(filenames,
cores = 1,
has_header = FALSE,
columns_to_keep = NULL,
fun1 = read_in_dataset,
fun2 = generate_cosponsorship_matrix) {
# Create a cluster instance:
cl <- parallel::makeCluster(getOption("cl.cores", cores))
# Run our function on the cluster:
data_list <- parallel::clusterApplyLB(
cl = cl,
x = filenames,
fun = wrapper_function,
has_header = has_header,
columns_to_keep = columns_to_keep,
fun1 = fun1,
fun2 = fun2)
# Stop the cluster when we are done:
parallel::stopCluster(cl)
# Give list entries some basic names:
names(data_list) <- paste("Dataset",1:length(filenames), sep = "_")
# Now just return everything:
return(data_list)
}
# Lets try it out!
setwd("~/Desktop/Data")
filenames <- list.files()
# Time the standard way of doing it:
system.time({
serial_data <- preprocess_data(filenames,
columns_to_keep = 200)
})
# Now try it out in parallel on 4 cores: