Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

xcute: Save sent tasks before they are processed #2101

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions oio/xcute/common/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ class XcuteBackend(RedisConnection):
return redis.error_reply('no_job');
end;

local old_last_sent = redis.call('HGET', info_key, 'tasks.last_sent');

local nb_tasks_sent = 0;
if tasks_sent_length > 0 then
nb_tasks_sent = redis.call(
Expand All @@ -321,11 +323,10 @@ class XcuteBackend(RedisConnection):
job_id);

local total_tasks_processed = redis.call(
'HGET', 'xcute:job:info:' .. job_id, 'tasks.processed');
'HGET', info_key, 'tasks.processed');
if tonumber(total_tasks_processed) >= tonumber(
total_tasks_sent) then
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.status', 'FINISHED');
redis.call('HSET', info_key, 'job.status', 'FINISHED');
redis.call('HDEL', 'xcute:locks', lock);
end;
else
Expand All @@ -344,7 +345,52 @@ class XcuteBackend(RedisConnection):
end;
end;
""" + _lua_update_mtime + """
return {nb_tasks_sent, redis.call('HGET', info_key, 'job.status')};
return {nb_tasks_sent, redis.call('HGET', info_key, 'job.status'),
old_last_sent};
"""

lua_abort_tasks_sent = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local old_last_sent = KEYS[3];
local tasks_sent = ARGV;
local tasks_sent_length = #tasks_sent;
local info_key = 'xcute:job:info:' .. job_id;

local status = redis.call('HGET', info_key, 'job.status');
if status == nil or status == false then
return redis.error_reply('no_job');
end;

if tonumber(tasks_sent_length) == 0 then
return;
end;

redis.call('HSET', info_key, 'tasks.all_sent', 'False');
redis.call('HINCRBY', info_key, 'tasks.sent', -tasks_sent_length);
redis.call(
'SREM', 'xcute:tasks:running:' .. job_id, unpack(tasks_sent));
if old_last_sent == 'None' then
redis.call('HDEL', info_key, 'tasks.last_sent');
else
redis.call('HSET', info_key, 'tasks.last_sent', old_last_sent);
end;

local request_pause = redis.call(
'HGET', info_key, 'job.request_pause');
if request_pause == 'True' then
-- if waiting pause, pause the job
redis.call('HMSET', info_key,
'job.status', 'PAUSED',
'job.request_pause', 'False');
local orchestrator_id = redis.call(
'HGET', info_key, 'orchestrator.id');
redis.call(
'SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,
job_id);
end;
""" + _lua_update_mtime + """
return {redis.call('HGET', info_key, 'job.status')};
"""

lua_update_tasks_processed = """
Expand Down Expand Up @@ -503,6 +549,8 @@ def __init__(self, conf, logger=None):
self.lua_resume)
self.script_update_tasks_sent = self.register_script(
self.lua_update_tasks_sent)
self.script_abort_tasks_sent = self.register_script(
self.lua_abort_tasks_sent)
self.script_update_tasks_processed = self.register_script(
self.lua_update_tasks_processed)
self.script_incr_total = self.register_script(
Expand Down Expand Up @@ -623,13 +671,19 @@ def resume(self, job_id):

@handle_redis_exceptions
def update_tasks_sent(self, job_id, task_ids, all_tasks_sent=False):
nb_tasks_sent, status = self.script_update_tasks_sent(
nb_tasks_sent, status, old_last_sent = self.script_update_tasks_sent(
keys=[self._get_timestamp(), job_id, str(all_tasks_sent)],
args=task_ids, client=self.conn)
if nb_tasks_sent != len(task_ids):
self.logger.warn('%s tasks were sent several times',
len(task_ids) - nb_tasks_sent)
return status
return status, old_last_sent

@handle_redis_exceptions
def abort_tasks_sent(self, job_id, task_ids, old_last_sent):
return self.script_abort_tasks_sent(
keys=[self._get_timestamp(), job_id, str(old_last_sent)],
args=task_ids, client=self.conn)

@handle_redis_exceptions
def update_tasks_processed(self, job_id, task_ids,
Expand Down
105 changes: 77 additions & 28 deletions oio/xcute/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,45 +262,89 @@ def dispatch_tasks(self, job_id, job_type, job_info, job):
tasks_run_time = ratelimit(
tasks_run_time, batch_per_second)

sent = self.dispatch_tasks_batch(
beanstalkd_workers,
job_id, job_type, job_config, tasks)
if sent:
job_status, exc = self.handle_backend_errors(
# Make sure that the sent tasks will be saved
# before being processed
exc = None
sent = False
while not sent:
(job_status, old_last_sent), exc = self.handle_backend_errors(
self.backend.update_tasks_sent, job_id, tasks.keys())
tasks.clear()
if exc is not None:
self.logger.warn(
'[job_id=%s] Job has not been updated '
'with the sent tasks: %s', job_id, exc)
'[job_id=%s] Job could not update '
'the sent tasks: %s', job_id, exc)
break
sent = self.dispatch_tasks_batch(
beanstalkd_workers, job_id, job_type, job_config, tasks)
if not sent:
self.logger.warn(
'[job_id=%s] Job aborting the last sent tasks', job_id)
job_status, exc = self.handle_backend_errors(
self.backend.abort_tasks_sent, job_id, tasks.keys(),
old_last_sent)
if exc is not None:
self.logger.warn(
'[job_id=%s] Job could not abort '
'the last sent tasks: %s', job_id, exc)
break
if job_status == 'PAUSED':
self.logger.info('Job %s is paused', job_id)
return

if not self.running:
if not self.running:
break
sleep(1)

if exc is not None and not self.running:
break
tasks.clear()
else:
sent = True
if tasks:
sent = self.dispatch_tasks_batch(
beanstalkd_workers,
job_id, job_type, job_config, tasks)
if sent:
job_status, exc = self.handle_backend_errors(
# Make sure that the sent tasks will be saved
# before being processed
sent = False
while not sent:
(job_status, old_last_sent), exc = self.handle_backend_errors(
self.backend.update_tasks_sent, job_id, tasks.keys(),
all_tasks_sent=True)
if exc is None:
if exc is not None:
self.logger.warn(
'[job_id=%s] Job could not update '
'the sent tasks: %s', job_id, exc)
break
if tasks:
sent = self.dispatch_tasks_batch(
beanstalkd_workers, job_id, job_type, job_config,
tasks)
else:
sent = True
if not sent:
self.logger.warn(
'[job_id=%s] Job aborting the last sent tasks', job_id)
job_status, exc = self.handle_backend_errors(
self.backend.abort_tasks_sent, job_id, tasks.keys(),
old_last_sent)
if exc is not None:
self.logger.warn(
'[job_id=%s] Job could not abort '
'the last sent tasks: %s', job_id, exc)
break
else:
if job_status == 'FINISHED':
self.logger.info('Job %s is finished', job_id)

self.logger.info(
'Finished dispatching job (job_id=%s)', job_id)
return
else:
self.logger.warn(
'[job_id=%s] Job has not been updated '
'with the last sent tasks: %s', job_id, exc)
if job_status == 'PAUSED':
self.logger.info('Job %s is paused', job_id)
return

if not self.running:
break
sleep(1)

self.logger.warn(
'[job_id=%s] Job was stopped before it was finished', job_id)

_, exc = self.handle_backend_errors(self.backend.free, job_id)
if exc is not None:
Expand All @@ -323,12 +367,16 @@ def dispatch_tasks_batch(self, beanstalkd_workers,
# max 2 minutes per task
ttr = len(tasks) * DEFAULT_TTR

while self.running:
for beanstalkd_worker in beanstalkd_workers:
if not self.running:
return False
if beanstalkd_worker is not None:
i = 0
for beanstalkd_worker in beanstalkd_workers:
if not self.running:
return False
i += 1
if beanstalkd_worker is None:
# Try for at least 30 seconds
if i > 30:
break
continue

try:
beanstalkd_worker.put(beanstalkd_payload, ttr=ttr)
Expand All @@ -344,6 +392,7 @@ def dispatch_tasks_batch(self, beanstalkd_workers,
# and wait for a few errors in a row
# to happen before marking it as broken.
beanstalkd_worker.is_broken = True
sleep(1)
return False

def make_beanstalkd_payload(self, job_id, job_type, job_config,
Expand Down Expand Up @@ -626,8 +675,8 @@ def get_beanstalkd_workers(self):
while True:
if not self.beanstalkd_workers:
self.logger.info('No beanstalkd worker available')
sleep(1)
yield None
sleep(1)
continue

if id(self.beanstalkd_workers) != beanstalkd_workers_id:
Expand All @@ -652,8 +701,8 @@ def get_beanstalkd_workers(self):
if not yielded:
self.logger.info(
'All beanstalkd workers available are broken')
sleep(1)
yield None
sleep(1)

def exit_gracefully(self, *args, **kwargs):
if self.running:
Expand Down