Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

feat: Allow mounting vfolder subdirectory into session container #537

Merged
merged 22 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e960b3d
feat: Allow mounting subdirectory into session container
SonMinWoo Feb 17, 2022
547a27a
fix: Add change log
SonMinWoo Feb 17, 2022
f3bbc3f
fix: Unify list element type
SonMinWoo Feb 17, 2022
e2b29bb
fix: Edit mount folder verification
SonMinWoo Feb 18, 2022
5c1db36
fix: Edit readability and remove unnecessary data
SonMinWoo Feb 18, 2022
6299dbe
fix: Edit code style for flake8
SonMinWoo Feb 18, 2022
42a0b72
fix: Edit code style for flake8
SonMinWoo Feb 18, 2022
43b0dff
Merge branch 'main' into feature/allow-mount-subdirectory
achimnol Mar 8, 2022
c46a99f
Merge branch 'main' into feature/allow-mount-subdirectory
achimnol Mar 8, 2022
c4bab89
refactor: Clean up many cluttered codes
achimnol Mar 9, 2022
a6286e9
fix: lint/test errors
achimnol Mar 9, 2022
dda0abd
fix: Update comment and remove "B" from column name
achimnol Mar 9, 2022
07752b9
fix: missing commit
achimnol Mar 9, 2022
9c3573e
fix: Allow mounting different subdirs of a vfolder with overlap check
achimnol Mar 9, 2022
ab75b90
fix: minor mistake
achimnol Mar 9, 2022
d91a9b9
fix: typo [skip ci]
achimnol Mar 9, 2022
505858a
setup: Update aiotools to get error propagation of enqueue_session()
achimnol Mar 10, 2022
3a393d0
fix: Utilize lablup/backend.ai-storage-proxy#41
achimnol Mar 10, 2022
9d045a1
refactor: Make early failures more consistent
achimnol Mar 10, 2022
9437867
docs: Update news fragment [skip ci]
achimnol Mar 10, 2022
3e5ea0d
fix: Add more fields to `VFolderMount` objects
achimnol Mar 10, 2022
5aa69ac
fix: type error
achimnol Mar 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/537.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow mounting subpath of vfolders if specified as relative paths appended to vfolder names and improve storage error propagation. Also introduce `StructuredJSONObjectColumn` and `StructuredJSONObjectListColumn` to define database columns based on `common.types.JSONSerializableMixin`.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ install_requires =
aiohttp_sse~=2.0
aiomonitor~=0.4.5
aioredis[hiredis]~=2.0
aiotools~=1.5.3
aiotools~=1.5.4
alembic~=1.6.5
async_timeout~=4.0
asyncache>=0.1.1
Expand Down
11 changes: 7 additions & 4 deletions src/ai/backend/manager/api/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
domains, groups, query_allowed_sgroups,
association_groups_users as agus,
)
from ..types import UserScope
from .auth import admin_required
from .exceptions import InvalidAPIParameters
from .manager import ALL_ALLOWED, READ_ALLOWED, server_status_required
Expand Down Expand Up @@ -427,10 +428,12 @@ async def import_image(request: web.Request, params: Any) -> web.Response:
None,
SessionTypes.BATCH,
resource_policy,
domain_name=request['user']['domain_name'],
group_id=group_id,
user_uuid=request['user']['uuid'],
user_role=request['user']['role'],
user_scope=UserScope(
domain_name=request['user']['domain_name'],
group_id=group_id,
user_uuid=request['user']['uuid'],
user_role=request['user']['role'],
),
internal_data={
'domain_socket_proxies': ['/var/run/docker.sock'],
'docker_credentials': docker_creds,
Expand Down
22 changes: 13 additions & 9 deletions src/ai/backend/manager/api/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@

from ..config import DEFAULT_CHUNK_SIZE
from ..defs import DEFAULT_ROLE, REDIS_STREAM_DB
from ..types import UserScope
from ..models import (
domains,
association_groups_users as agus, groups,
Expand Down Expand Up @@ -511,13 +512,14 @@ async def _create(request: web.Request, params: Any) -> web.Response:
params['config']['scaling_group'],
params['session_type'],
resource_policy,
domain_name=params['domain'], # type: ignore # params always have it
group_id=group_id,
user_uuid=owner_uuid,
user_role=request['user']['role'],
user_scope=UserScope(
domain_name=params['domain'], # type: ignore # params always have it
group_id=group_id,
user_uuid=owner_uuid,
user_role=request['user']['role'],
),
cluster_mode=params['cluster_mode'],
cluster_size=params['cluster_size'],
startup_command=params['startup_command'],
session_tag=params['tag'],
starts_at=starts_at,
agent_list=params['config']['agent_list'],
Expand Down Expand Up @@ -1006,10 +1008,12 @@ async def create_cluster(request: web.Request, params: Any) -> web.Response:
params['scaling_group'],
params['sess_type'],
resource_policy,
domain_name=params['domain'], # type: ignore
group_id=group_id,
user_uuid=owner_uuid,
user_role=request['user']['role'],
user_scope=UserScope(
domain_name=params['domain'], # type: ignore
group_id=group_id,
user_uuid=owner_uuid,
user_role=request['user']['role'],
),
session_tag=params['tag'],
),
))
Expand Down
3 changes: 3 additions & 0 deletions src/ai/backend/manager/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from . import keypair as _keypair
from . import user as _user
from . import vfolder as _vfolder
from . import dotfile as _dotfile
from . import resource_policy as _rpolicy
from . import resource_preset as _rpreset
from . import scaling_group as _sgroup
Expand All @@ -25,6 +26,7 @@
*_keypair.__all__,
*_user.__all__,
*_vfolder.__all__,
*_dotfile.__all__,
*_rpolicy.__all__,
*_rpreset.__all__,
*_sgroup.__all__,
Expand All @@ -41,6 +43,7 @@
from .keypair import * # noqa
from .user import * # noqa
from .vfolder import * # noqa
from .dotfile import * # noqa
from .resource_policy import * # noqa
from .resource_preset import * # noqa
from .scaling_group import * # noqa
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""add-vfolder-mounts-to-kernels

Revision ID: 7dd1d81c3204
Revises: 60a1effa77d2
Create Date: 2022-03-09 16:41:48.304128

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '7dd1d81c3204'
down_revision = '60a1effa77d2'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('kernels', sa.Column('vfolder_mounts', sa.JSON(), nullable=True))
op.drop_index('ix_keypairs_resource_policy', table_name='keypairs')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_index('ix_keypairs_resource_policy', 'keypairs', ['resource_policy'], unique=False)
op.drop_column('kernels', 'vfolder_mounts')
# ### end Alembic commands ###
54 changes: 51 additions & 3 deletions src/ai/backend/manager/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
KernelId,
ResourceSlot,
SessionId,
JSONSerializableMixin,
)

from ai.backend.manager.models.utils import execute_with_retry
Expand Down Expand Up @@ -192,9 +193,9 @@ def copy(self):
return ResourceSlotColumn()


class StructuredJSONBColumn(TypeDecorator):
class StructuredJSONColumn(TypeDecorator):
"""
A column type check scheduler_opts's validation using trafaret.
A column type to convert JSON values back and forth using a Trafaret.
"""

impl = JSONB
Expand All @@ -213,7 +214,54 @@ def process_result_value(self, raw_value, dialect):
return self._schema.check(raw_value)

def copy(self):
return StructuredJSONBColumn(self._schema)
return StructuredJSONColumn(self._schema)


class StructuredJSONObjectColumn(TypeDecorator):
"""
A column type to convert JSON values back and forth using JSONSerializableMixin.
"""

impl = JSONB
cache_ok = True

def __init__(self, attr_cls: Type[JSONSerializableMixin]) -> None:
super().__init__()
self._attr_cls = attr_cls

def process_bind_param(self, value, dialect):
return self._attr_cls.to_json(value)

def process_result_value(self, raw_value, dialect):
return self._attr_cls.from_json(raw_value)

def copy(self):
return StructuredJSONObjectColumn(self._attr_cls)


class StructuredJSONObjectListColumn(TypeDecorator):
"""
A column type to convert JSON values back and forth using JSONSerializableMixin,
but store and load a list of the objects.
"""

impl = JSONB
cache_ok = True

def __init__(self, attr_cls: Type[JSONSerializableMixin]) -> None:
super().__init__()
self._attr_cls = attr_cls

def process_bind_param(self, value, dialect):
return [self._attr_cls.to_json(item) for item in value]

def process_result_value(self, raw_value, dialect):
if raw_value is None:
return []
return [self._attr_cls.from_json(item) for item in raw_value]

def copy(self):
return StructuredJSONObjectListColumn(self._attr_cls)


class CurrencyTypes(enum.Enum):
Expand Down
84 changes: 84 additions & 0 deletions src/ai/backend/manager/models/dotfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from __future__ import annotations

from pathlib import PurePosixPath
from typing import Any, Mapping, Sequence, TYPE_CHECKING

import sqlalchemy as sa
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import (
AsyncConnection as SAConnection,
)

from ai.backend.common import msgpack
from ai.backend.common.types import VFolderMount

from ..api.exceptions import BackendError
from ..types import UserScope
from .keypair import keypairs
from .domain import query_domain_dotfiles
from .group import query_group_dotfiles

__all__ = (
'prepare_dotfiles',
)


async def prepare_dotfiles(
conn: SAConnection,
user_scope: UserScope,
access_key: str,
vfolder_mounts: Sequence[VFolderMount],
) -> Mapping[str, Any]:
# Feed SSH keypair and dotfiles if exists.
internal_data = {}
query = (
sa.select([
keypairs.c.ssh_public_key,
keypairs.c.ssh_private_key,
keypairs.c.dotfiles,
])
.select_from(keypairs)
.where(keypairs.c.access_key == access_key)
)
result = await conn.execute(query)
row = result.first()
dotfiles = msgpack.unpackb(row['dotfiles'])
internal_data.update({'dotfiles': dotfiles})
if row['ssh_public_key'] and row['ssh_private_key']:
internal_data['ssh_keypair'] = {
'public_key': row['ssh_public_key'],
'private_key': row['ssh_private_key'],
}
# use dotfiles in the priority of keypair > group > domain
dotfile_paths = set(map(lambda x: x['path'], dotfiles))
# add keypair dotfiles
internal_data.update({'dotfiles': list(dotfiles)})
# add group dotfiles
dotfiles, _ = await query_group_dotfiles(conn, user_scope.group_id)
for dotfile in dotfiles:
if dotfile['path'] not in dotfile_paths:
internal_data['dotfiles'].append(dotfile)
dotfile_paths.add(dotfile['path'])
# add domain dotfiles
dotfiles, _ = await query_domain_dotfiles(conn, user_scope.domain_name)
for dotfile in dotfiles:
if dotfile['path'] not in dotfile_paths:
internal_data['dotfiles'].append(dotfile)
dotfile_paths.add(dotfile['path'])
# reverse the dotfiles list so that higher priority can overwrite
# in case the actual path is the same
internal_data['dotfiles'].reverse()

# check if there is no name conflict of dotfile and vfolder
vfolder_kernel_paths = {m.kernel_path for m in vfolder_mounts}
for dotfile in internal_data.get('dotfiles', []):
dotfile_path = PurePosixPath(dotfile['path'])
if not dotfile_path.is_absolute():
dotfile_path = PurePosixPath('/home/work', dotfile['path'])
if dotfile_path in vfolder_kernel_paths:
raise BackendError(
f"There is a kernel-side path from vfolders that conflicts with "
f"a dotfile '{dotfile['path']}'.",
)

return internal_data
11 changes: 7 additions & 4 deletions src/ai/backend/manager/models/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
SlotName,
RedisConnectionInfo,
ResourceSlot,
VFolderMount,
)

from ..defs import DEFAULT_ROLE
Expand All @@ -53,6 +54,7 @@
PaginatedList,
ResourceSlotColumn,
SessionIDColumnType,
StructuredJSONObjectListColumn,
batch_result,
batch_multiresult,
metadata,
Expand All @@ -65,7 +67,7 @@
if TYPE_CHECKING:
from .gql import GraphQueryContext

__all__: Sequence[str] = (
__all__ = (
'kernels',
'session_dependencies',
'KernelStatus',
Expand Down Expand Up @@ -183,8 +185,9 @@ def default_hostname(context) -> str:
sa.Column('occupied_slots', ResourceSlotColumn(), nullable=False),
sa.Column('occupied_shares', pgsql.JSONB(), nullable=False, default={}), # legacy
sa.Column('environ', sa.ARRAY(sa.String), nullable=True),
sa.Column('mounts', sa.ARRAY(sa.String), nullable=True), # list of list
sa.Column('mount_map', pgsql.JSONB(), nullable=True, default={}),
sa.Column('mounts', sa.ARRAY(sa.String), nullable=True), # list of list; legacy since 22.03
sa.Column('mount_map', pgsql.JSONB(), nullable=True, default={}), # legacy since 22.03
sa.Column('vfolder_mounts', StructuredJSONObjectListColumn(VFolderMount), nullable=True),
sa.Column('attached_devices', pgsql.JSONB(), nullable=True, default={}),
sa.Column('resource_opts', pgsql.JSONB(), nullable=True, default={}),
sa.Column('bootstrap_script', sa.String(length=16 * 1024), nullable=True),
Expand Down Expand Up @@ -1292,7 +1295,7 @@ def parse_row(cls, ctx: GraphQueryContext, row: Row) -> Mapping[str, Any]:
'result': row['result'].name,
'service_ports': row['service_ports'],
'occupied_slots': row['occupied_slots'].to_json(),
'mounts': row['mounts'],
'vfolder_mounts': row['vfolder_mounts'],
'resource_opts': row['resource_opts'],
'num_queries': row['num_queries'],
# optionally hidden
Expand Down
4 changes: 2 additions & 2 deletions src/ai/backend/manager/models/scaling_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
set_if_set,
batch_result,
batch_multiresult,
StructuredJSONBColumn,
StructuredJSONColumn,
)
from .group import resolve_group_name_or_id
from .user import UserRole
Expand Down Expand Up @@ -68,7 +68,7 @@
sa.Column('driver', sa.String(length=64), nullable=False),
sa.Column('driver_opts', pgsql.JSONB(), nullable=False, default={}),
sa.Column('scheduler', sa.String(length=64), nullable=False),
sa.Column('scheduler_opts', StructuredJSONBColumn(
sa.Column('scheduler_opts', StructuredJSONColumn(
t.Dict({
t.Key('allowed_session_types', default=['interactive', 'batch']):
t.List(tx.Enum(SessionTypes), min_length=1),
Expand Down
14 changes: 12 additions & 2 deletions src/ai/backend/manager/models/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from contextvars import ContextVar
import itertools
import logging
from pathlib import PurePosixPath
from typing import (
Any,
AsyncIterator,
Expand Down Expand Up @@ -120,12 +121,18 @@ async def _fetch(
_ctx_volumes_cache.set(results)
return results

async def get_mount_path(self, vfolder_host: str, vfolder_id: UUID) -> str:
async def get_mount_path(
self,
vfolder_host: str,
vfolder_id: UUID,
subpath: PurePosixPath = PurePosixPath("."),
) -> str:
async with self.request(
vfolder_host, 'GET', 'folder/mount',
json={
'volume': self.split_host(vfolder_host)[1],
'vfid': str(vfolder_id),
'subpath': str(subpath),
},
) as (_, resp):
reply = await resp.json()
Expand Down Expand Up @@ -157,7 +164,10 @@ async def request(
if client_resp.status // 100 != 2:
try:
error_data = await client_resp.json()
raise VFolderOperationFailed(extra_data=error_data)
raise VFolderOperationFailed(
extra_msg=error_data.pop("msg"),
extra_data=error_data,
)
except aiohttp.ClientResponseError:
# when the response body is not JSON, just raise with status info.
raise VFolderOperationFailed(
Expand Down
Loading