-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
add support for rwlock #1031
add support for rwlock #1031
Conversation
这个实现性能有很大优化吧。这个无脑 butex 唤醒有问题吧?理想状态下应该有个 flag 标识有没有真的有 bthread 在等待。 |
我的实现是这样的,由于 bthread 的 butex 没有按 tag 唤醒,有可能产生读写者同时唤醒(惊群) #pragma once
#include <bthread/bthread.h>
#include <bthread/butex.h>
#include <butil/atomicops.h>
#include <cassert>
namespace bcache {
class SingleWriterBthreadRWLock {
public:
SingleWriterBthreadRWLock() : flag_(bthread::butex_create_checked<unsigned int>()) { *flag_ = 0; }
~SingleWriterBthreadRWLock() { bthread::butex_destroy(flag_); }
void lock_shared() { CHECK_EQ(0, ReadLock()); }
void unlock_shared() { ReadUnlock(); }
void lock() { CHECK_EQ(0, WriteLock()); }
void unlock() { WriteUnlock(); }
private:
int ReadLock() {
auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
unsigned int state;
wait_stage:
while ((state = f.load(std::memory_order_relaxed)) & kLockMask) {
unsigned int after = state | kWaitMask;
while (!(state & kWaitMask) && !f.compare_exchange_weak(state, after, std::memory_order_relaxed,
std::memory_order_relaxed)) {
if (!(state & kLockMask)) {
goto lock_stage;
}
after = state | kWaitMask;
}
if (bthread::butex_wait(flag_, after, nullptr) < 0 && errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
}
lock_stage:
if (f.fetch_add(1, std::memory_order_acquire) & kLockMask) {
state = f.fetch_sub(1, std::memory_order_relaxed) - 1;
if ((state & (kWaitMask | kReaderCountMask)) == kWaitMask) {
state = f.fetch_and(~kWaitMask, std::memory_order_relaxed);
if (state & kWaitMask) {
bthread::butex_wake_all(flag_);
}
}
if (state & kLockMask) {
goto wait_stage;
} else {
goto lock_stage;
}
}
return 0;
}
void ReadUnlock() {
auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
unsigned int state = f.fetch_sub(1, std::memory_order_release) - 1;
if ((state & (kWaitMask | kReaderCountMask)) == kWaitMask) {
assert(state & kLockMask);
state = f.fetch_and(~kWaitMask, std::memory_order_relaxed);
if (state & kWaitMask) {
bthread::butex_wake_all(flag_);
}
}
}
int WriteLock() {
auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
unsigned int state;
if ((state = f.fetch_or(kLockMask, std::memory_order_acquire))) {
assert(!(state & (kLockMask | kWaitMask)));
state |= kLockMask;
do {
assert(state & kLockMask);
unsigned int after = state | kWaitMask;
while (!(state & kWaitMask) &&
!f.compare_exchange_weak(state, after, std::memory_order_relaxed,
std::memory_order_acquire)) {
assert(state & kLockMask);
if (!(state & kReaderCountMask)) {
return 0;
}
after = state | kWaitMask;
}
if (bthread::butex_wait(flag_, after, nullptr) < 0 && errno != EWOULDBLOCK &&
errno != EINTR) {
WriteUnlock();
return errno;
}
} while ((state = f.load(std::memory_order_acquire)) & kReaderCountMask);
}
return 0;
}
void WriteUnlock() {
auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
unsigned int state = f.fetch_and(~(kLockMask | kWaitMask), std::memory_order_release);
assert(state & kLockMask);
if (state & kWaitMask) {
bthread::butex_wake_all(flag_);
}
}
private:
static constexpr unsigned int kLockMask = 1U << 31U;
static constexpr unsigned int kWaitMask = 1U << 30U;
static constexpr unsigned int kReaderCountMask = (1U << 30U) - 1U;
unsigned int* const flag_;
};
class GeneralBthreadRWLock {
public:
void lock_shared() { internal_rwlock_.lock_shared(); }
void unlock_shared() { internal_rwlock_.unlock_shared(); }
void lock() {
writer_mtx_.lock();
internal_rwlock_.lock();
}
void unlock() {
internal_rwlock_.unlock();
writer_mtx_.unlock();
}
private:
bthread::Mutex writer_mtx_;
SingleWriterBthreadRWLock internal_rwlock_;
};
} // namespace bcache
|
src/bthread/rwlock.cpp
Outdated
if(r != 0) { | ||
if(bthread::butex_wait(whole, r, NULL) < 0 && | ||
errno != EWOULDBLOCK && errno != EINTR) { | ||
whole->fetch_sub(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里需要减一的为啥是whole
?我感觉应该是w_wait_count
吧?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的,应该是w_wait_count,我们内部版本很久以前改过了, 很久没来这里更新了,抱歉
@hairet 这个PR和master冲突了,可以解决一下吗 |
Closed this as completed in #2752. |
补充写优先读写锁实现,变量和状态应该算是比较简洁的了,参考了内核写优先设计。
UT里有性能相关测试