From ddc125e7e220949b90e664bfd3c003d944633289 Mon Sep 17 00:00:00 2001 From: Jonghyun Park Date: Mon, 5 Oct 2020 23:32:34 +0900 Subject: [PATCH] fix: Regression of get_task_logs since storage-proxy adoption (#337) * docs: Set news fragment to mark this PR as a follow-up of #312 * fix: Rewrite using the new "fetch-file" storage proxy API * fix: The task-log filename should use the raw hex string (without hyphens) of the kernel ID. * fix: Be transparent to storage-proxy errors Co-authored-by: Joongi Kim --- changes/337.feature | 1 + src/ai/backend/gateway/exceptions.py | 24 ++++++++++- src/ai/backend/gateway/session.py | 63 +++++++++++++++++----------- 3 files changed, 61 insertions(+), 27 deletions(-) create mode 100644 changes/337.feature diff --git a/changes/337.feature b/changes/337.feature new file mode 100644 index 000000000..0de3c0775 --- /dev/null +++ b/changes/337.feature @@ -0,0 +1 @@ +**TO MERGE:** Update `get_task_logs` API to work with the storage proxy (follow-up to #312) diff --git a/src/ai/backend/gateway/exceptions.py b/src/ai/backend/gateway/exceptions.py index a103d2314..e71e38b11 100644 --- a/src/ai/backend/gateway/exceptions.py +++ b/src/ai/backend/gateway/exceptions.py @@ -63,9 +63,11 @@ def __str__(self): def __repr__(self): lines = [] if self.extra_msg: - lines.append(f'<{type(self).__name__}: {self.error_title} ({self.extra_msg})>') + lines.append(f'<{type(self).__name__}({self.status}): ' + f'{self.error_title} ({self.extra_msg})>') else: - lines.append(f'<{type(self).__name__}: {self.error_title}>') + lines.append(f'<{type(self).__name__}({self.status}): ' + f'{self.error_title}>') if self.extra_data: lines.append(' -> extra_data: ' + repr(self.extra_data)) return '\n'.join(lines) @@ -273,6 +275,24 @@ class ServerFrozen(BackendError, web.HTTPServiceUnavailable): error_title = 'The server is frozen due to maintenance. Please try again later.' +class StorageProxyError(BackendError, web.HTTPError): + error_type = 'https://api.backend.ai/probs/storage-proxy-error' + error_title = 'The storage proxy returned an error.' + + def __init__(self, status: int, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + # Currently there is no good public way to override the status code + # after initialization of aiohttp.web.StreamResponse objects. :( + self.status_code = status # HTTPException uses self.status_code + self._status = status # StreamResponse uses self._status + self.args = (status, self.args[1], self.args[2]) + + @property + def status(self) -> int: + # override the status property again to refer the subclass' attribute. + return self.status_code + + class AgentError(RuntimeError): ''' A dummy exception class to distinguish agent-side errors passed via diff --git a/src/ai/backend/gateway/session.py b/src/ai/backend/gateway/session.py index befe95d97..a0707257b 100644 --- a/src/ai/backend/gateway/session.py +++ b/src/ai/backend/gateway/session.py @@ -11,7 +11,7 @@ import json import logging import re -from pathlib import Path +from pathlib import PurePosixPath import secrets from typing import ( Any, @@ -55,7 +55,7 @@ SessionTypes, ) from ai.backend.common.plugin.monitor import GAUGE -from ai.backend.common.utils import current_loop +from .config import DEFAULT_CHUNK_SIZE from .defs import REDIS_STREAM_DB from .exceptions import ( InvalidAPIParameters, @@ -67,7 +67,8 @@ TooManySessionsMatched, BackendError, InternalServerError, - TaskTemplateNotFound + TaskTemplateNotFound, + StorageProxyError, ) from .auth import auth_required from .types import CORSOptions, WebMiddleware @@ -1696,9 +1697,7 @@ async def get_task_logs(request: web.Request, params: Any) -> web.StreamResponse domain_name = request['user']['domain_name'] user_role = request['user']['role'] user_uuid = request['user']['uuid'] - raw_kernel_id = params['kernel_id'].hex - mount_prefix = await request.app['config_server'].get('volumes/_mount') - fs_prefix = await request.app['config_server'].get('volumes/_fsprefix') + kernel_id_str = params['kernel_id'].hex async with request.app['dbpool'].acquire() as conn, conn.begin(): matched_vfolders = await query_accessible_vfolders( conn, user_uuid, @@ -1708,26 +1707,40 @@ async def get_task_logs(request: web.Request, params: Any) -> web.StreamResponse if not matched_vfolders: raise GenericNotFound('You do not have ".logs" vfolder for persistent task logs.') log_vfolder = matched_vfolders[0] - log_path = ( - Path(mount_prefix) / log_vfolder['host'] / Path(fs_prefix.lstrip('/')) / - log_vfolder['id'].hex / - 'task' / raw_kernel_id[:2] / raw_kernel_id[2:4] / f'{raw_kernel_id[4:]}.log' - ) - def check_file(): - if not log_path.is_file(): - raise GenericNotFound('The requested log file or the task was not found.') - try: - with open(log_path, 'rb'): - pass - except IOError: - raise GenericNotFound('The requested log file is not readable.') - - loop = current_loop() - await loop.run_in_executor(None, check_file) - return web.FileResponse(log_path, headers={ - hdrs.CONTENT_TYPE: "text/plain", - }) + storage_manager = request.app['storage_manager'] + proxy_name, volume_name = storage_manager.split_host(log_vfolder['host']) + response = web.StreamResponse(status=200) + response.headers[hdrs.CONTENT_TYPE] = "text/plain" + prepared = False + try: + async with storage_manager.request( + log_vfolder['host'], 'POST', 'folder/file/fetch', + json={ + 'volume': volume_name, + 'vfid': str(log_vfolder['id']), + 'relpath': str( + PurePosixPath('task') + / kernel_id_str[:2] / kernel_id_str[2:4] + / f'{kernel_id_str[4:]}.log' + ), + }, + raise_for_status=True, + ) as (_, storage_resp): + while True: + chunk = await storage_resp.content.read(DEFAULT_CHUNK_SIZE) + if not chunk: + break + if not prepared: + await response.prepare(request) + prepared = True + await response.write(chunk) + except aiohttp.ClientResponseError as e: + raise StorageProxyError(status=e.status, extra_msg=e.message) + finally: + if prepared: + await response.write_eof() + return response async def init(app: web.Application) -> None: