Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix cordon/uncordon logic #48

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ $ python main.py [options]
- --spare-agents: Number of agent per pool that should always stay up (default is 1)
- --acs-deployment: The name of the deployment used to deploy the kubernetes cluster initially
- --idle-threshold: Maximum duration (in seconds) an agent can stay idle before being deleted
- --util-threshold: Utilization of a node in percent under which it is considered under utilized and should be cordoned
- --over-provision: Number of extra agents to create when scaling up, default to 0.

## Windows Machine Pools
Expand Down
17 changes: 11 additions & 6 deletions autoscaler/agent_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from azure.mgmt.compute import ComputeManagementClient
import logging
import autoscaler.utils as utils
import time

from autoscaler.capacity import get_capacity_for_instance_type

Expand Down Expand Up @@ -32,16 +33,20 @@ def reclaim_unschedulable_nodes(self, new_desired_capacity):
Try to get the number of schedulable nodes up if we don't have enough before scaling
"""
desired_capacity = min(self.max_size, new_desired_capacity)
num_unschedulable = len(self.unschedulable_nodes)
num_schedulable = self.actual_capacity - num_unschedulable

if num_schedulable < desired_capacity:
# because of how acs-engine works (offset etc.), the desired capacity is the number of node existing in the pool
# (even unschedulable) + the number of additional node we need.

reclaimed = 0
if (self.actual_capacity + reclaimed) < desired_capacity:
for node in self.unschedulable_nodes:
if node.uncordon():
num_schedulable += 1
# give some time to k8s to assign any pending pod to the newly uncordonned node
time.sleep(10)
reclaimed += 1
# Uncordon only what we need
if num_schedulable == desired_capacity:
if (self.actual_capacity + reclaimed) == desired_capacity:
break
return (self.actual_capacity + reclaimed)

def has_node_with_index(self, index):
for node in self.nodes:
Expand Down
10 changes: 6 additions & 4 deletions autoscaler/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
logger = logging.getLogger(__name__)

class Cluster(object):
def __init__(self, kubeconfig, idle_threshold, spare_agents,
def __init__(self, kubeconfig, idle_threshold, util_threshold, spare_agents,
service_principal_app_id, service_principal_secret, service_principal_tenant_id,
kubeconfig_private_key, client_private_key, ca_private_key,
instance_init_time, resource_group, notifier, ignore_pools,
Expand All @@ -54,6 +54,7 @@ def __init__(self, kubeconfig, idle_threshold, spare_agents,
self.instance_init_time = instance_init_time
self.spare_agents = spare_agents
self.idle_threshold = idle_threshold
self.util_threshold = util_threshold
self.over_provision = over_provision
self.scale_up = scale_up
self.maintainance = maintainance
Expand Down Expand Up @@ -113,8 +114,8 @@ def loop(self, debug):
return self.loop_logic()
except CloudError as e:
logger.error(e)
except:
logger.warn("Unexpected error: {}".format(sys.exc_info()[0]))
except BaseException as e:
logger.warn("Unexpected error: {}".format(e))
return False

def create_kube_node(self, node):
Expand Down Expand Up @@ -142,6 +143,7 @@ def loop_logic(self):
over_provision=self.over_provision,
spare_count=self.spare_agents,
idle_threshold=self.idle_threshold,
util_threshold=self.util_threshold,
notifier=self.notifier)

pods = list(map(KubePod, pykube.Pod.objects(self.api)))
Expand Down Expand Up @@ -178,7 +180,7 @@ def get_pending_pods(self, pods, nodes):
for pod in pods:
fitting = None
for node in nodes:
if node.can_fit(pod.resources):
if node.can_fit(pod.resources) and not node.unschedulable:
fitting = node
break
if fitting is None:
Expand Down
15 changes: 8 additions & 7 deletions autoscaler/engine_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@

logger = logging.getLogger(__name__)


class EngineScaler(Scaler):

def __init__(
self, resource_group, nodes,
over_provision, spare_count, idle_threshold, dry_run,
over_provision, spare_count, idle_threshold, util_threshold, dry_run,
deployments, arm_template, arm_parameters, ignore_pools, notifier):

Scaler.__init__(
self, resource_group, nodes, over_provision,
spare_count, idle_threshold, dry_run, deployments, notifier)
spare_count, idle_threshold, util_threshold, dry_run, deployments, notifier)

self.arm_parameters = arm_parameters
self.arm_template = arm_template
Expand Down Expand Up @@ -139,7 +138,11 @@ def scale_pools(self, new_pool_sizes):

if not self.dry_run:
if new_size > pool.actual_capacity:
pool.reclaim_unschedulable_nodes(new_size)
available_size = pool.reclaim_unschedulable_nodes(new_size)
if new_size == available_size:
#we uncordonned enough nodes, no need to scale
return

else:
logger.info("[Dry run] Would have scaled pool '{}' to {} agent(s) (currently at {})".format(
pool.name, new_size, pool.actual_capacity))
Expand Down Expand Up @@ -222,7 +225,7 @@ def maintain(self, pods_to_schedule, running_or_pending_assigned_pods):
else:
logger.info(
'[Dry run] Would have drained and cordoned %s', node)
elif state == ClusterNodeState.IDLE_SCHEDULABLE:
elif state in (ClusterNodeState.IDLE_SCHEDULABLE, ClusterNodeState.UNDER_UTILIZED_UNDRAINABLE):
if not self.dry_run:
node.cordon()
else:
Expand All @@ -238,8 +241,6 @@ def maintain(self, pods_to_schedule, running_or_pending_assigned_pods):
delete_queue.append({'node': node, 'pool': pool})
else:
logger.info('[Dry run] Would have scaled in %s', node)
elif state == ClusterNodeState.UNDER_UTILIZED_UNDRAINABLE:
pass
else:
raise Exception("Unhandled state: {}".format(state))

Expand Down
5 changes: 4 additions & 1 deletion autoscaler/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ def is_in_drain_grace_period(self):
determines whether the pod is in a grace period for draining
this prevents us from draining pods that are too new
"""
return not self.start_time or (datetime.datetime.now(self.start_time.tzinfo) - self.start_time) < self._DRAIN_GRACE_PERIOD
#return not self.start_time or (datetime.datetime.now(self.start_time.tzinfo) - self.start_time) < self._DRAIN_GRACE_PERIOD
# disable pod grace_period, as this seems to cause more problem than benefits for most users
return True

def is_drainable(self):
return self.is_replicated() and not self.is_critical() and not self.is_in_drain_grace_period()
Expand Down Expand Up @@ -138,6 +140,7 @@ def uncordon(self):
try:
self.original.reload()
self.original.obj['spec']['unschedulable'] = False
self.original.obj['metadata']['labels'][_CORDON_LABEL] = 'false'
self.original.update()
logger.info("uncordoned %s", self)
return True
Expand Down
14 changes: 8 additions & 6 deletions autoscaler/scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ class ClusterNodeState(object):


class Scaler(object):
# UTIL_THRESHOLD = 0.3

# the utilization threshold under which to consider a node
# under utilized and drainable
UTIL_THRESHOLD = 0.3

def __init__(self, resource_group, nodes, over_provision, spare_count, idle_threshold, dry_run, deployments, notifier):
def __init__(self, resource_group, nodes, over_provision, spare_count, idle_threshold, util_threshold, dry_run, deployments, notifier):
self.resource_group_name = resource_group
self.over_provision = over_provision
self.spare_count = spare_count
self.idle_threshold = idle_threshold
self.dry_run = dry_run
self.deployments = deployments
self.notifier = notifier
# the utilization threshold under which to consider a node
# under utilized
self.util_threshold = util_threshold / 100

# ACS support up to 100 agents today
# TODO: how to handle case where cluster has 0 node? How to get unit
Expand Down Expand Up @@ -83,7 +83,7 @@ def get_node_state(self, node, node_pods, pods_to_schedule):
p.is_drainable() or 'kube-proxy' in p.name)]

utilization = sum((p.resources for p in busy_list), KubeResource())
under_utilized = (self.UTIL_THRESHOLD *
under_utilized = (self.util_threshold *
node.capacity - utilization).possible
drainable = not undrainable_list

Expand Down Expand Up @@ -179,7 +179,9 @@ def fulfill_pending(self, pods):
if num_unaccounted:
logger.warn('Failed to scale sufficiently.')
self.notifier.notify_failed_to_scale(selectors_hash, pods)

self.scale_pools(new_pool_sizes)

if self.notifier:
self.notifier.notify_scale(new_pool_sizes, pods, current_pool_sizes)

4 changes: 3 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#How many agents should we keep even if the cluster is not utilized? The autoscaler will currenty break if --spare-agents == 0
@click.option("--spare-agents", default=1, help='number of agent per pool that should always stay up')
@click.option("--idle-threshold", default=1800, help='time in seconds an agent can stay idle')
@click.option("--util-threshold", default=30, help='Utilization of a node in percent under which it is considered under utilized and should be cordoned')
@click.option("--service-principal-app-id", default=None, envvar='AZURE_SP_APP_ID')
@click.option("--service-principal-secret", default=None, envvar='AZURE_SP_SECRET')
@click.option("--service-principal-tenant-id", default=None, envvar='AZURE_SP_TENANT_ID')
Expand All @@ -50,7 +51,7 @@
def main(resource_group, acs_deployment, sleep, kubeconfig,
service_principal_app_id, service_principal_secret,
kubeconfig_private_key, client_private_key, ca_private_key,
service_principal_tenant_id, spare_agents, idle_threshold,
service_principal_tenant_id, spare_agents, idle_threshold, util_threshold,
no_scale, over_provision, no_maintenance, ignore_pools, slack_hook,
dry_run, verbose, debug):
logger_handler = logging.StreamHandler(sys.stderr)
Expand Down Expand Up @@ -81,6 +82,7 @@ def main(resource_group, acs_deployment, sleep, kubeconfig,
instance_init_time=instance_init_time,
spare_agents=spare_agents,
idle_threshold=idle_threshold,
util_threshold=util_threshold,
resource_group=resource_group,
acs_deployment=acs_deployment,
service_principal_app_id=service_principal_app_id,
Expand Down
1 change: 1 addition & 0 deletions test/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def setUp(self):
self.cluster = Cluster(
kubeconfig='~/.kube/config',
idle_threshold=60,
util_threshold=30,
spare_agents=1,
instance_init_time=60,
resource_group='my-rg',
Expand Down
1 change: 1 addition & 0 deletions test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ def create_scaler(nodes):
arm_template=template,
ignore_pools='',
idle_threshold=0,
util_threshold=30,
notifier='')