Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Merge pull request #4413 from golemfactory/mwu/list-work-dirs
Browse files Browse the repository at this point in the history
DockerCPUConfig Allow multiple work_dirs
  • Loading branch information
maaktweluit authored Jul 4, 2019
2 parents 608aae0 + 9f066c7 commit 1f281b5
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 128 deletions.
8 changes: 4 additions & 4 deletions golem/docker/hypervisor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
from pathlib import Path
from typing import Dict, Optional, Iterable, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

from golem.docker.commands.docker import DockerCommandHandler
from golem.docker.config import DOCKER_VM_NAME, GetConfigFunction, \
Expand All @@ -28,7 +28,7 @@ def __init__(self,

self._get_config = get_config
self._vm_name = vm_name
self._work_dir: Optional[Path] = None
self._work_dirs: List[Path] = []

@classmethod
@abstractmethod
Expand Down Expand Up @@ -177,8 +177,8 @@ def recover_ctx(self, name: Optional[str] = None):
with self.restart_ctx(name) as res:
yield res

def update_work_dir(self, work_dir: Path) -> None:
self._work_dir = work_dir
def update_work_dirs(self, work_dirs: List[Path]) -> None:
self._work_dirs = work_dirs

def create_volumes(self, binds: Iterable[DockerBind]) -> dict:
return {
Expand Down
27 changes: 18 additions & 9 deletions golem/docker/hypervisor/hyperv.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,11 @@ def constrain(self, name: Optional[str] = None, **params) -> None:

logger.info('Hyper-V: reconfiguration of VM "%s" finished', name)

def update_work_dir(self, work_dir: Path) -> None:
super().update_work_dir(work_dir)
def update_work_dirs(self, work_dirs: List[Path]) -> None:
super().update_work_dirs(work_dirs)
# Ensure that working directory is shared via SMB
smbshare.create_share(self.DOCKER_USER, work_dir)
for work_dir in work_dirs:
smbshare.create_share(self.DOCKER_USER, work_dir)

@contextmanager
@report_calls(Component.hypervisor, 'vm.reconfig')
Expand Down Expand Up @@ -414,15 +415,23 @@ def _log_and_publish_event(event_type: Events, **kwargs) -> None:
publish_event(event)

def _create_volume(self, hostname: str, shared_dir: Path) -> str:
assert self._work_dir is not None
try:
relpath = shared_dir.relative_to(self._work_dir)
except ValueError:
assert self._work_dirs, "Can not make volumes witout work_dirs"
work_dir = None
relpath = None
for check_dir in self._work_dirs:
try:
relpath = shared_dir.relative_to(check_dir)
work_dir = check_dir
break
except ValueError:
continue

if work_dir is None or relpath is None:
raise ValueError(
f'Cannot create docker volume: "{shared_dir}" is not a '
f'subdirectory of docker work dir ("{self._work_dir}")')
f'subdirectory of docker work dirs ("{self._work_dirs}")')

share_name = smbshare.get_share_name(self._work_dir)
share_name = smbshare.get_share_name(work_dir)
volume_name = f'{hostname}/{share_name}/{relpath.as_posix()}'

# Client must be created here, do it in __init__() will not work since
Expand Down
6 changes: 3 additions & 3 deletions golem/docker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from contextlib import contextmanager
from pathlib import Path
from threading import Thread
from typing import Optional, Callable, Any, Iterable
from typing import Any, Callable, Iterable, List, Optional

from golem import hardware
from golem.core.common import is_linux, is_windows, is_osx
Expand Down Expand Up @@ -102,13 +102,13 @@ def update_config(
self,
status_callback: Callable[[], Any],
done_callback: Callable[[bool], Any],
work_dir: Path,
work_dirs: List[Path],
in_background: bool = True
) -> None:
self.check_environment()

if self.hypervisor:
self.hypervisor.update_work_dir(work_dir)
self.hypervisor.update_work_dirs(work_dirs)

if in_background:
thread = Thread(target=self._wait_for_tasks,
Expand Down
38 changes: 28 additions & 10 deletions golem/envs/docker/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@


class DockerCPUConfigData(NamedTuple):
work_dir: Path
# The directories this environment is allowed to work in
work_dirs: List[Path] = []
memory_mb: int = 1024
cpu_count: int = 1

Expand All @@ -47,13 +48,14 @@ class DockerCPUConfig(DockerCPUConfigData, EnvConfig):

def to_dict(self) -> Dict[str, Any]:
dict_ = self._asdict()
dict_['work_dir'] = str(dict_['work_dir'])
dict_['work_dirs'] = [str(work_dir) for work_dir in dict_['work_dirs']]
return dict_

@staticmethod
def from_dict(dict_: Dict[str, Any]) -> 'DockerCPUConfig':
work_dir = Path(dict_.pop('work_dir'))
return DockerCPUConfig(work_dir=work_dir, **dict_)
_work_dirs = dict_.pop('work_dirs')
work_dirs = [Path(work_dir) for work_dir in _work_dirs]
return DockerCPUConfig(work_dirs=work_dirs, **dict_)


class DockerOutput(RuntimeOutput):
Expand Down Expand Up @@ -485,6 +487,8 @@ def __init__(self, config: DockerCPUConfig) -> None:
raise EnvironmentError("No supported hypervisor found")
self._hypervisor = hypervisor_cls.instance(self._get_hypervisor_config)
self._port_mapper = ContainerPortMapper(self._hypervisor)
self._update_work_dirs(config.work_dirs)
self._constrain_hypervisor(config)

def _get_hypervisor_config(self) -> Dict[str, int]:
return {
Expand Down Expand Up @@ -617,27 +621,41 @@ def update_config(self, config: EnvConfig) -> None:
logger.info("Updating environment configuration...")

self._validate_config(config)
if config.work_dir != self._config.work_dir:
self._update_work_dir(config.work_dir)
if config.work_dirs != self._config.work_dirs:
self._update_work_dirs(config.work_dirs)
self._constrain_hypervisor(config)
self._config = DockerCPUConfig(*config)
self._config_updated(config)

@classmethod
def _validate_config(cls, config: DockerCPUConfig) -> None:
logger.info("Validating configuration...")
if not config.work_dir.is_dir():
raise ValueError(f"Invalid working directory: '{config.work_dir}'")
for work_dir in config.work_dirs:
if not work_dir.is_dir():
raise ValueError(f"Invalid working directory: '{work_dir}'")
# Check for duplicates, not allowed
if config.work_dirs.count(work_dir) > 1:
raise ValueError(f"Duplicate working directory: '{work_dir}'")
# Check for parents, not allowed
for check_dir in config.work_dirs:
if check_dir == work_dir:
continue
if work_dir in check_dir.parents:
raise ValueError("Working dir can not be parent: parent="
f"'{work_dir}', child='{check_dir}'")
if check_dir in work_dir.parents:
raise ValueError("Working dir can not be parent: parent="
f"'{check_dir}', child='{work_dir}'")
if config.memory_mb < cls.MIN_MEMORY_MB:
raise ValueError(f"Not enough memory: {config.memory_mb} MB")
if config.cpu_count < cls.MIN_CPU_COUNT:
raise ValueError(f"Not enough CPUs: {config.cpu_count}")
logger.info("Configuration positively validated.")

def _update_work_dir(self, work_dir: Path) -> None:
def _update_work_dirs(self, work_dirs: List[Path]) -> None:
logger.info("Updating hypervisor's working directory...")
try:
self._hypervisor.update_work_dir(work_dir)
self._hypervisor.update_work_dirs(work_dirs)
except Exception as e:
self._error_occurred(e, "Updating working directory failed.")
raise
Expand Down
10 changes: 5 additions & 5 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from pathlib import Path
from typing import Optional, TYPE_CHECKING
from typing import List, Optional, TYPE_CHECKING

import os
import time
Expand Down Expand Up @@ -248,7 +248,7 @@ def change_config(
return self.change_docker_config(
config_desc=config_desc,
run_benchmarks=run_benchmarks,
work_dir=Path(self.dir_manager.root_path),
work_dirs=[Path(self.dir_manager.root_path)],
in_background=in_background)

def config_changed(self):
Expand All @@ -260,7 +260,7 @@ def change_docker_config(
self,
config_desc: ClientConfigDescriptor,
run_benchmarks: bool,
work_dir: Path,
work_dirs: List[Path],
in_background: bool = True
) -> Deferred:

Expand All @@ -270,7 +270,7 @@ def change_docker_config(

yield self.docker_cpu_env.clean_up()
self.docker_cpu_env.update_config(DockerCPUConfig(
work_dir=work_dir,
work_dirs=work_dirs,
cpu_count=config_desc.num_cores,
memory_mb=scale_memory(
config_desc.max_memory_size,
Expand Down Expand Up @@ -311,7 +311,7 @@ def done_callback(config_differs):
dm.update_config(
status_callback=status_callback,
done_callback=done_callback,
work_dir=work_dir,
work_dirs=work_dirs,
in_background=in_background)

return (yield deferred)
Expand Down
2 changes: 1 addition & 1 deletion golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __init__(self,

os.makedirs(self.get_task_computer_root(), exist_ok=True)
docker_cpu_config = DockerCPUConfig(
work_dir=Path(self.get_task_computer_root()))
work_dirs=[Path(self.get_task_computer_root())])
docker_cpu_env = NonHypervisedDockerCPUEnvironment(docker_cpu_config)
new_env_manager = EnvironmentManager()
new_env_manager.register_env(docker_cpu_env)
Expand Down
2 changes: 1 addition & 1 deletion tests/apps/blender/benchmark/test_blenderbenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_run(self):
dm.update_config(
status_callback=mock.Mock(),
done_callback=mock.Mock(),
work_dir=self.new_path,
work_dirs=[self.new_path],
in_background=True)
benchmark = BlenderBenchmark()
task_definition = benchmark.task_definition
Expand Down
2 changes: 0 additions & 2 deletions tests/golem/docker/test_docker_blender_task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


from os import path
from unittest.mock import Mock

Expand Down
4 changes: 2 additions & 2 deletions tests/golem/docker/test_docker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ def done_cb(_):
dmm.check_environment()

dmm.update_config(
status_cb, done_cb, in_background=False, work_dir=None)
status_cb, done_cb, in_background=False, work_dirs=[])
dmm.update_config(
status_cb, done_cb, in_background=True, work_dir=None)
status_cb, done_cb, in_background=True, work_dirs=[])

def test_constrain_not_called(self):
dmm = MockDockerManager()
Expand Down
10 changes: 5 additions & 5 deletions tests/golem/docker/test_hyperv.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ def test_constraints_ok(self, get_info, get_memory):
})

@patch(PATCH_BASE + '.smbshare')
def test_update_work_dir(self, smbshare):
def test_update_work_dirs(self, smbshare):
path = Mock()
self.hyperv.update_work_dir(path)
self.hyperv.update_work_dirs([path])
smbshare.create_share.assert_called_once_with(
HyperVHypervisor.DOCKER_USER, path)

Expand Down Expand Up @@ -200,7 +200,7 @@ def _create_volume(my_ip, shared_dir):

def test_create_volume_wrong_dir(self):
tmp_dir = Path(tempfile.gettempdir())
self.hyperv._work_dir = tmp_dir / 'work_dir'
self.hyperv._work_dirs = [tmp_dir / 'work_dir']

with self.assertRaises(ValueError):
self.hyperv._create_volume('127.0.0.1', tmp_dir / 'shared_dir')
Expand All @@ -209,8 +209,8 @@ def test_create_volume_wrong_dir(self):
@patch(PATCH_BASE + '.smbshare')
def test_create_volume_ok(self, smbshare, local_client):
tmp_dir = Path(tempfile.gettempdir())
work_dir = self.hyperv._work_dir = tmp_dir / 'work_dir'
shared_dir = work_dir / 'task1' / 'res'
work_dirs = self.hyperv._work_dirs = [tmp_dir / 'work_dir']
shared_dir = work_dirs[0] / 'task1' / 'res'
smbshare.get_share_name.return_value = 'SHARE_NAME'

volume_name = self.hyperv._create_volume('127.0.0.1', shared_dir)
Expand Down
15 changes: 8 additions & 7 deletions tests/golem/envs/docker/cpu/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,29 @@ def test_missing_values(self):
def test_extra_values(self):
with self.assertRaises(TypeError):
DockerCPUConfig.from_dict({
'work_dir': '/tmp/golem',
'work_dirs': ['/tmp/golem'],
'memory_mb': 2000,
'cpu_count': 2,
'extra': 'value'
})

def test_default_values(self):
config = DockerCPUConfig.from_dict({
'work_dir': '/tmp/golem'
'work_dirs': ['/tmp/golem']
})

self.assertEqual(config.work_dir, Path('/tmp/golem'))
self.assertEqual(config.work_dirs, [Path('/tmp/golem')])
self.assertIsNotNone(config.memory_mb)
self.assertIsNotNone(config.cpu_count)

def test_custom_values(self):
config = DockerCPUConfig.from_dict({
'work_dir': '/tmp/golem',
'work_dirs': ['/tmp/golem'],
'memory_mb': 2137,
'cpu_count': 12
})

self.assertEqual(config.work_dir, Path('/tmp/golem'))
self.assertEqual(config.work_dirs, [Path('/tmp/golem')])
self.assertEqual(config.memory_mb, 2137)
self.assertEqual(config.cpu_count, 12)

Expand All @@ -44,13 +44,14 @@ class TestToDict(TestCase):

def test_to_dict(self):
config_dict = DockerCPUConfig(
work_dir=Path('/tmp/golem'),
work_dirs=[Path('/tmp/golem')],
memory_mb=2137,
cpu_count=12
).to_dict()

# We cannot assert exact path string because it depends on OS
self.assertEqual(Path(config_dict.pop('work_dir')), Path('/tmp/golem'))
_work_dirs = config_dict.pop('work_dirs')
self.assertEqual(Path(_work_dirs[0]), Path('/tmp/golem'))
self.assertEqual(config_dict, {
'memory_mb': 2137,
'cpu_count': 12
Expand Down
Loading

0 comments on commit 1f281b5

Please # to comment.