Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
fix: Regression of get_task_logs since storage-proxy adoption (#337)
Browse files Browse the repository at this point in the history
* docs: Set news fragment to mark this PR as a follow-up of #312
* fix: Rewrite using the new "fetch-file" storage proxy API
* fix: The task-log filename should use the raw hex string (without hyphens)
  of the kernel ID.
* fix: Be transparent to storage-proxy errors

Co-authored-by: Joongi Kim <joongi@lablup.com>
  • Loading branch information
adrysn and achimnol authored Oct 5, 2020
1 parent 76ff249 commit ddc125e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 27 deletions.
1 change: 1 addition & 0 deletions changes/337.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**TO MERGE:** Update `get_task_logs` API to work with the storage proxy (follow-up to #312)
24 changes: 22 additions & 2 deletions src/ai/backend/gateway/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ def __str__(self):
def __repr__(self):
lines = []
if self.extra_msg:
lines.append(f'<{type(self).__name__}: {self.error_title} ({self.extra_msg})>')
lines.append(f'<{type(self).__name__}({self.status}): '
f'{self.error_title} ({self.extra_msg})>')
else:
lines.append(f'<{type(self).__name__}: {self.error_title}>')
lines.append(f'<{type(self).__name__}({self.status}): '
f'{self.error_title}>')
if self.extra_data:
lines.append(' -> extra_data: ' + repr(self.extra_data))
return '\n'.join(lines)
Expand Down Expand Up @@ -273,6 +275,24 @@ class ServerFrozen(BackendError, web.HTTPServiceUnavailable):
error_title = 'The server is frozen due to maintenance. Please try again later.'


class StorageProxyError(BackendError, web.HTTPError):
error_type = 'https://api.backend.ai/probs/storage-proxy-error'
error_title = 'The storage proxy returned an error.'

def __init__(self, status: int, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# Currently there is no good public way to override the status code
# after initialization of aiohttp.web.StreamResponse objects. :(
self.status_code = status # HTTPException uses self.status_code
self._status = status # StreamResponse uses self._status
self.args = (status, self.args[1], self.args[2])

@property
def status(self) -> int:
# override the status property again to refer the subclass' attribute.
return self.status_code


class AgentError(RuntimeError):
'''
A dummy exception class to distinguish agent-side errors passed via
Expand Down
63 changes: 38 additions & 25 deletions src/ai/backend/gateway/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import json
import logging
import re
from pathlib import Path
from pathlib import PurePosixPath
import secrets
from typing import (
Any,
Expand Down Expand Up @@ -55,7 +55,7 @@
SessionTypes,
)
from ai.backend.common.plugin.monitor import GAUGE
from ai.backend.common.utils import current_loop
from .config import DEFAULT_CHUNK_SIZE
from .defs import REDIS_STREAM_DB
from .exceptions import (
InvalidAPIParameters,
Expand All @@ -67,7 +67,8 @@
TooManySessionsMatched,
BackendError,
InternalServerError,
TaskTemplateNotFound
TaskTemplateNotFound,
StorageProxyError,
)
from .auth import auth_required
from .types import CORSOptions, WebMiddleware
Expand Down Expand Up @@ -1696,9 +1697,7 @@ async def get_task_logs(request: web.Request, params: Any) -> web.StreamResponse
domain_name = request['user']['domain_name']
user_role = request['user']['role']
user_uuid = request['user']['uuid']
raw_kernel_id = params['kernel_id'].hex
mount_prefix = await request.app['config_server'].get('volumes/_mount')
fs_prefix = await request.app['config_server'].get('volumes/_fsprefix')
kernel_id_str = params['kernel_id'].hex
async with request.app['dbpool'].acquire() as conn, conn.begin():
matched_vfolders = await query_accessible_vfolders(
conn, user_uuid,
Expand All @@ -1708,26 +1707,40 @@ async def get_task_logs(request: web.Request, params: Any) -> web.StreamResponse
if not matched_vfolders:
raise GenericNotFound('You do not have ".logs" vfolder for persistent task logs.')
log_vfolder = matched_vfolders[0]
log_path = (
Path(mount_prefix) / log_vfolder['host'] / Path(fs_prefix.lstrip('/')) /
log_vfolder['id'].hex /
'task' / raw_kernel_id[:2] / raw_kernel_id[2:4] / f'{raw_kernel_id[4:]}.log'
)

def check_file():
if not log_path.is_file():
raise GenericNotFound('The requested log file or the task was not found.')
try:
with open(log_path, 'rb'):
pass
except IOError:
raise GenericNotFound('The requested log file is not readable.')

loop = current_loop()
await loop.run_in_executor(None, check_file)
return web.FileResponse(log_path, headers={
hdrs.CONTENT_TYPE: "text/plain",
})
storage_manager = request.app['storage_manager']
proxy_name, volume_name = storage_manager.split_host(log_vfolder['host'])
response = web.StreamResponse(status=200)
response.headers[hdrs.CONTENT_TYPE] = "text/plain"
prepared = False
try:
async with storage_manager.request(
log_vfolder['host'], 'POST', 'folder/file/fetch',
json={
'volume': volume_name,
'vfid': str(log_vfolder['id']),
'relpath': str(
PurePosixPath('task')
/ kernel_id_str[:2] / kernel_id_str[2:4]
/ f'{kernel_id_str[4:]}.log'
),
},
raise_for_status=True,
) as (_, storage_resp):
while True:
chunk = await storage_resp.content.read(DEFAULT_CHUNK_SIZE)
if not chunk:
break
if not prepared:
await response.prepare(request)
prepared = True
await response.write(chunk)
except aiohttp.ClientResponseError as e:
raise StorageProxyError(status=e.status, extra_msg=e.message)
finally:
if prepared:
await response.write_eof()
return response


async def init(app: web.Application) -> None:
Expand Down

0 comments on commit ddc125e

Please # to comment.