From e170408880488b7fed72a78aa058d1b6599d6767 Mon Sep 17 00:00:00 2001 From: Yang Liming Date: Thu, 21 Dec 2023 10:40:43 +0800 Subject: [PATCH] modify bthread attribute with tag --- src/brpc/acceptor.cpp | 2 ++ src/brpc/controller.cpp | 1 + src/brpc/details/http_message.h | 1 + src/brpc/parallel_channel.cpp | 2 ++ src/brpc/periodic_task.cpp | 3 +-- src/brpc/rdma/rdma_endpoint.cpp | 7 +++---- src/brpc/server.cpp | 5 ++++- src/brpc/socket.cpp | 9 +++------ src/brpc/stream.cpp | 16 ++++++++-------- 9 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp index 68d77082b7..54cc4908cf 100644 --- a/src/brpc/acceptor.cpp +++ b/src/brpc/acceptor.cpp @@ -76,6 +76,8 @@ int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec, return -1; } if (idle_timeout_sec > 0) { + bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; + tmp.tag = _bthread_tag; if (bthread_start_background(&_close_idle_tid, NULL, CloseIdleConnections, this) != 0) { LOG(FATAL) << "Fail to start bthread"; diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index f49a27a92f..fc7625c7ac 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -718,6 +718,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, bthread_t bt; bthread_attr_t attr = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); + attr.tag = bthread_self_tag(); _tmp_completion_info = info; if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) { LOG(FATAL) << "Fail to start bthread"; diff --git a/src/brpc/details/http_message.h b/src/brpc/details/http_message.h index dc999cfa91..c55cc8bb03 100644 --- a/src/brpc/details/http_message.h +++ b/src/brpc/details/http_message.h @@ -20,6 +20,7 @@ #define BRPC_HTTP_MESSAGE_H #include // std::string +#include // std::unique_ptr #include "butil/macros.h" #include "butil/iobuf.h" // butil::IOBuf #include "butil/scoped_lock.h" // butil::unique_lock diff --git a/src/brpc/parallel_channel.cpp b/src/brpc/parallel_channel.cpp index ca71bedcc1..fe6f117fed 100644 --- a/src/brpc/parallel_channel.cpp +++ b/src/brpc/parallel_channel.cpp @@ -288,6 +288,7 @@ class ParallelChannelDone : public google::protobuf::Closure { bthread_t bh; bthread_attr_t attr = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); + attr.tag = bthread_self_tag(); if (bthread_start_background(&bh, &attr, RunOnComplete, this) != 0) { LOG(FATAL) << "Fail to start bthread"; OnComplete(); @@ -708,6 +709,7 @@ void ParallelChannel::CallMethod( bthread_t bh; bthread_attr_t attr = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); + attr.tag = bthread_self_tag(); // Hack: save done in cntl->_done to remove a malloc of args. cntl->_done = done; if (bthread_start_background(&bh, &attr, RunDoneAndDestroy, cntl) == 0) { diff --git a/src/brpc/periodic_task.cpp b/src/brpc/periodic_task.cpp index 27ea3ec310..af89a6052b 100644 --- a/src/brpc/periodic_task.cpp +++ b/src/brpc/periodic_task.cpp @@ -38,8 +38,7 @@ static void* PeriodicTaskThread(void* arg) { static void RunPeriodicTaskThread(void* arg) { bthread_t th = 0; - int rc = bthread_start_background( - &th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg); + int rc = bthread_start_background(&th, NULL, PeriodicTaskThread, arg); if (rc != 0) { LOG(ERROR) << "Fail to start PeriodicTaskThread"; static_cast(arg)->OnDestroyingTask(); diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 40b52a806f..9e336fc049 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -246,8 +246,8 @@ void RdmaConnect::StartConnect(const Socket* socket, _done = done; _data = data; bthread_t tid; - if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL, - RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) { + if (bthread_start_background(&tid, NULL, RdmaEndpoint::ProcessHandshakeAtClient, + socket->_rdma_ep) < 0) { LOG(FATAL) << "Fail to start handshake bthread"; } else { s.release(); @@ -305,8 +305,7 @@ void RdmaEndpoint::OnNewDataFromTcp(Socket* m) { ep->_state = S_HELLO_WAIT; SocketUniquePtr s; m->ReAddress(&s); - if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL, - ProcessHandshakeAtServer, ep) < 0) { + if (bthread_start_background(&tid, NULL, ProcessHandshakeAtServer, ep) < 0) { ep->_state = UNINIT; LOG(FATAL) << "Fail to start handshake bthread"; } else { diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index ac8f29c9f3..6145e7d444 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -906,6 +906,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint, init_args[i].done = false; init_args[i].stop = false; bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; + tmp.tag = _options.bthread_tag; tmp.keytable_pool = _keytable_pool; if (bthread_start_background( &init_args[i].th, &tmp, BthreadInitEntry, &init_args[i]) != 0) { @@ -1144,7 +1145,9 @@ int Server::StartInternal(const butil::EndPoint& endpoint, // Launch _derivative_thread. CHECK_EQ(INVALID_BTHREAD, _derivative_thread); - if (bthread_start_background(&_derivative_thread, NULL, + bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; + tmp.tag = _options.bthread_tag; + if (bthread_start_background(&_derivative_thread, &tmp, UpdateDerivedVars, this) != 0) { LOG(ERROR) << "Fail to create _derivative_thread"; return -1; diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index acd1b54d8c..bbe9710092 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -1464,8 +1464,7 @@ void Socket::AfterAppConnected(int err, void* data) { // requests are not setup yet. check the comment on Setup() in Write() req->Setup(s); bthread_t th; - if (bthread_start_background( - &th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) != 0) { + if (bthread_start_background(&th, NULL, KeepWrite, req) != 0) { PLOG(WARNING) << "Fail to start KeepWrite"; KeepWrite(req); } @@ -1505,8 +1504,7 @@ int Socket::KeepWriteIfConnected(int fd, int err, void* data) { bthread_t th; std::unique_ptr thrd_func(brpc::NewCallback( Socket::CheckConnectedAndKeepWrite, fd, err, data)); - if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, - RunClosure, thrd_func.get())) == 0) { + if ((err = bthread_start_background(&th, NULL, RunClosure, thrd_func.get())) == 0) { thrd_func.release(); return 0; } else { @@ -1736,8 +1734,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) { KEEPWRITE_IN_BACKGROUND: ReAddress(&ptr_for_keep_write); req->socket = ptr_for_keep_write.release(); - if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, - KeepWrite, req) != 0) { + if (bthread_start_background(&th, NULL, KeepWrite, req) != 0) { LOG(FATAL) << "Fail to start KeepWrite"; KeepWrite(req); } diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index a9126537fd..6dcf6e26bf 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -363,11 +363,11 @@ int Stream::TriggerOnWritable(bthread_id_t id, void *data, int error_code) { } wm->error_code = error_code; if (wm->new_thread) { - const bthread_attr_t* attr = - FLAGS_usercode_in_pthread ? &BTHREAD_ATTR_PTHREAD - : &BTHREAD_ATTR_NORMAL; + bthread_attr_t attr = + FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; + attr.tag = bthread_self_tag(); bthread_t tid; - if (bthread_start_background(&tid, attr, RunOnWritable, wm) != 0) { + if (bthread_start_background(&tid, &attr, RunOnWritable, wm) != 0) { LOG(FATAL) << "Fail to start bthread" << berror(); RunOnWritable(wm); } @@ -706,11 +706,11 @@ void StreamWait(StreamId stream_id, const timespec *due_time, wm->has_timer = false; wm->on_writable = on_writable; wm->error_code = EINVAL; - const bthread_attr_t* attr = - FLAGS_usercode_in_pthread ? &BTHREAD_ATTR_PTHREAD - : &BTHREAD_ATTR_NORMAL; + bthread_attr_t attr = + FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; + attr.tag = bthread_self_tag(); bthread_t tid; - if (bthread_start_background(&tid, attr, Stream::RunOnWritable, wm) != 0) { + if (bthread_start_background(&tid, &attr, Stream::RunOnWritable, wm) != 0) { PLOG(FATAL) << "Fail to start bthread"; Stream::RunOnWritable(wm); }