diff --git a/core/environment.c b/core/environment.c index e51667a67..8cef44e70 100644 --- a/core/environment.c +++ b/core/environment.c @@ -50,16 +50,9 @@ static void environment_init_threaded(environment_t* env, int num_workers) { env->barrier.horizon = FOREVER_TAG; // Initialize synchronization objects. - if (lf_mutex_init(&env->mutex) != 0) { - lf_print_error_and_exit("Could not initialize environment mutex"); - } - if (lf_cond_init(&env->event_q_changed, &env->mutex) != 0) { - lf_print_error_and_exit("Could not initialize environment event queue condition variable"); - } - if (lf_cond_init(&env->global_tag_barrier_requestors_reached_zero, &env->mutex)) { - lf_print_error_and_exit("Could not initialize environment tag barrier condition variable"); - } - + LF_MUTEX_INIT(&env->mutex); + LF_COND_INIT(&env->event_q_changed, &env->mutex); + LF_COND_INIT(&env->global_tag_barrier_requestors_reached_zero, &env->mutex); #endif } diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 700304aea..ba1661a19 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -311,7 +311,7 @@ int main(int argc, const char* argv[]) { if (rti.base.tracing_enabled) { _lf_number_of_workers = rti.base.number_of_scheduling_nodes; rti.base.trace = trace_new(NULL, rti_trace_file_name); - LF_ASSERT(rti.base.trace, "Out of memory"); + LF_ASSERT_NON_NULL(rti.base.trace); start_trace(rti.base.trace); lf_print("Tracing the RTI execution in %s file.", rti_trace_file_name); } diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index 33049db50..e7a4e01ba 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -60,7 +60,7 @@ void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) { void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) { // FIXME: Consolidate this message with NET to get NMR (Next Message Request). // Careful with handling startup and shutdown. - lf_mutex_lock(rti_common->mutex); + LF_MUTEX_LOCK(rti_common->mutex); enclave->completed = completed; @@ -78,7 +78,7 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) { free(visited); } - lf_mutex_unlock(rti_common->mutex); + LF_MUTEX_UNLOCK(rti_common->mutex); } tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) { diff --git a/core/federated/RTI/rti_local.c b/core/federated/RTI/rti_local.c index c75605426..538a5b2be 100644 --- a/core/federated/RTI/rti_local.c +++ b/core/federated/RTI/rti_local.c @@ -37,10 +37,10 @@ lf_mutex_t rti_mutex; void initialize_local_rti(environment_t *envs, int num_envs) { rti_local = (rti_local_t*)calloc(1, sizeof(rti_local_t)); - LF_ASSERT(rti_local, "Out of memory"); + LF_ASSERT_NON_NULL(rti_local); initialize_rti_common(&rti_local->base); - LF_ASSERT(lf_mutex_init(&rti_mutex) == 0, "Could not create mutex"); + LF_MUTEX_INIT(&rti_mutex); rti_local->base.mutex = &rti_mutex; rti_local->base.number_of_scheduling_nodes = num_envs; rti_local->base.tracing_enabled = (envs[0].trace != NULL); @@ -73,7 +73,7 @@ void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t * e enclave->env = env; // Initialize the next event condition variable. - LF_ASSERT(lf_cond_init(&enclave->next_event_condition, &rti_mutex) == 0, "Could not create cond var"); + LF_COND_INIT(&enclave->next_event_condition, &rti_mutex); } tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) { @@ -86,8 +86,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) { } // This is called from a critical section within the source enclave. Leave // this critical section and acquire the RTI mutex. - LF_ASSERT(lf_mutex_unlock(&e->env->mutex) == 0, "Could not unlock mutex"); - LF_ASSERT(lf_mutex_lock(rti_local->base.mutex) == 0, "Could not lock mutex"); + LF_MUTEX_UNLOCK(&e->env->mutex); + LF_MUTEX_LOCK(rti_local->base.mutex); tracepoint_federate_to_rti(e->env->trace, send_NET, e->base.id, &next_event_tag); // First, update the enclave data structure to record this next_event_tag, // and notify any downstream scheduling_nodes, and unblock them if appropriate. @@ -105,8 +105,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) { next_event_tag.time - lf_time_start(), next_event_tag.microstep); tracepoint_federate_from_rti(e->env->trace, receive_TAG, e->base.id, &next_event_tag); // Release RTI mutex and re-enter the critical section of the source enclave before returning. - LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex"); - LF_ASSERT(lf_mutex_lock(&e->env->mutex) == 0, "Could not lock mutex"); + LF_MUTEX_UNLOCK(rti_local->base.mutex); + LF_MUTEX_LOCK(&e->env->mutex); return next_event_tag; } @@ -136,8 +136,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) { e->base.id, e->base.next_event.time - lf_time_start(), e->base.next_event.microstep); tracepoint_federate_from_rti(e->env->trace, receive_TAG, e->base.id, &result.tag); // Release RTI mutex and re-enter the critical section of the source enclave. - LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex"); - LF_ASSERT(lf_mutex_lock(&e->env->mutex) == 0, "Could not lock mutex"); + LF_MUTEX_UNLOCK(rti_local->base.mutex); + LF_MUTEX_LOCK(&e->env->mutex); return result.tag; } @@ -146,24 +146,24 @@ void rti_logical_tag_complete_locked(enclave_info_t* enclave, tag_t completed) { return; } // Release the enclave mutex while doing the local RTI work. - LF_ASSERT(lf_mutex_unlock(&enclave->env->mutex) == 0, "Could not unlock mutex"); + LF_MUTEX_UNLOCK(&enclave->env->mutex); tracepoint_federate_to_rti(enclave->env->trace, send_LTC, enclave->base.id, &completed); _logical_tag_complete(&enclave->base, completed); // Acquire the enclave mutex again before returning. - LF_ASSERT(lf_mutex_lock(&enclave->env->mutex) == 0, "Could not lock mutex"); + LF_MUTEX_LOCK(&enclave->env->mutex); } void rti_update_other_net_locked(enclave_info_t* src, enclave_info_t * target, tag_t net) { // Here we do NOT leave the critical section of the target enclave before we // acquire the RTI mutex. This means that we cannot block within this function. - LF_ASSERT(lf_mutex_lock(rti_local->base.mutex) == 0, "Could not lock mutex"); + LF_MUTEX_LOCK(rti_local->base.mutex); tracepoint_federate_to_federate(src->env->trace, send_TAGGED_MSG, src->base.id, target->base.id, &net); // If our proposed NET is less than the current NET, update it. if (lf_tag_compare(net, target->base.next_event) < 0) { target->base.next_event = net; } - LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex"); + LF_MUTEX_UNLOCK(rti_local->base.mutex); } /////////////////////////////////////////////////////////////////////////////// diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 373a109df..96d02e2a2 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -330,13 +330,13 @@ void handle_port_absent_message(federate_info_t *sending_federate, unsigned char // Need to acquire the mutex lock to ensure that the thread handling // messages coming from the socket connected to the destination does not // issue a TAG before this message has been forwarded. - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); // If the destination federate is no longer connected, issue a warning // and return. federate_info_t *fed = GET_FED_INFO(federate_id); if (fed->enclave.state == NOT_CONNECTED) { - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id); LF_PRINT_LOG("Fed status: next_event (" PRINTF_TIME ", %d), " @@ -373,7 +373,7 @@ void handle_port_absent_message(federate_info_t *sending_federate, unsigned char write_to_socket_fail_on_error(&fed->socket, message_size + 1, buffer, &rti_mutex, "RTI failed to forward message to federate %d.", federate_id); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } void handle_timed_message(federate_info_t *sending_federate, unsigned char *buffer) { @@ -423,13 +423,13 @@ void handle_timed_message(federate_info_t *sending_federate, unsigned char *buff // Need to acquire the mutex lock to ensure that the thread handling // messages coming from the socket connected to the destination does not // issue a TAG before this message has been forwarded. - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); // If the destination federate is no longer connected, issue a warning // and return. federate_info_t *fed = GET_FED_INFO(federate_id); if (fed->enclave.state == NOT_CONNECTED) { - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id); LF_PRINT_LOG("Fed status: next_event (" PRINTF_TIME ", %d), " @@ -519,7 +519,7 @@ void handle_timed_message(federate_info_t *sending_federate, unsigned char *buff update_federate_next_event_tag_locked(federate_id, intended_tag); } - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } void handle_latest_tag_complete(federate_info_t *fed) { @@ -534,10 +534,10 @@ void handle_latest_tag_complete(federate_info_t *fed) { _logical_tag_complete(&(fed->enclave), completed); // FIXME: Should this function be in the enclave version? - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); // See if we can remove any of the recorded in-transit messages for this. pqueue_tag_remove_up_to(fed->in_transit_message_tags, completed); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } void handle_next_event_tag(federate_info_t *fed) { @@ -548,7 +548,7 @@ void handle_next_event_tag(federate_info_t *fed) { // Acquire a mutex lock to ensure that this state does not change while a // message is in transport or being used to determine a TAG. - LF_MUTEX_LOCK(rti_mutex); // FIXME: Instead of using a mutex, it might be more efficient to use a + LF_MUTEX_LOCK(&rti_mutex); // FIXME: Instead of using a mutex, it might be more efficient to use a // select() mechanism to read and process federates' buffers in an orderly fashion. tag_t intended_tag = extract_tag(buffer); @@ -561,7 +561,7 @@ void handle_next_event_tag(federate_info_t *fed) { update_federate_next_event_tag_locked( fed->enclave.id, intended_tag); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } /////////////////// STOP functions //////////////////// @@ -676,7 +676,7 @@ void handle_stop_request_message(federate_info_t *fed) { // Acquire a mutex lock to ensure that this state does change while a // message is in transport or being used to determine a TAG. - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); // Check whether we have already received a stop_tag // from this federate @@ -685,7 +685,7 @@ void handle_stop_request_message(federate_info_t *fed) { if (rti_remote->stop_in_progress) { mark_federate_requesting_stop(fed); } - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); return; } @@ -697,7 +697,7 @@ void handle_stop_request_message(federate_info_t *fed) { // If all federates have replied, send stop request granted. if (mark_federate_requesting_stop(fed)) { // Have send stop request granted to all federates. Nothing more to do. - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); return; } @@ -710,7 +710,7 @@ void handle_stop_request_message(federate_info_t *fed) { // Iterate over federates and send each the MSG_TYPE_STOP_REQUEST message // if we do not have a stop_time already for them. Do not do this more than once. if (rti_remote->stop_in_progress) { - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); return; } rti_remote->stop_in_progress = true; @@ -735,7 +735,7 @@ void handle_stop_request_message(federate_info_t *fed) { LF_PRINT_LOG("RTI forwarded to federates MSG_TYPE_STOP_REQUEST with tag (" PRINTF_TIME ", %u).", rti_remote->base.max_stop_tag.time - start_time, rti_remote->base.max_stop_tag.microstep); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } void handle_stop_request_reply(federate_info_t *fed) { @@ -756,13 +756,13 @@ void handle_stop_request_reply(federate_info_t *fed) { federate_stop_tag.microstep); // Acquire the mutex lock so that we can change the state of the RTI - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); // If the federate has not requested stop before, count the reply if (lf_tag_compare(federate_stop_tag, rti_remote->base.max_stop_tag) > 0) { rti_remote->base.max_stop_tag = federate_stop_tag; } mark_federate_requesting_stop(fed); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } ////////////////////////////////////////////////// @@ -793,7 +793,7 @@ void handle_address_query(uint16_t fed_id) { federate_info_t *remote_fed = GET_FED_INFO(remote_fed_id); // Send the port number (which could be -1). - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); encode_int32(remote_fed->server_port, (unsigned char *)&buffer[1]); write_to_socket_fail_on_error( &fed->socket, sizeof(int32_t) + 1, (unsigned char *)buffer, &rti_mutex, @@ -804,7 +804,7 @@ void handle_address_query(uint16_t fed_id) { &fed->socket, sizeof(remote_fed->server_ip_addr), (unsigned char *)&remote_fed->server_ip_addr, &rti_mutex, "Failed to write ip address to socket of federate %d.", fed_id); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); LF_PRINT_DEBUG("Replied to address query from federate %d with address %s:%d.", fed_id, remote_fed->server_hostname, remote_fed->server_port); @@ -823,9 +823,9 @@ void handle_address_ad(uint16_t federate_id) { assert(server_port < 65536); - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); fed->server_port = server_port; - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); LF_PRINT_LOG("Received address advertisement with port %d from federate %d.", server_port, federate_id); if (rti_remote->base.tracing_enabled) { @@ -846,7 +846,7 @@ void handle_timestamp(federate_info_t *my_fed) { } LF_PRINT_DEBUG("RTI received timestamp message with time: " PRINTF_TIME ".", timestamp); - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); rti_remote->num_feds_proposed_start++; if (timestamp > rti_remote->max_start_time) { rti_remote->max_start_time = timestamp; @@ -863,7 +863,7 @@ void handle_timestamp(federate_info_t *my_fed) { } } - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); // Send back to the federate the maximum time plus an offset on a TIMESTAMP // message. @@ -881,14 +881,14 @@ void handle_timestamp(federate_info_t *my_fed) { lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id); } - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP // message has been sent. That MSG_TYPE_TIMESTAMP message grants time advance to // the federate to the start time. my_fed->enclave.state = GRANTED; lf_cond_broadcast(&sent_start_time); LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } void send_physical_clock(unsigned char message_type, federate_info_t *fed, socket_type_t socket_type) { @@ -917,11 +917,11 @@ void send_physical_clock(unsigned char message_type, federate_info_t *fed, socke } else if (socket_type == TCP) { LF_PRINT_DEBUG("Clock sync: RTI sending TCP message type %u.", buffer[0]); - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); write_to_socket_fail_on_error(&fed->socket, 1 + sizeof(int64_t), buffer, &rti_mutex, "Clock sync: RTI failed to send physical time to federate %d.", fed->enclave.id); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } LF_PRINT_DEBUG("Clock sync: RTI sent PHYSICAL_TIME_SYNC_MESSAGE with timestamp " PRINTF_TIME " to federate %d.", @@ -932,7 +932,7 @@ void send_physical_clock(unsigned char message_type, federate_info_t *fed, socke void handle_physical_clock_sync_message(federate_info_t *my_fed, socket_type_t socket_type) { // Lock the mutex to prevent interference between sending the two // coded probe messages. - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); // Reply with a T4 type message send_physical_clock(MSG_TYPE_CLOCK_SYNC_T4, my_fed, socket_type); // Send the corresponding coded probe immediately after, @@ -940,18 +940,18 @@ void handle_physical_clock_sync_message(federate_info_t *my_fed, socket_type_t s if (socket_type == UDP) { send_physical_clock(MSG_TYPE_CLOCK_SYNC_CODED_PROBE, my_fed, socket_type); } - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } void *clock_synchronization_thread(void *noargs) { // Wait until all federates have been notified of the start time. // FIXME: Use lf_ version of this when merged with master. - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); while (rti_remote->num_feds_proposed_start < rti_remote->base.number_of_scheduling_nodes) { lf_cond_wait(&received_start_times); } - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); // Wait until the start time before starting clock synchronization. // The above wait ensures that start_time has been set. @@ -1051,7 +1051,7 @@ void *clock_synchronization_thread(void *noargs) { */ static void handle_federate_failed(federate_info_t *my_fed) { // Nothing more to do. Close the socket and exit. - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); if (rti_remote->base.tracing_enabled) { tracepoint_rti_from_federate(rti_remote->base.trace, receive_FAILED, my_fed->enclave.id, NULL); @@ -1082,7 +1082,7 @@ static void handle_federate_failed(federate_info_t *my_fed) { notify_downstream_advance_grant_if_safe(&(my_fed->enclave), visited); free(visited); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } /** @@ -1099,7 +1099,7 @@ static void handle_federate_failed(federate_info_t *my_fed) { */ static void handle_federate_resign(federate_info_t *my_fed) { // Nothing more to do. Close the socket and exit. - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); if (rti_remote->base.tracing_enabled) { tracepoint_rti_from_federate(rti_remote->base.trace, receive_RESIGN, my_fed->enclave.id, NULL); @@ -1134,7 +1134,7 @@ static void handle_federate_resign(federate_info_t *my_fed) { notify_downstream_advance_grant_if_safe(&(my_fed->enclave), visited); free(visited); - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } void *federate_info_thread_TCP(void *fed) { @@ -1205,9 +1205,9 @@ void *federate_info_thread_TCP(void *fed) { // Nothing more to do. Close the socket and exit. // Prevent multiple threads from closing the same socket at the same time. - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); close(my_fed->socket); // from unistd.h - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); return NULL; } @@ -1216,7 +1216,7 @@ void send_reject(int *socket_id, unsigned char error_code) { unsigned char response[2]; response[0] = MSG_TYPE_REJECT; response[1] = error_code; - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); // NOTE: Ignore errors on this response. if (write_to_socket(*socket_id, 2, response)) { lf_print_warning("RTI failed to write MSG_TYPE_REJECT message on the socket."); @@ -1225,7 +1225,7 @@ void send_reject(int *socket_id, unsigned char error_code) { shutdown(*socket_id, SHUT_RDWR); close(*socket_id); *socket_id = -1; - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); } /** @@ -1356,13 +1356,13 @@ static int32_t receive_and_check_fed_id_message(int *socket_id, struct sockaddr_ if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(rti_remote->base.trace, send_ACK, fed_id, NULL); } - LF_MUTEX_LOCK(rti_mutex); + LF_MUTEX_LOCK(&rti_mutex); if (write_to_socket_close_on_error(&fed->socket, 1, &ack_message)) { - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); lf_print_error("RTI failed to write MSG_TYPE_ACK message to federate %d.", fed_id); return -1; } - LF_MUTEX_UNLOCK(rti_mutex); + LF_MUTEX_UNLOCK(&rti_mutex); LF_PRINT_DEBUG("RTI sent MSG_TYPE_ACK to federate %d.", fed_id); @@ -1797,9 +1797,9 @@ void initialize_RTI(rti_remote_t *rti) { rti_remote = rti; // Initialize thread synchronization primitives - LF_MUTEX_INIT(rti_mutex); - LF_COND_INIT(received_start_times, rti_mutex); - LF_COND_INIT(sent_start_time, rti_mutex); + LF_MUTEX_INIT(&rti_mutex); + LF_COND_INIT(&received_start_times, &rti_mutex); + LF_COND_INIT(&sent_start_time, &rti_mutex); initialize_rti_common(&rti_remote->base); rti_remote->base.mutex = &rti_mutex; diff --git a/core/federated/federate.c b/core/federated/federate.c index cd9149e9e..afcb5fa98 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -121,10 +121,10 @@ static void send_time(unsigned char type, instant_t time) { tag_t tag = {.time = time, .microstep = 0}; tracepoint_federate_to_rti(_fed.trace, send_TIMESTAMP, _lf_my_fed_id, &tag); - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); write_to_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_write, buffer, &lf_outbound_socket_mutex, "Failed to send time " PRINTF_TIME " to the RTI.", time - start_time); - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); } /** @@ -140,10 +140,10 @@ static void send_tag(unsigned char type, tag_t tag) { buffer[0] = type; encode_tag(&(buffer[1]), tag); - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); if (_fed.socket_TCP_RTI < 0) { lf_print_warning("Socket is no longer connected. Dropping message."); - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); return; } trace_event_t event_type = (type == MSG_TYPE_NEXT_EVENT_TAG) ? send_NET : send_LTC; @@ -152,7 +152,7 @@ static void send_tag(unsigned char type, tag_t tag) { write_to_socket_fail_on_error( &_fed.socket_TCP_RTI, bytes_to_write, buffer, &lf_outbound_socket_mutex, "Failed to send tag " PRINTF_TAG " to the RTI.", tag.time - start_time, tag.microstep); - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); } /** @@ -359,7 +359,7 @@ static trigger_handle_t schedule_message_received_from_network_locked( // that does not carry a timestamp that is in the future // would indicate a critical condition, showing that the // time advance mechanism is not working correctly. - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); lf_print_error_and_exit( "Received a message at tag " PRINTF_TAG " that has a tag " PRINTF_TAG " that has violated the STP offset. " @@ -400,7 +400,7 @@ static trigger_handle_t schedule_message_received_from_network_locked( * @param flag 0 if an EOF was received, -1 if a socket error occurred, 1 otherwise. */ static void close_inbound_socket(int fed_id, int flag) { - LF_MUTEX_LOCK(socket_mutex); + LF_MUTEX_LOCK(&socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) { if (flag >= 0) { if (flag > 0) { @@ -413,7 +413,7 @@ static void close_inbound_socket(int fed_id, int flag) { close(_fed.sockets_for_inbound_p2p_connections[fed_id]); _fed.sockets_for_inbound_p2p_connections[fed_id] = -1; } - LF_MUTEX_UNLOCK(socket_mutex); + LF_MUTEX_UNLOCK(&socket_mutex); } /** @@ -590,7 +590,7 @@ static int handle_tagged_message(int* socket, int fed_id) { // The following is only valid for string messages. // LF_PRINT_DEBUG("Message received: %s.", message_contents); - LF_MUTEX_LOCK(env->mutex); + LF_MUTEX_LOCK(&env->mutex); action->trigger->physical_time_of_arrival = time_of_arrival; @@ -676,7 +676,7 @@ static int handle_tagged_message(int* socket, int fed_id) { // logical time has been removed to avoid // the need for unecessary lock and unlock // operations. - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); return 0; } @@ -720,9 +720,9 @@ static int handle_port_absent_message(int* socket, int fed_id) { environment_t *env; _lf_get_environments(&env); - LF_MUTEX_LOCK(env->mutex); + LF_MUTEX_LOCK(&env->mutex); update_last_known_status_on_input_port(env, intended_tag, port_id); - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); return 0; } @@ -830,7 +830,7 @@ static void* listen_to_federates(void* _args) { static void close_outbound_socket(int fed_id, int flag) { assert (fed_id >= 0 && fed_id < NUMBER_OF_FEDERATES); if (_lf_normal_termination) { - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); } if (_fed.sockets_for_outbound_p2p_connections[fed_id] >= 0) { // Close the socket by sending a FIN packet indicating that no further writes @@ -850,7 +850,7 @@ static void close_outbound_socket(int fed_id, int flag) { _fed.sockets_for_outbound_p2p_connections[fed_id] = -1; } if (_lf_normal_termination) { - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); } } @@ -1042,7 +1042,7 @@ static void handle_tag_advance_grant(void) { // Trace the event when tracing is enabled tracepoint_federate_from_rti(_fed.trace, receive_TAG, _lf_my_fed_id, &TAG); - LF_MUTEX_LOCK(env->mutex); + LF_MUTEX_LOCK(&env->mutex); // Update the last known status tag of all network input ports // to the TAG received from the RTI. Here we assume that the RTI @@ -1059,7 +1059,7 @@ static void handle_tag_advance_grant(void) { LF_PRINT_LOG("Received Time Advance Grant (TAG): " PRINTF_TAG ".", _fed.last_TAG.time - start_time, _fed.last_TAG.microstep); } else { - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); lf_print_error("Received a TAG " PRINTF_TAG " that wasn't larger " "than the previous TAG or PTAG " PRINTF_TAG ". Ignoring the TAG.", TAG.time - start_time, TAG.microstep, @@ -1069,7 +1069,7 @@ static void handle_tag_advance_grant(void) { // Notify everything that is blocked. lf_cond_broadcast(&env->event_q_changed); - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); } #ifdef FEDERATED_DECENTRALIZED @@ -1115,7 +1115,7 @@ static void* update_ports_from_staa_offsets(void* args) { // input ports. environment_t *env; int num_envs = _lf_get_environments(&env); - LF_MUTEX_LOCK(env->mutex); + LF_MUTEX_LOCK(&env->mutex); while (1) { LF_PRINT_DEBUG("**** (update thread) starting"); tag_t tag_when_started_waiting = lf_tag(env); @@ -1213,7 +1213,7 @@ static void* update_ports_from_staa_offsets(void* args) { lf_tag(env).time - lf_time_start(), lf_tag(env).microstep, tag_when_started_waiting.time -lf_time_start(), tag_when_started_waiting.microstep); } - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); } #endif // FEDERATED_DECENTRALIZED @@ -1250,12 +1250,12 @@ static void handle_provisional_tag_advance_grant() { // get updated to a PTAG value because a PTAG does not indicate that // the RTI knows about the status of all ports up to and _including_ // the value of PTAG. Only a TAG message indicates that. - LF_MUTEX_LOCK(env->mutex); + LF_MUTEX_LOCK(&env->mutex); // Sanity check if (lf_tag_compare(PTAG, _fed.last_TAG) < 0 || (lf_tag_compare(PTAG, _fed.last_TAG) == 0 && !_fed.is_last_TAG_provisional)) { - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); lf_print_error_and_exit("Received a PTAG " PRINTF_TAG " that is equal or earlier " "than an already received TAG " PRINTF_TAG ".", PTAG.time, PTAG.microstep, @@ -1289,7 +1289,7 @@ static void handle_provisional_tag_advance_grant() { // it is already treating the current tag as PTAG cycle (e.g. at the // start time) or it will be completing the current cycle and sending // a LTC message shortly. In either case, there is nothing more to do. - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); return; } else if (lf_tag_compare(env->current_tag, PTAG) > 0) { // Current tag is greater than the PTAG. @@ -1303,7 +1303,7 @@ static void handle_provisional_tag_advance_grant() { // Send an LTC to indicate absent outputs. lf_latest_tag_complete(PTAG); // Nothing more to do. - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); return; } else if (PTAG.time == env->current_tag.time) { // We now know env->current_tag < PTAG, but the times are equal. @@ -1324,7 +1324,7 @@ static void handle_provisional_tag_advance_grant() { pqueue_insert(env->event_q, dummy); } - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); } /** @@ -1353,7 +1353,7 @@ static void handle_stop_granted_message() { int num_environments = _lf_get_environments(&env); for (int i = 0; i < num_environments; i++) { - LF_MUTEX_LOCK(env[i].mutex); + LF_MUTEX_LOCK(&env[i].mutex); // Sanity check. if (lf_tag_compare(received_stop_tag, env[i].current_tag) <= 0) { @@ -1371,7 +1371,7 @@ static void handle_stop_granted_message() { if (env[i].barrier.requestors) _lf_decrement_tag_barrier_locked(&env[i]); lf_cond_broadcast(&env[i].event_q_changed); - LF_MUTEX_UNLOCK(env[i].mutex); + LF_MUTEX_UNLOCK(&env[i].mutex); } } @@ -1395,14 +1395,14 @@ static void handle_stop_request_message() { extern bool lf_stop_requested; bool already_blocked = false; - LF_MUTEX_LOCK(global_mutex); + LF_MUTEX_LOCK(&global_mutex); if (lf_stop_requested) { LF_PRINT_LOG("Ignoring MSG_TYPE_STOP_REQUEST from RTI because lf_request_stop has been called locally."); already_blocked = true; } // Treat the stop request from the RTI as if a local stop request had been received. lf_stop_requested = true; - LF_MUTEX_UNLOCK(global_mutex); + LF_MUTEX_UNLOCK(&global_mutex); // If we have previously received from the RTI a stop request, // or we have previously sent a stop request to the RTI, @@ -1411,7 +1411,7 @@ static void handle_stop_request_message() { // is guarded by the outbound socket mutex. // The second is guarded by the global mutex. // Note that the RTI should not send stop requests more than once to federates. - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); if (_fed.received_stop_request_from_rti) { LF_PRINT_LOG("Redundant MSG_TYPE_STOP_REQUEST from RTI. Ignoring it."); already_blocked = true; @@ -1420,7 +1420,7 @@ static void handle_stop_request_message() { // prevent lf_request_stop from sending. _fed.received_stop_request_from_rti = true; } - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); if (already_blocked) { // Either we have sent a stop request to the RTI ourselves, @@ -1435,7 +1435,7 @@ static void handle_stop_request_message() { environment_t *env; int num_environments = _lf_get_environments(&env); for (int i = 0; i < num_environments; i++) { - LF_MUTEX_LOCK(env[i].mutex); + LF_MUTEX_LOCK(&env[i].mutex); if (lf_tag_compare(tag_to_stop, env[i].current_tag) <= 0) { // Can't stop at the requested tag. Make a counteroffer. tag_to_stop = env->current_tag; @@ -1444,7 +1444,7 @@ static void handle_stop_request_message() { // Set a barrier to prevent the enclave from advancing past the so-far tag to stop. _lf_increment_tag_barrier_locked(&env[i], tag_to_stop); - LF_MUTEX_UNLOCK(env[i].mutex); + LF_MUTEX_UNLOCK(&env[i].mutex); } // Send the reply, which is the least tag at which we can stop. unsigned char outgoing_buffer[MSG_TYPE_STOP_REQUEST_REPLY_LENGTH]; @@ -1454,11 +1454,11 @@ static void handle_stop_request_message() { tracepoint_federate_to_rti(_fed.trace, send_STOP_REQ_REP, _lf_my_fed_id, &tag_to_stop); // Send the current logical time to the RTI. - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); write_to_socket_fail_on_error( &_fed.socket_TCP_RTI, MSG_TYPE_STOP_REQUEST_REPLY_LENGTH, outgoing_buffer, &lf_outbound_socket_mutex, "Failed to send the answer to MSG_TYPE_STOP_REQUEST to RTI."); - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); LF_PRINT_DEBUG("Sent MSG_TYPE_STOP_REQUEST_REPLY to RTI with tag " PRINTF_TAG, tag_to_stop.time, tag_to_stop.microstep); @@ -1471,11 +1471,11 @@ static void send_resign_signal(environment_t* env) { size_t bytes_to_write = 1; unsigned char buffer[bytes_to_write]; buffer[0] = MSG_TYPE_RESIGN; - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); write_to_socket_fail_on_error( &_fed.socket_TCP_RTI, bytes_to_write, &(buffer[0]), &lf_outbound_socket_mutex, "Failed to send MSG_TYPE_RESIGN."); - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); LF_PRINT_LOG("Resigned."); } @@ -1748,12 +1748,12 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { // Trace the event when tracing is enabled tracepoint_federate_to_rti(_fed.trace, send_ADR_QR, _lf_my_fed_id, NULL); - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); write_to_socket_fail_on_error( &_fed.socket_TCP_RTI, sizeof(uint16_t) + 1, buffer, &lf_outbound_socket_mutex, "Failed to send address query for federate %d to RTI.", remote_federate_id); - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); // Read RTI's response. read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(int32_t) + 1, buffer, NULL, @@ -2240,14 +2240,14 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Trace the event when tracing is enabled tracepoint_federate_to_federate(_fed.trace, send_ACK, _lf_my_fed_id, remote_fed_id, NULL); - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); write_to_socket_fail_on_error( &_fed.sockets_for_inbound_p2p_connections[remote_fed_id], 1, (unsigned char*)&response, &lf_outbound_socket_mutex, "Failed to write MSG_TYPE_ACK in response to federate %d.", remote_fed_id); - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); // Start a thread to listen for incoming messages from other federates. // The fed_id is a uint16_t, which we assume can be safely cast to and from void*. @@ -2258,12 +2258,12 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { fed_id_arg); if (result != 0) { // Failed to create a listening thread. - LF_MUTEX_LOCK(socket_mutex); + LF_MUTEX_LOCK(&socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[remote_fed_id] != -1) { close(socket_id); _fed.sockets_for_inbound_p2p_connections[remote_fed_id] = -1; } - LF_MUTEX_UNLOCK(socket_mutex); + LF_MUTEX_UNLOCK(&socket_mutex); lf_print_error_and_exit( "Failed to create a thread to listen for incoming physical connection. Error code: %d.", result @@ -2369,7 +2369,7 @@ int lf_send_message(int message_type, const int header_length = 1 + sizeof(uint16_t) + sizeof(uint16_t) + sizeof(int32_t); // Use a mutex lock to prevent multiple threads from simultaneously sending. - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); int* socket = &_fed.sockets_for_outbound_p2p_connections[federate]; @@ -2385,7 +2385,7 @@ int lf_send_message(int message_type, // Message did not send. Since this is used for physical connections, this is not critical. lf_print_warning("Failed to send message to %s. Dropping the message.", next_destination_str); } - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); return result; } @@ -2580,9 +2580,9 @@ void lf_send_port_absent_to_federate( _fed.trace, send_PORT_ABS, _lf_my_fed_id, fed_ID, ¤t_message_intended_tag); } - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); int result = write_to_socket_close_on_error(socket, message_length, buffer); - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); if (result != 0) { // Write failed. Response depends on whether coordination is centralized. @@ -2606,7 +2606,7 @@ int lf_send_stop_request_to_rti(tag_t stop_tag) { stop_tag.microstep++; ENCODE_STOP_REQUEST(buffer, stop_tag.time, stop_tag.microstep); - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); // Do not send a stop request if a stop request has been previously received from the RTI. if (!_fed.received_stop_request_from_rti) { LF_PRINT_LOG("Sending to RTI a MSG_TYPE_STOP_REQUEST message with tag " PRINTF_TAG ".", @@ -2615,7 +2615,7 @@ int lf_send_stop_request_to_rti(tag_t stop_tag) { if (_fed.socket_TCP_RTI < 0) { lf_print_warning("Socket is no longer connected. Dropping message."); - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); return -1; } // Trace the event when tracing is enabled @@ -2627,10 +2627,10 @@ int lf_send_stop_request_to_rti(tag_t stop_tag) { // Treat this sending as equivalent to having received a stop request from the RTI. _fed.received_stop_request_from_rti = true; - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); return 0; } else { - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); return 1; } } @@ -2693,7 +2693,7 @@ int lf_send_tagged_message(environment_t* env, next_destination_str); // Use a mutex lock to prevent multiple threads from simultaneously sending. - LF_MUTEX_LOCK(lf_outbound_socket_mutex); + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); int* socket; if (message_type == MSG_TYPE_P2P_TAGGED_MESSAGE) { @@ -2718,7 +2718,7 @@ int lf_send_tagged_message(environment_t* env, next_destination_str); } } - LF_MUTEX_UNLOCK(lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); return result; } @@ -2738,13 +2738,13 @@ void lf_spawn_staa_thread(){ void lf_stall_advance_level_federation(environment_t* env, size_t level) { LF_PRINT_DEBUG("Acquiring the environment mutex."); - LF_MUTEX_LOCK(env->mutex); + LF_MUTEX_LOCK(&env->mutex); LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", level, max_level_allowed_to_advance); while (((int) level) >= max_level_allowed_to_advance) { lf_cond_wait(&lf_port_status_changed); }; LF_PRINT_DEBUG("Exiting wait with MLAA %d and next_reaction_level %zu.", max_level_allowed_to_advance, level); - LF_MUTEX_UNLOCK(env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); } void lf_synchronize_with_other_federates(void) { diff --git a/core/federated/network/net_util.c b/core/federated/network/net_util.c index 754a28ada..813e5cd07 100644 --- a/core/federated/network/net_util.c +++ b/core/federated/network/net_util.c @@ -145,7 +145,7 @@ void read_from_socket_fail_on_error( if (read_failed) { // Read failed. if (mutex != NULL) { - lf_mutex_unlock(mutex); + LF_MUTEX_UNLOCK(mutex); } if (format != NULL) { lf_print_error_system_failure(format, args); @@ -214,7 +214,7 @@ void write_to_socket_fail_on_error( if (result) { // Write failed. if (mutex != NULL) { - lf_mutex_unlock(mutex); + LF_MUTEX_UNLOCK(mutex); } if (format != NULL) { lf_print_error_system_failure(format, args); diff --git a/core/lf_token.c b/core/lf_token.c index b309f0a2f..8e4b5ae43 100644 --- a/core/lf_token.c +++ b/core/lf_token.c @@ -30,12 +30,14 @@ * Functions supporting token types. See lf_token.h for docs. */ +#if !defined NDEBUG /** * Counter used to issue a warning if memory is * allocated for message payloads and never freed. */ int _lf_count_payload_allocations; int _lf_count_token_allocations; +#endif #include #include @@ -72,6 +74,9 @@ static hashset_t _lf_token_recycling_bin = NULL; */ static hashset_t _lf_token_templates = NULL; +// Forward declarations +static lf_token_t* _lf_writable_copy_locked(lf_port_base_t* port); + //////////////////////////////////////////////////////////////////// //// Functions that users may call. @@ -80,6 +85,16 @@ lf_token_t* lf_new_token(void* port_or_action, void* val, size_t len) { } lf_token_t* lf_writable_copy(lf_port_base_t* port) { + LF_CRITICAL_SECTION_ENTER(port->source_reactor->environment); + lf_token_t* token = _lf_writable_copy_locked(port); + LF_CRITICAL_SECTION_EXIT(port->source_reactor->environment); + return token; +} + +//////////////////////////////////////////////////////////////////// +//// Internal functions. + +static lf_token_t* _lf_writable_copy_locked(lf_port_base_t* port) { assert(port != NULL); lf_token_t* token = port->tmplt.token; @@ -115,7 +130,11 @@ lf_token_t* lf_writable_copy(lf_port_base_t* port) { LF_PRINT_DEBUG("lf_writable_copy: Allocated memory for payload (token value): %p", copy); // Count allocations to issue a warning if this is never freed. + #if !defined NDEBUG + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); _lf_count_payload_allocations++; + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); + #endif // Create a new, dynamically allocated token. lf_token_t* result = _lf_new_token((token_type_t*)port, copy, token->length); @@ -128,12 +147,14 @@ lf_token_t* lf_writable_copy(lf_port_base_t* port) { return result; } -//////////////////////////////////////////////////////////////////// -//// Internal functions. -void _lf_free_token_value(lf_token_t* token) { +static void _lf_free_token_value(lf_token_t* token) { if (token->value != NULL) { // Count frees to issue a warning if this is never freed. + #if !defined NDEBUG + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); _lf_count_payload_allocations--; + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); + #endif // Free the value field (the payload). LF_PRINT_DEBUG("_lf_free_token_value: Freeing allocated memory for payload (token value): %p", token->value); @@ -161,13 +182,11 @@ token_freed _lf_free_token(lf_token_t* token) { // Tokens that are created at the start of execution and associated with // output ports or actions persist until they are overwritten. // Need to acquire a mutex to access the recycle bin. - if (lf_critical_section_enter(GLOBAL_ENVIRONMENT) != 0) { - lf_print_error_and_exit("Could not enter critical section"); - } + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); if (_lf_token_recycling_bin == NULL) { _lf_token_recycling_bin = hashset_create(4); // Initial size is 16. if (_lf_token_recycling_bin == NULL) { - lf_critical_section_exit(GLOBAL_ENVIRONMENT); + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); lf_print_error_and_exit("Out of memory: failed to setup _lf_token_recycling_bin"); } } @@ -182,10 +201,10 @@ token_freed _lf_free_token(lf_token_t* token) { LF_PRINT_DEBUG("_lf_free_token: Freeing allocated memory for token: %p", token); free(token); } - if(lf_critical_section_exit(GLOBAL_ENVIRONMENT) != 0) { - lf_print_error_and_exit("Could not leave critical section"); - } + #if !defined NDEBUG _lf_count_token_allocations--; + #endif + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); result &= TOKEN_FREED; return result; @@ -194,9 +213,7 @@ token_freed _lf_free_token(lf_token_t* token) { lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) { lf_token_t* result = NULL; // Check the recycling bin. - if (lf_critical_section_enter(GLOBAL_ENVIRONMENT) != 0) { - lf_print_error_and_exit("Could not enter critical section"); - } + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); if (_lf_token_recycling_bin != NULL) { hashset_itr_t iterator = hashset_iterator(_lf_token_recycling_bin); if (hashset_iterator_next(iterator) >= 0) { @@ -206,9 +223,14 @@ lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) { } free(iterator); } - if(lf_critical_section_exit(GLOBAL_ENVIRONMENT) != 0) { - lf_print_error_and_exit("Could not leave critical section"); - } + + // Count the token allocation to catch memory leaks. + #if !defined NDEBUG + _lf_count_token_allocations++; + #endif + + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); + if (result == NULL) { // Nothing found on the recycle bin. result = (lf_token_t*)calloc(1, sizeof(lf_token_t)); @@ -242,16 +264,12 @@ lf_token_t* _lf_get_token(token_template_t* tmplt) { void _lf_initialize_template(token_template_t* tmplt, size_t element_size) { assert(tmplt != NULL); - if (lf_critical_section_enter(GLOBAL_ENVIRONMENT) != 0) { - lf_print_error_and_exit("Could not enter critical section"); - } + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); if (_lf_token_templates == NULL) { _lf_token_templates = hashset_create(4); // Initial size is 16. } hashset_add(_lf_token_templates, tmplt); - if(lf_critical_section_exit(GLOBAL_ENVIRONMENT) != 0) { - lf_print_error_and_exit("Could not leave critical section"); - } + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); if (tmplt->token != NULL) { if (tmplt->token->ref_count == 1 && tmplt->token->type->element_size == element_size) { // Template token is already set. @@ -276,7 +294,11 @@ lf_token_t* _lf_initialize_token_with_value(token_template_t* tmplt, void* value lf_token_t* result = _lf_get_token(tmplt); result->value = value; // Count allocations to issue a warning if this is never freed. + #if !defined NDEBUG + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); _lf_count_payload_allocations++; + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); + #endif result->length = length; return result; } @@ -291,9 +313,7 @@ lf_token_t* _lf_initialize_token(token_template_t* tmplt, size_t length) { void _lf_free_all_tokens() { // Free template tokens. - if (lf_critical_section_enter(GLOBAL_ENVIRONMENT) != 0) { - lf_print_error_and_exit("Could not enter critical section"); - } + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); // It is possible for a token to be a template token for more than one port // or action because the same token may be sent to multiple output ports. if (_lf_token_templates != NULL) { @@ -319,9 +339,7 @@ void _lf_free_all_tokens() { hashset_destroy(_lf_token_recycling_bin); _lf_token_recycling_bin = NULL; } - if(lf_critical_section_exit(GLOBAL_ENVIRONMENT) != 0) { - lf_print_error_and_exit("Could not leave critical section"); - } + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); } void _lf_replace_template_token(token_template_t* tmplt, lf_token_t* newtoken) { diff --git a/core/platform/lf_rp2040_support.c b/core/platform/lf_rp2040_support.c index 045c6d6d9..61f839646 100644 --- a/core/platform/lf_rp2040_support.c +++ b/core/platform/lf_rp2040_support.c @@ -138,14 +138,14 @@ int _lf_interruptable_sleep_until_locked(environment_t* env, instant_t wakeup_ti // create us boot wakeup time target = from_us_since_boot((uint64_t) (wakeup_time / 1000)); // allow interrupts - lf_critical_section_exit(env); + LF_CRITICAL_SECTION_EXIT(env); // blocked sleep // return on timeout or on processor event if(sem_acquire_block_until(&_lf_sem_irq_event, target)) { ret_code = -1; } // remove interrupts - lf_critical_section_enter(env); + LF_CRITICAL_SECTION_ENTER(env); return ret_code; } diff --git a/core/platform/lf_zephyr_clock_counter.c b/core/platform/lf_zephyr_clock_counter.c index f6110b19b..f4a419723 100644 --- a/core/platform/lf_zephyr_clock_counter.c +++ b/core/platform/lf_zephyr_clock_counter.c @@ -180,9 +180,9 @@ int _lf_interruptable_sleep_until_locked(environment_t* env, instant_t wakeup) { lf_print_error_and_exit("Could not setup alarm for sleeping. Errno %i", err); } - lf_critical_section_exit(env); + LF_CRITICAL_SECTION_EXIT(env); k_sem_take(&semaphore, K_FOREVER); - lf_critical_section_enter(env); + LF_CRITICAL_SECTION_ENTER(env); // Then calculating remaining sleep, unless we got woken up by an event if (!async_event) { diff --git a/core/platform/lf_zephyr_clock_kernel.c b/core/platform/lf_zephyr_clock_kernel.c index 281cfe3db..ff87610a6 100644 --- a/core/platform/lf_zephyr_clock_kernel.c +++ b/core/platform/lf_zephyr_clock_kernel.c @@ -75,12 +75,12 @@ int _lf_clock_now(instant_t* t) { int _lf_interruptable_sleep_until_locked(environment_t* env, instant_t wakeup) { async_event=false; - lf_critical_section_exit(env); + LF_CRITICAL_SECTION_EXIT(env); instant_t now; do { _lf_clock_now(&now); } while ( (nowevent_q); //pqueue_dump(event_q, event_q->prt); // If there is no next event and -keepalive has been specified @@ -278,9 +276,7 @@ int next(environment_t* env) { // gets scheduled from an interrupt service routine. // In this case, check the event queue again to make sure to // advance time to the correct tag. - if(lf_critical_section_exit(env) != 0) { - lf_print_error_and_exit("Could not leave critical section"); - } + LF_CRITICAL_SECTION_EXIT(env); return 1; } // Advance current time to match that of the first event on the queue. @@ -301,9 +297,7 @@ int next(environment_t* env) { // extract all the reactions triggered by these events, and // stick them into the reaction queue. _lf_pop_events(env); - if(lf_critical_section_exit(env) != 0) { - lf_print_error_and_exit("Could not leave critical section"); - } + LF_CRITICAL_SECTION_EXIT(env); return _lf_do_step(env); } diff --git a/core/reactor_common.c b/core/reactor_common.c index 938202909..041181ba4 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -72,8 +72,10 @@ extern watchdog_t* _lf_watchdogs; // Global variable defined in tag.c: extern instant_t start_time; +#if !defined NDEBUG // Global variable defined in lf_token.c: extern int _lf_count_payload_allocations; +#endif /** * Indicator of whether to wait for physical time to match logical time. @@ -1224,15 +1226,11 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t* trigger_handle_t _lf_schedule_token(lf_action_base_t* action, interval_t extra_delay, lf_token_t* token) { environment_t* env = action->parent->environment; - if (lf_critical_section_enter(env) != 0) { - lf_print_error_and_exit("Could not enter critical section"); - } + LF_CRITICAL_SECTION_ENTER(env); int return_value = _lf_schedule(env, action->trigger, extra_delay, token); // Notify the main thread in case it is waiting for physical time to elapse. lf_notify_of_event(env); - if(lf_critical_section_exit(env) != 0) { - lf_print_error_and_exit("Could not leave critical section"); - } + LF_CRITICAL_SECTION_EXIT(env); return return_value; } @@ -1251,9 +1249,7 @@ trigger_handle_t _lf_schedule_copy(lf_action_base_t* action, interval_t offset, lf_print_error("schedule: Invalid element size."); return -1; } - if (lf_critical_section_enter(env) != 0) { - lf_print_error_and_exit("Could not enter critical section"); - } + LF_CRITICAL_SECTION_ENTER(env); // Initialize token with an array size of length and a reference count of 0. lf_token_t* token = _lf_initialize_token(template, length); // Copy the value into the newly allocated memory. @@ -1262,9 +1258,7 @@ trigger_handle_t _lf_schedule_copy(lf_action_base_t* action, interval_t offset, trigger_handle_t result = _lf_schedule(env, action->trigger, offset, token); // Notify the main thread in case it is waiting for physical time to elapse. lf_notify_of_event(env); - if(lf_critical_section_exit(env) != 0) { - lf_print_error_and_exit("Could not leave critical section"); - } + LF_CRITICAL_SECTION_EXIT(env); return result; } @@ -1276,16 +1270,12 @@ trigger_handle_t _lf_schedule_copy(lf_action_base_t* action, interval_t offset, trigger_handle_t _lf_schedule_value(lf_action_base_t* action, interval_t extra_delay, void* value, size_t length) { token_template_t* template = (token_template_t*)action; environment_t* env = action->parent->environment; - if (lf_critical_section_enter(env) != 0) { - lf_print_error_and_exit("Could not enter critical section"); - } + LF_CRITICAL_SECTION_ENTER(env); lf_token_t* token = _lf_initialize_token_with_value(template, value, length); int return_value = _lf_schedule(env, action->trigger, extra_delay, token); // Notify the main thread in case it is waiting for physical time to elapse. lf_notify_of_event(env); - if(lf_critical_section_exit(env) != 0) { - lf_print_error_and_exit("Could not leave critical section"); - } + LF_CRITICAL_SECTION_EXIT(env); return return_value; } @@ -1355,7 +1345,7 @@ void _lf_invoke_reaction(environment_t* env, reaction_t* reaction, int worker) { #if !defined(LF_SINGLE_THREADED) if (((self_base_t*) reaction->self)->reactor_mutex != NULL) { - lf_mutex_lock((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); + LF_MUTEX_LOCK((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); } #endif @@ -1368,7 +1358,7 @@ void _lf_invoke_reaction(environment_t* env, reaction_t* reaction, int worker) { #if !defined(LF_SINGLE_THREADED) if (((self_base_t*) reaction->self)->reactor_mutex != NULL) { - lf_mutex_unlock((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); + LF_MUTEX_UNLOCK((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); } #endif } @@ -1725,8 +1715,10 @@ int process_args(int argc, const char* argv[]) { * `_lf_initialize_trigger_objects` function */ void initialize_global(void) { + #if !defined NDEBUG _lf_count_payload_allocations = 0; _lf_count_token_allocations = 0; + #endif environment_t *envs; int num_envs = _lf_get_environments(&envs); @@ -1816,6 +1808,7 @@ void termination(void) { // Skip most cleanup on abnormal termination. if (_lf_normal_termination) { _lf_free_all_tokens(); // Must be done before freeing reactors. +#if !defined NDEBUG // Issue a warning if a memory leak has been detected. if (_lf_count_payload_allocations > 0) { lf_print_warning("Memory allocated for messages has not been freed."); @@ -1825,6 +1818,7 @@ void termination(void) { lf_print_warning("Memory allocated for tokens has not been freed!"); lf_print_warning("Number of unfreed tokens: %d.", _lf_count_token_allocations); } +#endif #if !defined(LF_SINGLE_THREADED) for (int i = 0; i < _lf_watchdog_count; i++) { if (_lf_watchdogs[i].base->reactor_mutex != NULL) { diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index b24d70390..3120a0405 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -117,9 +117,9 @@ void _lf_increment_tag_barrier_locked(environment_t *env, tag_t future_tag) { void _lf_increment_tag_barrier(environment_t *env, tag_t future_tag) { assert(env != GLOBAL_ENVIRONMENT); - lf_mutex_lock(&env->mutex); + LF_MUTEX_LOCK(&env->mutex); _lf_increment_tag_barrier_locked(env, future_tag); - lf_mutex_unlock(&env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); } void _lf_decrement_tag_barrier_locked(environment_t* env) { @@ -580,14 +580,14 @@ bool lf_stop_requested = false; void lf_request_stop() { // If a requested stop is pending, return without doing anything. LF_PRINT_LOG("lf_request_stop() has been called."); - lf_mutex_lock(&global_mutex); + LF_MUTEX_LOCK(&global_mutex); if (lf_stop_requested) { - lf_mutex_unlock(&global_mutex); + LF_MUTEX_UNLOCK(&global_mutex); LF_PRINT_LOG("Ignoring redundant lf_request_stop() call."); return; } lf_stop_requested = true; - lf_mutex_unlock(&global_mutex); + LF_MUTEX_UNLOCK(&global_mutex); // Iterate over scheduling enclaves to find their maximum current tag // and set a barrier for tag advancement for each enclave. @@ -595,13 +595,13 @@ void lf_request_stop() { environment_t* env; int num_environments = _lf_get_environments(&env); for (int i = 0; i < num_environments; i++) { - lf_mutex_lock(&env[i].mutex); + LF_MUTEX_LOCK(&env[i].mutex); if (lf_tag_compare(env[i].current_tag, max_current_tag) > 0) { max_current_tag = env[i].current_tag; } // Set a barrier to prevent the enclave from advancing past the so-far maximum current tag. _lf_increment_tag_barrier_locked(&env[i], max_current_tag); - lf_mutex_unlock(&env[i].mutex); + LF_MUTEX_UNLOCK(&env[i].mutex); } #ifdef FEDERATED @@ -612,16 +612,16 @@ void lf_request_stop() { // Message was not sent to the RTI. // Decrement the barriers to reverse our previous increment. for (int i = 0; i < num_environments; i++) { - lf_mutex_lock(&env[i].mutex); + LF_MUTEX_LOCK(&env[i].mutex); _lf_decrement_tag_barrier_locked(&env[i]); - lf_mutex_unlock(&env[i].mutex); + LF_MUTEX_UNLOCK(&env[i].mutex); } } #else // In a non-federated program, the stop_tag will be the next microstep after max_current_tag. // Iterate over environments to set their stop tag and release their barrier. for (int i = 0; i < num_environments; i++) { - lf_mutex_lock(&env[i].mutex); + LF_MUTEX_LOCK(&env[i].mutex); _lf_set_stop_tag(&env[i], (tag_t) {.time = max_current_tag.time, .microstep = max_current_tag.microstep+1}); // Release the barrier on tag advancement. _lf_decrement_tag_barrier_locked(&env[i]); @@ -630,7 +630,7 @@ void lf_request_stop() { // one worker thread can call wait_until at a given time because // the call to wait_until is protected by a mutex lock lf_cond_signal(&env->event_q_changed); - lf_mutex_unlock(&env[i].mutex); + LF_MUTEX_UNLOCK(&env[i].mutex); } #endif } @@ -999,7 +999,7 @@ void _lf_worker_do_work(environment_t *env, int worker_number) { */ void* worker(void* arg) { environment_t *env = (environment_t* ) arg; - lf_mutex_lock(&env->mutex); + LF_MUTEX_LOCK(&env->mutex); int worker_number = env->worker_thread_count++; LF_PRINT_LOG("Environment %u: Worker thread %d started.",env->id, worker_number); @@ -1020,9 +1020,9 @@ void* worker(void* arg) { #endif // Release mutex and start working. - lf_mutex_unlock(&env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); _lf_worker_do_work(env, worker_number); - lf_mutex_lock(&env->mutex); + LF_MUTEX_LOCK(&env->mutex); // This thread is exiting, so don't count it anymore. env->worker_thread_count--; @@ -1045,7 +1045,7 @@ void* worker(void* arg) { lf_cond_signal(&env->event_q_changed); LF_PRINT_DEBUG("Worker %d: Stop requested. Exiting.", worker_number); - lf_mutex_unlock(&env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); // timeout has been requested. return NULL; } @@ -1164,9 +1164,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { _lf_create_environments(); // Initialize the one global mutex - if (lf_mutex_init(&global_mutex) != 0) { - lf_print_error_and_exit("Could not initialize global mutex"); - } + LF_MUTEX_INIT(&global_mutex); // Initialize the global payload and token allocation counts and the trigger table // as well as starting tracing subsystem @@ -1199,9 +1197,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { // Lock mutex and spawn threads. This must be done before `_lf_initialize_start_tag` since it is using // a cond var - if (lf_mutex_lock(&env->mutex) != 0) { - lf_print_error_and_exit("Could not lock environment mutex"); - } + LF_MUTEX_LOCK(&env->mutex); // Initialize start tag lf_print("Environment %u: ---- Intializing start tag", env->id); @@ -1210,7 +1206,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { lf_print("Environment %u: ---- Spawning %d workers.",env->id, env->num_workers); start_threads(env); // Unlock mutex and allow threads proceed - lf_mutex_unlock(&env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); } for (int i = 0; iindex); LF_PRINT_DEBUG("Scheduler: Trying to lock the mutex for level %zu.", reaction_level); - lf_mutex_lock( - &scheduler->array_of_mutexes[reaction_level]); + LF_MUTEX_LOCK(&scheduler->array_of_mutexes[reaction_level]); LF_PRINT_DEBUG("Scheduler: Locked the mutex for level %zu.", reaction_level); pqueue_insert(((pqueue_t**)scheduler ->triggered_reactions)[reaction_level], (void*)reaction); - lf_mutex_unlock( - &scheduler->array_of_mutexes[reaction_level]); + LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[reaction_level]); } /** @@ -157,17 +155,17 @@ void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { while (true) { if (scheduler->next_reaction_level == (scheduler->max_reaction_level + 1)) { scheduler->next_reaction_level = 0; - lf_mutex_lock(&env->mutex); + LF_MUTEX_LOCK(&env->mutex); // Nothing more happening at this tag. LF_PRINT_DEBUG("Scheduler: Advancing tag."); // This worker thread will take charge of advancing tag. if (_lf_sched_advance_tag_locked(scheduler)) { LF_PRINT_DEBUG("Scheduler: Reached stop tag."); _lf_sched_signal_stop(scheduler); - lf_mutex_unlock(&env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); break; } - lf_mutex_unlock(&env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); } if (_lf_sched_distribute_ready_reactions(scheduler) > 0) { @@ -258,7 +256,7 @@ void lf_sched_init( get_reaction_position, set_reaction_position, reaction_matches, print_reaction); // Initialize the mutexes for the reaction queues - lf_mutex_init(&scheduler->array_of_mutexes[i]); + LF_MUTEX_INIT(&scheduler->array_of_mutexes[i]); } scheduler->executing_reactions = @@ -300,14 +298,12 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu LF_PRINT_DEBUG( "Scheduler: Worker %d trying to lock the mutex for level %zu.", worker_number, current_level); - lf_mutex_lock( - &scheduler->array_of_mutexes[current_level]); + LF_MUTEX_LOCK(&scheduler->array_of_mutexes[current_level]); LF_PRINT_DEBUG("Scheduler: Worker %d locked the mutex for level %zu.", worker_number, current_level); reaction_t* reaction_to_return = (reaction_t*)pqueue_pop( (pqueue_t*)scheduler->executing_reactions); - lf_mutex_unlock( - &scheduler->array_of_mutexes[current_level]); + LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[current_level]); if (reaction_to_return != NULL) { // Got a reaction diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 9856525db..b3346e143 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -79,8 +79,7 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t * scheduler, reactio if (reaction_level == current_level) { LF_PRINT_DEBUG("Scheduler: Trying to lock the mutex for level %zu.", reaction_level); - lf_mutex_lock( - &scheduler->array_of_mutexes[reaction_level]); + LF_MUTEX_LOCK(&scheduler->array_of_mutexes[reaction_level]); LF_PRINT_DEBUG("Scheduler: Locked the mutex for level %zu.", reaction_level); } // The level index for the current level can sometimes become negative. Set @@ -103,8 +102,7 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t * scheduler, reactio reaction_q_level_index); #ifdef FEDERATED if (reaction_level == current_level) { - lf_mutex_unlock( - &scheduler->array_of_mutexes[reaction_level]); + LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[reaction_level]); } #endif } @@ -199,17 +197,17 @@ void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { if (scheduler->next_reaction_level == (scheduler->max_reaction_level + 1)) { scheduler->next_reaction_level = 0; - lf_mutex_lock(&env->mutex); + LF_MUTEX_LOCK(&env->mutex); // Nothing more happening at this tag. LF_PRINT_DEBUG("Scheduler: Advancing tag."); // This worker thread will take charge of advancing tag. if (_lf_sched_advance_tag_locked(scheduler)) { LF_PRINT_DEBUG("Scheduler: Reached stop tag."); _lf_sched_signal_stop(scheduler); - lf_mutex_unlock(&env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); break; } - lf_mutex_unlock(&env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); } if (_lf_sched_distribute_ready_reactions(scheduler) > 0) { @@ -316,7 +314,7 @@ void lf_sched_init( ); // Initialize the mutexes for the reaction vectors - lf_mutex_init(&env->scheduler->array_of_mutexes[i]); + LF_MUTEX_INIT(&env->scheduler->array_of_mutexes[i]); } @@ -364,8 +362,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu #ifdef FEDERATED // Need to lock the mutex because federate.c could trigger reactions at // the current level (if there is a causality loop) - lf_mutex_lock( - &scheduler->array_of_mutexes[current_level]); + LF_MUTEX_LOCK(&scheduler->array_of_mutexes[current_level]); #endif int current_level_q_index = lf_atomic_add_fetch( &scheduler->indexes[current_level], -1); diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c index c01136122..5024e5218 100644 --- a/core/threaded/scheduler_adaptive.c +++ b/core/threaded/scheduler_adaptive.c @@ -330,7 +330,7 @@ static void worker_states_init(lf_scheduler_t* scheduler, size_t number_of_worke worker_states->cumsum_of_worker_group_sizes[i] += worker_states->cumsum_of_worker_group_sizes[i - 1]; } for (int i = 0; i < num_conds; i++) { - lf_cond_init(worker_states->worker_conds + i, &scheduler->env->mutex); + LF_COND_INIT(worker_states->worker_conds + i, &scheduler->env->mutex); } worker_states->num_loose_threads = scheduler->number_of_workers; } @@ -361,7 +361,7 @@ static void worker_states_awaken_locked(lf_scheduler_t* scheduler, size_t worker size_t max_cond = cond_of(greatest_worker_number_to_awaken); if (!worker_states->mutex_held[worker]) { worker_states->mutex_held[worker] = true; - lf_mutex_lock(&scheduler->env->mutex); + LF_MUTEX_LOCK(&scheduler->env->mutex); } // The predicate of the condition variable depends on num_awakened and level_counter, so // this is a critical section. @@ -382,7 +382,7 @@ static void worker_states_lock(lf_scheduler_t* scheduler, size_t worker) { assert(worker_states->num_loose_threads <= worker_assignments->max_num_workers); size_t lt = worker_states->num_loose_threads; if (lt > 1 || !fast) { // FIXME: Lock should be partially optimized out even when !fast - lf_mutex_lock(&scheduler->env->mutex); + LF_MUTEX_LOCK(&scheduler->env->mutex); assert(!worker_states->mutex_held[worker]); worker_states->mutex_held[worker] = true; } @@ -393,7 +393,7 @@ static void worker_states_unlock(lf_scheduler_t* scheduler, size_t worker) { worker_states_t* worker_states = scheduler->custom_data->worker_states; if (!worker_states->mutex_held[worker]) return; worker_states->mutex_held[worker] = false; - lf_mutex_unlock(&scheduler->env->mutex); + LF_MUTEX_UNLOCK(&scheduler->env->mutex); } /** @@ -433,7 +433,7 @@ static void worker_states_sleep_and_unlock(lf_scheduler_t* scheduler, size_t wor assert(worker < worker_assignments->max_num_workers); assert(worker_states->num_loose_threads <= worker_assignments->max_num_workers); if (!worker_states->mutex_held[worker]) { - lf_mutex_lock(&scheduler->env->mutex); + LF_MUTEX_LOCK(&scheduler->env->mutex); } worker_states->mutex_held[worker] = false; // This will be true soon, upon call to lf_cond_wait. size_t cond = cond_of(worker); @@ -445,7 +445,7 @@ static void worker_states_sleep_and_unlock(lf_scheduler_t* scheduler, size_t wor } while (level_counter_snapshot == scheduler->custom_data->level_counter || worker >= worker_states->num_awakened); } assert(!worker_states->mutex_held[worker]); // This thread holds the mutex, but it did not report that. - lf_mutex_unlock(&scheduler->env->mutex); + LF_MUTEX_UNLOCK(&scheduler->env->mutex); } /** diff --git a/core/threaded/scheduler_instance.c b/core/threaded/scheduler_instance.c index 77f8a2124..c09d95d7a 100644 --- a/core/threaded/scheduler_instance.c +++ b/core/threaded/scheduler_instance.c @@ -16,16 +16,16 @@ bool init_sched_instance( LF_ASSERT(env, "`init_sched_instance` called without env pointer being set"); // Check if the instance is already initialized - lf_critical_section_enter(env); + LF_CRITICAL_SECTION_ENTER(env); if (*instance != NULL) { // Already initialized - lf_critical_section_exit(env); + LF_CRITICAL_SECTION_EXIT(env); return false; } else { *instance = (lf_scheduler_t*)calloc(1, sizeof(lf_scheduler_t)); } - lf_mutex_unlock(&env->mutex); + LF_MUTEX_UNLOCK(&env->mutex); if (params == NULL || params->num_reactions_per_level_size == 0) { (*instance)->max_reaction_level = DEFAULT_MAX_REACTION_LEVEL; diff --git a/core/threaded/watchdog.c b/core/threaded/watchdog.c index e6044809b..f2a1423b5 100644 --- a/core/threaded/watchdog.c +++ b/core/threaded/watchdog.c @@ -9,6 +9,7 @@ #include #include "watchdog.h" +#include "util.h" // Defines macros LF_MUTEX_LOCK, etc. extern int _lf_watchdog_count; extern watchdog_t* _lf_watchdogs; @@ -23,7 +24,7 @@ void _lf_initialize_watchdog_mutexes() { for (int i = 0; i < _lf_watchdog_count; i++) { self_base_t* current_base = _lf_watchdogs[i].base; if (current_base->reactor_mutex != NULL) { - lf_mutex_init((lf_mutex_t*)(current_base->reactor_mutex)); + LF_MUTEX_INIT((lf_mutex_t*)(current_base->reactor_mutex)); } } } @@ -45,13 +46,13 @@ void* _lf_run_watchdog(void* arg) { self_base_t* base = watchdog->base; assert(base->reactor_mutex != NULL); - lf_mutex_lock((lf_mutex_t*)(base->reactor_mutex)); + LF_MUTEX_LOCK((lf_mutex_t*)(base->reactor_mutex)); instant_t physical_time = lf_time_physical(); while (physical_time < watchdog->expiration) { interval_t T = watchdog->expiration - physical_time; - lf_mutex_unlock((lf_mutex_t*)base->reactor_mutex); + LF_MUTEX_UNLOCK((lf_mutex_t*)base->reactor_mutex); lf_sleep(T); - lf_mutex_lock((lf_mutex_t*)(base->reactor_mutex)); + LF_MUTEX_LOCK((lf_mutex_t*)(base->reactor_mutex)); physical_time = lf_time_physical(); } @@ -61,7 +62,7 @@ void* _lf_run_watchdog(void* arg) { } watchdog->thread_active = false; - lf_mutex_unlock((lf_mutex_t*)(base->reactor_mutex)); + LF_MUTEX_UNLOCK((lf_mutex_t*)(base->reactor_mutex)); return NULL; } diff --git a/core/trace.c b/core/trace.c index 34b7cd5d2..1aef9a837 100644 --- a/core/trace.c +++ b/core/trace.c @@ -84,9 +84,9 @@ void trace_free(trace_t *trace) { int _lf_register_trace_event(trace_t* trace, void* pointer1, void* pointer2, _lf_trace_object_t type, char* description) { - lf_critical_section_enter(trace->env); + LF_CRITICAL_SECTION_ENTER(trace->env); if (trace->_lf_trace_object_descriptions_size >= TRACE_OBJECT_TABLE_SIZE) { - lf_critical_section_exit(trace->env); + LF_CRITICAL_SECTION_EXIT(trace->env); fprintf(stderr, "WARNING: Exceeded trace object table size. Trace file will be incomplete.\n"); return 0; } @@ -95,7 +95,7 @@ int _lf_register_trace_event(trace_t* trace, void* pointer1, void* pointer2, _lf trace->_lf_trace_object_descriptions[trace->_lf_trace_object_descriptions_size].type = type; trace->_lf_trace_object_descriptions[trace->_lf_trace_object_descriptions_size].description = description; trace->_lf_trace_object_descriptions_size++; - lf_critical_section_exit(trace->env); + LF_CRITICAL_SECTION_EXIT(trace->env); return 1; } @@ -238,9 +238,9 @@ void flush_trace_locked(trace_t* trace, int worker) { void flush_trace(trace_t* trace, int worker) { // To avoid having more than one worker writing to the file at the same time, // enter a critical section. - lf_critical_section_enter(GLOBAL_ENVIRONMENT); + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); flush_trace_locked(trace, worker); - lf_critical_section_exit(GLOBAL_ENVIRONMENT); + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); } void start_trace(trace_t* trace) { @@ -385,9 +385,9 @@ void tracepoint_user_event(void* self, char* description) { LF_ASSERT(self, "A pointer to the self struct is needed to trace an event"); environment_t *env = ((self_base_t *)self)->environment; trace_t *trace = env->trace; - lf_critical_section_enter(env); + LF_CRITICAL_SECTION_ENTER(env); tracepoint(trace, user_event, description, NULL, -1, -1, -1, NULL, NULL, 0, false); - lf_critical_section_exit(env); + LF_CRITICAL_SECTION_EXIT(env); } /** @@ -413,9 +413,9 @@ void tracepoint_user_value(void* self, char* description, long long value) { // There will be a performance hit for this. environment_t *env = ((self_base_t *)self)->environment; trace_t *trace = env->trace; - lf_critical_section_enter(env); + LF_CRITICAL_SECTION_ENTER(env); tracepoint(trace, user_value, description, NULL, -1, -1, -1, NULL, NULL, value, false); - lf_critical_section_exit(env); + LF_CRITICAL_SECTION_EXIT(env); } /** @@ -460,9 +460,9 @@ void tracepoint_reaction_deadline_missed(trace_t* trace, reaction_t *reaction, i } void stop_trace(trace_t* trace) { - lf_critical_section_enter(trace->env); + LF_CRITICAL_SECTION_ENTER(trace->env); stop_trace_locked(trace); - lf_critical_section_exit(trace->env); + LF_CRITICAL_SECTION_EXIT(trace->env); } void stop_trace_locked(trace_t* trace) { diff --git a/core/utils/lf_semaphore.c b/core/utils/lf_semaphore.c index e2b807e5c..a4295d47b 100644 --- a/core/utils/lf_semaphore.c +++ b/core/utils/lf_semaphore.c @@ -33,6 +33,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "lf_semaphore.h" #include +#include "util.h" // Defines macros LF_MUTEX_LOCK, etc. /** * @brief Create a new semaphore. @@ -42,8 +43,8 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ lf_semaphore_t* lf_semaphore_new(int count) { lf_semaphore_t* semaphore = (lf_semaphore_t*)malloc(sizeof(lf_semaphore_t)); - lf_mutex_init(&semaphore->mutex); - lf_cond_init(&semaphore->cond, &semaphore->mutex); + LF_MUTEX_INIT(&semaphore->mutex); + LF_COND_INIT(&semaphore->cond, &semaphore->mutex); semaphore->count = count; return semaphore; } @@ -56,10 +57,10 @@ lf_semaphore_t* lf_semaphore_new(int count) { */ void lf_semaphore_release(lf_semaphore_t* semaphore, int i) { assert(semaphore != NULL); - lf_mutex_lock(&semaphore->mutex); + LF_MUTEX_LOCK(&semaphore->mutex); semaphore->count += i; lf_cond_broadcast(&semaphore->cond); - lf_mutex_unlock(&semaphore->mutex); + LF_MUTEX_UNLOCK(&semaphore->mutex); } /** @@ -69,12 +70,12 @@ void lf_semaphore_release(lf_semaphore_t* semaphore, int i) { */ void lf_semaphore_acquire(lf_semaphore_t* semaphore) { assert(semaphore != NULL); - lf_mutex_lock(&semaphore->mutex); + LF_MUTEX_LOCK(&semaphore->mutex); while (semaphore->count == 0) { lf_cond_wait(&semaphore->cond); } semaphore->count--; - lf_mutex_unlock(&semaphore->mutex); + LF_MUTEX_UNLOCK(&semaphore->mutex); } /** @@ -84,11 +85,11 @@ void lf_semaphore_acquire(lf_semaphore_t* semaphore) { */ void lf_semaphore_wait(lf_semaphore_t* semaphore) { assert(semaphore != NULL); - lf_mutex_lock(&semaphore->mutex); + LF_MUTEX_LOCK(&semaphore->mutex); while (semaphore->count == 0) { lf_cond_wait(&semaphore->cond); } - lf_mutex_unlock(&semaphore->mutex); + LF_MUTEX_UNLOCK(&semaphore->mutex); } /** diff --git a/include/core/utils/util.h b/include/core/utils/util.h index 728880e0f..dc9ddf39a 100644 --- a/include/core/utils/util.h +++ b/include/core/utils/util.h @@ -279,11 +279,16 @@ void lf_register_print_function(print_message_function_t* function, int log_leve * The LF_ASSERT version requires that the condition evaluate to true * (non-zero), whereas the LF_ASSERTN version requires that the condition * evaluate to false (zero). - * These are optimized away if the NDEBUG flag is defined. + * These are optimized to execute the condition argument but not + * check the result if the NDEBUG flag is defined. + * + * LF_ASSERT_NON_NULL can be used to verify that a pointer is not NULL. + * It differs from LF_ASSERT in that it does nothing at all if the NDEBUG flag is defined. */ #if defined(NDEBUG) #define LF_ASSERT(condition, format, ...) (void)(condition) #define LF_ASSERTN(condition, format, ...) (void)(condition) +#define LF_ASSERT_NON_NULL(pointer) #else #define LF_ASSERT(condition, format, ...) \ do { \ @@ -297,18 +302,55 @@ void lf_register_print_function(print_message_function_t* function, int log_leve lf_print_error_and_exit(format, ##__VA_ARGS__); \ } \ } while(0) +#define LF_ASSERT_NON_NULL(pointer) \ + do { \ + if (!(pointer)) { \ + lf_print_error_and_exit("Assertion failed: pointer is NULL Out of memory?."); \ + } \ + } while(0) #endif // NDEBUG /** - * Checking mutex locking and unlocking. + * Initialize mutex with error checking. * This is optimized away if the NDEBUG flag is defined. + * @param mutex Pointer to the mutex to initialize. */ -#define LF_MUTEX_INIT(mutex) LF_ASSERTN(lf_mutex_init(&mutex), "Mutex init failed.") +#define LF_MUTEX_INIT(mutex) LF_ASSERTN(lf_mutex_init(mutex), "Mutex init failed.") -#define LF_MUTEX_LOCK(mutex) LF_ASSERTN(lf_mutex_lock(&mutex), "Mutex lock failed.") +/** + * Lock mutex with error checking. + * This is optimized away if the NDEBUG flag is defined. + * @param mutex Pointer to the mutex to lock. + */ +#define LF_MUTEX_LOCK(mutex) LF_ASSERTN(lf_mutex_lock(mutex), "Mutex lock failed.") -#define LF_MUTEX_UNLOCK(mutex) LF_ASSERTN(lf_mutex_unlock(&mutex), "Mutex unlock failed.") +/** + * Unlock mutex with error checking. + * This is optimized away if the NDEBUG flag is defined. + * @param mutex Pointer to the mutex to unlock. + */ +#define LF_MUTEX_UNLOCK(mutex) LF_ASSERTN(lf_mutex_unlock(mutex), "Mutex unlock failed.") -#define LF_COND_INIT(cond, mutex) LF_ASSERTN(lf_cond_init(&cond, &mutex), "Condition variable init failed.") +/** + * Initialize condition variable with error checking. + * This is optimized away if the NDEBUG flag is defined. + * @param mutex Pointer to the condition variable to initialize. + * @param mutex Pointer to the mutex to associate with the condition variable. + */ +#define LF_COND_INIT(cond, mutex) LF_ASSERTN(lf_cond_init(cond, mutex), "Condition variable init failed.") + +/** + * Enter critical section with error checking. + * This is optimized away if the NDEBUG flag is defined. + * @param env Pointer to the environment. + */ +#define LF_CRITICAL_SECTION_ENTER(env) LF_ASSERT(!lf_critical_section_enter(env), "Could not enter critical section") + +/** + * Exit critical section with error checking. + * This is optimized away if the NDEBUG flag is defined. + * @param env Pointer to the environment. + */ +#define LF_CRITICAL_SECTION_EXIT(env) LF_ASSERT(!lf_critical_section_exit(env), "Could not exit critical section") #endif /* UTIL_H */ diff --git a/util/sensor_simulator.c b/util/sensor_simulator.c index 6f404b95e..03bdcaa00 100644 --- a/util/sensor_simulator.c +++ b/util/sensor_simulator.c @@ -170,7 +170,7 @@ void _lf_start_print_window(int above, int right) { * @param body The message, or NULL for exit type. */ void _lf_sensor_post_message(enum _lf_sensor_message_type type, char* body) { - lf_mutex_lock(&_lf_sensor_mutex); + LF_MUTEX_LOCK(&_lf_sensor_mutex); _lf_sensor_message_t* message = _lf_sensor.message_recycle_q; if (message == NULL) { // Create a new message struct. @@ -198,7 +198,7 @@ void _lf_sensor_post_message(enum _lf_sensor_message_type type, char* body) { } } lf_cond_signal(&_lf_sensor_simulator_cond_var); - lf_mutex_unlock(&_lf_sensor_mutex); + LF_MUTEX_UNLOCK(&_lf_sensor_mutex); } /** @@ -256,7 +256,7 @@ void* _lf_sensor_read_input(void* ignored) { * message window. */ void* _lf_sensor_simulator_thread(void* ignored) { - lf_mutex_lock(&_lf_sensor_mutex); + LF_MUTEX_LOCK(&_lf_sensor_mutex); _lf_sensor.thread_created = 1; // Clean up any previous curses state. if (!isendwin()) { @@ -301,7 +301,7 @@ void* _lf_sensor_simulator_thread(void* ignored) { if (_lf_sensor.message_q->type == _lf_sensor_close_windows) { lf_register_print_function(NULL, -1); endwin(); - lf_mutex_unlock(&_lf_sensor_mutex); + LF_MUTEX_UNLOCK(&_lf_sensor_mutex); return NULL; } else if (_lf_sensor.message_q->type == _lf_sensor_tick) { wmove(_lf_sensor.tick_window, _lf_sensor.tick_cursor_y, _lf_sensor.tick_cursor_x); @@ -341,7 +341,7 @@ void* _lf_sensor_simulator_thread(void* ignored) { _lf_sensor.message_recycle_q->next = tmp_recycle; } } - lf_mutex_unlock(&_lf_sensor_mutex); + LF_MUTEX_UNLOCK(&_lf_sensor_mutex); return NULL; } @@ -375,7 +375,7 @@ int start_sensor_simulator( _lf_sensor.message_q = NULL; _lf_sensor.message_recycle_q = NULL; _lf_sensor.thread_created = 0; - lf_cond_init(&_lf_sensor_simulator_cond_var, &_lf_sensor_mutex); + LF_COND_INIT(&_lf_sensor_simulator_cond_var, &_lf_sensor_mutex); if (_lf_sensor.thread_created == 0) { // Thread has not been created. // Zero out the trigger table. @@ -427,7 +427,7 @@ int register_sensor_key(char key, void* action) { return 2; } int result = 0; - lf_mutex_lock(&_lf_sensor_mutex); + LF_MUTEX_LOCK(&_lf_sensor_mutex); if (key == '\n') { if (_lf_sensor_sensor_newline_trigger != NULL) { result = 1; @@ -446,6 +446,6 @@ int register_sensor_key(char key, void* action) { } else { _lf_sensor_trigger_table[index] = action; } - lf_mutex_unlock(&_lf_sensor_mutex); + LF_MUTEX_UNLOCK(&_lf_sensor_mutex); return result; }