diff --git a/README.md b/README.md index 5e577188..aea53e54 100644 --- a/README.md +++ b/README.md @@ -13,15 +13,11 @@ Hello Wo**RL**d!!:hand: **Join Our Reinforcement Learning framework for Develop - Distributed RL algorithms are provided using [ray](https://github.com/ray-project/ray) - Benchmark of the algorithms is conducted in many RL environment -## :exclamation:Notification +## :heavy_check_mark: Tested -Currently, JORLDY is pre-release version. It supports Linux only, but all the scripts can be run on Windows and Mac in the following ways. -- Windows: Docker or WSL -- Mac: Docker - -However, you can use only (single, sync_distributed)_train_nomp.py and eval.py on a local environment in Windows and Mac. We will address these issues as soon as possible. - -**\* (single, sync_distributed)_train_nomp.py: these scripts don't use multiprocessing library. In detail, the manage process is included in the main process. So it can be a bit slow.** +| Python | Windows | Mac | Linux | +| :----: | :---------: | :-----: | :------: | +| 3.8 | :heavy_check_mark: | :heavy_check_mark: | WSL, Ubuntu 18.04 | ## :arrow_down: Installation diff --git a/docs/How_to_use.md b/docs/How_to_use.md index 30ec72ea..c3c05df9 100644 --- a/docs/How_to_use.md +++ b/docs/How_to_use.md @@ -5,7 +5,6 @@ - sync_distributed_train.py: train with sychronous distributed setting. - async_distributed_train.py: train with asychronous distributed setting. - eval.py: evaluate with trained agent. -- (single, sync_distributed)_train_nomp.py: this scripts don't use multiprocessing library. In detail, the manage process is included in the main process. So it can be a bit slow. if you want to know the specific process of each script, please refer to [Distributed Architecture](./Distributed_Architecture.md) ## How to Check Implemented List diff --git a/jorldy/async_distributed_train.py b/jorldy/async_distributed_train.py index bbc295e6..61a61990 100644 --- a/jorldy/async_distributed_train.py +++ b/jorldy/async_distributed_train.py @@ -26,7 +26,7 @@ if config.train.distributed_batch_size: agent_config["batch_size"] = config.train.distributed_batch_size - trans_queue = mp.Queue() + trans_queue = mp.Queue(10) interact_sync_queue = mp.Queue(1) result_queue = mp.Queue() manage_sync_queue = mp.Queue(1) @@ -58,7 +58,7 @@ step, _step, print_stamp, save_stamp = 0, 0, 0, 0 while step < config.train.run_step: transitions = [] - while (_step == 0 or trans_queue.qsize() > 0) and\ + while (_step == 0 or not trans_queue.empty()) and\ (_step - step < config.train.update_period): _step, _transitions = trans_queue.get() transitions += _transitions diff --git a/jorldy/manager/config_manager.py b/jorldy/manager/config_manager.py index d9634603..4d39e221 100644 --- a/jorldy/manager/config_manager.py +++ b/jorldy/manager/config_manager.py @@ -60,6 +60,12 @@ class CustomDict(dict): def __init__(self, init_dict={}): self.update(init_dict) + def __getstate__(self): + return self.__dict__ + + def __setstate__(self, d): + self.__dict__.update(d) + def type_cast(var): try: return int(var) diff --git a/jorldy/process.py b/jorldy/process.py index 776a2ef6..0fe12279 100644 --- a/jorldy/process.py +++ b/jorldy/process.py @@ -13,9 +13,9 @@ def interact_process(DistributedManager, distributed_manager_config, delta_t = len(transitions) / num_workers step += delta_t trans_queue.put((int(step), transitions)) - if sync_queue.qsize() > 0: + if sync_queue.full(): distributed_manager.sync(sync_queue.get()) - while trans_queue.qsize() == 10: + while trans_queue.full(): time.sleep(0.1) except Exception as e: traceback.print_exc() @@ -39,7 +39,7 @@ def manage_process(Agent, agent_config, try: while step < run_step: wait = True - while wait or result_queue.qsize() > 0: + while wait or not result_queue.empty(): _step, result = result_queue.get() metric_manager.append(result) wait = False diff --git a/jorldy/single_train_nomp.py b/jorldy/single_train_nomp.py deleted file mode 100644 index 9d5b92d4..00000000 --- a/jorldy/single_train_nomp.py +++ /dev/null @@ -1,66 +0,0 @@ -import argparse - -from core import * -from manager import * - -# default_config_path = "config.YOUR_AGENT.YOUR_ENV" -default_config_path = "config.dqn.cartpole" - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument('--config', type=str, help='config.dqn.cartpole') - args, unknown = parser.parse_known_args() - config_path = args.config if args.config else default_config_path - config_manager = ConfigManager(config_path, unknown) - config = config_manager.config - - env = Env(**config.env) - agent = Agent(state_size=env.state_size, - action_size=env.action_size, - optim_config=config.optim, - **config.agent) - - if config.train.load_path: - agent.load(config.train.load_path) - - record_period = config.train.record_period if config.train.record_period else config.train.run_step//10 - eval_manager = EvalManager(Env(**config.env), config.train.eval_iteration, - config.train.record, record_period) - metric_manager = MetricManager() - log_id = config.train.id if config.train.id else config.agent.name - log_manager = LogManager(config.env.name, log_id, config.train.experiment) - config_manager.dump(log_manager.path) - - episode = 0 - state = env.reset() - - for step in range(1, config.train.run_step+1): - action_dict = agent.act(state, config.train.training) - next_state, reward, done = env.step(action_dict['action']) - - if config.train.training: - transition = {'state': state, 'next_state': next_state, - 'reward': reward, 'done': done} - transition.update(action_dict) - transition = agent.interact_callback(transition) - if transition: - result = agent.process([transition], step) - metric_manager.append(result) - state = next_state - - if done: - episode += 1 - state = env.reset() - - if step % config.train.print_period == 0: - score, frames = eval_manager.evaluate(agent, step) - metric_manager.append({"score": score}) - statistics = metric_manager.get_statistics() - print(f"{episode} Episode / Step : {step} / {statistics}") - log_manager.write(statistics, frames, score, step) - - if config.train.training and \ - (step % config.train.save_period == 0 or step == config.train.run_step - 1): - agent.save(log_manager.path) - - env.close() diff --git a/jorldy/sync_distributed_train_nomp.py b/jorldy/sync_distributed_train_nomp.py deleted file mode 100644 index ddd0166e..00000000 --- a/jorldy/sync_distributed_train_nomp.py +++ /dev/null @@ -1,60 +0,0 @@ -import argparse - -from core import * -from manager import * -from process import * - -# default_config_path = "config.YOUR_AGENT.YOUR_ENV" -default_config_path = "config.dqn.cartpole" - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument('--config', type=str, help='config.dqn.cartpole') - args, unknown = parser.parse_known_args() - config_path = args.config if args.config else default_config_path - config_manager = ConfigManager(config_path, unknown) - config = config_manager.config - - env = Env(**config.env) - agent_config = {'state_size': env.state_size, - 'action_size': env.action_size, - 'optim_config': config.optim, - 'num_workers': config.train.num_workers} - agent_config.update(config.agent) - if config.train.distributed_batch_size: - agent_config["batch_size"] = config.train.distributed_batch_size - agent = Agent(**agent_config) - - if config.train.load_path: - agent.load(config.train.load_path) - - record_period = config.train.record_period if config.train.record_period else config.train.run_step//10 - - eval_manager = EvalManager(Env(**config.env), config.train.eval_iteration, config.train.record, record_period) - metric_manager = MetricManager() - log_id = config.train.id if config.train.id else config.agent.name - log_manager = LogManager(config.env.name, log_id, config.train.experiment) - config_manager.dump(log_manager.path) - - distributed_manager = DistributedManager(Env, config.env, Agent, {'device':'cpu', **agent_config}, config.train.num_workers, 'sync') - - step, print_stamp, save_stamp = 0, 0, 0 - while step < config.train.run_step: - transitions = distributed_manager.run(config.train.update_period) - step += config.train.update_period - print_stamp += config.train.update_period - save_stamp += config.train.update_period - result = agent.process(transitions, step) - distributed_manager.sync(agent.sync_out()) - metric_manager.append(result) - if print_stamp >= config.train.print_period or step >= config.train.run_step: - score, frames = eval_manager.evaluate(agent, step) - metric_manager.append({"score": score}) - statistics = metric_manager.get_statistics() - print(f"Step : {step} / {statistics}") - log_manager.write(statistics, frames, score, step) - print_stamp = 0 - if save_stamp >= config.train.save_period or step >= config.train.run_step: - agent.save(log_manager.path) - save_stamp = 0 - env.close() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e5bc9bbc..05b307b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ torch==1.8.1 tensorboard==2.5.0 -ray==1.7.0 +ray==1.8.0 opencv-python==4.5.2.52 pygifsicle==1.0.4 gym==0.21.0