Skip to content

Commit

Permalink
EventDispatcher create and destroy UserData
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Apr 7, 2024
1 parent e9c747a commit 1078125
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 109 deletions.
32 changes: 1 addition & 31 deletions src/brpc/event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {
}

int EventData::OnCreated(const EventDataOptions& options) {
if (options.user_id == INVALID_VREF_ID) {
LOG(ERROR) << "Invalid user_id=-1";
return -1;
}
if (!options.input_cb) {
LOG(ERROR) << "Invalid input_cb=NULL";
return -1;
Expand All @@ -90,33 +86,7 @@ int EventData::OnCreated(const EventDataOptions& options) {
}

void EventData::BeforeRecycled() {
_options = {INVALID_EVENT_DATA_ID, NULL, NULL};
}

void MakeEventDataIdInvalid(EventDataId& id) {
EventData::SetFailedById(id);
id = INVALID_EVENT_DATA_ID;
}

int EventDispatcher::CallInputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (EventData::Address(event_data_id, &data) != 0) {
return -1;
}
data->CallInputEventCallback(events, thread_attr);
return 0;
}

int EventDispatcher::CallOutputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (EventData::Address(event_data_id, &data) != 0) {
return -1;
}
return data->CallOutputEventCallback(events, thread_attr);
_options = { NULL, NULL, NULL };
}

} // namespace brpc
Expand Down
137 changes: 127 additions & 10 deletions src/brpc/event_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ class EventData;
typedef VersionedRefWithIdUniquePtr<EventData> EventDataUniquePtr;

// User callback type of input event and output event.
typedef int (*InputEventCallback) (VRefId id, uint32_t events,
typedef int (*InputEventCallback) (void* id, uint32_t events,
const bthread_attr_t& thread_attr);
typedef InputEventCallback OutputEventCallback;

struct EventDataOptions {
// Find user object to handle event by `user_id'.
uint64_t user_id;
// Callback for input event.
InputEventCallback input_cb;
// Callback for output event.
OutputEventCallback output_cb;
// User data.
void* user_data;
};

// EventDispatcher finds EventData by EventDataId which is
Expand All @@ -60,18 +60,18 @@ class EventData : public VersionedRefWithId<EventData> {
public:
explicit EventData(Forbidden f)
: VersionedRefWithId<EventData>(f)
, _options{INVALID_EVENT_DATA_ID, NULL, NULL} {}
, _options{ NULL, NULL, NULL } {}

DISALLOW_COPY_AND_ASSIGN(EventData);

int CallInputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _options.input_cb(_options.user_id, events, thread_attr);
return _options.input_cb(_options.user_data, events, thread_attr);
}

int CallOutputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _options.output_cb(_options.user_id, events, thread_attr);
return _options.output_cb(_options.user_data, events, thread_attr);
}

private:
Expand All @@ -83,8 +83,6 @@ friend class VersionedRefWithId<EventData>;
EventDataOptions _options;
};

void MakeEventDataIdInvalid(EventDataId& id);

namespace rdma {
class RdmaEndpoint;
}
Expand All @@ -94,6 +92,7 @@ class RdmaEndpoint;
class EventDispatcher {
friend class Socket;
friend class rdma::RdmaEndpoint;
template <typename T> friend class IOEvent;
public:
EventDispatcher();

Expand Down Expand Up @@ -147,12 +146,29 @@ friend class rdma::RdmaEndpoint;
int RemoveConsumer(int fd);

// Call user callback of input event and output event.
template<bool IsInputEvent>
static int OnEvent(EventDataId event_data_id, uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (EventData::Address(event_data_id, &data) != 0) {
return -1;
}
return IsInputEvent ?
data->CallInputEventCallback(events, thread_attr) :
data->CallOutputEventCallback(events, thread_attr);
}

static int CallInputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr);
const bthread_attr_t& thread_attr) {
return OnEvent<true>(event_data_id, events, thread_attr);
}

static int CallOutputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr);
const bthread_attr_t& thread_attr) {
return OnEvent<false>(event_data_id, events, thread_attr);
}

// The epoll/kqueue fd to watch events.
int _event_dispatcher_fd;
Expand All @@ -172,6 +188,107 @@ friend class rdma::RdmaEndpoint;

EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);

// IOEvent is used to manage the IO events of a file descriptor conveniently.
template <typename T>
class IOEvent {
public:
IOEvent()
: _init(false)
, _event_data_id(INVALID_EVENT_DATA_ID)
, _bthread_tag(bthread_self_tag()) {}

int Init(void* user_data) {
if (_init) {
LOG(WARNING) << "IOEvent has been initialized";
return 0;
}
EventDataOptions options{ OnInputEvent, OnOutputEvent, user_data };
if (EventData::Create(&_event_data_id, options) != 0) {
LOG(ERROR) << "Fail to create EventData";
return -1;
}
_init = true;
return 0;
}

void Reset() {
if (_init) {
EventData::SetFailedById(_event_data_id);
_init = false;
}
}

int AddConsumer(int fd) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag)
.AddConsumer(_event_data_id, fd);
}


int RemoveConsumer(int fd) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag).RemoveConsumer(fd);
}


int RegisterEvent(int fd, bool pollin) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag)
.RegisterEvent(_event_data_id, fd, pollin);
}

int UnregisterEvent(int fd, bool pollin) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag)
.UnregisterEvent(_event_data_id, fd, pollin);
}

void set_bthread_tag(bthread_tag_t bthread_tag) {
_bthread_tag = bthread_tag;
}
bthread_tag_t bthread_tag() const {
return _bthread_tag;
}

private:

static int OnInputEvent(void* user_data, uint32_t events,
const bthread_attr_t& thread_attr) {
static_assert(
butil::is_callable_return_int<decltype(&T::OnInputEvent),
void*, uint32_t,
bthread_attr_t>::value,
"T::OnInputEvent signature mismatch");
return T::OnInputEvent(user_data, events, thread_attr);
}

static int OnOutputEvent(void* user_data, uint32_t events,
const bthread_attr_t& thread_attr) {
static_assert(
butil::is_callable_return_int<decltype(&T::OnOutputEvent),
void*, uint32_t,
bthread_attr_t>::value,
"T::OnInputEvent signature mismatch");
return T::OnOutputEvent(user_data, events, thread_attr);
}

bool _init;
EventDataId _event_data_id;
bthread_tag_t _bthread_tag;
};

} // namespace brpc


Expand Down
Loading

0 comments on commit 1078125

Please # to comment.