Skip to content

Commit

Permalink
xcute: Allow to add filters when listing jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu authored and fvennetier committed Dec 28, 2020
1 parent a051e6c commit 0c930e2
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 13 deletions.
64 changes: 62 additions & 2 deletions oio/cli/admin/xcute/job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2019-2020 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand All @@ -13,9 +13,13 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from datetime import datetime

from oio.cli import Lister, ShowOne, flat_dict_from_dict
from oio.cli.admin.xcute import XcuteCommand
from oio.cli.common.utils import KeyValueAction
from oio.xcute.common.job import XcuteJobStatus
from oio.xcute.jobs import JOB_TYPES


class JobList(XcuteCommand, Lister):
Expand All @@ -25,8 +29,64 @@ class JobList(XcuteCommand, Lister):

columns = ('ID', 'Status', 'Type', 'Lock', 'Progress')

def get_parser(self, prog_name):
parser = super(JobList, self).get_parser(prog_name)
parser.add_argument(
'--date',
help='Filter jobs with the specified job date '
'(%%Y-%%m-%%dT%%H:%%M:%%S)')
parser.add_argument(
'--status',
choices=XcuteJobStatus.ALL,
help='Filter jobs with the specified job status')
parser.add_argument(
'--type',
choices=JOB_TYPES.keys(),
help='Filter jobs with the specified job type')
parser.add_argument(
'--lock',
help='Filter jobs with the specified job lock (wildcards allowed)')
return parser

def _take_action(self, parsed_args):
jobs = self.xcute.job_list()
prefix = None
if parsed_args.date:
datetime_input_format = ''
datetime_output_format = ''
datetime_info_split = parsed_args.date.split('T', 1)
date_info_split = datetime_info_split[0].split('-', 2)
if len(date_info_split) > 0:
datetime_input_format += '%Y'
datetime_output_format += '%Y'
if len(date_info_split) > 1:
datetime_input_format += '-%m'
datetime_output_format += '%m'
if len(date_info_split) > 2:
datetime_input_format += '-%d'
datetime_output_format += '%d'
if len(datetime_info_split) > 1:
if len(date_info_split) != 3:
raise ValueError('Wrong date format')
time_info_split = datetime_info_split[1].split(':', 2)
if len(time_info_split) > 0:
datetime_input_format += 'T%H'
datetime_output_format += '%H'
if len(time_info_split) > 1:
datetime_input_format += ':%M'
datetime_output_format += '%M'
if len(time_info_split) > 2:
datetime_input_format += ':%S'
datetime_output_format += '%S'
try:
job_date = datetime.strptime(parsed_args.date,
datetime_input_format)
except ValueError:
raise ValueError('Wrong date format')
prefix = job_date.strftime(datetime_output_format)

jobs = self.xcute.job_list(
prefix=prefix, job_status=parsed_args.status,
job_type=parsed_args.type, job_lock=parsed_args.lock)
for job_info in jobs:
job_main_info = job_info['job']
job_tasks = job_info['tasks']
Expand Down
10 changes: 8 additions & 2 deletions oio/xcute/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,15 @@ def xcute_request(self, method, action, params=None, **kwargs):
raise exc_info[0], exc_info[1], exc_info[2]
return resp, body

def job_list(self, limit=None, marker=None):
def job_list(self, limit=None, prefix=None, marker=None,
job_status=None, job_type=None, job_lock=None):
_, data = self.xcute_request(
'GET', '/job/list', params={'limit': limit, 'marker': marker})
'GET', '/job/list', params={'limit': limit,
'prefix': prefix,
'marker': marker,
'status': job_status,
'type': job_type,
'lock': job_lock})
return data

def job_create(self, job_type, job_config=None):
Expand Down
32 changes: 27 additions & 5 deletions oio/xcute/common/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import redis
import random
from fnmatch import fnmatchcase

from oio.common.easy_value import true_value
from oio.common.exceptions import Forbidden, NotFound
Expand All @@ -27,6 +28,9 @@
from oio.common.timestamp import Timestamp


END_MARKER = u"\U0010fffd"


def handle_redis_exceptions(func):
@wraps(func)
def handle_redis_exceptions(self, *args, **kwargs):
Expand Down Expand Up @@ -590,20 +594,30 @@ def status(self):
status = {'job_count': job_count}
return status

def list_jobs(self, marker=None, limit=1000):
def list_jobs(self, prefix=None, marker=None, limit=1000,
job_status=None, job_type=None, job_lock=None):
limit = limit or self.DEFAULT_LIMIT

if job_status:
job_status = job_status.upper().strip()
if job_type:
job_type = job_type.lower().strip()
if job_lock:
job_lock = job_lock.lower().strip()

jobs = list()
while True:
limit_ = limit - len(jobs)
if limit_ <= 0:
break

range_min = '-'
if marker:
range_max = '(' + marker
else:
range_max = '+'
range_max = '+'
if prefix:
range_min = '[' + prefix
range_max = '[' + prefix + END_MARKER
if marker and (not prefix or marker > prefix):
range_min = '(' + marker

job_ids = self.conn.zrevrangebylex(
self.key_job_ids, range_max, range_min, 0, limit_)
Expand All @@ -618,6 +632,14 @@ def list_jobs(self, marker=None, limit=1000):
# The job can be deleted between two requests
continue

if job_status and job_info['job.status'] != job_status:
continue
if job_type and job_info['job.type'] != job_type:
continue
if job_lock and not fnmatchcase(
job_info.get('job.lock') or '', job_lock):
continue

jobs.append(self._unmarshal_job_info(job_info))

if len(job_ids) < limit_:
Expand Down
18 changes: 18 additions & 0 deletions oio/xcute/common/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@
from oio.common.logger import get_logger


class XcuteJobStatus(object):
"""Enum class for job type names."""

WAITING = 'WAITING'
RUNNING = 'RUNNING'
PAUSED = 'PAUSED'
FINISHED = 'FINISHED'
FAILED = 'FAILED'

ALL = (
WAITING,
RUNNING,
PAUSED,
FINISHED,
FAILED
)


class XcuteTask(object):

def __init__(self, conf, job_params, logger=None):
Expand Down
7 changes: 4 additions & 3 deletions oio/xcute/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
ConnectionError, DEFAULT_TTR
from oio.event.evob import EventTypes
from oio.xcute.common.backend import XcuteBackend
from oio.xcute.common.job import XcuteJobStatus
from oio.xcute.jobs import JOB_TYPES


Expand Down Expand Up @@ -287,7 +288,7 @@ def dispatch_tasks(self, job_id, job_type, job_info, job):
'[job_id=%s] Job could not abort '
'the last sent tasks: %s', job_id, exc)
break
if job_status == 'PAUSED':
if job_status == XcuteJobStatus.PAUSED:
self.logger.info('Job %s is paused', job_id)
return

Expand Down Expand Up @@ -329,13 +330,13 @@ def dispatch_tasks(self, job_id, job_type, job_info, job):
'the last sent tasks: %s', job_id, exc)
break
else:
if job_status == 'FINISHED':
if job_status == XcuteJobStatus.FINISHED:
self.logger.info('Job %s is finished', job_id)

self.logger.info(
'Finished dispatching job (job_id=%s)', job_id)
return
if job_status == 'PAUSED':
if job_status == XcuteJobStatus.PAUSED:
self.logger.info('Job %s is paused', job_id)
return

Expand Down
8 changes: 7 additions & 1 deletion oio/xcute/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,15 @@ def on_status(self, req):
@handle_exceptions
def on_job_list(self, req):
limit = int_value(req.args.get('limit'), None)
prefix = req.args.get('prefix')
marker = req.args.get('marker')
job_status = req.args.get('status')
job_type = req.args.get('type')
job_lock = req.args.get('lock')

job_infos = self.backend.list_jobs(limit=limit, marker=marker)
job_infos = self.backend.list_jobs(
limit=limit, prefix=prefix, marker=marker,
job_status=job_status, job_type=job_type, job_lock=job_lock)
return Response(
json.dumps(job_infos), mimetype='application/json')

Expand Down

0 comments on commit 0c930e2

Please # to comment.