Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix data races for _lf_count_payload_allocations and _lf_count_token_allocations #313

Merged
merged 5 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,9 @@ static void environment_init_threaded(environment_t* env, int num_workers) {
env->barrier.horizon = FOREVER_TAG;

// Initialize synchronization objects.
if (lf_mutex_init(&env->mutex) != 0) {
lf_print_error_and_exit("Could not initialize environment mutex");
}
if (lf_cond_init(&env->event_q_changed, &env->mutex) != 0) {
lf_print_error_and_exit("Could not initialize environment event queue condition variable");
}
if (lf_cond_init(&env->global_tag_barrier_requestors_reached_zero, &env->mutex)) {
lf_print_error_and_exit("Could not initialize environment tag barrier condition variable");
}

LF_MUTEX_INIT(&env->mutex);
LF_COND_INIT(&env->event_q_changed, &env->mutex);
LF_COND_INIT(&env->global_tag_barrier_requestors_reached_zero, &env->mutex);

#endif
}
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ int main(int argc, const char* argv[]) {
if (rti.base.tracing_enabled) {
_lf_number_of_workers = rti.base.number_of_scheduling_nodes;
rti.base.trace = trace_new(NULL, rti_trace_file_name);
LF_ASSERT(rti.base.trace, "Out of memory");
LF_ASSERT_NON_NULL(rti.base.trace);
start_trace(rti.base.trace);
lf_print("Tracing the RTI execution in %s file.", rti_trace_file_name);
}
Expand Down
4 changes: 2 additions & 2 deletions core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) {
void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) {
// FIXME: Consolidate this message with NET to get NMR (Next Message Request).
// Careful with handling startup and shutdown.
lf_mutex_lock(rti_common->mutex);
LF_MUTEX_LOCK(rti_common->mutex);

enclave->completed = completed;

Expand All @@ -78,7 +78,7 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) {
free(visited);
}

lf_mutex_unlock(rti_common->mutex);
LF_MUTEX_UNLOCK(rti_common->mutex);
}

tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) {
Expand Down
26 changes: 13 additions & 13 deletions core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ lf_mutex_t rti_mutex;

void initialize_local_rti(environment_t *envs, int num_envs) {
rti_local = (rti_local_t*)calloc(1, sizeof(rti_local_t));
LF_ASSERT(rti_local, "Out of memory");
LF_ASSERT_NON_NULL(rti_local);

initialize_rti_common(&rti_local->base);
LF_ASSERT(lf_mutex_init(&rti_mutex) == 0, "Could not create mutex");
LF_MUTEX_INIT(&rti_mutex);
rti_local->base.mutex = &rti_mutex;
rti_local->base.number_of_scheduling_nodes = num_envs;
rti_local->base.tracing_enabled = (envs[0].trace != NULL);
Expand Down Expand Up @@ -73,7 +73,7 @@ void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t * e
enclave->env = env;

// Initialize the next event condition variable.
LF_ASSERT(lf_cond_init(&enclave->next_event_condition, &rti_mutex) == 0, "Could not create cond var");
LF_COND_INIT(&enclave->next_event_condition, &rti_mutex);
}

tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
Expand All @@ -86,8 +86,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
}
// This is called from a critical section within the source enclave. Leave
// this critical section and acquire the RTI mutex.
LF_ASSERT(lf_mutex_unlock(&e->env->mutex) == 0, "Could not unlock mutex");
LF_ASSERT(lf_mutex_lock(rti_local->base.mutex) == 0, "Could not lock mutex");
LF_MUTEX_UNLOCK(&e->env->mutex);
LF_MUTEX_LOCK(rti_local->base.mutex);
tracepoint_federate_to_rti(e->env->trace, send_NET, e->base.id, &next_event_tag);
// First, update the enclave data structure to record this next_event_tag,
// and notify any downstream scheduling_nodes, and unblock them if appropriate.
Expand All @@ -105,8 +105,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
next_event_tag.time - lf_time_start(), next_event_tag.microstep);
tracepoint_federate_from_rti(e->env->trace, receive_TAG, e->base.id, &next_event_tag);
// Release RTI mutex and re-enter the critical section of the source enclave before returning.
LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex");
LF_ASSERT(lf_mutex_lock(&e->env->mutex) == 0, "Could not lock mutex");
LF_MUTEX_UNLOCK(rti_local->base.mutex);
LF_MUTEX_LOCK(&e->env->mutex);
return next_event_tag;
}

Expand Down Expand Up @@ -136,8 +136,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
e->base.id, e->base.next_event.time - lf_time_start(), e->base.next_event.microstep);
tracepoint_federate_from_rti(e->env->trace, receive_TAG, e->base.id, &result.tag);
// Release RTI mutex and re-enter the critical section of the source enclave.
LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex");
LF_ASSERT(lf_mutex_lock(&e->env->mutex) == 0, "Could not lock mutex");
LF_MUTEX_UNLOCK(rti_local->base.mutex);
LF_MUTEX_LOCK(&e->env->mutex);
return result.tag;
}

Expand All @@ -146,24 +146,24 @@ void rti_logical_tag_complete_locked(enclave_info_t* enclave, tag_t completed) {
return;
}
// Release the enclave mutex while doing the local RTI work.
LF_ASSERT(lf_mutex_unlock(&enclave->env->mutex) == 0, "Could not unlock mutex");
LF_MUTEX_UNLOCK(&enclave->env->mutex);
tracepoint_federate_to_rti(enclave->env->trace, send_LTC, enclave->base.id, &completed);
_logical_tag_complete(&enclave->base, completed);
// Acquire the enclave mutex again before returning.
LF_ASSERT(lf_mutex_lock(&enclave->env->mutex) == 0, "Could not lock mutex");
LF_MUTEX_LOCK(&enclave->env->mutex);
}

void rti_update_other_net_locked(enclave_info_t* src, enclave_info_t * target, tag_t net) {
// Here we do NOT leave the critical section of the target enclave before we
// acquire the RTI mutex. This means that we cannot block within this function.
LF_ASSERT(lf_mutex_lock(rti_local->base.mutex) == 0, "Could not lock mutex");
LF_MUTEX_LOCK(rti_local->base.mutex);
tracepoint_federate_to_federate(src->env->trace, send_TAGGED_MSG, src->base.id, target->base.id, &net);

// If our proposed NET is less than the current NET, update it.
if (lf_tag_compare(net, target->base.next_event) < 0) {
target->base.next_event = net;
}
LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex");
LF_MUTEX_UNLOCK(rti_local->base.mutex);
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
Loading
Loading