Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

bthread task group add tag #2358

Merged
merged 1 commit into from
Dec 20, 2023

Conversation

yanglimingcn
Copy link
Contributor

What problem does this PR solve?

Issue Number:

Problem Summary:

What is changed and the side effects?

Changed:

Side effects:

  • Performance effects(性能影响):

  • Breaking backward compatibility(向后兼容性):


Check List:

  • Please make sure your changes are compilable(请确保你的更改可以通过编译).
  • When providing us with a new feature, it is best to add related tests(如果你向我们增加一个新的功能, 请添加相关测试).
  • Please follow Contributor Covenant Code of Conduct.(请遵循贡献者准则).

@yanglimingcn
Copy link
Contributor Author

这个是划分线程池的一个新的想法,我觉得这样实现对性能损失最少,现在提上来的代码是一个DEMO,总结一下:
1、在brpc worker层面给worker线程打标签的方式划分了线程池。
2、使用bthread的时候可以指定tag,就会把bthread调度到相应的worker池子。
3、只需要给epoll协程打上tag,正常处理请求不需要指定tag,自然就在epoll所在的worker池子上运行。
这个改的比较底层,不知道是否社区能接受。另外请求要想在指定的worker池子运行也需要带上标签,我现在想的是给socket打标签,这样请求就不用每个请求打标签了,连接打标签需要建立连接的时候发送一个标识,这块又对客户端、服务端交互有改变。不知道是否有更好的方式。
@wwbmmm 辛苦有时间看一下,给点建议。

@yanglimingcn yanglimingcn force-pushed the feature/brpc_worker_tag branch 4 times, most recently from 1ff809d to 9998367 Compare August 22, 2023 02:38
@chenBright
Copy link
Contributor

进一步,是不是可以像flare一样支持NUMA架构?
https://github.com/Tencent/flare/blob/master/flare/doc/scheduling-group.md

@yanglimingcn
Copy link
Contributor Author

进一步,是不是可以像flare一样支持NUMA架构?
https://github.com/Tencent/flare/blob/master/flare/doc/scheduling-group.md
嗯,如果这个思路大家觉得没问题,可以往这个方向迭代。

@yanglimingcn yanglimingcn force-pushed the feature/brpc_worker_tag branch 8 times, most recently from d65c234 to c70890f Compare August 29, 2023 03:33
@yanglimingcn yanglimingcn force-pushed the feature/brpc_worker_tag branch 14 times, most recently from ad2fc96 to 0fad86c Compare September 4, 2023 11:54
@yanglimingcn
Copy link
Contributor Author

yanglimingcn commented Dec 11, 2023

大佬们这个现在有啥合并 master 的计划吗?有了 tag 也好支持 NUMA 分组了,不然大机型搞起来还有点费劲

@wwbmmm 再review评估一下吧,看起来有需求的用户还挺多的。

@vinllen
Copy link

vinllen commented Dec 13, 2023

坐等合入

@huntinux
Copy link

大佬们这个现在有啥合并 master 的计划吗?有了 tag 也好支持 NUMA 分组了,不然大机型搞起来还有点费劲

@justmao945 @yanglimingcn @wwbmmm 我请教个小问题,大机型下,如果划分容器时直接绑定numa,这样就没有 remote access 了。在软件层面实现 numa aware 似乎只能逼近这个状态。所以为什么不直接用容器绑定numa?是因为内存的限制吗?还是因为有些场景不能接受使用容器,必须要用物理整机?

@yanglimingcn yanglimingcn force-pushed the feature/brpc_worker_tag branch 3 times, most recently from c8616de to 8dd7426 Compare December 14, 2023 01:41
@yanglimingcn
Copy link
Contributor Author

大佬们这个现在有啥合并 master 的计划吗?有了 tag 也好支持 NUMA 分组了,不然大机型搞起来还有点费劲

@justmao945 @yanglimingcn @wwbmmm 我请教个小问题,大机型下,如果划分容器时直接绑定numa,这样就没有 remote access 了。在软件层面实现 numa aware 似乎只能逼近这个状态。所以为什么不直接用容器绑定numa?是因为内存的限制吗?还是因为有些场景不能接受使用容器,必须要用物理整机?

的确有些场景不能或者不方便使用容器,我所接触的,比如,一些使用了dpdk、spdk、rdma等等这些对性能要求比较极致的应用,都绕过了内核,相当于自己直接管理硬件了。

@justmao945
Copy link

大佬们这个现在有啥合并 master 的计划吗?有了 tag 也好支持 NUMA 分组了,不然大机型搞起来还有点费劲

@justmao945 @yanglimingcn @wwbmmm 我请教个小问题,大机型下,如果划分容器时直接绑定numa,这样就没有 remote access 了。在软件层面实现 numa aware 似乎只能逼近这个状态。所以为什么不直接用容器绑定numa?是因为内存的限制吗?还是因为有些场景不能接受使用容器,必须要用物理整机?

大内存应用 😢

@yanglimingcn yanglimingcn force-pushed the feature/brpc_worker_tag branch from 8dd7426 to 4963aa3 Compare December 19, 2023 11:20
@wwbmmm wwbmmm merged commit 73b307a into apache:master Dec 20, 2023
@r-value r-value mentioned this pull request Dec 25, 2023
@vinllen
Copy link

vinllen commented Jan 17, 2024

@yanglimingcn 大佬请教个问题,我在你代码基础上修改了一下,一个server内部分service,然后每个service绑定一个bthread group tag,收到消息后根据tag进行分发。现在有一个问题,发现bthread_count监控数值不对,比如3个tag,总和是对的,但是有些group tag不对,看起来应该是task原来在group1上运行,但是退出是跑到group2上:

# curl -s 127.0.0.1:47791/vars | grep bthread_count
bthread_count : 14
bthread_count_0 : 118431
bthread_count_1 : 1
bthread_count_2 : -118418

我的处理是在void ProcessRpcRequest(InputMessageBase* msg_base)函数中,在svc->CallMethod之前判断tag,然后进行分发到对应的tag:

void ProcessRpcRequest(InputMessageBase* msg_base) {
    ....
    if (!FLAGS_usercode_in_pthread) {
            // check bthread worker tag
            LOG(WARNING) << "bthread tag: " << mp->bthread_worker_tag << ", service_name:" << svc_name << ", method_name:" << mp->method->name() << std::endl;
            if (mp->bthread_worker_tag != BTHREAD_TAG_DEFAULT) {
                bthread_t th;
                bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
                attr.tag = mp->bthread_worker_tag;

                bthread::CountdownEvent event(1);
                auto func = [method, &cntl, &req, &res, done, svc, &event]() {
                    svc->CallMethod(method, cntl.release(),
                                    req.release(), res.release(), done);
                    event.signal();
                };
                std::function<void()> func_cp(func);
                LOG(WARNING) << "BthreadCallMethodRun before";
                int ret = bthread_start_background(&th, &attr, BthreadCallMethodRun, &func_cp);
                // int ret = 0;
                // (*BthreadCallMethodRun)(&func_cp);
                LOG(WARNING) << "BthreadCallMethodRun ret: " << ret;
                event.wait();
                LOG(WARNING) << "BthreadCallMethodRun after wait: " << ret;
                if (ret != 0) {
                    LOG(FATAL) << "Fail to start bthread_worker_tag";
                    // return svc->CallMethod(method, cntl.release(),
                    //                        req.release(), res.release(), done);
                }
                return;
            } else {
                return svc->CallMethod(method, cntl.release(),
                                       req.release(), res.release(), done);
            }
        }
}

tag是service注册时打上的。
大佬帮忙看看是我修改的问题哪里没考虑到吗?

@yanglimingcn
Copy link
Contributor Author

yanglimingcn commented Jan 17, 2024

@yanglimingcn 大佬请教个问题,我在你代码基础上修改了一下,一个server内部分service,然后每个service绑定一个bthread group tag,收到消息后根据tag进行分发。现在有一个问题,发现bthread_count监控数值不对,比如3个tag,总和是对的,但是有些group tag不对,看起来应该是task原来在group1上运行,但是退出是跑到group2上:

# curl -s 127.0.0.1:47791/vars | grep bthread_count
bthread_count : 14
bthread_count_0 : 118431
bthread_count_1 : 1
bthread_count_2 : -118418

我的处理是在void ProcessRpcRequest(InputMessageBase* msg_base)函数中,在svc->CallMethod之前判断tag,然后进行分发到对应的tag:

void ProcessRpcRequest(InputMessageBase* msg_base) {
    ....
    if (!FLAGS_usercode_in_pthread) {
            // check bthread worker tag
            LOG(WARNING) << "bthread tag: " << mp->bthread_worker_tag << ", service_name:" << svc_name << ", method_name:" << mp->method->name() << std::endl;
            if (mp->bthread_worker_tag != BTHREAD_TAG_DEFAULT) {
                bthread_t th;
                bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
                attr.tag = mp->bthread_worker_tag;

                bthread::CountdownEvent event(1);
                auto func = [method, &cntl, &req, &res, done, svc, &event]() {
                    svc->CallMethod(method, cntl.release(),
                                    req.release(), res.release(), done);
                    event.signal();
                };
                std::function<void()> func_cp(func);
                LOG(WARNING) << "BthreadCallMethodRun before";
                int ret = bthread_start_background(&th, &attr, BthreadCallMethodRun, &func_cp);
                // int ret = 0;
                // (*BthreadCallMethodRun)(&func_cp);
                LOG(WARNING) << "BthreadCallMethodRun ret: " << ret;
                event.wait();
                LOG(WARNING) << "BthreadCallMethodRun after wait: " << ret;
                if (ret != 0) {
                    LOG(FATAL) << "Fail to start bthread_worker_tag";
                    // return svc->CallMethod(method, cntl.release(),
                    //                        req.release(), res.release(), done);
                }
                return;
            } else {
                return svc->CallMethod(method, cntl.release(),
                                       req.release(), res.release(), done);
            }
        }
}

tag是service注册时打上的。 大佬帮忙看看是我修改的问题哪里没考虑到吗?

你这个func_cp里面执行什么内容了呢,如果执行done->Run可能会有tag切换吧,因为你这么改的话,epoll_wait这个线程只能属于tag0吧,所以它的执行上下文都是tag0的。
#2476 这个PR你代码里面合入了吗?
你可以分段调试一下,比如 func_cp里面,先答应一下bthread_self_tag(),不执行别的,没问题,再执行done->Run,看看问题发生在哪个阶段

@vinllen
Copy link

vinllen commented Jan 18, 2024

done我感觉应该没问题的,是跑在对应tag的bthread里面,这个func_cp就是调用服务端的代码了。我合一下 #2476 这个PR再试试

@vinllen
Copy link

vinllen commented Jan 19, 2024

@yanglimingcn 我知道问题所在了,的确是done的调用问题,我们用了另外一个braft的库,里面调用done是塞入到bthread::ExecutionQueue里面,这块代码没有改导致的。

@ketor
Copy link
Contributor

ketor commented Feb 13, 2024

请问这个PR的改动,如果使用了bthread::ExecutionQueue,ExecutionQueue里面的任务如何区分按照哪个tag去调度的呢? 还是整个ExecutionQueue对应于一个tag呢?

@yanglimingcn
Copy link
Contributor Author

请问这个PR的改动,如果使用了bthread::ExecutionQueue,ExecutionQueue里面的任务如何区分按照哪个tag去调度的呢? 还是整个ExecutionQueue对应于一个tag呢?

最好还是一个tag启动一个execq比较合适

@ketor
Copy link
Contributor

ketor commented Feb 14, 2024

请问这个PR的改动,如果使用了bthread::ExecutionQueue,ExecutionQueue里面的任务如何区分按照哪个tag去调度的呢? 还是整个ExecutionQueue对应于一个tag呢?

最好还是一个tag启动一个execq比较合适
谢谢!

启动ExecutionQueue的时候,在attr里面指定上tag,就是整个ExecutionQueue都会在这个tag所在的线程池里执行了是么?
另外现在的实现,如果使用了bthread_mutex/cond这些同步原语,跨线程池的时候,这些同步原语的工作是否会有问题呢? 看了2个PR里的discussion,似乎没有给出明确的答案,所以请教一下。

@yanglimingcn
Copy link
Contributor Author

请问这个PR的改动,如果使用了bthread::ExecutionQueue,ExecutionQueue里面的任务如何区分按照哪个tag去调度的呢? 还是整个ExecutionQueue对应于一个tag呢?

最好还是一个tag启动一个execq比较合适
谢谢!

启动ExecutionQueue的时候,在attr里面指定上tag,就是整个ExecutionQueue都会在这个tag所在的线程池里执行了是么? 另外现在的实现,如果使用了bthread_mutex/cond这些同步原语,跨线程池的时候,这些同步原语的工作是否会有问题呢? 看了2个PR里的discussion,似乎没有给出明确的答案,所以请教一下。

1、execq会启动协程,这个协程设置好attribute的tag就可以了
2、跨线程池不要使用bthread原语会有问题,可以使用pthread原语。

@ketor
Copy link
Contributor

ketor commented Feb 16, 2024

感觉ExecutionQueue这个用起来还是有点问题,比如in_place执行的场景,task是在当前bthread执行的,就有可与初始化execq时指定的attr的tag不一致,如果task里面用到了bthread_mutex,就可能存在跨线程池唤醒的问题。

在我们的项目里面还有一个场景,感觉也有可能有问题,具体如下:
对外服务是一个端口,braft使用的是另一个端口,期望对外服务和braft如果分别使用不同的tag调度。
但是在业务流程中,存在对外服务的service函数中,要执行提交到braft状态机,然后阻塞到bthread_cond上,然后等待raft apply之后的在closure中对bthread_cond进行signal。这个操作就相当于是用bthread_cond实现了异步转同步的调用。

由于对外服务和braft使用了不同的tag,braft的on_apply调用是在execq中执行,按这个PR前面的讨论,会指定attr的tag为braft使用的tag,那么on_apply执行所使用的bthread就会是braft的tag,在on_apply中调用bthread_cond_signal,就会发生跨线程池使用bthread同步原语的问题了。

如果是这种场景,根据前面的讨论,应该是不适合让对外服务和braft服务使用不同的tag来调度了。不知道这样的理解是否正确?

@yanglimingcn
Copy link
Contributor Author

感觉ExecutionQueue这个用起来还是有点问题,比如in_place执行的场景,task是在当前bthread执行的,就有可与初始化execq时指定的attr的tag不一致,如果task里面用到了bthread_mutex,就可能存在跨线程池唤醒的问题。

在我们的项目里面还有一个场景,感觉也有可能有问题,具体如下: 对外服务是一个端口,braft使用的是另一个端口,期望对外服务和braft如果分别使用不同的tag调度。 但是在业务流程中,存在对外服务的service函数中,要执行提交到braft状态机,然后阻塞到bthread_cond上,然后等待raft apply之后的在closure中对bthread_cond进行signal。这个操作就相当于是用bthread_cond实现了异步转同步的调用。

由于对外服务和braft使用了不同的tag,braft的on_apply调用是在execq中执行,按这个PR前面的讨论,会指定attr的tag为braft使用的tag,那么on_apply执行所使用的bthread就会是braft的tag,在on_apply中调用bthread_cond_signal,就会发生跨线程池使用bthread同步原语的问题了。

如果是这种场景,根据前面的讨论,应该是不适合让对外服务和braft服务使用不同的tag来调度了。不知道这样的理解是否正确?

划分线程池的目的是为了隔离,感觉你对外服务和braft已经在上层通过execq做了隔离,是不是就没必要再使用这个功能了呢?
如果要使用可能要做一些改造,attribute可以按照你的情况制定,但是bthread原语确实不能跨线程池。

@ketor
Copy link
Contributor

ketor commented Feb 17, 2024

braft目前用起来有一个有点棘手的问题,每个raft group都会开启4个execq,包括fsm caller、log manager、node apply、raft meta。当raft group数量比较多,或者fsm caller中有长耗时操作,或者fsm caller的调用内有pthread阻塞,braft实际上就会把brpc的所有线程用满,导致整个进程都无法处理新的message,导致braft本身的心跳也会超时。

如前文所述,对外服务的业务逻辑涉及到写braft状态机,并等待fsm caller执行完成后的callback再返回response给调用方。因此就会存在对外服务的线程池与braft的execq之间需要使用bthread同步原语。除了这种情况之外,服务本身还有一些业务逻辑的处理也涉及到提交raft状态机等待状态写入成功后才能继续执行的逻辑,也会涉及到跨线程池同步。

表达能力比较差,可能描述得不好:)。

划分线程池的目的是为了隔离,感觉你对外服务和braft已经在上层通过execq做了隔离,是不是就没必要再使用这个功能了呢?
如果要使用可能要做一些改造,attribute可以按照你的情况制定,但是bthread原语确实不能跨线程池。

这个地方比较难处理的点是braft的逻辑几乎都是通过execq来执行的,当execq的数量变大之后,实际上一个execq就相当于一个并发,可以跑满一个线程。execq数量多了之后,线程数就不够用了,这个问题是最早希望使用tag调度来限制braft使用的资源的一个主要原因。但是使用了tag调度之后,跨线程池使用bthread原语又会有问题,就有点不好处理。

我们现在通过一些方法已经基本实现了控制braft的并发度了,结合tag调度这个feature,应该能扩展一些新的方法来做这个事情。

技术上bthread原语跨线程池的使用,以后是否有办法支持呢?

@yanglimingcn
Copy link
Contributor Author

braft目前用起来有一个有点棘手的问题,每个raft group都会开启4个execq,包括fsm caller、log manager、node apply、raft meta。当raft group数量比较多,或者fsm caller中有长耗时操作,或者fsm caller的调用内有pthread阻塞,braft实际上就会把brpc的所有线程用满,导致整个进程都无法处理新的message,导致braft本身的心跳也会超时。

如前文所述,对外服务的业务逻辑涉及到写braft状态机,并等待fsm caller执行完成后的callback再返回response给调用方。因此就会存在对外服务的线程池与braft的execq之间需要使用bthread同步原语。除了这种情况之外,服务本身还有一些业务逻辑的处理也涉及到提交raft状态机等待状态写入成功后才能继续执行的逻辑,也会涉及到跨线程池同步。

表达能力比较差,可能描述得不好:)。

划分线程池的目的是为了隔离,感觉你对外服务和braft已经在上层通过execq做了隔离,是不是就没必要再使用这个功能了呢?
如果要使用可能要做一些改造,attribute可以按照你的情况制定,但是bthread原语确实不能跨线程池。

这个地方比较难处理的点是braft的逻辑几乎都是通过execq来执行的,当execq的数量变大之后,实际上一个execq就相当于一个并发,可以跑满一个线程。execq数量多了之后,线程数就不够用了,这个问题是最早希望使用tag调度来限制braft使用的资源的一个主要原因。但是使用了tag调度之后,跨线程池使用bthread原语又会有问题,就有点不好处理。

我们现在通过一些方法已经基本实现了控制braft的并发度了,结合tag调度这个feature,应该能扩展一些新的方法来做这个事情。

技术上bthread原语跨线程池的使用,以后是否有办法支持呢?

技术上是能实现的,但是做起来也有些复杂,同步的代码确实更好一些,否则execq里面就得调用callback了吧?

@ketor
Copy link
Contributor

ketor commented Feb 20, 2024

技术上是能实现的,但是做起来也有些复杂,同步的代码确实更好一些,否则execq里面就得调用callback了吧?

确实还是有同步代码的需求。有些场景如果用callback,就会使callback里的逻辑很重,虽然技术上是可以的,但是代码看起来就很难看了。

@yanglimingcn
Copy link
Contributor Author

技术上是能实现的,但是做起来也有些复杂,同步的代码确实更好一些,否则execq里面就得调用callback了吧?

确实还是有同步代码的需求。有些场景如果用callback,就会使callback里的逻辑很重,虽然技术上是可以的,但是代码看起来就很难看了。

ok,这块后续可以支持。

@yanglimingcn
Copy link
Contributor Author

#2551

@yyweii
Copy link
Contributor

yyweii commented Mar 1, 2024

@yanglimingcn 我有几个疑问。比如我有一个server,tag是tag1。

  1. 处理service请求的bthread是tag1?
  2. service的所有io bthread是tag1?
  3. 请求下游服务时创建的io bthread也是tag1吗?

@yanglimingcn
Copy link
Contributor Author

@yanglimingcn 我有几个疑问。比如我有一个server,tag是tag1。

  1. 处理service请求的bthread是tag1?
  2. service的所有io bthread是tag1?
  3. 请求下游服务时创建的io bthread也是tag1吗?

是的

@yanglimingcn
Copy link
Contributor Author

技术上是能实现的,但是做起来也有些复杂,同步的代码确实更好一些,否则execq里面就得调用callback了吧?

确实还是有同步代码的需求。有些场景如果用callback,就会使callback里的逻辑很重,虽然技术上是可以的,但是代码看起来就很难看了。

#2551

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants