Skip to content

Commit

Permalink
support bthread primitive cross different worker pools
Browse files Browse the repository at this point in the history
  • Loading branch information
yanglimingcn committed Mar 2, 2024
1 parent 24fc31e commit ba3eac5
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ void bthread_flush() {
}
}

int bthread_interrupt(bthread_t tid) {
return bthread::TaskGroup::interrupt(tid, bthread::get_task_control());
int bthread_interrupt(bthread_t tid, bthread_tag_t tag) {
return bthread::TaskGroup::interrupt(tid, bthread::get_task_control(), tag);
}

int bthread_stop(bthread_t tid) {
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/bthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ extern int bthread_start_background(bthread_t* __restrict tid,
// bthread_interrupt() guarantees that Thread2 is woken up reliably no matter
// how the 2 threads are interleaved.
// Returns 0 on success, errno otherwise.
extern int bthread_interrupt(bthread_t tid);
extern int bthread_interrupt(bthread_t tid, bthread_tag_t tag = BTHREAD_TAG_DEFAULT);

// Make bthread_stopped() on the bthread return true and interrupt the bthread.
// Note that current bthread_stop() solely sets the built-in "stop flag" and
Expand Down
53 changes: 33 additions & 20 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "butil/atomicops.h" // butil::atomic
#include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
#include "butil/macros.h"
#include "butil/containers/flat_map.h"
#include "butil/containers/linked_list.h" // LinkNode
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
#include "butil/memory/singleton_on_pthread_once.h"
Expand Down Expand Up @@ -101,6 +102,7 @@ struct ButexBthreadWaiter : public ButexWaiter {
Butex* initial_butex;
TaskControl* control;
const timespec* abstime;
bthread_tag_t tag;
};

// pthread_task or main_task allocates this structure on stack and queue it
Expand Down Expand Up @@ -272,17 +274,22 @@ void butex_destroy(void* butex) {
butil::return_object(b);
}

inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) {
TaskGroup* g = tls_task_group;
// if TaskGroup g is belong tag
inline bool is_same_tag(TaskGroup* g, bthread_tag_t tag) {
return g && g->tag() == tag;
}

inline TaskGroup* get_task_group(TaskControl* c, bthread_tag_t tag, bool nosignal = false) {
auto g = is_same_tag(tls_task_group, tag) ? tls_task_group : NULL;
if (nosignal) {
if (NULL == tls_task_group_nosignal) {
g = g ? g : c->choose_one_group();
g = g ? g : c->choose_one_group(tag);
tls_task_group_nosignal = g;
} else {
g = tls_task_group_nosignal;
}
} else {
g = g ? g : c->choose_one_group();
g = g ? g : c->choose_one_group(tag);
}
return g;
}
Expand Down Expand Up @@ -313,7 +320,7 @@ int butex_wake(void* arg, bool nosignal) {
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup* g = get_task_group(bbw->control, nosignal);
TaskGroup* g = get_task_group(bbw->control, bbw->tag, nosignal);
if (g == tls_task_group) {
run_in_local_task_group(g, bbw->tid, nosignal);
} else {
Expand Down Expand Up @@ -352,26 +359,32 @@ int butex_wake_all(void* arg, bool nosignal) {
if (bthread_waiters.empty()) {
return nwakeup;
}
butil::FlatMap<bthread_tag_t, TaskGroup*> nwakeups;
nwakeups.init(FLAGS_task_group_ntags);
// We will exchange with first waiter in the end.
ButexBthreadWaiter* next = static_cast<ButexBthreadWaiter*>(
bthread_waiters.head()->value());
next->RemoveFromList();
unsleep_if_necessary(next, get_global_timer_thread());
++nwakeup;
TaskGroup* g = get_task_group(next->control, nosignal);
const int saved_nwakeup = nwakeup;
while (!bthread_waiters.empty()) {
// pop reversely
ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
bthread_waiters.tail()->value());
w->RemoveFromList();
unsleep_if_necessary(w, get_global_timer_thread());
auto g = get_task_group(w->control, w->tag, nosignal);
g->ready_to_run_general(w->tid, true);
nwakeups[g->tag()] = g;
++nwakeup;
}
if (!nosignal && saved_nwakeup != nwakeup) {
g->flush_nosignal_tasks_general();
if (!nosignal) {
for (auto it = nwakeups.begin(); it != nwakeups.end(); ++it) {
auto g = it->second;
g->flush_nosignal_tasks_general();
}
}
auto g = get_task_group(next->control, next->tag, nosignal);
if (g == tls_task_group) {
run_in_local_task_group(g, next->tid, nosignal);
} else {
Expand Down Expand Up @@ -422,21 +435,20 @@ int butex_wake_except(void* arg, bthread_t excluded_bthread) {
if (bthread_waiters.empty()) {
return nwakeup;
}
ButexBthreadWaiter* front = static_cast<ButexBthreadWaiter*>(
bthread_waiters.head()->value());

TaskGroup* g = get_task_group(front->control);
const int saved_nwakeup = nwakeup;
butil::FlatMap<bthread_tag_t, TaskGroup*> nwakeups;
nwakeups.init(FLAGS_task_group_ntags);
do {
// pop reversely
ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
bthread_waiters.tail()->value());
ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(bthread_waiters.tail()->value());
w->RemoveFromList();
unsleep_if_necessary(w, get_global_timer_thread());
auto g = get_task_group(w->control, w->tag);
g->ready_to_run_general(w->tid, true);
nwakeups[g->tag()] = g;
++nwakeup;
} while (!bthread_waiters.empty());
if (saved_nwakeup != nwakeup) {
for (auto it = nwakeups.begin(); it != nwakeups.end(); ++it) {
auto g = it->second;
g->flush_nosignal_tasks_general();
}
return nwakeup;
Expand Down Expand Up @@ -473,11 +485,11 @@ int butex_requeue(void* arg, void* arg2) {
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup* g = tls_task_group;
auto g = is_same_tag(tls_task_group, bbw->tag) ? tls_task_group : NULL;
if (g) {
TaskGroup::exchange(&g, front->tid);
} else {
bbw->control->choose_one_group()->ready_to_run_remote(front->tid);
bbw->control->choose_one_group(bbw->tag)->ready_to_run_remote(front->tid);
}
return 1;
}
Expand Down Expand Up @@ -515,7 +527,7 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
if (erased && wakeup) {
if (bw->tid) {
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw);
get_task_group(bbw->control)->ready_to_run_general(bw->tid);
get_task_group(bbw->control, bbw->tag)->ready_to_run_general(bw->tid);
} else {
ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
wakeup_pthread(pw);
Expand Down Expand Up @@ -658,6 +670,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
bbw.initial_butex = b;
bbw.control = g->control();
bbw.abstime = abstime;
bbw.tag = g->tag();

if (abstime != NULL) {
// Schedule timer before queueing. If the timer is triggered before
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/task_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TaskControl {

// Choose one TaskGroup (randomly right now).
// If this method is called after init(), it never returns NULL.
TaskGroup* choose_one_group(bthread_tag_t tag = BTHREAD_TAG_DEFAULT);
TaskGroup* choose_one_group(bthread_tag_t tag);

private:
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ static int set_butex_waiter(bthread_t tid, ButexWaiter* w) {
// by race conditions.
// TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep()
// can't be interrupted.
int TaskGroup::interrupt(bthread_t tid, TaskControl* c) {
int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) {
// Consume current_waiter in the TaskMeta, wake it up then set it back.
ButexWaiter* w = NULL;
uint64_t sleep_id = 0;
Expand Down Expand Up @@ -905,7 +905,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c) {
if (!c) {
return EINVAL;
}
c->choose_one_group()->ready_to_run_remote(tid);
c->choose_one_group(tag)->ready_to_run_remote(tid);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class TaskGroup {

// Wake up blocking ops in the thread.
// Returns 0 on success, errno otherwise.
static int interrupt(bthread_t tid, TaskControl* c);
static int interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag);

// Get the meta associate with the task.
static TaskMeta* address_meta(bthread_t tid);
Expand Down

0 comments on commit ba3eac5

Please # to comment.