Skip to content

Commit e237ae8

Browse files
committed
Add parallel Radix Sort
1 parent ea7604e commit e237ae8

7 files changed

+3682
-1
lines changed

results/2022-07-13/RadixSort_UltimateTest_parallel.txt

+3,275
Large diffs are not rendered by default.

src/main/java/eu/happycoders/sort/demos/UltimateTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import eu.happycoders.sort.method.quicksort.PivotStrategy;
1717
import eu.happycoders.sort.method.quicksort.QuicksortImproved;
1818
import eu.happycoders.sort.method.quicksort.QuicksortVariant1;
19+
import eu.happycoders.sort.method.radixsort.ParallelRadixSortWithArrays;
20+
import eu.happycoders.sort.method.radixsort.ParallelRecursiveMsdRadixSortWithArrays;
1921
import eu.happycoders.sort.method.radixsort.RadixSortWithArraysAndCustomBase;
2022
import eu.happycoders.sort.method.radixsort.RadixSortWithCountingSortAndCustomBase;
2123
import eu.happycoders.sort.method.radixsort.RadixSortWithDynamicListsAndCustomBase;
@@ -64,7 +66,9 @@ public class UltimateTest {
6466
new RadixSortWithCountingSortAndCustomBase(10),
6567
new RadixSortWithCountingSortAndCustomBase(100),
6668
new RecursiveMsdRadixSortWithArraysAndCustomBase(10),
67-
new RecursiveMsdRadixSortWithArraysAndCustomBase(100)
69+
new RecursiveMsdRadixSortWithArraysAndCustomBase(100),
70+
new ParallelRadixSortWithArrays(),
71+
new ParallelRecursiveMsdRadixSortWithArrays()
6872
};
6973

7074
private static final int WARM_UPS = 2;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package eu.happycoders.sort.method.radixsort;
2+
3+
final class ParallelRadixSortHelper {
4+
5+
private ParallelRadixSortHelper() {}
6+
7+
static Segment[] splitIntoSegments(int[] elements) {
8+
int processors = Runtime.getRuntime().availableProcessors();
9+
10+
// Let's put at least 10 elements into a segment, otherwise the overhead would be very high
11+
int maxProcessors = Math.max(elements.length / 10, 1);
12+
if (processors > maxProcessors) {
13+
processors = maxProcessors;
14+
}
15+
16+
Segment[] segments = new Segment[processors];
17+
for (int i = 0; i < processors; i++) {
18+
int start = i * elements.length / processors;
19+
int nextStart = (i + 1) * elements.length / processors;
20+
segments[i] = new Segment(start, nextStart);
21+
}
22+
23+
return segments;
24+
}
25+
26+
static class Segment {
27+
private final int start;
28+
private final int end;
29+
30+
private final int[] bucketCounts = new int[10];
31+
private final int[] bucketWritePositions = new int[10];
32+
33+
private Segment(int start, int end) {
34+
this.start = start;
35+
this.end = end;
36+
}
37+
38+
int getStart() {
39+
return start;
40+
}
41+
42+
int getEnd() {
43+
return end;
44+
}
45+
46+
int getBucketCount(int bucketIndex) {
47+
return bucketCounts[bucketIndex];
48+
}
49+
50+
void setCounts(int[] counts) {
51+
System.arraycopy(counts, 0, bucketCounts, 0, 10);
52+
}
53+
54+
int getBucketWritePosition(int bucketIndex) {
55+
return bucketWritePositions[bucketIndex];
56+
}
57+
58+
int getAndIncrementBucketWritePosition(int bucketIndex) {
59+
int position = bucketWritePositions[bucketIndex];
60+
bucketWritePositions[bucketIndex]++;
61+
return position;
62+
}
63+
64+
void setBucketWritePosition(int bucketIndex, int position) {
65+
bucketWritePositions[bucketIndex] = position;
66+
}
67+
68+
void resetBucketWritePositions() {
69+
for (int i = 0; i < 10; i++) {
70+
bucketWritePositions[i] = 0;
71+
}
72+
}
73+
}
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package eu.happycoders.sort.method.radixsort;
2+
3+
import static eu.happycoders.sort.method.radixsort.ParallelRadixSortHelper.splitIntoSegments;
4+
import static eu.happycoders.sort.method.radixsort.RadixSortHelper.calculateDivisor;
5+
import static eu.happycoders.sort.method.radixsort.RadixSortHelper.checkIfContainsNegatives;
6+
import static eu.happycoders.sort.method.radixsort.RadixSortHelper.getNumberOfDigits;
7+
import static eu.happycoders.sort.utils.ArrayUtils.getMaximum;
8+
9+
import eu.happycoders.sort.method.Counters;
10+
import eu.happycoders.sort.method.SortAlgorithm;
11+
import eu.happycoders.sort.method.radixsort.ParallelRadixSortHelper.Segment;
12+
import eu.happycoders.sort.utils.NotImplementedException;
13+
import java.util.Arrays;
14+
15+
/**
16+
* Parallel radix Sort implementation using arrays as buckets.
17+
*
18+
* @author <a href="sven@happycoders.eu">Sven Woltmann</a>
19+
*/
20+
@SuppressWarnings("PMD.TooManyStaticImports")
21+
public class ParallelRadixSortWithArrays implements SortAlgorithm {
22+
23+
@Override
24+
public void sort(int[] elements) {
25+
checkIfContainsNegatives(elements);
26+
int max = getMaximum(elements);
27+
int numberOfDigits = getNumberOfDigits(max);
28+
29+
// 1. Divide the input into segments to be processed in parallel
30+
Segment[] segments = splitIntoSegments(elements);
31+
32+
for (int digitIndex = 0; digitIndex < numberOfDigits; digitIndex++) {
33+
sortByDigit(elements, segments, digitIndex);
34+
}
35+
}
36+
37+
private void sortByDigit(int[] elements, Segment[] segments, int digitIndex) {
38+
Bucket[] buckets = partition(elements, segments, digitIndex);
39+
collect(buckets, elements);
40+
}
41+
42+
private Bucket[] partition(int[] elements, Segment[] segments, int digitIndex) {
43+
// 2. Calculate in parallel per segment how many elements have to be sorted into which buckets
44+
Arrays.stream(segments)
45+
.parallel()
46+
.forEach(
47+
segment -> {
48+
int[] counts = countDigits(elements, digitIndex, segment);
49+
segment.setCounts(counts);
50+
});
51+
52+
// 3. Calculate
53+
// a) the total number of elements per bucket and
54+
// b) the write position for each segment-bucket combination
55+
int[] counts = new int[10];
56+
for (int segmentIndex = 0; segmentIndex < segments.length; segmentIndex++) {
57+
Segment segment = segments[segmentIndex];
58+
segment.resetBucketWritePositions();
59+
for (int bucketIndex = 0; bucketIndex < 10; bucketIndex++) {
60+
counts[bucketIndex] += segment.getBucketCount(bucketIndex);
61+
62+
if (segmentIndex > 0) {
63+
int previousSegmentBucketWritePosition =
64+
segments[segmentIndex - 1].getBucketWritePosition(bucketIndex);
65+
int previousSegmentBucketCount = segments[segmentIndex - 1].getBucketCount(bucketIndex);
66+
segment.setBucketWritePosition(
67+
bucketIndex, previousSegmentBucketWritePosition + previousSegmentBucketCount);
68+
}
69+
}
70+
}
71+
72+
Bucket[] buckets = createBuckets(counts);
73+
74+
// 4. Distribute the elements of the segments in parallel to the buckets
75+
Arrays.stream(segments)
76+
.parallel()
77+
.forEach(segment -> distributeToBuckets(elements, segment, digitIndex, buckets));
78+
79+
return buckets;
80+
}
81+
82+
private int[] countDigits(int[] elements, int digitIndex, Segment segment) {
83+
int[] counts = new int[10];
84+
int divisor = calculateDivisor(digitIndex);
85+
86+
for (int i = segment.getStart(); i < segment.getEnd(); i++) {
87+
int element = elements[i];
88+
int digit = element / divisor % 10;
89+
counts[digit]++;
90+
}
91+
92+
return counts;
93+
}
94+
95+
private Bucket[] createBuckets(int[] counts) {
96+
Bucket[] buckets = new Bucket[10];
97+
for (int i = 0; i < 10; i++) {
98+
buckets[i] = new Bucket(counts[i]);
99+
}
100+
return buckets;
101+
}
102+
103+
private void distributeToBuckets(
104+
int[] elements, Segment segment, int digitIndex, Bucket[] buckets) {
105+
int divisor = calculateDivisor(digitIndex);
106+
107+
for (int i = segment.getStart(); i < segment.getEnd(); i++) {
108+
int element = elements[i];
109+
int digit = element / divisor % 10;
110+
int indexWithinTheBucket = segment.getAndIncrementBucketWritePosition(digit);
111+
buckets[digit].set(indexWithinTheBucket, element);
112+
}
113+
}
114+
115+
private void collect(Bucket[] buckets, int[] elements) {
116+
// 5. For each bucket, calculate the offset in the target array
117+
// (= prefix sum over counts)
118+
calculateTargetOffsets(buckets);
119+
120+
// 6. Collect the elements from the buckets for each bucket in parallel
121+
Arrays.stream(buckets)
122+
.parallel()
123+
.forEach(
124+
bucket -> {
125+
int targetOffset = bucket.getTargetOffset();
126+
int targetIndex = 0;
127+
for (int element : bucket.getElements()) {
128+
elements[targetOffset + targetIndex] = element;
129+
targetIndex++;
130+
}
131+
});
132+
}
133+
134+
private void calculateTargetOffsets(Bucket[] buckets) {
135+
for (int i = 0; i < 10; i++) {
136+
int targetOffset;
137+
if (i > 0) {
138+
Bucket previousBucket = buckets[i - 1];
139+
targetOffset = previousBucket.getTargetOffset() + previousBucket.getElements().length;
140+
} else {
141+
targetOffset = 0;
142+
}
143+
buckets[i].setTargetOffset(targetOffset);
144+
}
145+
}
146+
147+
@Override
148+
public void sortWithCounters(int[] elements, Counters counters) {
149+
throw new NotImplementedException();
150+
}
151+
152+
@Override
153+
public boolean supportsCounting() {
154+
return false;
155+
}
156+
157+
private static class Bucket {
158+
private final int[] elements;
159+
private int targetOffset;
160+
161+
private Bucket(int size) {
162+
elements = new int[size];
163+
}
164+
165+
private void set(int index, int element) {
166+
elements[index] = element;
167+
}
168+
169+
private int[] getElements() {
170+
return elements;
171+
}
172+
173+
private int getTargetOffset() {
174+
return targetOffset;
175+
}
176+
177+
private void setTargetOffset(int targetOffset) {
178+
this.targetOffset = targetOffset;
179+
}
180+
}
181+
}

0 commit comments

Comments
 (0)