Skip to content

Commit

Permalink
Balancer for inbound connections is added
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Kraynyukhov committed May 2, 2018
1 parent 0d1f7e4 commit 6d2f8ef
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 3 deletions.
117 changes: 117 additions & 0 deletions include/Balancer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright 2017-2018, Pavel Kraynyukhov <pavel.kraynyukhov@gmail.com>
*
* This file is a part of LAppS (Lua Application Server).
*
* LAppS is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* LAppS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with LAppS. If not, see <http://www.gnu.org/licenses/>.
*
* $Id: Balancer.h May 2, 2018 1:50 PM $
*
**/


#ifndef __BALANCER_H__
# define __BALANCER_H__

#include <TCPListener.h>
#include <WSWorkersPool.h>
#include <tsbqueue.h>
#include <sys/CancelableThread.h>
#include <atomic>
#include <memory>
#include <abstract/Runnable.h>

template <bool TLSEnable=true, bool StatsEnable=true>
class Balancer
: public ::itc::TCPListener::ViewType,
public ::itc::abstract::IRunnable
{
private:
float mConnectionWeight;
std::atomic<bool> mMayRun;
itc::tsbqueue<::itc::TCPListener::value_type> mInbound;
std::vector<std::shared_ptr<::abstract::Worker>> mWorkersCache;

public:
void onUpdate(const ::itc::TCPListener::value_type& data)
{
mInbound.send(data);
}

Balancer(const float connw=0.7):mConnectionWeight(connw),mMayRun(true)
{
itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(mWorkersCache);
}
void shutdown()
{
mMayRun.store(false);
}
void onCancel()
{
shutdown();
}
~Balancer()
{
this->shutdown();
}
void execute()
{
while(mMayRun)
{
try {
auto inbound_connection=mInbound.recv();

if(mWorkersCache.size()==0)
itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(mWorkersCache);

if(mWorkersCache.size()>0)
{
size_t choosen=0;
auto stats=mWorkersCache[0]->getStats();
for(size_t i=1;i<mWorkersCache.size();++i)
{
auto stats2=mWorkersCache[i]->getStats();
if(stats.mConnections>stats2.mConnections) // candidate i
{
if(stats.mEventQSize > stats2.mEventQSize)
{
stats=stats2;
choosen=i;
}
}
else
{
if(stats.mEventQSize > stats2.mConnections*mConnectionWeight)
{
stats=stats2;
choosen=i;
}
}
}
mWorkersCache[choosen]->update(inbound_connection);
}else
{
mMayRun.store(false);
}

}catch (const std::exception& e)
{
mMayRun.store(false);
}
}
}
};

#endif /* __BALANCER_H__ */

9 changes: 6 additions & 3 deletions include/wsServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#include <ApplicationRegistry.h>
#include <Application.h>
#include <IOWorker.h>

#include <Balancer.h>

// libressl
#include <tls.h>
Expand All @@ -80,6 +80,7 @@ class wsServer
WorkerStats mAllStats;

std::vector<TCPListenerThreadSPtr> mListenersPool;
itc::sys::CancelableThread<Balancer<TLSEnable,StatsEnable>> mBalancer;



Expand Down Expand Up @@ -179,7 +180,8 @@ class wsServer
std::make_shared<::itc::TCPListener>(
LAppSConfig::getInstance()->getWSConfig()["ip"],
LAppSConfig::getInstance()->getWSConfig()["port"],
WorkersPool::getInstance()->next()
//WorkersPool::getInstance()->next()
mBalancer.getRunnable()
)
));
}
Expand Down Expand Up @@ -210,7 +212,8 @@ class wsServer
}

wsServer()
: enableTLS(), enableStatsUpdate(), mWorkers(1), mAllStats{0,0,0,0,0,0,0}
: enableTLS(), enableStatsUpdate(), mWorkers(1), mAllStats{0,0,0,0,0,0,0,0},
mBalancer(std::make_shared<Balancer<TLSEnable, StatsEnable>>())
{
itc::getLog()->info(__FILE__,__LINE__,"Starting WS Server");

Expand Down

0 comments on commit 6d2f8ef

Please # to comment.