-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPthreads.cpp
474 lines (395 loc) · 14.6 KB
/
Pthreads.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
#include <cmath>
#include <algorithm>
#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <chrono>
#include <vector>
#define HAVE_STRUCT_TIMESPEC
#include <pthread.h>
using namespace std;
const int num_threads = 8;
const int best_record_each_thread = 5;
const int num_record_to_sort = num_threads * best_record_each_thread;
const int k_value = 3;
struct PthreadParams {
double** dataset;
const double* target;
double** distances;
int dataset_size;
int feature_size;
int start;
int end;
int thread_id;
};
struct quickSortParams {
double** distances;
int low;
int high;
};
class PthreadKnn {
private:
int neighbours_number;
public:
PthreadKnn(int k) : neighbours_number(k) {}
int predict_class(double* dataset[], const double* target, int dataset_size, int feature_size) {
double* distances[3];
int zeros_count = 0;
int ones_count = 0;
// Allocate memory for distances and index order
distances[0] = new double[dataset_size];
distances[1] = new double[dataset_size];
distances[2] = new double[dataset_size];
//chrono::steady_clock::time_point knnBegin = chrono::steady_clock::now();
get_knn(dataset, target, distances, dataset_size, feature_size);
//chrono::steady_clock::time_point knnEnd = chrono::steady_clock::now();
//cout << "KNN = " << chrono::duration_cast<chrono::microseconds>(knnEnd - knnBegin).count() << "[µs]" << endl;
//for (int i = 0; i < 100; i++) {
// cout << i + 1 <<") " << distances[0][i] << ", " << distances[1][i] << ", " << distances[2][i] << endl;
//}
#pragma region quickSorting
// creating 4 threads
pthread_t sortThreads[num_threads];
quickSortParams sortParams[num_threads];
int rows_per_thread = dataset_size / num_threads;
for (int i = 0; i < num_threads; i++) {
//calculate the start and end for each thread
int low = i * rows_per_thread;
int high = (i == num_threads - 1) ? dataset_size : (i + 1) * rows_per_thread;
sortParams[i] = { distances, low, high };
pthread_create(&sortThreads[i], NULL, quick_sort_parallel, &sortParams[i]);
}
for (int i = 0; i < num_threads; i++) {
pthread_join(sortThreads[i], nullptr);
}
double* finalSortedDistances[3];
finalSortedDistances[0] = new double[num_record_to_sort];
finalSortedDistances[1] = new double[num_record_to_sort];
finalSortedDistances[2] = new double[num_record_to_sort];
//extract first 5 from each thread (shortest distance for each thread)
//i < 3 because distance is 2d array and distance[3][i] is largest
for (int i = 0; i < 3; i++) {
// k < num_thread is to extract value from each thread
for (int k = 0; k < num_threads; k++) {
// to extract first 5 record from each thread
for (int j = 0; j < best_record_each_thread; j++) {
finalSortedDistances[i][(k * best_record_each_thread) + j] = distances[i][((dataset_size / num_threads) * k) + j];
}
}
}
//for (int i = 0; i < num_record_to_sort; i++) {
// cout << finalSortedDistances[0][i] << "," << finalSortedDistances[1][i] << "," << finalSortedDistances[2][i] << endl;
//}
//cout << "\n";
//sort again
//the number of record need to serial sort is rapidly decreased
quick_sort(finalSortedDistances, 0, num_record_to_sort - 1);
//for (int i = 0; i < num_record_to_sort; i++) {
// cout << finalSortedDistances[0][i] << "," << finalSortedDistances[1][i] << "," << finalSortedDistances[2][i] << endl;
//}
#pragma endregion
cout << "First K(" <<k_value<< ") value: " << endl;
//Count label occurrences in the K nearest neighbors
int count = 0;
for (int i = 0; count < neighbours_number; i++) {
if (finalSortedDistances[1][i] == 0 && finalSortedDistances[0][i] > 0) {
zeros_count += 1;
cout << "0: " << finalSortedDistances[0][i] << endl;
//cout << "0: " << distances[0][i] << "," << distances[2][i] << endl;
count++;
}
else if (finalSortedDistances[1][i] == 1 && finalSortedDistances[0][i] > 0) {
ones_count += 1;
cout << "1: " << finalSortedDistances[0][i] << endl;
//cout << "1: " << distances[0][i] << "," << distances[2][i] << endl;
count++;
}
}
//return prediction
int prediction = (zeros_count > ones_count) ? 0 : 1;
// Clean up memory
delete[] distances[0];
delete[] distances[1];
delete[] distances[2];
delete[] finalSortedDistances[0];
delete[] finalSortedDistances[1];
delete[] finalSortedDistances[2];
return prediction;
}
private:
static void* quick_sort_parallel(void* arg) {
//recieve parameter
quickSortParams* params = static_cast<quickSortParams*>(arg);
//call sort
quick_sort(params->distances, params->low, params->high - 1);
return nullptr;
}
// Function to partition the array into two sub-arrays and return the index of the pivot element
static int partition(double** distances, int low, int high) {
// Choose the last element as the pivot
double pivot = distances[0][high];
// Index of the smaller element
int i = low - 1;
//check if value of distance[0] is lesser than pivot value, if yes swap
//to move shorter distance to left
for (int j = low; j < high; j++) {
if (distances[0][j] <= pivot) {
i++;
swap(distances, i, j);
}
}
//placing the pivot in the correct position
swap(distances, i + 1, high);
return i + 1;
}
static void swap(double** distances, int i, int j) {
std::swap(distances[0][i], distances[0][j]);
std::swap(distances[1][i], distances[1][j]);
std::swap(distances[2][i], distances[2][j]);
}
static void quick_sort(double** distances, int low, int high) {
if (low < high) {
int pivotIndex = partition(distances, low, high);
// Recursively sort the sub-arrays
quick_sort(distances, low, pivotIndex - 1);
quick_sort(distances, pivotIndex + 1, high);
}
}
//to calculate euclidean distance
static double euclidean_distance(const double* x, const double* y, int feature_size) {
double l2 = 0.0;
//loop through each label in a row to calculate distance
for (int i = 1; i < feature_size; i++) {
l2 += pow((x[i] - y[i]), 2);
}
return sqrt(l2);
}
//function to be parse to pthread for multi-threading
static void* compute_distances(void* arg) {
//recieve parameters
PthreadParams* params = static_cast<PthreadParams*>(arg);
int count = 0;
//different thread is accessing different index range, so no race condition
for (int i = params->start; i < params->end; i++) {
if (params->dataset[i] == params->target) continue; // do not use the same point
params->distances[0][i] = euclidean_distance(params->target, params->dataset[i], params->feature_size);
params->distances[1][i] = params->dataset[i][0]; // Store outcome label
params->distances[2][i] = i; // Store index
count++;
}
//cout << "Thread " << params->thread_id << " - Number of euclidean run: " << count << endl;
return nullptr;
}
//the function to be call to get KNN
void get_knn(double* x[], const double* y, double* distances[3], int dataset_size, int feature_size) {
//create parameters to be parse to compute_distance function
PthreadParams knnParams[num_threads];
pthread_t knnThreads[num_threads];
//to calculate the number of dataset need to handled by each thread
int rows_per_thread = dataset_size / num_threads;
for (int i = 0; i < num_threads; i++) {
//assign start and end point for each thread
int start = i * rows_per_thread;
int end = (i == num_threads - 1) ? dataset_size : (i + 1) * rows_per_thread;
//store parameter
knnParams[i] = { x, y, distances, dataset_size, feature_size, start, end ,i };
//create and assign task to thread
pthread_create(&knnThreads[i], nullptr, compute_distances, &knnParams[i]);
}
//wait all thread to complete
for (int i = 0; i < num_threads; i++) {
pthread_join(knnThreads[i], nullptr);
}
}
};
class Knn {
private:
int neighbours_number;
public:
Knn(int k) : neighbours_number(k) {}
int predict_class(double* dataset[], const double* target, int dataset_size, int feature_size) {
double* distances[3];
int zeros_count = 0;
int ones_count = 0;
// Allocate memory for distances and index order
distances[0] = new double[dataset_size];
distances[1] = new double[dataset_size];
distances[2] = new double[dataset_size];
get_knn(dataset, target, distances, dataset_size, feature_size);
//for (int i = 0; i < 100; i++) {
// cout << i + 1 << ") " << distances[0][i] << ", " << distances[1][i] << ", " << distances[2][i] << endl;
//}
quick_sort(distances, 0, dataset_size - 1);
/*for (int i = 0; i < num_record_to_sort; i++) {
cout << distances[0][i] << "," << distances[1][i] << "," << distances[2][i] << endl;
}*/
cout << "First K(" << k_value << ") value: " << endl;
//Count label occurrences in the K nearest neighbors
int count = 0;
for (int i = 0; count < neighbours_number; i++) {
if (distances[1][i] == 0 && distances[0][i] > 0) {
zeros_count += 1;
cout << "0: " << distances[0][i] << endl;
//cout << "0: " << distances[0][i] << "," << distances[2][i] << endl;
count++;
}
else if (distances[1][i] == 1 && distances[0][i] > 0) {
ones_count += 1;
cout << "1: " << distances[0][i] << endl;
//cout << "1: " << distances[0][i] << "," << distances[2][i] << endl;
count++;
}
}
int prediction = (zeros_count > ones_count) ? 0 : 1;
// Clean up memory
delete[] distances[0];
delete[] distances[1];
delete[] distances[2];
return prediction;
}
private:
// Function to partition the array into two sub-arrays and return the index of the pivot element
static int partition(double** distances, int low, int high) {
// Choose the last element as the pivot
double pivot = distances[0][high];
// Index of the smaller element
int i = low - 1;
//check if value of distance[0] is lesser than pivot value, if yes swap
//to move shorter distance to left
for (int j = low; j < high; j++) {
if (distances[0][j] <= pivot) {
i++;
swap(distances, i, j);
}
}
//placing the pivot in the correct position
swap(distances, i + 1, high);
return i + 1;
}
static void swap(double** distances, int i, int j) {
std::swap(distances[0][i], distances[0][j]);
std::swap(distances[1][i], distances[1][j]);
std::swap(distances[2][i], distances[2][j]);
}
static void quick_sort(double** distances, int low, int high) {
if (low < high) {
int pivotIndex = partition(distances, low, high);
// Recursively sort the sub-arrays
quick_sort(distances, low, pivotIndex - 1);
quick_sort(distances, pivotIndex + 1, high);
}
}
double euclidean_distance(const double* x, const double* y, int feature_size) {
double l2 = 0.0;
for (int i = 1; i < feature_size; i++) {
l2 += pow((x[i] - y[i]), 2);
}
return sqrt(l2);
}
void get_knn(double* x[], const double* y, double* distances[3], int dataset_size, int feature_size) {
int count = 0;
for (int i = 0; i < dataset_size; i++) {
if (x[i] == y) continue; // do not use the same point
distances[0][count] = this->euclidean_distance(y, x[i], feature_size);
distances[1][count] = x[i][0]; // Store outcome label
distances[2][count] = i; // Store index
count++;
}
//cout << "Number of euclidean run:" << count << endl;
}
};
vector<double> parseLine(const string& line) {
vector<double> row;
istringstream iss(line);
string value;
while (getline(iss, value, ',')) {
try {
double num = stod(value);
row.push_back(num);
}
catch (const invalid_argument&) {
cerr << "Invalid data in CSV: " << value << endl;
}
}
return row;
}
int main() {
string filename = "diabetes_binary.csv";
//const int dataset_size = 30000;
//const int dataset_size = 100000;
const int dataset_size = 250000;
const int feature_size = 22;
double** dataset = new double* [dataset_size];
double target[feature_size] = { 0.0, 0.0, 0.0, 1.0, 24.0, 1.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, 3.0, 0.0, 0.0, 0.0, 2.0, 5.0, 3.0 };
//double target[feature_size] = { 1.0, 1.0, 1.0, 1.0, 30.0, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 5.0, 30.0, 30.0, 1.0, 0.0, 9.0, 5.0, 1.0 };
//double target[feature_size] = { 1.0, 0.0, 0.0, 1.0, 25.0, 1.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 3.0, 0.0, 0.0, 0.0, 1.0, 13.0, 6.0, 8.0 };
//double target[feature_size] = { 0.0, 1.0, 1.0, 1.0, 28.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 4.0, 0.0, 10.0, 1.0, 0.0, 12.0, 6.0, 2.0 };
// Allocate memory for dataset and target
for (int i = 0; i < dataset_size; i++) {
dataset[i] = new double[feature_size];
}
// Read data from CSV and populate dataset and target
ifstream file(filename);
if (!file.is_open()) {
cerr << "Error opening file: " << filename << endl;
return 1;
}
string header;
//to eliminate first line which is the header
getline(file, header);
string line;
int index = 0;
while (getline(file, line) && index < dataset_size) {
vector<double> row = parseLine(line);
for (int j = 0; j < feature_size; j++) {
dataset[index][j] = row[j];
}
index++;
}
cout << "Number of records: " << index << endl;
//Pthread Knn
#pragma region PthreadKnn
cout << "\nPthread KNN + Quick Sort: " << endl;
chrono::steady_clock::time_point pthreadBegin = chrono::steady_clock::now();
PthreadKnn pthreadknn(k_value); // Use K=3
int pthreadPrediction = pthreadknn.predict_class(dataset, target, dataset_size, feature_size);
cout << "Pthread Prediction: " << pthreadPrediction << endl;
if (pthreadPrediction == 0) {
cout << "Predicted class: Negative" << endl;
}
else if (pthreadPrediction == 1) {
cout << "Predicted class: Prediabetes or Diabetes" << endl;
}
else {
cout << "Prediction could not be made." << endl;
}
chrono::steady_clock::time_point pthreadEnd = chrono::steady_clock::now();
cout << "Classification Time = " << chrono::duration_cast<chrono::microseconds>(pthreadEnd - pthreadBegin).count() << "[µs]" << endl;
#pragma endregion
//Knn
#pragma region Knn
cout << "\nKNN + Quick Sort: " << endl;
chrono::steady_clock::time_point knnBegin = chrono::steady_clock::now();
Knn knn(k_value); // Use K=3
int prediction = knn.predict_class(dataset, target, dataset_size, feature_size);
cout << "KNN Prediction: " << prediction << endl;
if (prediction == 0) {
cout << "Predicted class: Negative" << endl;
}
else if (prediction == 1) {
cout << "Predicted class: Prediabetes or Diabetes" << endl;
}
else {
cout << "Prediction could not be made." << endl;
}
chrono::steady_clock::time_point knnEnd = chrono::steady_clock::now();
cout << "Classification Time = " << chrono::duration_cast<chrono::microseconds>(knnEnd - knnBegin).count() << "[µs]" << endl;
#pragma endregion
//cout << "The speed of classification is " << (double)((knnEnd - knnBegin) / (pthreadEnd - pthreadBegin)) << " Times fasters" << endl;
// Deallocate memory for dataset
for (int i = 0; i < dataset_size; i++) {
delete[] dataset[i];
}
return 0;
}