Skip to content

Commit

Permalink
0.6.3 stability improvments, slight performance improvments
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Kraynyukhov committed May 16, 2018
1 parent facdd8b commit bb0a0a2
Show file tree
Hide file tree
Showing 12 changed files with 418 additions and 347 deletions.
50 changes: 42 additions & 8 deletions include/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,31 +138,65 @@ namespace LAppS
itc::getLog()->error(__FILE__,__LINE__,"Can't enqueue request to application %s, exception: %s",mName.c_str(),e.what());
}
}

void enqueueDisconnect(const size_t wid, const int32_t sockfd)
{
try {
mEvents.send({wid,sockfd,{WebSocketProtocol::CLOSE,nullptr}});
//mAppContext.onDisconnect(wid,sockfd);
} catch (const std::exception& e)
{
itc::getLog()->error(__FILE__,__LINE__,"Can't enqueue request to application %s, exception: %s",mName.c_str(),e.what());
itc::getLog()->error(__FILE__,__LINE__,"Can't enqueue close event to application %s, exception: %s",mName.c_str(),e.what());
}
}
void enqueueDisconnect(const size_t wid, const int32_t sockfd, const MSGBufferTypeSPtr& message)
{
try {
mEvents.send({wid,sockfd,{WebSocketProtocol::OpCode::CLOSE,message}});
} catch (const std::exception& e)
{
itc::getLog()->error(__FILE__,__LINE__,"Can't enqueue close event to application %s, exception: %s",mName.c_str(),e.what());
}
}

void enqueuePong(const size_t wid, const int32_t sockfd, const MSGBufferTypeSPtr& pong)
{
mEvents.send({wid,sockfd,{WebSocketProtocol::OpCode::PONG,pong}});
}
void execute()
{
while(mMayRun)
{
try
{
auto te=mEvents.recv();
if(te.event.type == WebSocketProtocol::CLOSE)
mAppContext.onDisconnect(te.wid,te.sockfd);
else
switch(te.event.type)
{
const bool exec_result=mAppContext.onMessage(te.wid,te.sockfd,te.event);
if(!exec_result)
case WebSocketProtocol::OpCode::CLOSE:
mAppContext.onDisconnect(te.wid,te.sockfd);
if(te.event.message)
{
getWorker(te.wid)->submitResponse(te.sockfd,te.event.message);
}
break;
case WebSocketProtocol::OpCode::PONG:
if(te.event.message)
{
getWorker(te.wid)->submitResponse(te.sockfd,te.event.message);
}
break;
default:
{
getWorker(te.wid)->submitError(te.sockfd);
const bool exec_result=mAppContext.onMessage(te.wid,te.sockfd,te.event);
if(!exec_result)
{
MSGBufferTypeSPtr outBuffer=std::make_shared<MSGBufferType>();

WebSocketProtocol::ServerCloseMessage(*outBuffer,WebSocketProtocol::DefiniteCloseCode::SHUTDOWN);

getWorker(te.wid)->submitResponse(te.sockfd,outBuffer);

mMayRun.store(false);
}
}
}
}catch(std::exception& e)
Expand Down
5 changes: 5 additions & 0 deletions include/Balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ namespace LAppS
mInbound.send(data);
}

void onUpdate(const std::vector<::itc::TCPListener::value_type>& data)
{
mInbound.send(data);
}

Balancer(const float connw=0.7):mConnectionWeight(connw),mMayRun(true),mWorkersCache(0)
{
itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(mWorkersCache);
Expand Down
6 changes: 3 additions & 3 deletions include/Broadcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ namespace LAppS
{

size_t wid=handler>>32;
int32_t fd=static_cast<int32_t>(handler&0x00000000FFFFFFFF);
const int32_t fd=static_cast<int32_t>(handler&0x00000000FFFFFFFF);

if(wid<mWorkersCache.size())
{
mWorkersCache[wid]->submitResponse({wid,fd,{WebSocketProtocol::BINARY,msg}});
mWorkersCache[wid]->submitResponse(fd,msg);
}
else // second attempt
{
mWorkersCache.clear();
itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(mWorkersCache);
if(wid<mWorkersCache.size())
{
mWorkersCache[wid]->submitResponse({wid,fd,{WebSocketProtocol::BINARY,msg}});
mWorkersCache[wid]->submitResponse(fd,msg);
} // not broadcasted, worker is down or never existed.
}
}
Expand Down
26 changes: 13 additions & 13 deletions include/EventBus.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,22 @@ namespace LAppS
explicit EventBus():mMutex(),mSem(0),mEvents(){}
EventBus(const EventBus&)=delete;
EventBus(EventBus&)=delete;
~EventBus()=default;

const size_t size() const
{
return mEvents.size();
}

void bachLock()
void push(const std::vector<Event>& e)
{
mMutex.lock();
}
void batchUnLock()
{
mMutex.unlock();
}
void unsecureBatchPush(const Event& e)
{
mEvents.push(e);
if(!mSem.post())
throw std::system_error(errno,std::system_category(),"Can't increment semaphore, system is going down or semaphore error");
SyncLock sync(mMutex);
for(size_t i=0;i<e.size();++i)
{
mEvents.push(e[i]);
if(!mSem.post())
throw std::system_error(errno,std::system_category(),"Can't increment semaphore, system is going down or semaphore error");
}
}

void push(const Event& e)
Expand All @@ -89,14 +86,17 @@ namespace LAppS
if(!mSem.post())
throw std::system_error(errno,std::system_category(),"Can't increment semaphore, system is going down or semaphore error");
}

void onEvent(std::function<void(const Event&)> processor)
{
if(!mSem.wait())
throw std::system_error(errno,std::system_category(),"Can't wait on semaphore");

Event event;

try{
SyncLock sync(mMutex);
event=mEvents.front();
event=std::move(mEvents.front());
mEvents.pop();
}catch(const std::exception& e)
{
Expand Down
Loading

0 comments on commit bb0a0a2

Please # to comment.