Skip to content

Commit

Permalink
xcute: Save sent tasks before they are processed
Browse files Browse the repository at this point in the history
In some cases, the task was processed
before updating the list of sent tasks
  • Loading branch information
AymericDu committed Dec 11, 2020
1 parent 9c50d0e commit 2198629
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 35 deletions.
66 changes: 60 additions & 6 deletions oio/xcute/common/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ class XcuteBackend(RedisConnection):
return redis.error_reply('no_job');
end;
local old_last_sent = redis.call('HGET', info_key, 'tasks.last_sent');
local nb_tasks_sent = 0;
if tasks_sent_length > 0 then
nb_tasks_sent = redis.call(
Expand All @@ -321,11 +323,10 @@ class XcuteBackend(RedisConnection):
job_id);
local total_tasks_processed = redis.call(
'HGET', 'xcute:job:info:' .. job_id, 'tasks.processed');
'HGET', info_key, 'tasks.processed');
if tonumber(total_tasks_processed) >= tonumber(
total_tasks_sent) then
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.status', 'FINISHED');
redis.call('HSET', info_key, 'job.status', 'FINISHED');
redis.call('HDEL', 'xcute:locks', lock);
end;
else
Expand All @@ -344,7 +345,52 @@ class XcuteBackend(RedisConnection):
end;
end;
""" + _lua_update_mtime + """
return {nb_tasks_sent, redis.call('HGET', info_key, 'job.status')};
return {nb_tasks_sent, redis.call('HGET', info_key, 'job.status'),
old_last_sent};
"""

lua_abort_tasks_sent = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local old_last_sent = KEYS[3];
local tasks_sent = ARGV;
local tasks_sent_length = #tasks_sent;
local info_key = 'xcute:job:info:' .. job_id;
local status = redis.call('HGET', info_key, 'job.status');
if status == nil or status == false then
return redis.error_reply('no_job');
end;
if tonumber(tasks_sent_length) == 0 then
return;
end;
redis.call('HSET', info_key, 'tasks.all_sent', 'False');
redis.call('HINCRBY', info_key, 'tasks.sent', -tasks_sent_length);
redis.call(
'SREM', 'xcute:tasks:running:' .. job_id, unpack(tasks_sent));
if old_last_sent == 'None' then
redis.call('HDEL', info_key, 'tasks.last_sent');
else
redis.call('HSET', info_key, 'tasks.last_sent', old_last_sent);
end;
local request_pause = redis.call(
'HGET', info_key, 'job.request_pause');
if request_pause == 'True' then
-- if waiting pause, pause the job
redis.call('HMSET', info_key,
'job.status', 'PAUSED',
'job.request_pause', 'False');
local orchestrator_id = redis.call(
'HGET', info_key, 'orchestrator.id');
redis.call(
'SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,
job_id);
end;
""" + _lua_update_mtime + """
return {redis.call('HGET', info_key, 'job.status')};
"""

lua_update_tasks_processed = """
Expand Down Expand Up @@ -503,6 +549,8 @@ def __init__(self, conf, logger=None):
self.lua_resume)
self.script_update_tasks_sent = self.register_script(
self.lua_update_tasks_sent)
self.script_abort_tasks_sent = self.register_script(
self.lua_abort_tasks_sent)
self.script_update_tasks_processed = self.register_script(
self.lua_update_tasks_processed)
self.script_incr_total = self.register_script(
Expand Down Expand Up @@ -623,13 +671,19 @@ def resume(self, job_id):

@handle_redis_exceptions
def update_tasks_sent(self, job_id, task_ids, all_tasks_sent=False):
nb_tasks_sent, status = self.script_update_tasks_sent(
nb_tasks_sent, status, old_last_sent = self.script_update_tasks_sent(
keys=[self._get_timestamp(), job_id, str(all_tasks_sent)],
args=task_ids, client=self.conn)
if nb_tasks_sent != len(task_ids):
self.logger.warn('%s tasks were sent several times',
len(task_ids) - nb_tasks_sent)
return status
return status, old_last_sent

@handle_redis_exceptions
def abort_tasks_sent(self, job_id, task_ids, old_last_sent):
return self.script_abort_tasks_sent(
keys=[self._get_timestamp(), job_id, str(old_last_sent)],
args=task_ids, client=self.conn)

@handle_redis_exceptions
def update_tasks_processed(self, job_id, task_ids,
Expand Down
107 changes: 78 additions & 29 deletions oio/xcute/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,50 +262,94 @@ def dispatch_tasks(self, job_id, job_type, job_info, job):
tasks_run_time = ratelimit(
tasks_run_time, batch_per_second)

sent = self.dispatch_tasks_batch(
beanstalkd_workers,
job_id, job_type, job_config, tasks)
if sent:
job_status, exc = self.handle_backend_errors(
# Make sure that the sent tasks will be saved
# before being processed
exc = None
sent = False
while not sent:
(job_status, old_last_sent), exc = self.handle_backend_errors(
self.backend.update_tasks_sent, job_id, tasks.keys())
tasks.clear()
if exc is not None:
self.logger.warn(
'[job_id=%s] Job has not been updated '
'with the sent tasks: %s', job_id, exc)
'[job_id=%s] Job could not updated '
'the sent tasks: %s', job_id, exc)
break
sent = self.dispatch_tasks_batch(
beanstalkd_workers, job_id, job_type, job_config, tasks)
if not sent:
self.logger.warn(
'[job_id=%s] Job aborting the last sent tasks', job_id)
job_status, exc = self.handle_backend_errors(
self.backend.abort_tasks_sent, job_id, tasks.keys(),
old_last_sent)
if exc is not None:
self.logger.warn(
'[job_id=%s] Job could not aborted '
'the last sent tasks: %s', job_id, exc)
break
if job_status == 'PAUSED':
self.logger.info('Job %s is paused', job_id)
return

if not self.running:
if not self.running:
break
sleep(1)

if exc is not None and not self.running:
break
tasks.clear()
else:
sent = True
if tasks:
sent = self.dispatch_tasks_batch(
beanstalkd_workers,
job_id, job_type, job_config, tasks)
if sent:
job_status, exc = self.handle_backend_errors(
# Make sure that the sent tasks will be saved
# before being processed
sent = False
while not sent:
(job_status, old_last_sent), exc = self.handle_backend_errors(
self.backend.update_tasks_sent, job_id, tasks.keys(),
all_tasks_sent=True)
if exc is None:
if exc is not None:
self.logger.warn(
'[job_id=%s] Job could not updated '
'the sent tasks: %s', job_id, exc)
break
if tasks:
sent = self.dispatch_tasks_batch(
beanstalkd_workers, job_id, job_type, job_config,
tasks)
else:
sent = True
if not sent:
self.logger.warn(
'[job_id=%s] Job aborting the last sent tasks', job_id)
job_status, exc = self.handle_backend_errors(
self.backend.abort_tasks_sent, job_id, tasks.keys(),
old_last_sent)
if exc is not None:
self.logger.warn(
'[job_id=%s] Job could not aborted '
'the last sent tasks: %s', job_id, exc)
break
else:
if job_status == 'FINISHED':
self.logger.info('Job %s is finished', job_id)

self.logger.info(
'Finished dispatching job (job_id=%s)', job_id)
return
else:
self.logger.warn(
'[job_id=%s] Job has not been updated '
'with the last sent tasks: %s', job_id, exc)
if job_status == 'PAUSED':
self.logger.info('Job %s is paused', job_id)
return

if not self.running:
break
sleep(1)

self.logger.warn(
'[job_id=%s] Job was stopped before it was finished', job_id)

_, exc = self.handle_backend_errors(self.backend.free, job_id)
if exc is not None:
self.logger.warn(
'[job_id=%s] Job has not been freed: %s', job_id, exc)
'[job_id=%s] Job could not freed: %s', job_id, exc)

def dispatch_tasks_batch(self, beanstalkd_workers,
job_id, job_type, job_config, tasks):
Expand All @@ -323,12 +367,16 @@ def dispatch_tasks_batch(self, beanstalkd_workers,
# max 2 minutes per task
ttr = len(tasks) * DEFAULT_TTR

while self.running:
for beanstalkd_worker in beanstalkd_workers:
if not self.running:
return False
if beanstalkd_worker is not None:
i = 0
for beanstalkd_worker in beanstalkd_workers:
if not self.running:
return False
i += 1
if beanstalkd_worker is None:
# Try for at least 30 seconds
if i > 30:
break
continue

try:
beanstalkd_worker.put(beanstalkd_payload, ttr=ttr)
Expand All @@ -344,6 +392,7 @@ def dispatch_tasks_batch(self, beanstalkd_workers,
# and wait for a few errors in a row
# to happen before marking it as broken.
beanstalkd_worker.is_broken = True
sleep(1)
return False

def make_beanstalkd_payload(self, job_id, job_type, job_config,
Expand Down Expand Up @@ -626,8 +675,8 @@ def get_beanstalkd_workers(self):
while True:
if not self.beanstalkd_workers:
self.logger.info('No beanstalkd worker available')
sleep(1)
yield None
sleep(1)
continue

if id(self.beanstalkd_workers) != beanstalkd_workers_id:
Expand All @@ -652,8 +701,8 @@ def get_beanstalkd_workers(self):
if not yielded:
self.logger.info(
'All beanstalkd workers available are broken')
sleep(1)
yield None
sleep(1)

def exit_gracefully(self, *args, **kwargs):
if self.running:
Expand Down

0 comments on commit 2198629

Please # to comment.