Skip to content

Commit

Permalink
Merge pull request #313 from lf-lang/count-token-allocations-race
Browse files Browse the repository at this point in the history
Fix data races for _lf_count_payload_allocations and _lf_count_token_allocations
  • Loading branch information
edwardalee authored Feb 4, 2024
2 parents a0ad33d + 32d3d91 commit d96cbce
Show file tree
Hide file tree
Showing 23 changed files with 313 additions and 281 deletions.
13 changes: 3 additions & 10 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,9 @@ static void environment_init_threaded(environment_t* env, int num_workers) {
env->barrier.horizon = FOREVER_TAG;

// Initialize synchronization objects.
if (lf_mutex_init(&env->mutex) != 0) {
lf_print_error_and_exit("Could not initialize environment mutex");
}
if (lf_cond_init(&env->event_q_changed, &env->mutex) != 0) {
lf_print_error_and_exit("Could not initialize environment event queue condition variable");
}
if (lf_cond_init(&env->global_tag_barrier_requestors_reached_zero, &env->mutex)) {
lf_print_error_and_exit("Could not initialize environment tag barrier condition variable");
}

LF_MUTEX_INIT(&env->mutex);
LF_COND_INIT(&env->event_q_changed, &env->mutex);
LF_COND_INIT(&env->global_tag_barrier_requestors_reached_zero, &env->mutex);

#endif
}
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ int main(int argc, const char* argv[]) {
if (rti.base.tracing_enabled) {
_lf_number_of_workers = rti.base.number_of_scheduling_nodes;
rti.base.trace = trace_new(NULL, rti_trace_file_name);
LF_ASSERT(rti.base.trace, "Out of memory");
LF_ASSERT_NON_NULL(rti.base.trace);
start_trace(rti.base.trace);
lf_print("Tracing the RTI execution in %s file.", rti_trace_file_name);
}
Expand Down
4 changes: 2 additions & 2 deletions core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) {
void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) {
// FIXME: Consolidate this message with NET to get NMR (Next Message Request).
// Careful with handling startup and shutdown.
lf_mutex_lock(rti_common->mutex);
LF_MUTEX_LOCK(rti_common->mutex);

enclave->completed = completed;

Expand All @@ -78,7 +78,7 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) {
free(visited);
}

lf_mutex_unlock(rti_common->mutex);
LF_MUTEX_UNLOCK(rti_common->mutex);
}

tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) {
Expand Down
26 changes: 13 additions & 13 deletions core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ lf_mutex_t rti_mutex;

void initialize_local_rti(environment_t *envs, int num_envs) {
rti_local = (rti_local_t*)calloc(1, sizeof(rti_local_t));
LF_ASSERT(rti_local, "Out of memory");
LF_ASSERT_NON_NULL(rti_local);

initialize_rti_common(&rti_local->base);
LF_ASSERT(lf_mutex_init(&rti_mutex) == 0, "Could not create mutex");
LF_MUTEX_INIT(&rti_mutex);
rti_local->base.mutex = &rti_mutex;
rti_local->base.number_of_scheduling_nodes = num_envs;
rti_local->base.tracing_enabled = (envs[0].trace != NULL);
Expand Down Expand Up @@ -73,7 +73,7 @@ void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t * e
enclave->env = env;

// Initialize the next event condition variable.
LF_ASSERT(lf_cond_init(&enclave->next_event_condition, &rti_mutex) == 0, "Could not create cond var");
LF_COND_INIT(&enclave->next_event_condition, &rti_mutex);
}

tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
Expand All @@ -86,8 +86,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
}
// This is called from a critical section within the source enclave. Leave
// this critical section and acquire the RTI mutex.
LF_ASSERT(lf_mutex_unlock(&e->env->mutex) == 0, "Could not unlock mutex");
LF_ASSERT(lf_mutex_lock(rti_local->base.mutex) == 0, "Could not lock mutex");
LF_MUTEX_UNLOCK(&e->env->mutex);
LF_MUTEX_LOCK(rti_local->base.mutex);
tracepoint_federate_to_rti(e->env->trace, send_NET, e->base.id, &next_event_tag);
// First, update the enclave data structure to record this next_event_tag,
// and notify any downstream scheduling_nodes, and unblock them if appropriate.
Expand All @@ -105,8 +105,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
next_event_tag.time - lf_time_start(), next_event_tag.microstep);
tracepoint_federate_from_rti(e->env->trace, receive_TAG, e->base.id, &next_event_tag);
// Release RTI mutex and re-enter the critical section of the source enclave before returning.
LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex");
LF_ASSERT(lf_mutex_lock(&e->env->mutex) == 0, "Could not lock mutex");
LF_MUTEX_UNLOCK(rti_local->base.mutex);
LF_MUTEX_LOCK(&e->env->mutex);
return next_event_tag;
}

Expand Down Expand Up @@ -136,8 +136,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
e->base.id, e->base.next_event.time - lf_time_start(), e->base.next_event.microstep);
tracepoint_federate_from_rti(e->env->trace, receive_TAG, e->base.id, &result.tag);
// Release RTI mutex and re-enter the critical section of the source enclave.
LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex");
LF_ASSERT(lf_mutex_lock(&e->env->mutex) == 0, "Could not lock mutex");
LF_MUTEX_UNLOCK(rti_local->base.mutex);
LF_MUTEX_LOCK(&e->env->mutex);
return result.tag;
}

Expand All @@ -146,24 +146,24 @@ void rti_logical_tag_complete_locked(enclave_info_t* enclave, tag_t completed) {
return;
}
// Release the enclave mutex while doing the local RTI work.
LF_ASSERT(lf_mutex_unlock(&enclave->env->mutex) == 0, "Could not unlock mutex");
LF_MUTEX_UNLOCK(&enclave->env->mutex);
tracepoint_federate_to_rti(enclave->env->trace, send_LTC, enclave->base.id, &completed);
_logical_tag_complete(&enclave->base, completed);
// Acquire the enclave mutex again before returning.
LF_ASSERT(lf_mutex_lock(&enclave->env->mutex) == 0, "Could not lock mutex");
LF_MUTEX_LOCK(&enclave->env->mutex);
}

void rti_update_other_net_locked(enclave_info_t* src, enclave_info_t * target, tag_t net) {
// Here we do NOT leave the critical section of the target enclave before we
// acquire the RTI mutex. This means that we cannot block within this function.
LF_ASSERT(lf_mutex_lock(rti_local->base.mutex) == 0, "Could not lock mutex");
LF_MUTEX_LOCK(rti_local->base.mutex);
tracepoint_federate_to_federate(src->env->trace, send_TAGGED_MSG, src->base.id, target->base.id, &net);

// If our proposed NET is less than the current NET, update it.
if (lf_tag_compare(net, target->base.next_event) < 0) {
target->base.next_event = net;
}
LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex");
LF_MUTEX_UNLOCK(rti_local->base.mutex);
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit d96cbce

Please # to comment.