From 0c38637b15142057552fd5c31030bd0ecf5f5166 Mon Sep 17 00:00:00 2001 From: Soroush Bateni Date: Fri, 12 Nov 2021 10:29:50 -0600 Subject: [PATCH 1/5] Resurrected the benchmark runner reactor --- benchmark/C/Savina/src/BenchmarkRunner.lf | 204 ++++++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 benchmark/C/Savina/src/BenchmarkRunner.lf diff --git a/benchmark/C/Savina/src/BenchmarkRunner.lf b/benchmark/C/Savina/src/BenchmarkRunner.lf new file mode 100644 index 0000000000..61d1e10caf --- /dev/null +++ b/benchmark/C/Savina/src/BenchmarkRunner.lf @@ -0,0 +1,204 @@ +target C; + +/** + * Reactor that starts the kernel of a benchmark, measures its runtime and outputs + * the results for a given number of iterations. + * + * This reactor is instantiated by the main reactor of a benchmark and + * the startup reaction of this reactor is the starting point for that benchmark. + * The reactor runs a given number of iterations of the benchmark, measures + * the runtime of each iteration and outputs them. The benchmark itself is responsible + * to reset its state between the iterations. + * A benchmark can have an optional initialization phase that is run once before + * the first iteration and is not measured. + * A benchmark can have an optional cleanup phase after each iteration before + * the next iteration start which is not considered in the runtime measurement. + * + * How to use: + * - Instantiate this reactor in the main reactor of the benchmark. + * - Connect the ports inStart, outIterationStart, inIterationFinish with + * the appropriate reactors of the benchmark. + * - Optionally connect the ports for initialization and cleanup. + * - Create a startup reaction in the main reactor that calls printBenchmarkInfo(), + * + * Prototype startup reaction in the main reactor of a benchmark: + * runner = new BenchmarkRunner(num_iterations=num_iterations); + * reaction(startup) -> runner.inStart {= + * printBenchmarkInfo("ThreadRingReactorLFCppBenchmark"); + * printSystemInfo(); + * SET(runner.inStart, true); + * =} + * + * @param num_iterations How many times to execute the kernel of the benchmark to measure. + * @param use_init Benchmarks needs initialization and handles the corresponding signals. + * @param use_cleanup_iteration Benchmark needs cleanup after each iteration and handles the corresponding signals. + * + * @author Hannes Klein + * @author Shaokai Lin + */ +reactor BenchmarkRunner(num_iterations:int(12), use_init:bool(false), use_cleanup_iteration:bool(false)) { + + /** Signal to start execution. Set this input from a startup reaction in the main reactor. */ + input inStart:bool; + + /** Signals for starting and finishing the kernel and runtime measurement. */ + output outIterationStart:bool; + input inIterationFinish:bool; + + /** Signals for initializations that are not part of the measured kernel. */ + output outInitializeStart:bool; + input inInitializeFinish:bool; + + /** Signals for cleanup operations after each iteration of the kernel. */ + output outCleanupIterationStart:bool; + input inCleanupIterationFinish:bool; + + /** Events to switch between the phases of running the iterations. */ + logical action initBenchmark:bool; + logical action cleanupIteration:bool; + logical action nextIteration:bool; + logical action finish:bool; + + /** Number of iterations already executed. */ + state count:unsigned(0); + + /** Start time for runtime measurement. */ + state startTime:instant_t; + + /** Runtime measurements. */ + state measuredTimes:interval_t[]; + + + reaction(startup) {= + // Initialize an array of interval_t + self->measuredTimes = calloc(self->num_iterations, sizeof(interval_t)); + =} + + reaction(inStart) -> nextIteration, initBenchmark {= + if(self->use_init) { + schedule(initBenchmark, 0); + } else { + schedule(nextIteration, 0); + } + =} + + reaction(initBenchmark) -> outInitializeStart {= + SET(outInitializeStart, true); + =} + + reaction(inInitializeFinish) -> nextIteration {= + schedule(nextIteration, 0); + =} + + reaction(cleanupIteration) -> outCleanupIterationStart {= + SET(outCleanupIterationStart, true); + =} + + reaction(inCleanupIterationFinish) -> nextIteration {= + schedule(nextIteration, 0); + =} + + reaction(nextIteration) -> outIterationStart, finish {= + if (self->count < self->num_iterations) { + self->startTime = get_physical_time(); + SET(outIterationStart, true); + } else { + schedule(finish, 0); + } + =} + + reaction(inIterationFinish) -> nextIteration, cleanupIteration {= + interval_t end_time = get_physical_time(); + interval_t duration = end_time - self->startTime; + self->measuredTimes[self->count] = duration; + self->count += 1; + + printf("Iteration: %d\t Duration: %.3f msec\n", self->count, toMS(duration)); + + if(self->use_cleanup_iteration) { + schedule(cleanupIteration, 0); + } else { + schedule(nextIteration, 0); + } + =} + + reaction(finish) {= + double* measuredMSTimes = getMSMeasurements(self->measuredTimes, self->num_iterations); + qsort(measuredMSTimes, self->num_iterations, sizeof(double), comp); + + printf("Execution - Summary:\n"); + printf("Best Time:\t %.3f msec\n", measuredMSTimes[0]); + printf("Worst Time:\t %.3f msec\n", measuredMSTimes[self->num_iterations - 1]); + printf("Median Time:\t %.3f msec\n", median(measuredMSTimes, self->num_iterations)); + request_stop(); + =} + + preamble {= + + static double toMS(interval_t t) { + return t / 1000000.0; + } + + int comp (const void * elem1, const void * elem2) { + int f = *((double*)elem1); + int s = *((double*)elem2); + if (f > s) return 1; + if (f < s) return -1; + return 0; + } + + static double median(double* execTimes, int size) { + if (size == 0) { + return 0.0; + } + + int middle = size / 2; + if(size % 2 == 1) { + return execTimes[middle]; + } else { + return (execTimes[middle-1] + execTimes[middle]) / 2; + } + } + + static double* getMSMeasurements(interval_t* measured_times, int num_iterations) { + + double* msMeasurements = calloc(num_iterations, sizeof(double)); + for (int i = 0; i < num_iterations; i++) { + msMeasurements[i] = toMS(measured_times[i]); + } + + return msMeasurements; + } + =} + + preamble {= + + void printBenchmarkInfo(char* benchmarkId) { + printf("Benchmark: %s\n", benchmarkId); + } + + void printSystemInfo() { + + printf("System information\n"); + printf("O/S Name: "); + + #ifdef _WIN32 + printf("Windows 32-bit"); + #elif _WIN64 + printf("Windows 64-bit"); + #elif __APPLE__ || __MACH__ + printf("Mac OSX"); + #elif __linux__ + printf("Linux"); + #elif __FreeBSD__ + printf("FreeBSD"); + #elif __unix || __unix__ + printf("Unix"); + #else + printf("Other"); + #endif + + printf("\n"); + } + =} +} \ No newline at end of file From febe824e0680cb03ab7fb4450541f96b23496620 Mon Sep 17 00:00:00 2001 From: Soroush Bateni Date: Fri, 12 Nov 2021 10:30:43 -0600 Subject: [PATCH 2/5] Updated RecMatMul to use the BenchmarkRunner reactor --- benchmark/C/Savina/src/parallelism/MatMul.lf | 37 ++++++++++++++++--- .../savina_parallelism_recmatmul.yaml | 3 ++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/benchmark/C/Savina/src/parallelism/MatMul.lf b/benchmark/C/Savina/src/parallelism/MatMul.lf index 02d9b91239..b4e7a9a2e9 100644 --- a/benchmark/C/Savina/src/parallelism/MatMul.lf +++ b/benchmark/C/Savina/src/parallelism/MatMul.lf @@ -16,9 +16,12 @@ target C { threads: 0, /// [[[end]]] cmake-include: "../lib/matrix.cmake", - files: ["../lib/matrix.c", "../include/matrix.h"] + files: ["../lib/matrix.c", "../include/matrix.h"], + build-type: RelWithDebInfo }; +import BenchmarkRunner from "../BenchmarkRunner.lf"; + preamble {= #include #include @@ -144,6 +147,10 @@ reactor Manager(num_workers: int(20), data_length: size_t(1024)) { input[num_workers] more_work: {= work_item_t* =}; + // Ports to interact with the benchmarkRunner reactor + input start: bool; + output finished: bool; + logical action next; logical action done; @@ -156,17 +163,20 @@ reactor Manager(num_workers: int(20), data_length: size_t(1024)) { state work_stack: work_stack_t; - reaction (startup) -> data, next {= + reaction (startup) {= // Fill both input arrays with data self->A = mat_new_d(self->data_length, self->data_length); self->B = mat_new_d(self->data_length, self->data_length); - self->C = mat_new_d(self->data_length, self->data_length); for (size_t i = 0; i < self->data_length; ++i) { for (size_t j = 0; j < self->data_length; ++j) { *mat_at_d(self->A, i, j) = i; *mat_at_d(self->B, i, j) = j; } } + =} + + reaction (start) -> data, next {= + self->C = mat_new_d(self->data_length, self->data_length); SET_NEW_ARRAY(data, 3); data->value[0] = self->A; data->value[1] = self->B; @@ -209,13 +219,16 @@ reactor Manager(num_workers: int(20), data_length: size_t(1024)) { } =} - reaction (done) {= + reaction (done) -> finished {= check_if_valid(self->C, self->data_length); work_stack_free(self->work_stack); + mat_destroy_d(self->C); + SET_PRESENT(finished); + =} + + reaction(shutdown) {= mat_destroy_d(self->A); mat_destroy_d(self->B); - mat_destroy_d(self->C); - request_stop(); =} } @@ -275,20 +288,32 @@ reactor Worker(threshold: size_t(16384), data_length: size_t(1024)) { main reactor ( /*[[[cog + cog.outl(f'num_iterations: size_t({num_iterations}),') cog.outl(f'data_length: size_t({data_length}),') cog.outl(f'block_threshold: int({block_threshold}),') cog.outl(f'priorities: int({priorities}),') cog.outl(f'num_workers: int({num_workers})') ]]] */ + num_iterations: size_t(12), data_length: size_t(1024), block_threshold: int(16384), priorities: int(10), num_workers: int(20) /// [[[end]]] ) { + runner = new BenchmarkRunner(num_iterations=num_iterations); manager = new Manager(num_workers=num_workers, data_length=data_length); workers = new[num_workers] Worker(threshold=block_threshold, data_length=data_length); + reaction(startup) -> runner.inStart {= + printBenchmarkInfo("ThreadRingReactorLFCppBenchmark"); + printSystemInfo(); + SET(runner.inStart, true); + =} + + runner.outIterationStart -> manager.start; + manager.finished -> runner.inIterationFinish; + (manager.data)+ -> workers.data; manager.do_work -> workers.do_work; workers.more_work -> manager.more_work; diff --git a/benchmark/runner/conf/benchmark/savina_parallelism_recmatmul.yaml b/benchmark/runner/conf/benchmark/savina_parallelism_recmatmul.yaml index 2bbd954af3..48ed2f274b 100644 --- a/benchmark/runner/conf/benchmark/savina_parallelism_recmatmul.yaml +++ b/benchmark/runner/conf/benchmark/savina_parallelism_recmatmul.yaml @@ -5,6 +5,7 @@ params: data_length: 1024 block_threshold: 16384 priorities: 10 + num_iterations: 12 # target specific configuration targets: @@ -38,12 +39,14 @@ targets: workers: ["--numWorkers", ""] lf-c: copy_sources: + - "${lf_path}/benchmark/C/Savina/src/BenchmarkRunner.lf" - "${lf_path}/benchmark/C/Savina/src/parallelism/" - "${lf_path}/benchmark/C/Savina/src/lib/" - "${lf_path}/benchmark/C/Savina/src/include/" lf_file: "parallelism/MatMul.lf" binary: "MatMul" gen_args: + num_iterations: ["-D", "num_iterations="] data_length: ["-D", "data_length="] block_threshold: ["-D", "block_threshold="] priorities: ["-D", "priorities="] From 5a42cf81803518594f475b02e644229873bd6824 Mon Sep 17 00:00:00 2001 From: Soroush Bateni Date: Fri, 12 Nov 2021 11:11:05 -0600 Subject: [PATCH 3/5] Update benchmark/C/Savina/src/parallelism/MatMul.lf Co-authored-by: Peter Donovan <33707478+petervdonovan@users.noreply.github.com> --- benchmark/C/Savina/src/parallelism/MatMul.lf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/C/Savina/src/parallelism/MatMul.lf b/benchmark/C/Savina/src/parallelism/MatMul.lf index b4e7a9a2e9..1be4f92fec 100644 --- a/benchmark/C/Savina/src/parallelism/MatMul.lf +++ b/benchmark/C/Savina/src/parallelism/MatMul.lf @@ -306,7 +306,7 @@ main reactor ( workers = new[num_workers] Worker(threshold=block_threshold, data_length=data_length); reaction(startup) -> runner.inStart {= - printBenchmarkInfo("ThreadRingReactorLFCppBenchmark"); + printBenchmarkInfo("RecMatMulBenchmark"); printSystemInfo(); SET(runner.inStart, true); =} From cc26429ad752e863ac52649c3a1a481c6c2a5d8f Mon Sep 17 00:00:00 2001 From: Peter Donovan Date: Tue, 16 Nov 2021 01:07:15 -0800 Subject: [PATCH 4/5] Transpose B matrix for C, C++ versions. --- benchmark/C/Savina/src/parallelism/MatMul.lf | 12 +++++++++-- .../Cpp/Savina/src/parallelism/MatMul.lf | 21 +++++++++++++++---- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/benchmark/C/Savina/src/parallelism/MatMul.lf b/benchmark/C/Savina/src/parallelism/MatMul.lf index 1be4f92fec..9903aec4f9 100644 --- a/benchmark/C/Savina/src/parallelism/MatMul.lf +++ b/benchmark/C/Savina/src/parallelism/MatMul.lf @@ -125,6 +125,14 @@ reactor Manager(num_workers: int(20), data_length: size_t(1024)) { #include #include "matrix.h" + /* + * Accesses the given matrix using the assumption that it is the + * transpose of the matrix that we wish to represent. + */ + double* transposed_mat_at_d(matrix_t matrix, size_t i, size_t j) { + return mat_at_d(matrix, j, i); + } + /* * Prints a message if there exists an incorrect entry in the given matrix. */ @@ -170,7 +178,7 @@ reactor Manager(num_workers: int(20), data_length: size_t(1024)) { for (size_t i = 0; i < self->data_length; ++i) { for (size_t j = 0; j < self->data_length; ++j) { *mat_at_d(self->A, i, j) = i; - *mat_at_d(self->B, i, j) = j; + *transposed_mat_at_d(self->B, i, j) = j; } } =} @@ -277,7 +285,7 @@ reactor Worker(threshold: size_t(16384), data_length: size_t(1024)) { for (size_t k = 0; k < wi.dim; ++k) { *mat_at_d(self->C, i, j) += ( (*mat_at_d(self->A, i, wi.scA + k)) - * (*mat_at_d(self->B, wi.srB + k, j)) + * (*transposed_mat_at_d(self->B, wi.srB + k, j)) ); } } diff --git a/benchmark/Cpp/Savina/src/parallelism/MatMul.lf b/benchmark/Cpp/Savina/src/parallelism/MatMul.lf index 7f2addc46a..364cbaa164 100644 --- a/benchmark/Cpp/Savina/src/parallelism/MatMul.lf +++ b/benchmark/Cpp/Savina/src/parallelism/MatMul.lf @@ -43,6 +43,19 @@ public preamble {= #include #include #include "Matrix.hh" + + template + class TransposedMatrix { + private: + std::vector data; + size_t size_x; + + public: + TransposedMatrix(size_t size_x, size_t size_y) : data(size_x * size_y), size_x(size_x) {} + + const T& at(size_t x, size_t y) const { return data[y*size_x+x]; } + T& at(size_t x, size_t y) { return data[y*size_x+x]; } + }; struct WorkItem { size_t srA; // srA = start row in matrix A @@ -58,7 +71,7 @@ public preamble {= reactor Manager(numWorkers: size_t{20}, dataLength: size_t{1024}) { state A: Matrix(dataLength, dataLength); - state B: Matrix(dataLength, dataLength); + state B: TransposedMatrix(dataLength, dataLength); state C: Matrix(dataLength, dataLength); state workQueue: std::deque<{=reactor::ImmutableValuePtr=}>; @@ -69,7 +82,7 @@ reactor Manager(numWorkers: size_t{20}, dataLength: size_t{1024}) { input start: void; output finished: void; - output data: {=std::tuple*, const Matrix*, Matrix*>=}; + output data: {=std::tuple*, const TransposedMatrix*, Matrix*>=}; output[numWorkers] doWork: WorkItem; input[numWorkers] moreWork: {=std::array, 8>=}; @@ -149,10 +162,10 @@ reactor Manager(numWorkers: size_t{20}, dataLength: size_t{1024}) { reactor Worker(threshold: size_t{16384}) { state A: {=const Matrix*=}; - state B: {=const Matrix*=}; + state B: {=const TransposedMatrix*=}; state C: {=Matrix*=}; - input data: {=std::tuple*, const Matrix*, Matrix*>=}; + input data: {=std::tuple*, const TransposedMatrix*, Matrix*>=}; input doWork: WorkItem; output moreWork: {=std::array, 8>=}; From 6dbcf1746f626ee637e1d96881c1f95556086d12 Mon Sep 17 00:00:00 2001 From: Peter Donovan Date: Tue, 16 Nov 2021 01:20:17 -0800 Subject: [PATCH 5/5] Add the original benchmark alongside this version. This is based on a suggestion from Christian, since the Python runner script does not parse the output of the modified benchmark correctly. The original benchmark has a couple of minor corrections that also appear in the one that uses the benchmark runner. --- benchmark/C/Savina/src/parallelism/MatMul.lf | 33 +- .../src/parallelism/MatMulBenchmarkRunner.lf | 328 ++++++++++++++++++ 2 files changed, 332 insertions(+), 29 deletions(-) create mode 100644 benchmark/C/Savina/src/parallelism/MatMulBenchmarkRunner.lf diff --git a/benchmark/C/Savina/src/parallelism/MatMul.lf b/benchmark/C/Savina/src/parallelism/MatMul.lf index 9903aec4f9..44c49e708d 100644 --- a/benchmark/C/Savina/src/parallelism/MatMul.lf +++ b/benchmark/C/Savina/src/parallelism/MatMul.lf @@ -20,8 +20,6 @@ target C { build-type: RelWithDebInfo }; -import BenchmarkRunner from "../BenchmarkRunner.lf"; - preamble {= #include #include @@ -155,10 +153,6 @@ reactor Manager(num_workers: int(20), data_length: size_t(1024)) { input[num_workers] more_work: {= work_item_t* =}; - // Ports to interact with the benchmarkRunner reactor - input start: bool; - output finished: bool; - logical action next; logical action done; @@ -171,20 +165,17 @@ reactor Manager(num_workers: int(20), data_length: size_t(1024)) { state work_stack: work_stack_t; - reaction (startup) {= + reaction (startup) -> data, next {= // Fill both input arrays with data self->A = mat_new_d(self->data_length, self->data_length); self->B = mat_new_d(self->data_length, self->data_length); + self->C = mat_new_d(self->data_length, self->data_length); for (size_t i = 0; i < self->data_length; ++i) { for (size_t j = 0; j < self->data_length; ++j) { *mat_at_d(self->A, i, j) = i; *transposed_mat_at_d(self->B, i, j) = j; } } - =} - - reaction (start) -> data, next {= - self->C = mat_new_d(self->data_length, self->data_length); SET_NEW_ARRAY(data, 3); data->value[0] = self->A; data->value[1] = self->B; @@ -227,16 +218,12 @@ reactor Manager(num_workers: int(20), data_length: size_t(1024)) { } =} - reaction (done) -> finished {= + reaction (done) {= check_if_valid(self->C, self->data_length); work_stack_free(self->work_stack); - mat_destroy_d(self->C); - SET_PRESENT(finished); - =} - - reaction(shutdown) {= mat_destroy_d(self->A); mat_destroy_d(self->B); + mat_destroy_d(self->C); =} } @@ -296,32 +283,20 @@ reactor Worker(threshold: size_t(16384), data_length: size_t(1024)) { main reactor ( /*[[[cog - cog.outl(f'num_iterations: size_t({num_iterations}),') cog.outl(f'data_length: size_t({data_length}),') cog.outl(f'block_threshold: int({block_threshold}),') cog.outl(f'priorities: int({priorities}),') cog.outl(f'num_workers: int({num_workers})') ]]] */ - num_iterations: size_t(12), data_length: size_t(1024), block_threshold: int(16384), priorities: int(10), num_workers: int(20) /// [[[end]]] ) { - runner = new BenchmarkRunner(num_iterations=num_iterations); manager = new Manager(num_workers=num_workers, data_length=data_length); workers = new[num_workers] Worker(threshold=block_threshold, data_length=data_length); - reaction(startup) -> runner.inStart {= - printBenchmarkInfo("RecMatMulBenchmark"); - printSystemInfo(); - SET(runner.inStart, true); - =} - - runner.outIterationStart -> manager.start; - manager.finished -> runner.inIterationFinish; - (manager.data)+ -> workers.data; manager.do_work -> workers.do_work; workers.more_work -> manager.more_work; diff --git a/benchmark/C/Savina/src/parallelism/MatMulBenchmarkRunner.lf b/benchmark/C/Savina/src/parallelism/MatMulBenchmarkRunner.lf new file mode 100644 index 0000000000..9903aec4f9 --- /dev/null +++ b/benchmark/C/Savina/src/parallelism/MatMulBenchmarkRunner.lf @@ -0,0 +1,328 @@ +/** + * This benchmark is a C implementation of the parallel matrix multiplication + * algorithm that appears in the Savina suite, including the same race condition. + * + * For details on this benchmark, see the Cpp version from which it was derived: + * https://github.com/lf-lang/lingua-franca/blob/master/benchmark/C/Savina/src/parallelism/FilterBank.lf + */ + +target C { + /* [[[cog + if (threaded_runtime=="True"): + cog.outl(f"threads: {threads},") + else: + cog.outl("threads: 0,") + ]]] */ + threads: 0, + /// [[[end]]] + cmake-include: "../lib/matrix.cmake", + files: ["../lib/matrix.c", "../include/matrix.h"], + build-type: RelWithDebInfo +}; + +import BenchmarkRunner from "../BenchmarkRunner.lf"; + +preamble {= + #include + #include + + typedef struct work_item_t { + size_t srA; // srA = start row in matrix A + size_t scA; // scA = start column in matrix A + size_t srB; + size_t scB; + size_t srC; + size_t scC; + size_t num_blocks; // total number of elements per block in both dimensions + size_t dim; // number of elements in one dimension in one block + } work_item_t; + + typedef struct work_stack_t { + work_item_t** stacks; + size_t current_n_stacks; + // Invariant: current_stack <= next_item <= current_stack_end + // Invariant: 8 << current_n_stacks == current_stack_end - current_stack, unless + // current_n_stacks == 0 + // current_stack_end is never a valid memory location to use + work_item_t* current_stack; + work_item_t* next_item; + work_item_t* current_stack_end; + } work_stack_t; + + /* + * Allocates and initializes a work stack. + */ + work_stack_t work_stack_new() { + // 2^53 - 8 WorkItems is more than can fit in a computer, and it is fine for + // work stacks to be initialized with space for 50 pointers because for every + // work_stack_t that exists, at least one expensive parallel computation is + // performed, and the number of expensive computations that can be performed is + // limited. + work_item_t** stacks = malloc(50 * sizeof(work_item_t*)); + assert(stacks != NULL); + *stacks = NULL; + return (work_stack_t) { stacks, 0, NULL, NULL, NULL }; + } + + /* + * Frees the memory required by the work stack. This operation invalidates the work + * stack. + */ + void work_stack_free(work_stack_t work) { + for (size_t i = 1; i <= work.current_n_stacks; ++i) { + free((work.stacks)[i]); + } + } + + /* + * Pushes a work item to the given work stack. + * @param work A work stack. + * @param w A new work item. + */ + void work_stack_push(work_stack_t* work, work_item_t w) { + if (work->next_item == work->current_stack_end) { + size_t current_height = 8L << (++work->current_n_stacks); + work_item_t* current_stack = (work_item_t*) malloc( + current_height * sizeof(work_item_t) + ); + assert(work->current_n_stacks < 50); + (work->stacks)[work->current_n_stacks] = current_stack; + work->current_stack = current_stack; + work->next_item = current_stack; + work->current_stack_end = (work_item_t*) current_stack + current_height; + } + *(work->next_item++) = w; + } + + /* + * Pops (removes) an item from the given work stack. + * @return The work item at the top of the stack. + */ + work_item_t work_stack_pop(work_stack_t* work) { + if (work->next_item == work->current_stack) { + assert(work->current_n_stacks > 1); + size_t current_height = 8L << (--work->current_n_stacks); + free(work->current_stack); + work_item_t* current_stack = (work->stacks)[work->current_n_stacks]; + work_item_t* end = (work_item_t*) current_stack + current_height; + work->current_stack = current_stack; + work->next_item = end; + work->current_stack_end = end; + } + return *(--work->next_item); + } + + /* + * Returns whether the given work stack is empty. + */ + bool work_stack_empty(work_stack_t* work) { + return work->current_n_stacks <= 1 && (work->next_item == work->current_stack); + } +=} + +reactor Manager(num_workers: int(20), data_length: size_t(1024)) { + preamble {= + #include + #include "matrix.h" + + /* + * Accesses the given matrix using the assumption that it is the + * transpose of the matrix that we wish to represent. + */ + double* transposed_mat_at_d(matrix_t matrix, size_t i, size_t j) { + return mat_at_d(matrix, j, i); + } + + /* + * Prints a message if there exists an incorrect entry in the given matrix. + */ + void check_if_valid(matrix_t C, size_t data_length) { + for (size_t i = 0; i < data_length; ++i) { + for (size_t j = 0; j < data_length; ++j) { + double actual = *mat_at_d(C, i, j); + double expected = 1.0 * data_length * i * j; + if (fabs(actual - expected) > 0.0001) { + printf( + "Validation failed for (i,j)=(%li, %li) with (%f, %f)\n", + i, j, actual, expected + ); + return; + } + } + } + } + =} + + input[num_workers] more_work: {= work_item_t* =}; + + // Ports to interact with the benchmarkRunner reactor + input start: bool; + output finished: bool; + + logical action next; + logical action done; + + output data: matrix_t[]; + output[num_workers] do_work: work_item_t; + + state A: matrix_t; + state B: matrix_t; + state C: matrix_t; + + state work_stack: work_stack_t; + + reaction (startup) {= + // Fill both input arrays with data + self->A = mat_new_d(self->data_length, self->data_length); + self->B = mat_new_d(self->data_length, self->data_length); + for (size_t i = 0; i < self->data_length; ++i) { + for (size_t j = 0; j < self->data_length; ++j) { + *mat_at_d(self->A, i, j) = i; + *transposed_mat_at_d(self->B, i, j) = j; + } + } + =} + + reaction (start) -> data, next {= + self->C = mat_new_d(self->data_length, self->data_length); + SET_NEW_ARRAY(data, 3); + data->value[0] = self->A; + data->value[1] = self->B; + data->value[2] = self->C; + size_t num_blocks = self->data_length * self->data_length; + work_stack_t ws = work_stack_new(); + work_stack_push(&ws, (work_item_t) { + 0, 0, 0, 0, 0, 0, num_blocks, self->data_length + }); + self->work_stack = ws; + schedule(next, 0); + =} + + reaction (next) -> next, done, do_work {= + if (work_stack_empty(&(self->work_stack))) { + schedule(done, 0); + } else { + for ( + int i = 0; + i < do_work_width && !work_stack_empty(&(self->work_stack)); + ++i + ) { + work_item_t ws = work_stack_pop(&(self->work_stack)); + SET((do_work[i]), ws); + } + schedule(next, 0); + } + =} + + reaction (more_work) {= + // append all work items received from the workers to the internal work queue + for (int i = 0; i < more_work_width; ++i) { + if (more_work[i]->is_present) { + work_item_t* items = more_work[i]->value; + // If the port is present, then it certainly has exactly 8 WorkItems. + for (int j = 0; j < 8; ++j) { + work_stack_push(&(self->work_stack), items[j]); + } + } + } + =} + + reaction (done) -> finished {= + check_if_valid(self->C, self->data_length); + work_stack_free(self->work_stack); + mat_destroy_d(self->C); + SET_PRESENT(finished); + =} + + reaction(shutdown) {= + mat_destroy_d(self->A); + mat_destroy_d(self->B); + =} +} + +reactor Worker(threshold: size_t(16384), data_length: size_t(1024)) { + input data: matrix_t[]; + input do_work: work_item_t; + output more_work: {= work_item_t* =}; + + state A: {= matrix_t =}; + state B: {= matrix_t =}; + state C: {= matrix_t =}; + + reaction (data) {= + matrix_t* matrices = data->value; + self->A = matrices[0]; + self->B = matrices[1]; + self->C = matrices[2]; + =} + + reaction (do_work) -> more_work {= + work_item_t wi = do_work->value; + // If the number of blocks to process is above the threshold, + // then we split the problem into smaller chunks and generate more work items + if (wi.num_blocks > self->threshold) { + size_t dim = wi.dim / 2; + size_t num_blocks = wi.num_blocks / 4; + + SET_NEW_ARRAY(more_work, 8); + + more_work->value[0] = (work_item_t) {wi.srA , wi.scA , wi.srB , wi.scB , wi.srC , wi.scC , num_blocks, dim}; + more_work->value[1] = (work_item_t) {wi.srA , wi.scA + dim, wi.srB + dim, wi.scB , wi.srC , wi.scC , num_blocks, dim}; + more_work->value[2] = (work_item_t) {wi.srA , wi.scA , wi.srB , wi.scB + dim, wi.srC , wi.scC + dim, num_blocks, dim}; + more_work->value[3] = (work_item_t) {wi.srA , wi.scA + dim, wi.srB + dim, wi.scB + dim, wi.srC , wi.scC + dim, num_blocks, dim}; + more_work->value[4] = (work_item_t) {wi.srA + dim, wi.scA , wi.srB , wi.scB , wi.srC + dim, wi.scC , num_blocks, dim}; + more_work->value[5] = (work_item_t) {wi.srA + dim, wi.scA + dim, wi.srB + dim, wi.scB , wi.srC + dim, wi.scC , num_blocks, dim}; + more_work->value[6] = (work_item_t) {wi.srA + dim, wi.scA , wi.srB , wi.scB + dim, wi.srC + dim, wi.scC + dim, num_blocks, dim}; + more_work->value[7] = (work_item_t) {wi.srA + dim, wi.scA + dim, wi.srB + dim, wi.scB + dim, wi.srC + dim, wi.scC + dim, num_blocks, dim}; + + } else { + // otherwise we compute the result directly + size_t end_r = wi.srC + wi.dim; + size_t end_c = wi.scC + wi.dim; + + for (size_t i = wi.srC; i < end_r; ++i) { + for (size_t j = wi.scC; j < end_c; ++j) { + for (size_t k = 0; k < wi.dim; ++k) { + *mat_at_d(self->C, i, j) += ( + (*mat_at_d(self->A, i, wi.scA + k)) + * (*transposed_mat_at_d(self->B, wi.srB + k, j)) + ); + } + } + } + } + =} +} + +main reactor ( + /*[[[cog + cog.outl(f'num_iterations: size_t({num_iterations}),') + cog.outl(f'data_length: size_t({data_length}),') + cog.outl(f'block_threshold: int({block_threshold}),') + cog.outl(f'priorities: int({priorities}),') + cog.outl(f'num_workers: int({num_workers})') + ]]] */ + num_iterations: size_t(12), + data_length: size_t(1024), + block_threshold: int(16384), + priorities: int(10), + num_workers: int(20) + /// [[[end]]] +) { + runner = new BenchmarkRunner(num_iterations=num_iterations); + manager = new Manager(num_workers=num_workers, data_length=data_length); + workers = new[num_workers] Worker(threshold=block_threshold, data_length=data_length); + + reaction(startup) -> runner.inStart {= + printBenchmarkInfo("RecMatMulBenchmark"); + printSystemInfo(); + SET(runner.inStart, true); + =} + + runner.outIterationStart -> manager.start; + manager.finished -> runner.inIterationFinish; + + (manager.data)+ -> workers.data; + manager.do_work -> workers.do_work; + workers.more_work -> manager.more_work; +}