diff --git a/core/port.c b/core/port.c new file mode 100644 index 000000000..c74d764b6 --- /dev/null +++ b/core/port.c @@ -0,0 +1,147 @@ +/** + * @file + * @author Edward A. Lee (eal@berkeley.edu) + * + * @section LICENSE + * Copyright (c) 2022, The University of California at Berkeley. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @section DESCRIPTION + * + * Header file for macros, functions, and structs for optimized sparse I/O + * through multiports. + * + */ +#include "port.h" +#include + +/** + * A vector of pointers to the size fields of instances of + * lf_sparse_io_record_t so that these can be set to 0 between iterations. + * The start field of this struct will be NULL initially, so calling + * vector_new(_lf_sparse_io_record_sizes) will be necessary to use this. + */ +struct vector_t _lf_sparse_io_record_sizes; + +/** + * Compare two non-negative integers pointed to. Return -1 if a < b, 0 if a == b, + * and 1 if a > b. + * @param a Pointer to the first integer. + * @param b Pointer to the second integer. + */ +int compare_sizes(const void* a, const void* b) { + if (*(size_t*)a < *(size_t*)b) { + return -1; + } else if (*(size_t*)a > *(size_t*)b) { + return 1; + } else { + return 0; + } +} + +/** + * Given an array of pointers to port structs, return an iterator + * that can be used to iterate over the present channels. + * @param port An array of pointers to port structs. + * @param width The width of the multiport (or a negative number if not + * a multiport). + */ +lf_multiport_iterator_t _lf_multiport_iterator_impl(lf_port_base_t** port, int width) { + // NOTE: Synchronization is not required because all writers must have + // completed by the time this is invoked. + struct lf_multiport_iterator_t result = (lf_multiport_iterator_t) { + .next = -1, + .idx = -1, // Indicate that lf_multiport_next() has not been called. + .port = port, + .width = width + }; + if (width <= 0) return result; + if (port[0]->sparse_record && port[0]->sparse_record->size >= 0) { + // Sparse record is enabled and ready to use. + if (port[0]->sparse_record->size > 0) { + // Need to sort it first (if the length is greater than 1). + if (port[0]->sparse_record->size > 1) { + qsort( + &port[0]->sparse_record->present_channels[0], + (size_t)port[0]->sparse_record->size, + sizeof(size_t), + &compare_sizes + ); + } + result.next = port[0]->sparse_record->present_channels[0]; + } + return result; + } + // Fallback is to iterate over all port structs representing channels. + int start = 0; + while(start < width) { + if (port[start]->is_present) { + result.next = start; + return result; + } + start++; + } + return result; +} + +/** + * Return the channel number of the next present input on the multiport + * or -1 if there are no more present channels. + * @param iterator The iterator. + */ +int lf_multiport_next(lf_multiport_iterator_t* iterator) { + // If the iterator has not been used, return next. + if (iterator->idx < 0) { + iterator->idx = 0; + return iterator->next; + } + // If the iterator is already exhausted, return. + if (iterator->next < 0 || iterator->width <= 0) { + return -1; + } + struct lf_sparse_io_record_t* sparse_record + = iterator->port[iterator->idx]->sparse_record; + if (sparse_record && sparse_record->size >= 0) { + // Sparse record is enabled and ready to use. + iterator->idx++; + if (iterator->idx >= sparse_record->size) { + // No more present channels. + iterator->next = -1; + } else { + iterator->next = sparse_record->present_channels[iterator->idx]; + } + return iterator->next; + } else { + // Fall back to iterate over all port structs representing channels. + int start = iterator->next + 1; + while(start < iterator->width) { + if (iterator->port[start]->is_present) { + iterator->next = start; + return iterator->next; + } + start++; + } + // No more present channels found. + iterator->next = -1; + return iterator->next; + } +} diff --git a/core/port.h b/core/port.h new file mode 100644 index 000000000..24aab4a14 --- /dev/null +++ b/core/port.h @@ -0,0 +1,152 @@ +/** + * @file + * @author Edward A. Lee (eal@berkeley.edu) + * + * @section LICENSE + * Copyright (c) 2022, The University of California at Berkeley. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @section DESCRIPTION + * + * This header file is for macros, functions, and structs for optimized sparse + * input through multiports. When reading from a wide input multiport, + * before this optimization, it was necessary for a reactor to test each + * channel for presence each time a reaction was triggered by the multiport. + * If few of the input channels are present, this can be very inefficient. + * To more efficiently handle this situation, reactor authors should + * annotate the input port with the "@sparse" annotation and use + * lf_multiport_iterator() to read from an input multiport. For example: + * + * ``` + * @sparse + * input[100] in:int; + * reaction(in) {= + * struct lf_multiport_iterator_t i = lf_multiport_iterator(in); + * int channel = lf_multiport_next(&i); + * while(channel >= 0) { + * printf("Received %d on channel %d.\n", in[channel]->value, channel); + * channel = lf_multiport_next(&i); + * } + * =} + * ``` + * + * The `lf_multiport_iterator()` function constructs an iterator (which is + * a struct) that can be passed to the `lf_multiport_next()` function to + * obtain the first channel number with a present input. Subsequent calls + * to `lf_multiport_next()` return the next channel index that is present + * until there are no more, at which point they return -1. + * + * The way this works is that for each input multiport p1 that is marked + * @sparse, a struct s of type `lf_sparse_io_record_t` will + * be dynamically allocated and a pointer to this struct will be put on the + * self struct in a field named "portname__sparse". Each port channel struct + * within the multiport will be given a pointer to s. + */ + +#ifndef PORT_H +#define PORT_H + +#include +#include +#include "utils/vector.h" + +/** Threshold for width of multiport s.t. sparse reading is supported. */ +#define LF_SPARSE_WIDTH_THRESHOLD 10 + +/** + * Divide LF_SPARSE_WIDTH_THRESHOLD by this number to get the capacity of a + * sparse input record for a multiport. + */ +#define LF_SPARSE_CAPACITY_DIVIDER 10 + +/** + * A record of the subset of channels of a multiport that have present inputs. + */ +typedef struct lf_sparse_io_record_t { + int size; // -1 if overflowed. 0 if empty. + size_t capacity; // Max number of writes to be considered sparse. + size_t present_channels[]; // Array of channel indices that are present. +} lf_sparse_io_record_t; + +/** + * Port structs are customized types because their payloads are type + * specific. This struct represents their common features. Given any + * pointer to a port struct, it can be cast to lf_port_base_t and then + * these common fields can be accessed. + */ +typedef struct lf_port_base_t { + bool is_present; + lf_sparse_io_record_t* sparse_record; // NULL if there is no sparse record. + int destination_channel; // -1 if there is no destination. +} lf_port_base_t; + +/** + * An iterator over a record of the subset of channels of a multiport that + * have present inputs. To use this, create an iterator using the function + * lf_multiport_iterator(). That function returns a struct that can be + * passed to the lf_multiport_next() function to obtain the next channel + * number of a present input (or -1 if there is no next present input). + */ +typedef struct lf_multiport_iterator_t { + int next; + int idx; // Index in the record of next or -1 if lf_multiport_next has not been called. + lf_port_base_t** port; + int width; +} lf_multiport_iterator_t; + + +/** + * A vector of pointers to the size fields of instances of + * lf_sparse_io_record_t so that these can be set to 0 between iterations. + * The start field of this struct will be NULL initially, so calling + * vector_new(_lf_sparse_io_record_sizes) will be necessary to use this. + */ +extern struct vector_t _lf_sparse_io_record_sizes; + +/** + * Given an array of pointers to port structs, return an iterator + * that can be used to iterate over the present channels. + * @param port An array of pointers to port structs. + * @param width The width of the multiport (or a negative number if not + * a multiport). + */ +lf_multiport_iterator_t _lf_multiport_iterator_impl(lf_port_base_t** port, int width); + +/** + * Macro for creating an iterator over an input multiport. + * The argument is the port name. This returns an instance of + * lf_multiport_iterator_t on the stack, a pointer to which should be + * passed to lf_multiport_iterator_next() to advance. + */ +#define lf_multiport_iterator(in) (_lf_multiport_iterator_impl( \ + (lf_port_base_t**)self->_lf_ ## in, \ + self->_lf_ ## in ## _width)) + +/** + * Return the channel number of the next present input on the multiport + * or -1 if there are no more present channels. + * @param iterator The iterator. + */ +int lf_multiport_next(lf_multiport_iterator_t* iterator); + +#endif /* PORT_H */ +/** @} */ diff --git a/core/reactor.c b/core/reactor.c index e65489bd6..996a40c13 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -99,19 +99,32 @@ trigger_handle_t _lf_schedule_copy(void* action, interval_t offset, void* value, } /** - * Mark the given is_present field as true. This is_present field + * Mark the given port's is_present field as true. This is_present field * will later be cleaned up by _lf_start_time_step. - * This assumes that the mutex is not held. - * @param is_present_field A pointer to the is_present field that - * must be set. + * @param port A pointer to the port struct. */ -void _lf_set_present(bool* is_present_field) { +void _lf_set_present(lf_port_base_t* port) { + bool* is_present_field = &port->is_present; if (_lf_is_present_fields_abbreviated_size < _lf_is_present_fields_size) { _lf_is_present_fields_abbreviated[_lf_is_present_fields_abbreviated_size] = is_present_field; } _lf_is_present_fields_abbreviated_size++; *is_present_field = true; + + // Support for sparse destination multiports. + if(port->sparse_record + && port->destination_channel >= 0 + && port->sparse_record->size >= 0) { + int next = port->sparse_record->size++; + if (next >= port->sparse_record->capacity) { + // Buffer is full. Have to revert to the classic iteration. + port->sparse_record->size = -1; + } else { + port->sparse_record->present_channels[next] + = port->destination_channel; + } + } } /** diff --git a/core/reactor.h b/core/reactor.h index f2cff06b4..79a65ce97 100644 --- a/core/reactor.h +++ b/core/reactor.h @@ -62,6 +62,7 @@ #include "utils/util.h" #include "tag.h" // Time-related functions. #include "modal_models/modes.h" // Modal model support +#include "port.h" // The following file is also included, but must be included // after its requirements are met, so the #include appears at @@ -85,7 +86,7 @@ // problems with if ... else statements that do not use braces around the // two branches. -void _lf_set_present(bool* is_present_field); +void _lf_set_present(lf_port_base_t* port); /** * Set the specified output (or input of a contained reactor) @@ -106,7 +107,7 @@ do { \ /* We need to assign "val" to "out->value" since we need to give "val" an address */ \ /* even if it is a literal */ \ out->value = val; \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ if (out->token != NULL) { \ /* The cast "*((void**) &out->value)" is a hack to make the code */ \ /* compile with non-token types where val is not a pointer. */ \ @@ -139,7 +140,7 @@ do { \ #ifndef __cplusplus #define _LF_SET_ARRAY(out, val, length) \ do { \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ lf_token_t* token = _lf_initialize_token_with_value(out->token, val, length); \ token->ref_count = out->num_destinations; \ out->token = token; \ @@ -148,7 +149,7 @@ do { \ #else #define _LF_SET_ARRAY(out, val, length) \ do { \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ lf_token_t* token = _lf_initialize_token_with_value(out->token, val, length); \ token->ref_count = out->num_destinations; \ out->token = token; \ @@ -173,7 +174,7 @@ do { \ #ifndef __cplusplus #define _LF_SET_NEW(out) \ do { \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ lf_token_t* token = _lf_set_new_array_impl(out->token, 1, out->num_destinations); \ out->value = token->value; \ out->token = token; \ @@ -181,7 +182,7 @@ do { \ #else #define _LF_SET_NEW(out) \ do { \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ lf_token_t* token = _lf_set_new_array_impl(out->token, 1, out->num_destinations); \ out->value = static_castvalue)>(token->value); \ out->token = token; \ @@ -204,7 +205,7 @@ do { \ #ifndef __cplusplus #define _LF_SET_NEW_ARRAY(out, len) \ do { \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ lf_token_t* token = _lf_set_new_array_impl(out->token, len, out->num_destinations); \ out->value = token->value; \ out->token = token; \ @@ -213,7 +214,7 @@ do { \ #else #define _LF_SET_NEW_ARRAY(out, len) \ do { \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ lf_token_t* token = _lf_set_new_array_impl(out->token, len, out->num_destinations); \ out->value = static_castvalue)>(token->value); \ out->token = token; \ @@ -229,9 +230,9 @@ do { \ * after this is called. * @param out The output port (by name). */ -#define _LF_SET_PRESENT(out) \ +#define lf_set_present(out) \ do { \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ } while(0) /** @@ -246,7 +247,7 @@ do { \ #ifndef __cplusplus #define _LF_SET_TOKEN(out, newtoken) \ do { \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ out->value = newtoken->value; \ out->token = newtoken; \ newtoken->ref_count += out->num_destinations; \ @@ -255,7 +256,7 @@ do { \ #else #define _LF_SET_TOKEN(out, newtoken) \ do { \ - _lf_set_present(&out->is_present); \ + _lf_set_present((lf_port_base_t*)out); \ out->value = static_castvalue)>(newtoken->value); \ out->token = newtoken; \ newtoken->ref_count += out->num_destinations; \ diff --git a/core/reactor_common.c b/core/reactor_common.c index 3d8c045fd..843e438a0 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -38,9 +38,11 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "reactor.h" #include "tag.c" #include "utils/pqueue.c" +#include "utils/vector.c" #include "utils/pqueue_support.h" #include "utils/util.c" #include "modal_models/modes.c" +#include "port.c" /** * Indicator of whether to wait for physical time to match logical time. @@ -464,6 +466,19 @@ void _lf_start_time_step() { for(int i = 0; i < size; i++) { *is_present_fields[i] = false; } + // Reset sparse IO record sizes to 0, if any. + if (_lf_sparse_io_record_sizes.start != NULL) { + for (size_t i = 0; i < vector_size(&_lf_sparse_io_record_sizes); i++) { + // NOTE: vector_at does not return the element at + // the index, but rather returns a pointer to that element, which is + // itself a pointer. + int** size = (int**)vector_at(&_lf_sparse_io_record_sizes, i); + if (size != NULL && *size != NULL) { + **size = 0; + } + } + } + #ifdef FEDERATED_DECENTRALIZED for (int i = 0; i < _lf_is_present_fields_size; i++) { // FIXME: For now, an intended tag of (NEVER, 0) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 7faed135c..13365d37b 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -356,18 +356,33 @@ trigger_handle_t _lf_schedule_value(void* action, interval_t extra_delay, void* } /* - * Mark the given is_present field as true. This is_present field + * Mark the given port's is_present field as true. This is_present field * will later be cleaned up by _lf_start_time_step. * This assumes that the mutex is not held. - * @param is_present_field A pointer to the is_present field that + * @param port A pointer to the port struct. * must be set. */ -void _lf_set_present(bool* is_present_field) { +void _lf_set_present(lf_port_base_t* port) { + bool* is_present_field = &port->is_present; int ipfas = lf_atomic_fetch_add(&_lf_is_present_fields_abbreviated_size, 1); if (ipfas < _lf_is_present_fields_size) { _lf_is_present_fields_abbreviated[ipfas] = is_present_field; } *is_present_field = true; + + // Support for sparse destination multiports. + if(port->sparse_record + && port->destination_channel >= 0 + && port->sparse_record->size >= 0) { + int next = lf_atomic_fetch_add(&port->sparse_record->size, 1); + if (next >= port->sparse_record->capacity) { + // Buffer is full. Have to revert to the classic iteration. + port->sparse_record->size = -1; + } else { + port->sparse_record->present_channels[next] + = port->destination_channel; + } + } } /** diff --git a/core/threaded/scheduler_GEDF_NP_CI.c b/core/threaded/scheduler_GEDF_NP_CI.c index 037305516..e43b65645 100644 --- a/core/threaded/scheduler_GEDF_NP_CI.c +++ b/core/threaded/scheduler_GEDF_NP_CI.c @@ -43,10 +43,11 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "../platform.h" #include "../utils/pqueue_support.h" #include "../utils/semaphore.h" -#include "../utils/vector.c" +#include "../utils/vector.h" #include "scheduler.h" #include "scheduler_instance.h" #include "scheduler_sync_tag_advance.c" +#include #ifndef MAX_REACTION_LEVEL #define MAX_REACTION_LEVEL INITIAL_REACT_QUEUE_SIZE diff --git a/core/threaded/scheduler_PEDF_NP.c b/core/threaded/scheduler_PEDF_NP.c index 6610234d9..93e4f9c1d 100644 --- a/core/threaded/scheduler_PEDF_NP.c +++ b/core/threaded/scheduler_PEDF_NP.c @@ -43,7 +43,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "../platform.h" #include "../utils/pqueue_support.h" #include "../utils/semaphore.c" -#include "../utils/vector.c" +#include "../utils/vector.h" #include "scheduler_instance.h" #include "scheduler_sync_tag_advance.c" diff --git a/include/ctarget/set.h b/include/ctarget/set.h index 9a40f2be8..44c29aa1b 100644 --- a/include/ctarget/set.h +++ b/include/ctarget/set.h @@ -148,8 +148,8 @@ do { \ */ #define SET_PRESENT(out) \ do { \ - _Pragma ("Warning \"'SET_PRESENT' is deprecated.\""); \ - _LF_SET_PRESENT(out); \ + _Pragma ("Warning \"'SET_PRESENT' is deprecated.\""); \ + _lf_set_present((lf_port_base_t*)out); \ } while (0) /** @@ -210,4 +210,4 @@ do { \ } while (0) #endif // MODAL_REACTORS -#endif // CTARGET_SET \ No newline at end of file +#endif // CTARGET_SET diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index c2eb81041..e6bb74f09 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -2011ccacb472451651d7e0d391127c8060fc9e54 +794f4c467bd162de43b4bb0c7076e5d17956ce2b