From 7d8147a5e259ab8f7858dff2c4bb71125dee77d0 Mon Sep 17 00:00:00 2001 From: "gaodawei.gdw" Date: Fri, 15 Jul 2022 15:42:56 +0800 Subject: [PATCH 1/8] wip --- federatedscope/core/trainers/torch_trainer.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/federatedscope/core/trainers/torch_trainer.py b/federatedscope/core/trainers/torch_trainer.py index 5c7e4c404..fc7ba8185 100644 --- a/federatedscope/core/trainers/torch_trainer.py +++ b/federatedscope/core/trainers/torch_trainer.py @@ -142,6 +142,7 @@ def _hook_on_fit_start_init(self, ctx): setattr(ctx, "num_samples_{}".format(ctx.cur_data_split), 0) setattr(ctx, "{}_y_true".format(ctx.cur_data_split), []) setattr(ctx, "{}_y_prob".format(ctx.cur_data_split), []) + setattr(ctx, "{}_y_indx".format(ctx.cur_data_split), []) def _hook_on_fit_start_calculate_model_size(self, ctx): if not isinstance(self.ctx.monitor, Monitor): @@ -280,6 +281,10 @@ def _hook_on_batch_end(self, ctx): ctx.get("{}_y_prob".format(ctx.cur_data_split)).append( ctx.y_prob.detach().cpu().numpy()) + # TODO: for cikmcup to save predictions + if ctx.cfg.data == 'cikmcup' and ctx.cur_data_split == MODE.TEST: + ctx.predict_ids = [_.data_index for _ in ctx.data_batch] + # clean temp ctx ctx.data_batch = None ctx.batch_size = None @@ -289,6 +294,7 @@ def _hook_on_batch_end(self, ctx): ctx.y_true = None ctx.y_prob = None + def _hook_on_fit_end(self, ctx): """Evaluate metrics. @@ -302,6 +308,7 @@ def _hook_on_fit_end(self, ctx): results = self.metric_calculator.eval(ctx) setattr(ctx, 'eval_metrics', results) + def save_model(self, path, cur_round=-1): assert self.ctx.model is not None From ee044faf8021d87564fd491b0339136db2c8f44c Mon Sep 17 00:00:00 2001 From: "gaodawei.gdw" Date: Fri, 15 Jul 2022 16:04:20 +0800 Subject: [PATCH 2/8] wip --- federatedscope/core/trainers/torch_trainer.py | 1 - federatedscope/gfl/trainer/graphtrainer.py | 10 ++++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/federatedscope/core/trainers/torch_trainer.py b/federatedscope/core/trainers/torch_trainer.py index fc7ba8185..ce104ad86 100644 --- a/federatedscope/core/trainers/torch_trainer.py +++ b/federatedscope/core/trainers/torch_trainer.py @@ -142,7 +142,6 @@ def _hook_on_fit_start_init(self, ctx): setattr(ctx, "num_samples_{}".format(ctx.cur_data_split), 0) setattr(ctx, "{}_y_true".format(ctx.cur_data_split), []) setattr(ctx, "{}_y_prob".format(ctx.cur_data_split), []) - setattr(ctx, "{}_y_indx".format(ctx.cur_data_split), []) def _hook_on_fit_start_calculate_model_size(self, ctx): if not isinstance(self.ctx.monitor, Monitor): diff --git a/federatedscope/gfl/trainer/graphtrainer.py b/federatedscope/gfl/trainer/graphtrainer.py index 9c7e1b2b5..16c1c6827 100644 --- a/federatedscope/gfl/trainer/graphtrainer.py +++ b/federatedscope/gfl/trainer/graphtrainer.py @@ -8,6 +8,10 @@ class GraphMiniBatchTrainer(GeneralTorchTrainer): + def _hook_on_fit_start_init(self, ctx): + super()._hook_on_fit_start_init() + setattr(ctx, "{}_y_inds".format(ctx.cur_data_split), []) + def _hook_on_batch_forward(self, ctx): batch = ctx.data_batch.to(ctx.device) pred = ctx.model(batch) @@ -20,6 +24,12 @@ def _hook_on_batch_forward(self, ctx): ctx.y_true = label ctx.y_prob = pred + setattr( + ctx, + f'{ctx.cur_data_split}_y_inds', + ctx.get(f'{ctx.cur_data_split}_y_inds') + [_ for _ in ctx.data_batch.data_index] + ) + def _hook_on_batch_forward_flop_count(self, ctx): if not isinstance(self.ctx.monitor, Monitor): logger.warning( From 10385d4a3577a46f7717b4f98d0589dd5b8e4b2c Mon Sep 17 00:00:00 2001 From: "gaodawei.gdw" Date: Fri, 15 Jul 2022 16:53:11 +0800 Subject: [PATCH 3/8] wip --- federatedscope/core/trainers/torch_trainer.py | 4 ---- federatedscope/core/worker/client.py | 8 +++++++ federatedscope/gfl/trainer/graphtrainer.py | 24 +++++++++++++++---- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/federatedscope/core/trainers/torch_trainer.py b/federatedscope/core/trainers/torch_trainer.py index ce104ad86..e3b36a242 100644 --- a/federatedscope/core/trainers/torch_trainer.py +++ b/federatedscope/core/trainers/torch_trainer.py @@ -280,10 +280,6 @@ def _hook_on_batch_end(self, ctx): ctx.get("{}_y_prob".format(ctx.cur_data_split)).append( ctx.y_prob.detach().cpu().numpy()) - # TODO: for cikmcup to save predictions - if ctx.cfg.data == 'cikmcup' and ctx.cur_data_split == MODE.TEST: - ctx.predict_ids = [_.data_index for _ in ctx.data_batch] - # clean temp ctx ctx.data_batch = None ctx.batch_size = None diff --git a/federatedscope/core/worker/client.py b/federatedscope/core/worker/client.py index 715df910c..a834f8645 100644 --- a/federatedscope/core/worker/client.py +++ b/federatedscope/core/worker/client.py @@ -444,6 +444,14 @@ def callback_funcs_for_finish(self, message: Message): self.trainer.update(message.content, strict=self._cfg.federate.share_local_model) + # TODO: more elegant here + # Save final prediction result + if self._cfg.data.type == 'cikmcup': + # Evaluate + self.trainer.evaluate(target_data_split_name='test') + # Save results + self.trainer.save_prediction(self.ID, self._cfg.model.task) + self._monitor.finish_fl() def callback_funcs_for_converged(self, message: Message): diff --git a/federatedscope/gfl/trainer/graphtrainer.py b/federatedscope/gfl/trainer/graphtrainer.py index 16c1c6827..3261c1a71 100644 --- a/federatedscope/gfl/trainer/graphtrainer.py +++ b/federatedscope/gfl/trainer/graphtrainer.py @@ -1,5 +1,7 @@ import logging +import numpy as np + from federatedscope.core.monitors import Monitor from federatedscope.register import register_trainer from federatedscope.core.trainers import GeneralTorchTrainer @@ -24,11 +26,13 @@ def _hook_on_batch_forward(self, ctx): ctx.y_true = label ctx.y_prob = pred - setattr( - ctx, - f'{ctx.cur_data_split}_y_inds', - ctx.get(f'{ctx.cur_data_split}_y_inds') + [_ for _ in ctx.data_batch.data_index] - ) + # record the index of the ${MODE} samples + if hasattr(batch, 'data_index'): + setattr( + ctx, + f'{ctx.cur_data_split}_y_inds', + ctx.get(f'{ctx.cur_data_split}_y_inds') + ctx.data_batch.data_index.cpu().numpy().tolist() + ) def _hook_on_batch_forward_flop_count(self, ctx): if not isinstance(self.ctx.monitor, Monitor): @@ -75,6 +79,16 @@ def _hook_on_batch_forward_flop_count(self, ctx): self.ctx.monitor.total_flops += self.ctx.monitor.flops_per_sample * \ ctx.batch_size + def save_prediction(self, client_id, task_type): + y_inds, y_probs = self.ctx.test_y_inds, self.ctx.test_y_prob + if 'classification' in task_type: + y_probs = np.argmax(y_probs, axis=-1) + # TODO: more feasible, for now we hard code it for cikmcup + with open('prediction/round_{}.csv', 'a') as file: + for y_ind, y_prob in zip(y_inds, y_probs): + line = [client_id, y_ind] + list(y_prob) + file.write(','.join(line) + '\n') + def call_graph_level_trainer(trainer_type): if trainer_type == 'graphminibatch_trainer': From 1a3770858390655342b90099a2286d322f89bb9d Mon Sep 17 00:00:00 2001 From: "gaodawei.gdw" Date: Fri, 15 Jul 2022 18:08:06 +0800 Subject: [PATCH 4/8] bug fix --- federatedscope/core/worker/client.py | 1 + federatedscope/gfl/trainer/graphtrainer.py | 23 ++++++++++++++-------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/federatedscope/core/worker/client.py b/federatedscope/core/worker/client.py index a834f8645..ba6b2743d 100644 --- a/federatedscope/core/worker/client.py +++ b/federatedscope/core/worker/client.py @@ -451,6 +451,7 @@ def callback_funcs_for_finish(self, message: Message): self.trainer.evaluate(target_data_split_name='test') # Save results self.trainer.save_prediction(self.ID, self._cfg.model.task) + logger.info(f"Client #{self.ID} finished saving prediction results.") self._monitor.finish_fl() diff --git a/federatedscope/gfl/trainer/graphtrainer.py b/federatedscope/gfl/trainer/graphtrainer.py index f1dc0c28c..ab11d5528 100644 --- a/federatedscope/gfl/trainer/graphtrainer.py +++ b/federatedscope/gfl/trainer/graphtrainer.py @@ -1,4 +1,5 @@ import logging +import os import numpy as np @@ -11,7 +12,7 @@ class GraphMiniBatchTrainer(GeneralTorchTrainer): def _hook_on_fit_start_init(self, ctx): - super()._hook_on_fit_start_init() + super()._hook_on_fit_start_init(ctx) setattr(ctx, "{}_y_inds".format(ctx.cur_data_split), []) def _hook_on_batch_forward(self, ctx): @@ -31,11 +32,11 @@ def _hook_on_batch_forward(self, ctx): ctx.y_prob = pred # record the index of the ${MODE} samples - if hasattr(batch, 'data_index'): + if hasattr(ctx.data_batch, 'data_index'): setattr( ctx, f'{ctx.cur_data_split}_y_inds', - ctx.get(f'{ctx.cur_data_split}_y_inds') + ctx.data_batch.data_index.cpu().numpy().tolist() + ctx.get(f'{ctx.cur_data_split}_y_inds') + ctx.data_batch.data_index.detach().cpu().numpy().tolist() ) def _hook_on_batch_forward_flop_count(self, ctx): @@ -85,13 +86,19 @@ def _hook_on_batch_forward_flop_count(self, ctx): def save_prediction(self, client_id, task_type): y_inds, y_probs = self.ctx.test_y_inds, self.ctx.test_y_prob - if 'classification' in task_type: - y_probs = np.argmax(y_probs, axis=-1) + os.makedirs('../prediction', exist_ok=True) + # TODO: more feasible, for now we hard code it for cikmcup - with open('prediction/round_{}.csv', 'a') as file: + if 'classification' in task_type.lower(): + y_probs = np.argmax(y_probs, axis=-1) + + with open('../prediction/prediction.csv', 'a') as file: for y_ind, y_prob in zip(y_inds, y_probs): - line = [client_id, y_ind] + list(y_prob) - file.write(','.join(line) + '\n') + if 'classification' in task_type.lower(): + line = [client_id, y_ind] + [y_prob] + else: + line = [client_id, y_ind] + list(y_prob) + file.write(','.join([str(_) for _ in line]) + '\n') def call_graph_level_trainer(trainer_type): From caec5a50ec642df11c644d6863d0b4ce3019d7dd Mon Sep 17 00:00:00 2001 From: "gaodawei.gdw" Date: Mon, 18 Jul 2022 09:43:42 +0800 Subject: [PATCH 5/8] modified according to the comments --- federatedscope/gfl/trainer/graphtrainer.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/federatedscope/gfl/trainer/graphtrainer.py b/federatedscope/gfl/trainer/graphtrainer.py index ab11d5528..49ca771be 100644 --- a/federatedscope/gfl/trainer/graphtrainer.py +++ b/federatedscope/gfl/trainer/graphtrainer.py @@ -89,15 +89,14 @@ def save_prediction(self, client_id, task_type): os.makedirs('../prediction', exist_ok=True) # TODO: more feasible, for now we hard code it for cikmcup - if 'classification' in task_type.lower(): - y_probs = np.argmax(y_probs, axis=-1) + y_preds = np.argmax(y_probs, axis=-1) if 'classification' in task_type.lower() else y_probs with open('../prediction/prediction.csv', 'a') as file: - for y_ind, y_prob in zip(y_inds, y_probs): + for y_ind, y_pred in zip(y_inds, y_preds): if 'classification' in task_type.lower(): - line = [client_id, y_ind] + [y_prob] + line = [client_id, y_ind] + [y_pred] else: - line = [client_id, y_ind] + list(y_prob) + line = [client_id, y_ind] + list(y_pred) file.write(','.join([str(_) for _ in line]) + '\n') From 55a3ff7b64b7b37857dcb84faaf8dd9b5a082821 Mon Sep 17 00:00:00 2001 From: "gaodawei.gdw" Date: Mon, 18 Jul 2022 16:42:44 +0800 Subject: [PATCH 6/8] Modify the save directory to fit the name of the download contest data --- federatedscope/gfl/dataset/cikm_cup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/federatedscope/gfl/dataset/cikm_cup.py b/federatedscope/gfl/dataset/cikm_cup.py index 4240b2a07..3dfeb730d 100644 --- a/federatedscope/gfl/dataset/cikm_cup.py +++ b/federatedscope/gfl/dataset/cikm_cup.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) class CIKMCUPDataset(InMemoryDataset): - name = 'CIKM_CUP' + name = 'CIKM22Competition' def __init__(self, root): super(CIKMCUPDataset, self).__init__(root) From 0e66ab03b77296561906ba27174b7a372f07d471 Mon Sep 17 00:00:00 2001 From: "gaodawei.gdw" Date: Mon, 18 Jul 2022 19:18:23 +0800 Subject: [PATCH 7/8] Modify the client_num --- federatedscope/gfl/baseline/fedavg_gin_minibatch_on_cikmcup.yaml | 1 + .../gfl/baseline/isolated_gin_minibatch_on_cikmcup.yaml | 1 + 2 files changed, 2 insertions(+) diff --git a/federatedscope/gfl/baseline/fedavg_gin_minibatch_on_cikmcup.yaml b/federatedscope/gfl/baseline/fedavg_gin_minibatch_on_cikmcup.yaml index 957684f3d..f49950054 100644 --- a/federatedscope/gfl/baseline/fedavg_gin_minibatch_on_cikmcup.yaml +++ b/federatedscope/gfl/baseline/fedavg_gin_minibatch_on_cikmcup.yaml @@ -9,6 +9,7 @@ federate: make_global_eval: False total_round_num: 100 share_local_model: False + client_num: 13 data: root: data/ type: cikmcup diff --git a/federatedscope/gfl/baseline/isolated_gin_minibatch_on_cikmcup.yaml b/federatedscope/gfl/baseline/isolated_gin_minibatch_on_cikmcup.yaml index 7880f4f3b..f4db08333 100644 --- a/federatedscope/gfl/baseline/isolated_gin_minibatch_on_cikmcup.yaml +++ b/federatedscope/gfl/baseline/isolated_gin_minibatch_on_cikmcup.yaml @@ -10,6 +10,7 @@ federate: make_global_eval: False total_round_num: 10 share_local_model: False + client_num: 13 data: batch_size: 64 root: data/ From 0f8f1c1651dd9d1989149561e4ea7acf90359236 Mon Sep 17 00:00:00 2001 From: "gaodawei.gdw" Date: Mon, 18 Jul 2022 19:21:15 +0800 Subject: [PATCH 8/8] bug fix --- federatedscope/gfl/trainer/graphtrainer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/federatedscope/gfl/trainer/graphtrainer.py b/federatedscope/gfl/trainer/graphtrainer.py index 49ca771be..07c746a74 100644 --- a/federatedscope/gfl/trainer/graphtrainer.py +++ b/federatedscope/gfl/trainer/graphtrainer.py @@ -86,12 +86,12 @@ def _hook_on_batch_forward_flop_count(self, ctx): def save_prediction(self, client_id, task_type): y_inds, y_probs = self.ctx.test_y_inds, self.ctx.test_y_prob - os.makedirs('../prediction', exist_ok=True) + os.makedirs('prediction', exist_ok=True) # TODO: more feasible, for now we hard code it for cikmcup y_preds = np.argmax(y_probs, axis=-1) if 'classification' in task_type.lower() else y_probs - with open('../prediction/prediction.csv', 'a') as file: + with open('prediction/prediction.csv', 'a') as file: for y_ind, y_pred in zip(y_inds, y_preds): if 'classification' in task_type.lower(): line = [client_id, y_ind] + [y_pred]