Skip to content

Commit

Permalink
fix group count not decreasing, needs refactoring
Browse files Browse the repository at this point in the history
- currently, example `benchmark.c` never finishes.
  • Loading branch information
TheTechsTech committed Oct 28, 2024
1 parent a4350cb commit 5a13aa9
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 30 deletions.
3 changes: 1 addition & 2 deletions include/coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ struct routine_s {
bool is_address;
bool is_waiting;
bool is_group_finish;
bool is_multi_wait;
bool is_group;
bool is_channeling;
bool is_plain;
bool flagged;
Expand Down Expand Up @@ -883,7 +883,6 @@ C_API u32 sched_id(void);
C_API int sched_count(void);
C_API void sched_dec(void);
C_API int sched_group_count(void);
C_API void sched_group_dec(void);

C_API void preempt_init(u32 usecs);
C_API void preempt_disable(void);
Expand Down
5 changes: 1 addition & 4 deletions src/coroutine.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ wait_result_t *wait_for(wait_group_t *wg) {
has_completed = true;
break;
} else if (gq_sys.is_multi && co->tid != sched_id()) {
/* TODO: rework thread's local run queue setup, for large capacities thread will spend to much time skipping */
co_yield();
continue;
} else if (!co_terminated(co)) {
Expand All @@ -280,17 +281,13 @@ wait_result_t *wait_for(wait_group_t *wg) {
if (co->is_event_err) {
hash_remove(wg, key);
wg->has_erred = true;
if (gq_sys.is_multi)
sched_group_dec();
continue;
}

if (co->interrupt_active)
co_deferred_free(co);

hash_delete(wg, key);
if (gq_sys.is_multi)
sched_group_dec();
}
}
}
Expand Down
39 changes: 15 additions & 24 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ routine_t *co_create(size_t size, callable_t func, void_t args) {
co->is_plain = false;
co->is_address = false;
co->is_waiting = false;
co->is_multi_wait = false;
co->is_group = false;
co->is_group_finish = true;
co->event_err_code = 0;
co->args = args;
Expand Down Expand Up @@ -1650,6 +1650,7 @@ static void sched_steal_available(void) {
break;

t->taken = true;
t->is_group = true;
t->tid = thread()->thrd_id;
count++;
thread()->used_count++;
Expand Down Expand Up @@ -1720,7 +1721,6 @@ u32 create_routine(callable_t fn, void_t arg, u32 stack, run_states code) {
} else if (c->wait_active && !is_empty(c->wait_group) && !c->is_group_finish) {
t->is_waiting = true;
t->is_address = true;
t->is_multi_wait = gq_sys.is_multi;
hash_put(c->wait_group, co_itoa(id), t);
}

Expand Down Expand Up @@ -1914,10 +1914,6 @@ CO_FORCE_INLINE int sched_group_count(void) {
return thread()->group_count;
}

CO_FORCE_INLINE void sched_group_dec(void) {
--thread()->group_count;
}

CO_FORCE_INLINE bool sched_is_main(void) {
return thread()->is_main;
}
Expand Down Expand Up @@ -1978,7 +1974,7 @@ void co_interrupt_off(void) {
sched_shutdown_interrupt(true);
atomic_thread_fence(memory_order_seq_cst);
gq_sys.is_interruptable = false;
gq_sys.stacksize = CO_STACK_SIZE;
gq_sys.stacksize = CO_STACK_SIZE + 1024;
}

void coroutine_system(void) {
Expand Down Expand Up @@ -2156,7 +2152,7 @@ static int thrd_scheduler(void) {
l = EMPTY;
continue;
}
} else if (gq_sys.is_multi && !sched_is_main() && (thread()->sleeping_counted == 0 || gq_sys.is_finish || l == EMPTY || !gq_sys.is_errorless)) {
} else if (gq_sys.is_multi && !sched_is_main() && (sched_empty() || gq_sys.is_finish || l == EMPTY || !gq_sys.is_errorless)) {
atomic_store(&gq_sys.count[thread()->thrd_id], NULL);
RAII_INFO("Thrd #%lx waiting to exit.\n", co_async_self());
/* Wait for global exit signal */
Expand All @@ -2172,9 +2168,6 @@ static int thrd_scheduler(void) {
RAII_INFO("Thrd #%lx exiting, %d runnable coroutines.\n", co_async_self(), sched_count());
return thread()->exiting;
}
} else if (sched_empty()) {
l = EMPTY;
continue;
}
}

Expand All @@ -2193,8 +2186,11 @@ static int thrd_scheduler(void) {

thread()->running = NULL;
if (t->halt || t->exiting) {
if (!t->system)
if (!t->system) {
--thread()->used_count;
if (gq_sys.is_multi && t->is_group)
--thread()->group_count;
}

if (gq_sys.is_multi && t->run_code == RUN_THRD) {
thread()->used_count++;
Expand Down Expand Up @@ -2236,11 +2232,17 @@ static void thrd_wait_for(u32 wait_count) {
while (atomic_load(&wg->size) != 0 && !has_completed) {
capacity = (u32)atomic_load(&wg->capacity);
for (i = 0; i < capacity; i++) {
if (sched_group_count() == 0) {
has_completed = true;
break;
}

pair = atomic_get(oa_pair *, &wg->buckets[i]);
if (!is_empty(pair) && !is_empty(pair->value)) {
co = (routine_t *)pair->value;
key = pair->key;
if (co->tid != sched_id()) {
/* TODO: rework thread's local run queue setup, for large capacities thread will spend to much time skipping */
co_yield();
continue;
} else if (!co_terminated(co)) {
Expand All @@ -2265,24 +2267,13 @@ static void thrd_wait_for(u32 wait_count) {
if (co->is_event_err) {
hash_remove(wg, key);
wg->has_erred = true;
sched_group_dec();
if (sched_group_count() == 0) {
has_completed = true;
break;
}

continue;
}

if (co->interrupt_active)
co_deferred_free(co);

hash_delete(wg, key);
sched_group_dec();
if (sched_group_count() == 0) {
has_completed = true;
break;
}
}
}
}
Expand Down Expand Up @@ -2322,7 +2313,7 @@ static int thrd_main(void_t args) {

atomic_thread_fence(memory_order_seq_cst);
if (gq_sys.is_multi) {
create_routine(thrd_main_main, NULL, gq_sys.stacksize * 2, RUN_THRD);
create_routine(thrd_main_main, NULL, gq_sys.stacksize * 3, RUN_THRD);
res = thrd_scheduler();
sched_shutdown_interrupt(false);
chan_collector_free();
Expand Down

0 comments on commit 5a13aa9

Please # to comment.