diff --git a/federatedscope/autotune/baseline/fedhpo_vfl.yaml b/federatedscope/autotune/baseline/fedhpo_vfl.yaml index bc2c52249..57f44275e 100644 --- a/federatedscope/autotune/baseline/fedhpo_vfl.yaml +++ b/federatedscope/autotune/baseline/fedhpo_vfl.yaml @@ -14,7 +14,6 @@ model: train: optimizer: lr: 0.5 - bin_num: 100 # learning rate for xgb model eta: 0.5 data: @@ -36,7 +35,6 @@ vertical: key_size: 256 dims: [7, 14] algo: 'xgb' - xgb_use_bin: False eval: freq: 5 best_res_update_round_wise_key: test_loss diff --git a/federatedscope/core/auxiliaries/trainer_builder.py b/federatedscope/core/auxiliaries/trainer_builder.py index f2dc2b53c..247c2ba8a 100644 --- a/federatedscope/core/auxiliaries/trainer_builder.py +++ b/federatedscope/core/auxiliaries/trainer_builder.py @@ -28,7 +28,6 @@ "mftrainer": "MFTrainer", "cltrainer": "CLTrainer", "lptrainer": "LPTrainer", - "verticaltrainer": "VerticalTrainer", "atc_trainer": "ATCTrainer", } @@ -135,8 +134,6 @@ def get_trainer(model=None, dict_path = "federatedscope.cv.trainer.trainer" elif config.trainer.type.lower() in ['nlptrainer']: dict_path = "federatedscope.nlp.trainer.trainer" - elif config.trainer.type.lower() in ['verticaltrainer']: - dict_path = "federatedscope.vertical_fl.trainer.trainer" elif config.trainer.type.lower() in ['cltrainer', 'lptrainer']: dict_path = "federatedscope.cl.trainer.trainer" elif config.trainer.type.lower() in [ @@ -171,6 +168,14 @@ def get_trainer(model=None, config=config, only_for_eval=only_for_eval, monitor=monitor) + elif config.trainer.type.lower() in ['verticaltrainer']: + from federatedscope.vertical_fl.trainer.utils import \ + get_vertical_trainer + trainer = get_vertical_trainer(config=config, + model=model, + data=data, + device=device, + monitor=monitor) else: # try to find user registered trainer trainer = None diff --git a/federatedscope/core/configs/cfg_fl_setting.py b/federatedscope/core/configs/cfg_fl_setting.py index 08d1f735d..52aa26117 100644 --- a/federatedscope/core/configs/cfg_fl_setting.py +++ b/federatedscope/core/configs/cfg_fl_setting.py @@ -77,7 +77,10 @@ def extend_fl_setting_cfg(cfg): cfg.vertical.encryption = 'paillier' cfg.vertical.key_size = 3072 cfg.vertical.algo = 'lr' # ['lr', 'xgb'] - cfg.vertical.xgb_use_bin = False + cfg.vertical.protect_object = '' # feature_order, TODO: add more + cfg.vertical.protect_method = '' # dp + cfg.vertical.protect_args = [] + # Default values for 'dp': {'bucket_num':100, 'epsilon':None} # --------------- register corresponding check function ---------- cfg.register_cfg_check_fun(assert_fl_setting_cfg) diff --git a/federatedscope/vertical_fl/dataset/blog.py b/federatedscope/vertical_fl/dataset/blog.py index 328d9957d..601f12488 100644 --- a/federatedscope/vertical_fl/dataset/blog.py +++ b/federatedscope/vertical_fl/dataset/blog.py @@ -103,6 +103,7 @@ def _get_data(self): def _read_raw(self, file_path): data = pd.read_csv(file_path, header=None, usecols=list(range(281))) + data = data.fillna(method='ffill') data = data.values return data diff --git a/federatedscope/vertical_fl/dataset/credit.py b/federatedscope/vertical_fl/dataset/credit.py index 6bf4b9803..76d65fb9b 100644 --- a/federatedscope/vertical_fl/dataset/credit.py +++ b/federatedscope/vertical_fl/dataset/credit.py @@ -97,6 +97,7 @@ def balance_sample(sample_size, y): def _read_raw(self, file_path): data = pd.read_csv(file_path) + data = data.fillna(method='ffill') data = data.values return data diff --git a/federatedscope/vertical_fl/trainer/__init__.py b/federatedscope/vertical_fl/trainer/__init__.py index e69de29bb..377c7c173 100644 --- a/federatedscope/vertical_fl/trainer/__init__.py +++ b/federatedscope/vertical_fl/trainer/__init__.py @@ -0,0 +1,3 @@ +from federatedscope.vertical_fl.trainer.trainer import VerticalTrainer +from federatedscope.vertical_fl.trainer.feature_order_protected_trainer \ + import FeatureOrderProtectedTrainer diff --git a/federatedscope/vertical_fl/trainer/feature_order_protected_trainer.py b/federatedscope/vertical_fl/trainer/feature_order_protected_trainer.py new file mode 100644 index 000000000..9d19d2112 --- /dev/null +++ b/federatedscope/vertical_fl/trainer/feature_order_protected_trainer.py @@ -0,0 +1,236 @@ +import numpy as np +from federatedscope.vertical_fl.trainer.trainer import VerticalTrainer + + +class FeatureOrderProtectedTrainer(VerticalTrainer): + def __init__(self, model, data, device, config, monitor): + super(FeatureOrderProtectedTrainer, + self).__init__(model, data, device, config, monitor) + + assert config.vertical.protect_method != '', \ + "Please specify the adopted method for protecting feature order" + args = config.vertical.protect_args[0] if len( + config.vertical.protect_args) > 0 else {} + + if config.vertical.protect_method == 'dp': + self.bucket_num = args.get('bucket_num', 100) + self.epsilon = args.get('epsilon', None) + self.protect_funcs = self._protect_via_dp + self.split_value = None + elif config.vertical.protect_method == 'op_boost': + self.algo = args.get('algo', 'global') + self.protect_funcs = self._protect_via_op_boost + self.lower_bound = args.get('lower_bound', 1) + self.upper_bound = args.get('upper_bound', 100) + if self.algo == 'global': + self.epsilon = args.get('epsilon', 2) + elif self.algo == 'adjusting': + self.epsilon_prt = args.get('epsilon_prt', 2) + self.epsilon_ner = args.get('epsilon_ner', 2) + self.partition_num = args.get('partition_num', 10) + else: + raise ValueError + else: + raise ValueError(f"The method {args['method']} is not provided") + + def get_feature_value(self, feature_idx, value_idx): + if not hasattr(self, 'split_value') or self.split_value is None: + return super().get_feature_value(feature_idx=feature_idx, + value_idx=value_idx) + + return self.split_value[feature_idx][value_idx] + + def _bucketize(self, feature_order, bucket_size, bucket_num): + bucketized_feature_order = list() + for bucket_idx in range(bucket_num): + start = bucket_idx * bucket_size + end = min((bucket_idx + 1) * bucket_size, len(feature_order)) + bucketized_feature_order.append(feature_order[start:end]) + return bucketized_feature_order + + def _processed_data(self, data): + min_value = np.min(data, axis=0) + max_value = np.max(data, axis=0) + # To avoid data_max[i] == data_min[i], + for i in range(data.shape[1]): + if max_value[i] == min_value[i]: + max_value[i] += 1 + return np.round(self.lower_bound + (data - min_value) / + (max_value - min_value) * + (self.upper_bound - self.lower_bound)) + + def _global_mapping_fun(self, x, epsilon, lower_bound, upper_bound): + probs = list() + denominator = np.sum( + np.exp(-np.abs(x - np.array(range(lower_bound, upper_bound + 1))) * + epsilon / 2)) + for k in range(lower_bound, upper_bound + 1): + probs.append(np.exp(-np.abs(x - k) * epsilon / 2) / denominator) + res = np.random.choice(list(range(lower_bound, upper_bound + 1)), + p=probs) + + return res + + def _adjusting_mapping_fun(self, x, partition_edges): + for part_idx in range(self.partition_num): + if partition_edges[part_idx] < x and partition_edges[part_idx + + 1] >= x: + selected_part = self._global_mapping_fun( + part_idx, + epsilon=self.epsilon_prt, + lower_bound=0, + upper_bound=self.partition_num - 1) + res = self._global_mapping_fun( + x, + epsilon=self.epsilon_ner, + lower_bound=partition_edges[selected_part] + 1, + upper_bound=partition_edges[selected_part + 1]) + + return res + + def _op_boost_global(self, data): + + processed_data = self._processed_data(data=data) + mapped_data = np.vectorize(self._global_mapping_fun)( + processed_data, + epsilon=self.epsilon, + lower_bound=self.lower_bound, + upper_bound=self.upper_bound) + + return mapped_data + + def _op_boost_adjusting(self, data): + + processed_data = self._processed_data(data=data) + quantiles = np.linspace(0, 100, self.partition_num + 1) + partition_edges = np.round( + np.asarray( + np.percentile( + list(range(self.lower_bound - 1, self.upper_bound + 1)), + quantiles))) + partition_edges = [int(x) for x in partition_edges] + mapped_data = np.vectorize(self._adjusting_mapping_fun, + signature='(),(n)->()')( + processed_data, + partition_edges=partition_edges) + + return mapped_data + + def _protect_via_op_boost(self, raw_feature_order, data): + """ + Add random noises to feature order for privacy protection. + For more details, please see + OpBoost: A Vertical Federated Tree Boosting Framework Based on + Order-Preserving Desensitization.pdf + (https://arxiv.org/pdf/2210.01318.pdf) + """ + if self.algo == 'global': + mapped_data = self._op_boost_global(data) + elif self.algo == 'adjusting': + mapped_data = self._op_boost_adjusting(data) + else: + mapped_data = None + assert mapped_data is not None + + # Get feature order based on mapped data + num_of_feature = mapped_data.shape[1] + protected_feature_order = [0] * num_of_feature + for i in range(num_of_feature): + protected_feature_order[i] = mapped_data[:, i].argsort() + + return { + 'raw_feature_order': raw_feature_order, + 'feature_order': protected_feature_order, + } + + def _protect_via_dp(self, raw_feature_order, data): + """ + Bucketize and add dp noise to feature order for privacy protection. + For more details, please refer to + FederBoost: Private Federated Learning for GBDT + (https://arxiv.org/pdf/2011.02796.pdf) + """ + protected_feature_order = list() + bucket_size = int( + np.ceil(self.cfg.dataloader.batch_size / self.bucket_num)) + if self.epsilon is None: + prob_for_preserving = 1.0 + else: + _tmp = np.power(np.e, self.epsilon) + prob_for_preserving = _tmp / (_tmp + self.bucket_num - 1) + prob_for_moving = (1.0 - prob_for_preserving) / (self.bucket_num - 1) + split_position = [] + self.split_value = [] + + for feature_idx in range(len(raw_feature_order)): + bucketized_feature_order = self._bucketize( + raw_feature_order[feature_idx], bucket_size, self.bucket_num) + noisy_bucketizd_feature_order = [[] + for _ in range(self.bucket_num)] + + # Add noise to bucketized feature order + for bucket_idx in range(self.bucket_num): + probs = np.ones(self.bucket_num) * prob_for_moving + probs[bucket_idx] = prob_for_preserving + for each in bucketized_feature_order[bucket_idx]: + selected_bucket_idx = np.random.choice(list( + range(self.bucket_num)), + p=probs) + noisy_bucketizd_feature_order[selected_bucket_idx].append( + each) + + # Save split positions (instance number within buckets) + # We exclude the endpoints to avoid empty sub-trees + _split_position = list() + _split_value = dict() + accumu_num = 0 + for bucket_idx, each_bucket in enumerate( + noisy_bucketizd_feature_order): + instance_num = len(each_bucket) + # Skip the empty bucket + if instance_num != 0: + # Skip the endpoints + if bucket_idx != self.bucket_num - 1: + _split_position.append(accumu_num + instance_num) + + # Save split values: average of min value of (j-1)-th + # bucket and max value of j-th bucket + max_value = data[bucketized_feature_order[bucket_idx] + [-1]][feature_idx] + min_value = data[bucketized_feature_order[bucket_idx] + [0]][feature_idx] + if accumu_num == 0: + _split_value[accumu_num + + instance_num] = max_value / 2.0 + elif bucket_idx == self.bucket_num - 1: + _split_value[accumu_num] += min_value / 2.0 + else: + _split_value[accumu_num] += min_value / 2.0 + _split_value[accumu_num + + instance_num] = max_value / 2.0 + + accumu_num += instance_num + + split_position.append(_split_position) + self.split_value.append(_split_value) + + [np.random.shuffle(x) for x in noisy_bucketizd_feature_order] + noisy_bucketizd_feature_order = np.concatenate( + noisy_bucketizd_feature_order) + protected_feature_order.append(noisy_bucketizd_feature_order) + + extra_info = {'split_position': split_position} + + return { + 'raw_feature_order': raw_feature_order, + 'feature_order': protected_feature_order, + 'extra_info': extra_info + } + + # TODO: more flexible for client to choose whether to protect or not + def _get_feature_order_info(self, data): + num_of_feature = data.shape[1] + feature_order = [0] * num_of_feature + for i in range(num_of_feature): + feature_order[i] = data[:, i].argsort() + return self.protect_funcs(feature_order, data) diff --git a/federatedscope/vertical_fl/trainer/trainer.py b/federatedscope/vertical_fl/trainer/trainer.py index de8b1127f..0949c2ffd 100644 --- a/federatedscope/vertical_fl/trainer/trainer.py +++ b/federatedscope/vertical_fl/trainer/trainer.py @@ -9,17 +9,18 @@ class VerticalTrainer(object): - def __init__(self, model, data, device, config, monitor, only_for_eval): + def __init__(self, model, data, device, config, monitor): self.model = model self.data = data self.device = device self.cfg = config self.monitor = monitor - self.only_for_eval = only_for_eval - self.bin_num = config.train.optimizer.bin_num self.eta = config.train.optimizer.eta + self.merged_feature_order = None + self.client_feature_order = None + self.extra_info = None self.batch_x = None self.batch_y = None self.batch_y_hat = None @@ -31,24 +32,38 @@ def prepare_for_train(self, index=None): shuffled=True) self.criterion = get_vertical_loss(self.cfg.criterion.type) batch_index, self.batch_x, self.batch_y = self._fetch_train_data(index) - feature_order = self._get_feature_order(self.batch_x) + feature_order_info = self._get_feature_order_info(self.batch_x) + if 'raw_feature_order' in feature_order_info: + # When applying protect method, the raw (real) feature order might + # be different from the shared feature order + self.client_feature_order = feature_order_info['raw_feature_order'] + feature_order_info.pop('raw_feature_order') + else: + self.client_feature_order = feature_order_info['feature_order'] if index is None: self.batch_y_hat = np.random.uniform(low=0.0, high=1.0, size=len(self.batch_y)) self.batch_z = 0 - return batch_index, feature_order + return batch_index, feature_order_info - def train(self, feature_order=None, tree_num=0, node_num=None): + def train(self, feature_order_info=None, tree_num=0, node_num=None): # Start to build a tree if node_num is None: - if tree_num == 0 and feature_order is not None: - self.feature_order = feature_order + if tree_num == 0 and feature_order_info is not None: + self.merged_feature_order, self.extra_info = \ + self._parse_feature_order(feature_order_info) return self._compute_for_root(tree_num=tree_num) # Continue training else: return self._compute_for_node(tree_num, node_num) + def get_feature_value(self, feature_idx, value_idx): + assert self.batch_x is not None + + instance_idx = self.client_feature_order[feature_idx][value_idx] + return self.batch_x[instance_idx, feature_idx] + def _predict(self, tree_num): self._compute_weight(tree_num, node_num=0) @@ -58,15 +73,35 @@ def _fetch_train_data(self, index=None): else: return index, self.data['train']['x'][index], None - def _get_feature_order(self, data): + def _parse_feature_order(self, feature_order_info): + client_ids = list(feature_order_info.keys()) + client_ids = sorted(client_ids) + merged_feature_order = np.concatenate( + [feature_order_info[idx]['feature_order'] for idx in client_ids]) + + # TODO: different extra_info for different clients + extra_info = feature_order_info[client_ids[0]].get('extra_info', None) + if extra_info is not None: + merged_extra_info = dict() + for each_key in extra_info.keys(): + merged_extra_info[each_key] = np.concatenate([ + feature_order_info[idx]['extra_info'][each_key] + for idx in client_ids + ]) + else: + merged_extra_info = None + + return merged_feature_order, merged_extra_info + + def _get_feature_order_info(self, data): num_of_feature = data.shape[1] feature_order = [0] * num_of_feature for i in range(num_of_feature): feature_order[i] = data[:, i].argsort() - return feature_order + return {'feature_order': feature_order} def _get_ordered_gh(self, tree_num, node_num, feature_idx): - order = self.feature_order[feature_idx] + order = self.merged_feature_order[feature_idx] ordered_g = self.model[tree_num][node_num].grad[order] ordered_h = self.model[tree_num][node_num].hess[order] return ordered_g, ordered_h @@ -76,11 +111,20 @@ def _get_best_gain(self, tree_num, node_num): split_ref = {'feature_idx': None, 'value_idx': None} instance_num = self.batch_x.shape[0] - feature_num = len(self.feature_order) + feature_num = len(self.merged_feature_order) + if self.extra_info is not None: + split_position = self.extra_info.get( + 'split_position', + [range(instance_num) for _ in range(feature_num)]) + else: + # The left/right sub-tree cannot be empty + split_position = [ + range(1, instance_num) for _ in range(feature_num) + ] for feature_idx in range(feature_num): ordered_g, ordered_h = self._get_ordered_gh( tree_num, node_num, feature_idx) - for value_idx in range(instance_num): + for value_idx in split_position[feature_idx]: gain = self.model[tree_num].cal_gain(ordered_g, ordered_h, value_idx) @@ -116,7 +160,8 @@ def _compute_for_node(self, tree_num, node_num): else: best_gain, split_ref = self._get_best_gain(tree_num, node_num) if best_gain > 0: - split_feature = self.feature_order[split_ref['feature_idx']] + split_feature = self.merged_feature_order[ + split_ref['feature_idx']] left_child = np.zeros(self.batch_x.shape[0]) for x in range(split_ref['value_idx']): left_child[split_feature[x]] = 1 diff --git a/federatedscope/vertical_fl/trainer/utils.py b/federatedscope/vertical_fl/trainer/utils.py new file mode 100644 index 000000000..c7c75939f --- /dev/null +++ b/federatedscope/vertical_fl/trainer/utils.py @@ -0,0 +1,21 @@ +from federatedscope.vertical_fl.trainer import VerticalTrainer, \ + FeatureOrderProtectedTrainer + + +def get_vertical_trainer(config, model, data, device, monitor): + + protect_object = config.vertical.protect_object + if not protect_object or protect_object == '': + return VerticalTrainer(model=model, + data=data, + device=device, + config=config, + monitor=monitor) + elif protect_object == 'feature_order': + return FeatureOrderProtectedTrainer(model=model, + data=data, + device=device, + config=config, + monitor=monitor) + else: + raise ValueError diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml index cefa5c590..479dae7d7 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml @@ -18,19 +18,17 @@ dataloader: type: raw batch_size: 4000 criterion: - type: Regression + type: RegressionMSELoss trainer: type: verticaltrainer train: optimizer: - bin_num: 1000 # learning rate for xgb model eta: 0.5 vertical: use: True dims: [4, 8] algo: 'xgb' - xgb_use_bin: True eval: freq: 5 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml index 1dc7db36e..023679d20 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml @@ -23,14 +23,12 @@ trainer: type: verticaltrainer train: optimizer: - bin_num: 100 # learning rate for xgb model eta: 0.5 vertical: use: True dims: [7, 14] algo: 'xgb' - xgb_use_bin: True eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml index b62b4cdfa..1c44aeda5 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml @@ -18,19 +18,17 @@ dataloader: type: raw batch_size: 8000 criterion: - type: Regression + type: RegressionMSELoss trainer: type: verticaltrainer train: optimizer: - bin_num: 1000 # learning rate for xgb model eta: 1 vertical: use: True dims: [10, 20] algo: 'xgb' - xgb_use_bin: True eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml index 6f6e735ce..7e50ee12f 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml @@ -23,14 +23,12 @@ trainer: type: verticaltrainer train: optimizer: - bin_num: 100 # learning rate for xgb model eta: 0.5 vertical: use: True dims: [5, 10] algo: 'xgb' - xgb_use_bin: True eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_dp_on_adult.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_dp_on_adult.yaml new file mode 100644 index 000000000..9843c56b0 --- /dev/null +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_dp_on_adult.yaml @@ -0,0 +1,37 @@ +use_gpu: False +device: 0 +backend: torch +federate: + mode: standalone + client_num: 2 +model: + type: xgb_tree + lambda_: 0.1 + gamma: 0 + num_of_trees: 10 + max_tree_depth: 3 +data: + root: data/ + type: adult + splits: [1.0, 0.0] +dataloader: + type: raw + batch_size: 2000 +criterion: + type: CrossEntropyLoss +trainer: + type: verticaltrainer +train: + optimizer: + # learning rate for xgb model + eta: 0.5 +vertical: + use: True + dims: [7, 14] + algo: 'xgb' + protect_object: 'feature_order' + protect_method: 'dp' + protect_args: [{'bucket_num': 100, 'epsilon':10}] +eval: + freq: 3 + best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_op_boost_on_adult.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_op_boost_on_adult.yaml new file mode 100644 index 000000000..80b357165 --- /dev/null +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_feature_order_op_boost_on_adult.yaml @@ -0,0 +1,37 @@ +use_gpu: False +device: 0 +backend: torch +federate: + mode: standalone + client_num: 2 +model: + type: xgb_tree + lambda_: 0.1 + gamma: 0 + num_of_trees: 10 + max_tree_depth: 3 +data: + root: data/ + type: adult + splits: [1.0, 0.0] +dataloader: + type: raw + batch_size: 2000 +criterion: + type: CrossEntropyLoss +trainer: + type: verticaltrainer +train: + optimizer: + # learning rate for xgb model + eta: 0.5 +vertical: + use: True + dims: [7, 14] + algo: 'xgb' + protect_object: 'feature_order' + protect_method: 'op_boost' + protect_args: [{'algo': 'global'}] +eval: + freq: 3 + best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py b/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py index 271a8b8a3..a4d64d5df 100644 --- a/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py +++ b/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py @@ -53,7 +53,7 @@ def __init__(self, self.callback_func_for_feature_order) self.register_handlers('finish', self.callback_func_for_finish) - def train(self, tree_num, node_num=None): + def train(self, tree_num, node_num=None, feature_order_info=None): raise NotImplementedError def eval(self, tree_num): @@ -68,14 +68,13 @@ def _init_data_related_var(self): # each contains self.num_of_trees trees # label-owner initials y_hat # label-owner sends "sample data" to others - # label-owner calls self.preparation() - def callback_func_for_model_para(self, message: Message): self.state = message.state if self.own_label: - batch_index, self.feature_order = self.trainer.prepare_for_train() - self.msg_buffer[self.ID] = self.feature_order + batch_index, feature_order_info = self.trainer.prepare_for_train() + self.feature_order = feature_order_info['feature_order'] + self.msg_buffer[self.ID] = feature_order_info receiver = [ each for each in list(self.comm_manager.neighbors.keys()) if each not in [self.ID, self.server_id] @@ -88,21 +87,21 @@ def callback_func_for_model_para(self, message: Message): content=batch_index)) # other clients receive the data-sample information - # other clients also call self.preparation() def callback_func_for_data_sample(self, message: Message): batch_index, sender = message.content, message.sender - _, self.feature_order = self.trainer.prepare_for_train( + _, feature_order_info = self.trainer.prepare_for_train( index=batch_index) + self.feature_order = feature_order_info['feature_order'] self.comm_manager.send( Message(msg_type='feature_order', sender=self.ID, state=self.state, receiver=[sender], - content=self.feature_order)) + content=feature_order_info)) def callback_func_for_feature_order(self, message: Message): - feature_order, sender = message.content, message.sender - self.msg_buffer[sender] = feature_order + feature_order_info, sender = message.content, message.sender + self.msg_buffer[sender] = feature_order_info self.check_and_move_on() def callback_func_for_finish(self, message: Message): @@ -113,7 +112,6 @@ def callback_func_for_finish(self, message: Message): def check_and_move_on(self): if len(self.msg_buffer) == self.client_num: - self.merged_feature_order = np.concatenate([ - self.msg_buffer[idx] for idx in range(1, self.client_num + 1) - ]) - self.train(tree_num=self.state) + received_feature_order_infos = self.msg_buffer + self.train(tree_num=self.state, + feature_order_info=received_feature_order_infos) diff --git a/federatedscope/vertical_fl/xgb_base/worker/evaluation_wrapper.py b/federatedscope/vertical_fl/xgb_base/worker/evaluation_wrapper.py index b19b8e2e1..cfce7a5db 100644 --- a/federatedscope/vertical_fl/xgb_base/worker/evaluation_wrapper.py +++ b/federatedscope/vertical_fl/xgb_base/worker/evaluation_wrapper.py @@ -56,7 +56,7 @@ def _feedback_eval_metrics(self): receiver=[ each for each in list(self.comm_manager.neighbors.keys()) - if each != self.server_id + if each not in [self.server_id, self.ID] ], content='None')) diff --git a/federatedscope/vertical_fl/xgb_base/worker/train_wrapper.py b/federatedscope/vertical_fl/xgb_base/worker/train_wrapper.py index 0640e5e03..05f811ecb 100644 --- a/federatedscope/vertical_fl/xgb_base/worker/train_wrapper.py +++ b/federatedscope/vertical_fl/xgb_base/worker/train_wrapper.py @@ -7,13 +7,13 @@ def wrap_client_for_train(client): - def train(self, tree_num, node_num=None): + def train(self, tree_num, node_num=None, feature_order_info=None): if node_num is None: logger.info(f'----------- Building a new tree (Tree ' f'#{tree_num}) -------------') self.state = tree_num finish_flag, results = self.trainer.train( - feature_order=self.merged_feature_order, tree_num=tree_num) + feature_order_info=feature_order_info, tree_num=tree_num) else: assert node_num is not None finish_flag, results = self.trainer.train(tree_num=tree_num, @@ -59,8 +59,7 @@ def callback_func_for_split(self, message: Message): self.state = message.state self.feature_importance[feature_idx] += 1 - instance_idx = self.feature_order[feature_idx][value_idx] - feature_value = self.trainer.batch_x[instance_idx, feature_idx] + feature_value = self.trainer.get_feature_value(feature_idx, value_idx) self.model[tree_num][node_num].feature_idx = feature_idx self.model[tree_num][node_num].feature_value = feature_value diff --git a/scripts/distributed_scripts/distributed_configs/distributed_xgb_client_1.yaml b/scripts/distributed_scripts/distributed_configs/distributed_xgb_client_1.yaml index 9542c5986..48665a0ac 100644 --- a/scripts/distributed_scripts/distributed_configs/distributed_xgb_client_1.yaml +++ b/scripts/distributed_scripts/distributed_configs/distributed_xgb_client_1.yaml @@ -28,7 +28,6 @@ model: type: lr train: optimizer: - bin_num: 100 lambda_: 0.1 gamma: 0 num_of_trees: 10 diff --git a/scripts/distributed_scripts/distributed_configs/distributed_xgb_client_2.yaml b/scripts/distributed_scripts/distributed_configs/distributed_xgb_client_2.yaml index b2d83caa8..7637e89f2 100644 --- a/scripts/distributed_scripts/distributed_configs/distributed_xgb_client_2.yaml +++ b/scripts/distributed_scripts/distributed_configs/distributed_xgb_client_2.yaml @@ -28,7 +28,6 @@ model: type: lr train: optimizer: - bin_num: 100 lambda_: 0.1 gamma: 0 num_of_trees: 10 diff --git a/scripts/distributed_scripts/distributed_configs/distributed_xgb_server.yaml b/scripts/distributed_scripts/distributed_configs/distributed_xgb_server.yaml index 9e73b9e86..3f14fee4f 100644 --- a/scripts/distributed_scripts/distributed_configs/distributed_xgb_server.yaml +++ b/scripts/distributed_scripts/distributed_configs/distributed_xgb_server.yaml @@ -26,7 +26,6 @@ model: type: lr train: optimizer: - bin_num: 100 lambda_: 0.1 gamma: 0 num_of_trees: 10 diff --git a/tests/test_xgb.py b/tests/test_xgb.py index c3ba1a589..d622355b5 100644 --- a/tests/test_xgb.py +++ b/tests/test_xgb.py @@ -13,7 +13,7 @@ class XGBTest(unittest.TestCase): def setUp(self): print(('Testing %s.%s' % (type(self).__name__, self._testMethodName))) - def set_config(self, cfg): + def set_config_for_xgb_base(self, cfg): backup_cfg = cfg.clone() import torch @@ -23,12 +23,11 @@ def set_config(self, cfg): cfg.federate.client_num = 2 cfg.model.type = 'xgb_tree' - cfg.model.lambda_ = 1 + cfg.model.lambda_ = 0.1 cfg.model.gamma = 0 cfg.model.num_of_trees = 5 cfg.model.max_tree_depth = 3 - cfg.train.optimizer.bin_num = 1000 cfg.train.optimizer.eta = 0.5 cfg.data.root = 'test_data/' @@ -36,12 +35,12 @@ def set_config(self, cfg): cfg.data.size = 2000 cfg.dataloader.type = 'raw' + cfg.dataloader.batch_size = 2000 cfg.criterion.type = 'CrossEntropyLoss' cfg.vertical.use = True - cfg.vertical.xgb_use_bin = False - cfg.vertical.dims = [5, 10] + cfg.vertical.dims = [7, 14] cfg.vertical.algo = 'xgb' cfg.trainer.type = 'verticaltrainer' @@ -50,9 +49,307 @@ def set_config(self, cfg): return backup_cfg - def test_XGBFL(self): + def set_config_for_xgb_dp(self, cfg): + backup_cfg = cfg.clone() + + import torch + cfg.use_gpu = torch.cuda.is_available() + + cfg.federate.mode = 'standalone' + cfg.federate.client_num = 2 + + cfg.model.type = 'xgb_tree' + cfg.model.lambda_ = 0.1 + cfg.model.gamma = 0 + cfg.model.num_of_trees = 5 + cfg.model.max_tree_depth = 3 + + cfg.train.optimizer.eta = 0.5 + + cfg.data.root = 'test_data/' + cfg.data.type = 'adult' + cfg.data.size = 2000 + + cfg.dataloader.type = 'raw' + cfg.dataloader.batch_size = 2000 + + cfg.criterion.type = 'CrossEntropyLoss' + + cfg.vertical.use = True + cfg.vertical.dims = [7, 14] + cfg.vertical.algo = 'xgb' + cfg.vertical.protect_object = 'feature_order' + cfg.vertical.protect_method = 'dp' + cfg.vertical.protect_args = [{'bucket_num': 100, 'epsilon': 10}] + + cfg.trainer.type = 'verticaltrainer' + cfg.eval.freq = 5 + cfg.eval.best_res_update_round_wise_key = "test_loss" + + return backup_cfg + + def set_config_for_xgb_dp_too_large_noise(self, cfg): + backup_cfg = cfg.clone() + + import torch + cfg.use_gpu = torch.cuda.is_available() + + cfg.federate.mode = 'standalone' + cfg.federate.client_num = 2 + + cfg.model.type = 'xgb_tree' + cfg.model.lambda_ = 0.1 + cfg.model.gamma = 0 + cfg.model.num_of_trees = 5 + cfg.model.max_tree_depth = 3 + + cfg.train.optimizer.eta = 0.5 + + cfg.data.root = 'test_data/' + cfg.data.type = 'adult' + cfg.data.size = 2000 + + cfg.dataloader.type = 'raw' + cfg.dataloader.batch_size = 2000 + + cfg.criterion.type = 'CrossEntropyLoss' + + cfg.vertical.use = True + cfg.vertical.dims = [7, 14] + cfg.vertical.algo = 'xgb' + cfg.vertical.protect_object = 'feature_order' + cfg.vertical.protect_method = 'dp' + cfg.vertical.protect_args = [{'bucket_num': 100, 'epsilon': 1}] + + cfg.trainer.type = 'verticaltrainer' + cfg.eval.freq = 5 + cfg.eval.best_res_update_round_wise_key = "test_loss" + + return backup_cfg + + def set_config_for_xgb_bucket(self, cfg): + backup_cfg = cfg.clone() + + import torch + cfg.use_gpu = torch.cuda.is_available() + + cfg.federate.mode = 'standalone' + cfg.federate.client_num = 2 + + cfg.model.type = 'xgb_tree' + cfg.model.lambda_ = 0.1 + cfg.model.gamma = 0 + cfg.model.num_of_trees = 5 + cfg.model.max_tree_depth = 3 + + cfg.train.optimizer.eta = 0.5 + + cfg.data.root = 'test_data/' + cfg.data.type = 'adult' + cfg.data.size = 2000 + + cfg.dataloader.type = 'raw' + cfg.dataloader.batch_size = 2000 + + cfg.criterion.type = 'CrossEntropyLoss' + + cfg.vertical.use = True + cfg.vertical.dims = [7, 14] + cfg.vertical.algo = 'xgb' + cfg.vertical.protect_object = 'feature_order' + cfg.vertical.protect_method = 'dp' + cfg.vertical.protect_args = [{'bucket_num': 100}] + + cfg.trainer.type = 'verticaltrainer' + cfg.eval.freq = 5 + cfg.eval.best_res_update_round_wise_key = "test_loss" + + return backup_cfg + + def set_config_for_xgb_op_boost_global(self, cfg): + backup_cfg = cfg.clone() + + import torch + cfg.use_gpu = torch.cuda.is_available() + + cfg.federate.mode = 'standalone' + cfg.federate.client_num = 2 + + cfg.model.type = 'xgb_tree' + cfg.model.lambda_ = 0.1 + cfg.model.gamma = 0 + cfg.model.num_of_trees = 5 + cfg.model.max_tree_depth = 3 + + cfg.train.optimizer.eta = 0.5 + + cfg.data.root = 'test_data/' + cfg.data.type = 'adult' + cfg.data.size = 2000 + + cfg.dataloader.type = 'raw' + cfg.dataloader.batch_size = 2000 + + cfg.criterion.type = 'CrossEntropyLoss' + + cfg.vertical.use = True + cfg.vertical.dims = [7, 14] + cfg.vertical.algo = 'xgb' + cfg.vertical.protect_object = 'feature_order' + cfg.vertical.protect_method = 'op_boost' + cfg.vertical.protect_args = [{'algo': 'global'}] + + cfg.trainer.type = 'verticaltrainer' + cfg.eval.freq = 5 + cfg.eval.best_res_update_round_wise_key = "test_loss" + + return backup_cfg + + def set_config_for_xgb_op_boost_local(self, cfg): + backup_cfg = cfg.clone() + + import torch + cfg.use_gpu = torch.cuda.is_available() + + cfg.federate.mode = 'standalone' + cfg.federate.client_num = 2 + + cfg.model.type = 'xgb_tree' + cfg.model.lambda_ = 0.1 + cfg.model.gamma = 0 + cfg.model.num_of_trees = 5 + cfg.model.max_tree_depth = 3 + + cfg.train.optimizer.eta = 0.5 + + cfg.data.root = 'test_data/' + cfg.data.type = 'adult' + cfg.data.size = 2000 + + cfg.dataloader.type = 'raw' + cfg.dataloader.batch_size = 2000 + + cfg.criterion.type = 'CrossEntropyLoss' + + cfg.vertical.use = True + cfg.vertical.dims = [7, 14] + cfg.vertical.algo = 'xgb' + cfg.vertical.protect_object = 'feature_order' + cfg.vertical.protect_method = 'op_boost' + cfg.vertical.protect_args = [{'algo': 'adjusting'}] + + cfg.trainer.type = 'verticaltrainer' + cfg.eval.freq = 5 + cfg.eval.best_res_update_round_wise_key = "test_loss" + + return backup_cfg + + def test_XGB_Base(self): + init_cfg = global_cfg.clone() + backup_cfg = self.set_config_for_xgb_base(init_cfg) + setup_seed(init_cfg.seed) + update_logger(init_cfg, True) + + data, modified_config = get_data(init_cfg.clone()) + init_cfg.merge_from_other_cfg(modified_config) + self.assertIsNotNone(data) + + Fed_runner = get_runner(data=data, + server_class=get_server_cls(init_cfg), + client_class=get_client_cls(init_cfg), + config=init_cfg.clone()) + self.assertIsNotNone(Fed_runner) + test_results = Fed_runner.run() + init_cfg.merge_from_other_cfg(backup_cfg) + print(test_results) + self.assertGreater(test_results['server_global_eval']['test_acc'], + 0.79) + + def test_XGB_use_dp(self): + init_cfg = global_cfg.clone() + backup_cfg = self.set_config_for_xgb_dp(init_cfg) + setup_seed(init_cfg.seed) + update_logger(init_cfg, True) + + data, modified_config = get_data(init_cfg.clone()) + init_cfg.merge_from_other_cfg(modified_config) + self.assertIsNotNone(data) + + Fed_runner = get_runner(data=data, + server_class=get_server_cls(init_cfg), + client_class=get_client_cls(init_cfg), + config=init_cfg.clone()) + self.assertIsNotNone(Fed_runner) + test_results = Fed_runner.run() + init_cfg.merge_from_other_cfg(backup_cfg) + print(test_results) + self.assertGreater(test_results['server_global_eval']['test_acc'], + 0.79) + + def test_XGB_use_dp_too_large_noise(self): + init_cfg = global_cfg.clone() + backup_cfg = self.set_config_for_xgb_dp_too_large_noise(init_cfg) + setup_seed(init_cfg.seed) + update_logger(init_cfg, True) + + data, modified_config = get_data(init_cfg.clone()) + init_cfg.merge_from_other_cfg(modified_config) + self.assertIsNotNone(data) + + Fed_runner = get_runner(data=data, + server_class=get_server_cls(init_cfg), + client_class=get_client_cls(init_cfg), + config=init_cfg.clone()) + self.assertIsNotNone(Fed_runner) + test_results = Fed_runner.run() + init_cfg.merge_from_other_cfg(backup_cfg) + print(test_results) + self.assertLess(test_results['server_global_eval']['test_acc'], 0.6) + + def test_XGB_use_bucket(self): + init_cfg = global_cfg.clone() + backup_cfg = self.set_config_for_xgb_bucket(init_cfg) + setup_seed(init_cfg.seed) + update_logger(init_cfg, True) + + data, modified_config = get_data(init_cfg.clone()) + init_cfg.merge_from_other_cfg(modified_config) + self.assertIsNotNone(data) + + Fed_runner = get_runner(data=data, + server_class=get_server_cls(init_cfg), + client_class=get_client_cls(init_cfg), + config=init_cfg.clone()) + self.assertIsNotNone(Fed_runner) + test_results = Fed_runner.run() + init_cfg.merge_from_other_cfg(backup_cfg) + print(test_results) + self.assertGreater(test_results['server_global_eval']['test_acc'], + 0.79) + + def test_XGB_use_op_boost_global(self): + init_cfg = global_cfg.clone() + backup_cfg = self.set_config_for_xgb_op_boost_global(init_cfg) + setup_seed(init_cfg.seed) + update_logger(init_cfg, True) + + data, modified_config = get_data(init_cfg.clone()) + init_cfg.merge_from_other_cfg(modified_config) + self.assertIsNotNone(data) + + Fed_runner = get_runner(data=data, + server_class=get_server_cls(init_cfg), + client_class=get_client_cls(init_cfg), + config=init_cfg.clone()) + self.assertIsNotNone(Fed_runner) + test_results = Fed_runner.run() + init_cfg.merge_from_other_cfg(backup_cfg) + print(test_results) + self.assertGreater(test_results['server_global_eval']['test_acc'], 0.7) + + def test_XGB_use_op_boost_local(self): init_cfg = global_cfg.clone() - backup_cfg = self.set_config(init_cfg) + backup_cfg = self.set_config_for_xgb_op_boost_local(init_cfg) setup_seed(init_cfg.seed) update_logger(init_cfg, True)