Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

F_40B: Resource Limits [feature to main] #881

Merged
merged 13 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion csm/conf/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async def execute(self, command):

self.cluster_id = Conf.get(const.CONSUMER_INDEX,
self.conf_store_keys[const.KEY_CLUSTER_ID])
Configure.set_resource_limits()
Configure._set_csm_endpoint()
Configure._set_s3_info()
Configure.set_hax_endpoint()
Expand Down Expand Up @@ -119,7 +120,8 @@ def _prepare_and_validate_confstore_keys(self):
const.KEY_CLUSTER_ID:f"{const.NODE}>{self.machine_id}>{const.CLUSTER_ID}",
const.RGW_S3_AUTH_USER: f"{const.RGW_S3_AUTH_USER_KEY}",
const.RGW_S3_AUTH_ADMIN: f"{const.RGW_S3_AUTH_ADMIN_KEY}",
const.RGW_S3_AUTH_SECRET: f"{const.RGW_S3_AUTH_SECRET_KEY}"
const.RGW_S3_AUTH_SECRET: f"{const.RGW_S3_AUTH_SECRET_KEY}",
const.KEY_SERVICE_LIMITS: const.CSM_USAGE_LIMIT_SERVICES
})

Setup._validate_conf_store_keys(const.CONSUMER_INDEX, keylist = list(self.conf_store_keys.values()))
Expand Down Expand Up @@ -324,3 +326,62 @@ def _create_topics():
mb_admin = MessageBusAdmin(admin_id = Conf.get(const.CSM_GLOBAL_INDEX,const.MSG_BUS_ADMIN_ID))
Configure._create_perf_stat_topic(mb_admin)
Configure._create_cluster_stop_topic(mb_admin)

@staticmethod
def _calculate_request_quota(mem_min: int, mem_max: int, cpu_min: int, cpu_max: int) -> int:
"""
Calculate the maximum number of requests CSM handles at particular time.

:param mem_min: minimum memory limit.
:param mem_max: maximum memory limit.
:param cpu_min: minumem CPU limit.
:param cpu_max: maximum CPU limit.
:returns: number of requests.
"""
Log.info(f"CPU boundaries are currently not included in calculation: {cpu_min}:{cpu_max}")
# Minimum memroy limit is considered the bare minimem to run CSM only.
# The rest part up to maximum limit is for handling incoming requests.
# CSM also reserves some amount (const.CSM_USAGE_RESERVED_BUFFER_PERCENT) for future needs.
# MaxConcurrentRequest = MaxAvailableMemForRequest/RequestSize

reserved_mem = mem_max * const.CSM_USAGE_RESERVED_BUFFER_PERCENT // 100
quota = (mem_max - reserved_mem) // const.MAX_MEM_PER_REQUEST_MB
return quota

@staticmethod
def _mem_limit_to_int(mem: str) -> int:
"""
Convert memory limit from Conf to integer, e.g. '128Mi' -> 128.

:param mem: value from Conf as string.
:returns: integer value.
"""
numbers_only = mem[:mem.find('M')]
return int(numbers_only)

@staticmethod
def _cpu_limit_to_int(cpu: str) -> int:
"""
Convert CPU limit from Conf string to integer, e.g. '250m' -> 250.

:param cpu: value from Conf as string.
:returns: integer value.
"""
numbers_only = cpu[:cpu.find('m')]
return int(numbers_only)

@staticmethod
def set_resource_limits() -> None:
"""Set resource limits for CSM."""
limits = Conf.get(const.CONSUMER_INDEX, const.CSM_USAGE_LIMIT_SERVICES)
Conf.set(const.CSM_GLOBAL_INDEX, const.CSM_USAGE_LIMIT_SERVICES, limits)
limits = next(filter(lambda x: x[const.NAME] == "agent", limits), None)
if limits:
mem_min = Configure._mem_limit_to_int(limits["memory"]["min"])
mem_max = Configure._mem_limit_to_int(limits["memory"]["max"])
cpu_min = Configure._cpu_limit_to_int(limits["cpu"]["min"])
cpu_max = Configure._cpu_limit_to_int(limits["cpu"]["max"])

request_rate = Configure._calculate_request_quota(mem_min, mem_max, cpu_min, cpu_max)
Conf.set(const.CSM_GLOBAL_INDEX, const.USAGE_REQUEST_QUOTA, request_rate)

1 change: 1 addition & 0 deletions csm/conf/etc/csm/csm.conf
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ MESSAGEBUS:
offset: "earliest"
CSM_USERS:
max_users_allowed: 100
active_users_quota: -1

#Cluster admin credentials
CLUSTER_ADMIN:
Expand Down
3 changes: 2 additions & 1 deletion csm/conf/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ async def _create_cluster_admin(self, force_action=False):
conf['databases']["consul_db"]["config"][const.PORT])
db = DataBaseProvider(conf)
usr_mngr = UserManager(db)
usr_service = CsmUserService(usr_mngr)
max_users_allowed = int(Conf.get(const.CSM_GLOBAL_INDEX, const.CSM_MAX_USERS_ALLOWED))
usr_service = CsmUserService(usr_mngr, max_users_allowed)
if (not force_action) and \
(await usr_service.validate_cluster_admin_create(cluster_admin_user)):
Log.console("WARNING: Cortx cluster admin already created.\n"
Expand Down
37 changes: 34 additions & 3 deletions csm/core/agent/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ class ErrorResponseSchema(ValidateSchema):

class CsmRestApi(CsmApi, ABC):
"""REST Interface to communicate with CSM."""

__is_shutting_down = False
# __unsupported_features = None
__is_shutting_down = False
__nreq = 0
__nblocked = 0
__request_quota = 0

@staticmethod
def init():
Expand All @@ -113,8 +115,12 @@ def init():
CsmRestApi._bgtasks = []
CsmRestApi._wsclients = WeakSet()

CsmRestApi.__request_quota = int(Conf.get(const.CSM_GLOBAL_INDEX, const.USAGE_REQUEST_QUOTA))
Log.info(f"CSM request quota is set to {CsmRestApi.__request_quota}")

CsmRestApi._app = web.Application(
middlewares=[CsmRestApi.set_secure_headers,
middlewares=[CsmRestApi.throttler_middleware,
CsmRestApi.set_secure_headers,
CsmRestApi.rest_middleware,
CsmRestApi.session_middleware,
CsmRestApi.permission_middleware]
Expand Down Expand Up @@ -304,6 +310,31 @@ def _retrieve_config(request):
conf_key = CsmAuth.HYBRID_APIS[key]
return Conf.get(const.CSM_GLOBAL_INDEX, conf_key)

@staticmethod
@web.middleware
async def throttler_middleware(request, handler):
if CsmRestApi.__nreq >= CsmRestApi.__request_quota:
# This block get executed when number of request reaches the request quota
CsmRestApi.__nblocked += 1
msg = (f"The request is blocked because the number of requests reached threshold\n"
f"Number of requests blocked since the start is {CsmRestApi.__nblocked}")
Log.warn(msg)
return web.Response(status=429, text="Too many requests")
else:
# This block get executed when number of request are less than request quota
# Increment the counter for number of request executing
CsmRestApi.__nreq += 1

try:
# Here we call handler
# Handler can return json response or can raise an exception
res = await handler(request)
finally:
# Decrements the counter of number of request executing
# This block always gets executed
CsmRestApi.__nreq -= 1
return res

@staticmethod
@web.middleware
async def session_middleware(request, handler):
Expand Down
8 changes: 5 additions & 3 deletions csm/core/agent/csm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def init():
auth_service = AuthService()
user_manager = UserManager(db)
role_manager = RoleManager(roles)
session_manager = SessionManager(db)
active_users_quota = int(Conf.get(const.CSM_GLOBAL_INDEX, const.CSM_ACTIVE_USERS_QUOTA_KEY))
session_manager = QuotaSessionManager(db, active_users_quota)
CsmRestApi._app[const.SESSION_MGR_SERVICE ] = session_manager
CsmRestApi._app.login_service = LoginService(auth_service,
user_manager,
Expand All @@ -108,7 +109,8 @@ def init():
# S3 service
CsmAgent._configure_s3_services()

user_service = CsmUserService(user_manager)
max_users_allowed = int(Conf.get(const.CSM_GLOBAL_INDEX, const.CSM_MAX_USERS_ALLOWED))
user_service = CsmUserService(user_manager, max_users_allowed)
CsmRestApi._app[const.CSM_USER_SERVICE] = user_service
CsmRestApi._app[const.STORAGE_CAPACITY_SERVICE] = StorageCapacityService()
# CsmRestApi._app[const.UNSUPPORTED_FEATURES_SERVICE] = UnsupportedFeaturesService()
Expand Down Expand Up @@ -248,7 +250,7 @@ def run():
# from csm.core.services.stats import StatsAppService
from csm.core.services.users import CsmUserService, UserManager
from csm.core.services.roles import RoleManagementService, RoleManager
from csm.core.services.sessions import SessionManager, LoginService, AuthService
from csm.core.services.sessions import QuotaSessionManager, LoginService, AuthService
from csm.core.agent.api import CsmRestApi
# from csm.common.timeseries import TimelionProvider
from csm.common.ha_framework import CortxHAFramework
Expand Down
11 changes: 11 additions & 0 deletions csm/core/blogic/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
CSM_USER_INTERFACES = ['cli', 'web', 'api']
CSM_CONF_URL = f"yaml://{CSM_CONF_PATH}/{CSM_CONF_FILE_NAME}"
CSM_MAX_USERS_ALLOWED = "CSM_USERS>max_users_allowed"
CSM_ACTIVE_USERS_QUOTA_KEY = "CSM_USERS>active_users_quota"

# Non root user
NON_ROOT_USER = 'csm'
Expand Down Expand Up @@ -769,6 +770,7 @@
KEY_HOSTNAME = "node_hostname_key"
KEY_DATA_NW_PUBLIC_FQDN = "data_nw_public_fqdn"
KEY_DATA_NW_PRIVATE_FQDN = "data_nw_private_fqdn"
KEY_SERVICE_LIMITS = "limits_services"

#CSM TEST Consts
DEFAULT_BROWSER = 'chrome'
Expand Down Expand Up @@ -963,6 +965,15 @@
LOCAL = 'local'
PERSISTENT = 'persistent'

# CSM usage quotas
CSM_ACTIVE_USERS_QUOTA = 50
CSM_ACTIVE_REQUESTS_QUOTA = 100
CSM_USAGE_RESERVED_BUFFER_PERCENT = 5
MAX_MEM_PER_REQUEST_MB = 2

USAGE_REQUEST_QUOTA = 'usage>request_quota'
CSM_USAGE_LIMIT_SERVICES = 'cortx>csm>limits>services'

# Swagger UI
SWAGGER_UI_DIST = '{}/templates/swagger-ui'.format(CSM_PATH)
SWAGGER_UI_INDEX_HTML = '{}/index.html'.format(SWAGGER_UI_DIST)
Expand Down
8 changes: 0 additions & 8 deletions csm/core/controllers/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,6 @@ def __init__(self, request):
self._service = self.request.app["csm_user_service"]
self._service_dispatch = {}

async def check_max_user_limit(self):
max_users_allowed = int(Conf.get(const.CSM_GLOBAL_INDEX, const.CSM_MAX_USERS_ALLOWED))
existing_users_count = await self._service.get_user_count()
if existing_users_count >= max_users_allowed:
raise CsmPermissionDenied("User creation failed. Maximum user limit reached.")

@CsmAuth.permissions({Resource.USERS: {Action.LIST}})
async def get(self):
"""GET REST implementation for fetching csm users."""
Expand Down Expand Up @@ -129,8 +123,6 @@ async def post(self):
except ValidationError as val_err:
raise InvalidRequest(f"Invalid request body: {val_err}")

await self.check_max_user_limit()

# TODO: Story has been taken for unsupported services
# The following commented lines will be removed by above story
# s3_account = await self.request.app["s3_account_service"].get_account(
Expand Down
Loading