Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* bf16 initial commit

* Update engine.py

* update split_half_float_double_csr dtypes

* update to bf16 communication (make flag optional)

* Update requirements-sparse_attn.txt

* add compressed bf16 allreduce

* add compressed bf16 allreduce

* Update __init__.py

* Update engine.py

* Update __init__.py

* Update engine.py

* zero1 + bf16

* zero 2 + bf16

* pipe parallel + bf16

* pipe parallel + bf16

* partition activations + bf16
  • Loading branch information
sdtblck authored May 14, 2021
1 parent 3389e4f commit 996951e
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 106 deletions.
14 changes: 12 additions & 2 deletions deepspeed/runtime/activation_checkpointing/checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def get_partition_size(item):
return int(partition_size)


def get_full_inputs(tensors, device=None):
def get_full_inputs(tensors, device=None, fp32_comm=False):
inputs = []
num_args = int(len(tensors) / 2)
for i in range(num_args - 1):
Expand All @@ -274,9 +274,14 @@ def get_full_inputs(tensors, device=None):
part_i = flat_tensor.narrow(0, partition_size * i, partition_size)
if i == mp_rank:
part_i.copy_(item)
if fp32_comm:
part_i = part_i.float()
partitions.append(part_i)
if mp_group is not None:
dist.all_gather(partitions, partitions[mp_rank], group=mp_group)
if fp32_comm:
for i in range(mp_size):
partitions[i] = partitions[i].to(item.dtype)
input_tensor = flat_tensor.view(list(size.numpy()))
item.data = input_tensor.data

Expand Down Expand Up @@ -599,9 +604,14 @@ def backward(ctx, *grads):
global cuda_device, transport_stream, PARTITION_ACTIVATIONS

if PARTITION_ACTIVATIONS:
if ctx.saved_tensors and ctx.saved_tensors[0].dtype == torch.bfloat16:
FP32_COMM = True
else:
FP32_COMM = False
# with torch.cuda.stream(transport_stream):
inputs = get_full_inputs(ctx.saved_tensors,
device=cuda_device if PA_TO_CPU else None)
device=cuda_device if PA_TO_CPU else None,
fp32_comm=FP32_COMM)
detached_inputs = detach_variable(inputs)
else:
inputs = ctx.saved_tensors
Expand Down
1 change: 1 addition & 0 deletions deepspeed/runtime/comm/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .compressed_ar import compressed_all_reduce
54 changes: 54 additions & 0 deletions deepspeed/runtime/comm/compressed_ar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# python -m torch.distributed.launch --nproc_per_node=1 24_bit_allreduce.py

import torch
import os
import cupy
from torch.utils.dlpack import to_dlpack
from torch.utils.dlpack import from_dlpack

version = torch.__version__.split('.')
TORCH_VERSION_MAJOR = int(version[0])
TORCH_VERSION_MINOR = int(version[1])
if TORCH_VERSION_MAJOR < 1 or (TORCH_VERSION_MAJOR >= 1 and TORCH_VERSION_MINOR < 9):
compressed_all_reduce = compressed_all_reduce_cupy
else:
compressed_all_reduce = compressed_all_reduce_torch

def torch2cupy(tensor):
return cupy.fromDlpack(to_dlpack(tensor))


def cupy2torch(cupy_tensor):
return from_dlpack(cupy_tensor.toDlpack())


def decompose_cupy(tensor):
mantissa, exponent = cupy.frexp(torch2cupy(tensor.float()))
return cupy2torch(mantissa).half(), cupy2torch(exponent).to(torch.int8)


def decompose(t):
if TORCH_VERSION_MAJOR < 1 or (TORCH_VERSION_MAJOR >= 1 and TORCH_VERSION_MINOR < 9):
raise Exception('Torch version >= 1.9.0 needed for 24_bit_allreduce.decompose')
mantissa, exponent = torch.frexp(t.float())
return mantissa.half(), exponent.to(torch.int8)


def reconstruct(mantissa, exponent, original_dtype=torch.bfloat16):
return torch.ldexp(mantissa, exponent).to(original_dtype)


def compressed_all_reduce_torch(tensor, op=torch.distributed.ReduceOp.SUM, group=None, async_op=False):
original_dtype = tensor.dtype
m, e = decompose(tensor)
torch.distributed.all_reduce(m, op=op, group=group, async_op=async_op)
torch.distributed.all_reduce(e, op=op, group=group, async_op=async_op)
return reconstruct(m, e, original_dtype)


def compressed_all_reduce_cupy(tensor, op=torch.distributed.ReduceOp.SUM, group=None, async_op=False):
original_dtype = tensor.dtype
m, e = decompose_cupy(tensor)
torch.distributed.all_reduce(m, op=op, group=group, async_op=async_op)
torch.distributed.all_reduce(e, op=op, group=group, async_op=async_op)
return reconstruct(m, e, original_dtype)
96 changes: 57 additions & 39 deletions deepspeed/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,18 @@ def get_fp16_enabled(param_dict):
return False


def get_fp16_type(param_dict):
if get_fp16_enabled(param_dict):
return get_scalar_param(param_dict[FP16], FP16_TYPE, FP16_TYPE_DEFAULT)
else:
return "fp32"


def get_loss_scale(param_dict):
if get_fp16_enabled(param_dict):
if get_fp16_type(param_dict) == "bfloat16":
# default loss scale to 1.0 if dtype == bf16, as loss scaling isn't needed
return 1.0
return get_scalar_param(param_dict[FP16],
FP16_LOSS_SCALE,
FP16_LOSS_SCALE_DEFAULT)
Expand All @@ -111,7 +121,7 @@ def get_initial_dynamic_scale(param_dict):
else:
initial_scale_power = FP16_INITIAL_SCALE_POWER_DEFAULT

return 2**initial_scale_power
return 2 ** initial_scale_power


def get_dynamic_loss_scale_args(param_dict):
Expand All @@ -138,7 +148,7 @@ def get_dynamic_loss_scale_args(param_dict):
FP16_MIN_LOSS_SCALE,
FP16_MIN_LOSS_SCALE_DEFAULT)
loss_scale_args = {
INITIAL_LOSS_SCALE: 2**init_scale,
INITIAL_LOSS_SCALE: 2 ** init_scale,
SCALE_WINDOW: scale_window,
DELAYED_SHIFT: delayed_shift,
MIN_LOSS_SCALE: min_loss_scale
Expand Down Expand Up @@ -168,6 +178,9 @@ def get_zero_reduce_scatter(param_dict):


def get_allreduce_always_fp32(param_dict):
if get_fp16_type(param_dict) == "bfloat16":
# default allreduce_always_fp32 to True if dtype == bf16, as nccl can't communicate bf16 tensors
return get_scalar_param(param_dict, FP32_ALLREDUCE, FP32_ALLREDUCE_DEFAULT_BF16)
return get_scalar_param(param_dict, FP32_ALLREDUCE, FP32_ALLREDUCE_DEFAULT)


Expand Down Expand Up @@ -409,7 +422,7 @@ def get_optimizer_gradient_clipping(param_dict):

def get_optimizer_legacy_fusion(param_dict):
if OPTIMIZER in param_dict.keys() and \
LEGACY_FUSION in param_dict[OPTIMIZER].keys():
LEGACY_FUSION in param_dict[OPTIMIZER].keys():
return param_dict[OPTIMIZER][LEGACY_FUSION]
else:
return LEGACY_FUSION_DEFAULT
Expand Down Expand Up @@ -496,7 +509,7 @@ def get_checkpoint_tag_validation_mode(checkpoint_params):
return tag_validation_mode
else:
raise DeepSpeedConfigError("Checkpoint config contains invalid tag_validation " \
f"value of {tag_validation_mode}, expecting one of {CHECKPOINT_TAG_VALIDATION_MODES}")
f"value of {tag_validation_mode}, expecting one of {CHECKPOINT_TAG_VALIDATION_MODES}")


'''Write deepspeed config files by modifying basic templates.
Expand Down Expand Up @@ -568,11 +581,11 @@ def __init__(self, json_file, mpu=None, param_dict=None):
]
if any(map(lambda t: t in self._param_dict, batch_params)):
raise ElasticityConfigError("One or more batch related parameters were found in your " \
f"ds_config ({TRAIN_BATCH_SIZE}, {TRAIN_MICRO_BATCH_SIZE_PER_GPU}, and/or " \
f"{GRADIENT_ACCUMULATION_STEPS}). These parameters *will not be used* since " \
"elastic training is enabled, which takes control of these parameters. " \
"If you want to supress this error (the parameters will be silently ignored) " \
f"please set {IGNORE_NON_ELASTIC_BATCH_INFO}':true in your elasticity config.")
f"ds_config ({TRAIN_BATCH_SIZE}, {TRAIN_MICRO_BATCH_SIZE_PER_GPU}, and/or " \
f"{GRADIENT_ACCUMULATION_STEPS}). These parameters *will not be used* since " \
"elastic training is enabled, which takes control of these parameters. " \
"If you want to supress this error (the parameters will be silently ignored) " \
f"please set {IGNORE_NON_ELASTIC_BATCH_INFO}':true in your elasticity config.")

# micro_bsz * world_size * gas = total_batch_size
# gas = total_batch_size // (micro_bsz * world_size)
Expand All @@ -581,13 +594,13 @@ def __init__(self, json_file, mpu=None, param_dict=None):

if TRAIN_BATCH_SIZE in self._param_dict:
logger.warning("[Elasticity] overriding training_batch_size: " \
f"{self._param_dict[TRAIN_BATCH_SIZE]} -> {final_batch_size}")
f"{self._param_dict[TRAIN_BATCH_SIZE]} -> {final_batch_size}")
if TRAIN_MICRO_BATCH_SIZE_PER_GPU in self._param_dict:
logger.warning("[Elasticity] overriding train_micro_batch_size_per_gpu: " \
f"{self._param_dict[TRAIN_MICRO_BATCH_SIZE_PER_GPU]} -> {micro_batch_size}")
f"{self._param_dict[TRAIN_MICRO_BATCH_SIZE_PER_GPU]} -> {micro_batch_size}")
if GRADIENT_ACCUMULATION_STEPS in self._param_dict:
logger.warning("[Elasticity] overriding gradient_accumulation_steps: "\
f"{self._param_dict[GRADIENT_ACCUMULATION_STEPS]} -> {gradient_accu_steps}")
logger.warning("[Elasticity] overriding gradient_accumulation_steps: " \
f"{self._param_dict[GRADIENT_ACCUMULATION_STEPS]} -> {gradient_accu_steps}")

logger.info(f"[Elasticity] valid GPU counts: {valid_gpus}")

Expand Down Expand Up @@ -622,6 +635,9 @@ def _initialize_params(self, param_dict):

self.gradient_clipping = get_gradient_clipping(param_dict)
self.fp16_enabled = get_fp16_enabled(param_dict)
self.fp16_type = get_fp16_type(param_dict)
self.precision = PRECISION_TYPES[self.fp16_type]

self.amp_enabled = get_amp_enabled(param_dict)
self.amp_params = get_amp_params(param_dict)
self.loss_scale = get_loss_scale(param_dict)
Expand All @@ -630,7 +646,7 @@ def _initialize_params(self, param_dict):

self.optimizer_name = get_optimizer_name(param_dict)
if self.optimizer_name is not None and \
self.optimizer_name.lower() in DEEPSPEED_OPTIMIZERS:
self.optimizer_name.lower() in DEEPSPEED_OPTIMIZERS:
self.optimizer_name = self.optimizer_name.lower()

self.optimizer_params = get_optimizer_params(param_dict)
Expand Down Expand Up @@ -678,54 +694,54 @@ def _batch_assertion(self):
f'Gradient accumulation steps: {grad_acc} has to be greater than 0'

assert train_batch == micro_batch * grad_acc * self.world_size, \
(f'Check batch related parameters. train_batch_size is not equal'
' to micro_batch_per_gpu * gradient_acc_step * world_size'
f'{train_batch} != {micro_batch} * {grad_acc} * {self.world_size}')
(f'Check batch related parameters. train_batch_size is not equal'
' to micro_batch_per_gpu * gradient_acc_step * world_size'
f'{train_batch} != {micro_batch} * {grad_acc} * {self.world_size}')

def _set_batch_related_parameters(self):

train_batch = self.train_batch_size
micro_batch = self.train_micro_batch_size_per_gpu
grad_acc = self.gradient_accumulation_steps

#all values are provided nothing needs to be set
# all values are provided nothing needs to be set
if train_batch is not None and \
micro_batch is not None and \
grad_acc is not None:
micro_batch is not None and \
grad_acc is not None:
return

#global_accumulation_steps needs to be set
# global_accumulation_steps needs to be set
elif train_batch is not None and \
micro_batch is not None:
micro_batch is not None:
grad_acc = train_batch // micro_batch
grad_acc //= self.world_size
self.gradient_accumulation_steps = grad_acc

#micro_batch_per_gpu needs to be set
# micro_batch_per_gpu needs to be set
elif train_batch is not None and \
grad_acc is not None:
grad_acc is not None:
micro_batch = train_batch // self.world_size
micro_batch //= grad_acc
self.train_micro_batch_size_per_gpu = micro_batch

#train_batch_size needs to be set
# train_batch_size needs to be set
elif micro_batch is not None and \
grad_acc is not None:
grad_acc is not None:
train_batch_size = micro_batch * grad_acc
train_batch_size *= self.world_size
self.train_batch_size = train_batch_size

#gradient_accumulation_steps and micro_batch_per_gpus is set
# gradient_accumulation_steps and micro_batch_per_gpus is set
elif train_batch is not None:
self.gradient_accumulation_steps = 1
self.train_micro_batch_size_per_gpu = train_batch // self.world_size

#train_batch_size and gradient_accumulation_step is set
# train_batch_size and gradient_accumulation_step is set
elif micro_batch is not None:
self.train_batch_size = micro_batch * self.world_size
self.gradient_accumulation_steps = 1

#either none of the three parameters are provided or just gradient_accumulation_step is provided
# either none of the three parameters are provided or just gradient_accumulation_step is provided
else:
assert False, \
'Either train_batch_size or micro_batch_per_gpu needs to be provided'
Expand Down Expand Up @@ -755,17 +771,19 @@ def print(self, name):
':'))))

def _do_error_check(self):
assert self.train_micro_batch_size_per_gpu, "DeepSpeedConfig: {} is not defined".format(TRAIN_MICRO_BATCH_SIZE_PER_GPU)
assert self.train_micro_batch_size_per_gpu, "DeepSpeedConfig: {} is not defined".format(
TRAIN_MICRO_BATCH_SIZE_PER_GPU)

assert self.gradient_accumulation_steps, "DeepSpeedConfig: {} is not defined".format(
GRADIENT_ACCUMULATION_STEPS)

if self.zero_enabled:
assert self.fp16_enabled, "DeepSpeedConfig: ZeRO is only supported if fp16 is enabled"
assert self.zero_optimization_stage <= MAX_STAGE_ZERO_OPTIMIZATION, "DeepSpeedConfig: Maximum supported ZeRO stage is {}".format(MAX_STAGE_ZERO_OPTIMIZATION)
#if self.zero_config.cpu_offload is True:
assert self.zero_optimization_stage <= MAX_STAGE_ZERO_OPTIMIZATION, "DeepSpeedConfig: Maximum supported ZeRO stage is {}".format(
MAX_STAGE_ZERO_OPTIMIZATION)
# if self.zero_config.cpu_offload is True:
# assert self.zero_optimization_stage == ZERO_OPTIMIZATION_GRADIENTS, "DeepSpeedConfig: cpu-offload supported ZeRO stage is {}".format(ZERO_OPTIMIZATION_GRADIENTS)
#assert self.gradient_accumulation_steps == 1, "DeepSpeedConfig: {}is not supported for {}".format(GRADIENT_ACCUMULATION_STEPS, ZERO_OPTIMIZATION_CPU_OFFLOAD)
# assert self.gradient_accumulation_steps == 1, "DeepSpeedConfig: {}is not supported for {}".format(GRADIENT_ACCUMULATION_STEPS, ZERO_OPTIMIZATION_CPU_OFFLOAD)

def _do_warning_check(self):
fp16_enabled = self.fp16_enabled or self.zero_enabled
Expand All @@ -774,21 +792,21 @@ def _do_warning_check(self):
if vocabulary_size and vocabulary_size % TENSOR_CORE_ALIGN_SIZE != 0:
logger.warning(
"DeepSpeedConfig: vocabulary size {} is not aligned to {}, may import tensor core utilization."
.format(vocabulary_size,
TENSOR_CORE_ALIGN_SIZE))
.format(vocabulary_size,
TENSOR_CORE_ALIGN_SIZE))

if self.optimizer_params is not None and \
MAX_GRAD_NORM in self.optimizer_params.keys() and \
MAX_GRAD_NORM in self.optimizer_params.keys() and \
self.optimizer_params[MAX_GRAD_NORM] > 0:
if fp16_enabled:
if self.global_rank == 0:
logger.warning(
'DeepSpeedConfig: In FP16 mode, DeepSpeed will pass {}:{} to FP16 wrapper'
.format(MAX_GRAD_NORM,
self.optimizer_params[MAX_GRAD_NORM]))
.format(MAX_GRAD_NORM,
self.optimizer_params[MAX_GRAD_NORM]))
else:
if self.global_rank == 0:
logger.warning(
'DeepSpeedConfig: In FP32 mode, DeepSpeed does not permit MAX_GRAD_NORM ({}) > 0, setting to zero'
.format(self.optimizer_params[MAX_GRAD_NORM]))
.format(self.optimizer_params[MAX_GRAD_NORM]))
self.optimizer_params[MAX_GRAD_NORM] = 0.0
14 changes: 14 additions & 0 deletions deepspeed/runtime/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Copyright (c) Microsoft Corporation
Licensed under the MIT license.
"""
import torch

#############################################
# Routes
Expand Down Expand Up @@ -128,6 +129,18 @@
FP16_ENABLED = "enabled"
FP16_ENABLED_DEFAULT = False

FP16_TYPE = "type"
FP16_TYPE_DEFAULT = "fp16"
PRECISION_TYPES = {
"fp32": torch.float32,
"float32": torch.float32,
"float": torch.float32,
"fp16": torch.half,
"float16": torch.half,
"half": torch.half,
"bfloat16": torch.bfloat16
}

# FP16 loss scale, zero means using dynamic scaling
FP16_LOSS_SCALE = "loss_scale"
FP16_LOSS_SCALE_DEFAULT = 0
Expand Down Expand Up @@ -189,6 +202,7 @@
'''
FP32_ALLREDUCE = "fp32_allreduce"
FP32_ALLREDUCE_DEFAULT = False
FP32_ALLREDUCE_DEFAULT_BF16 = True # if dtype is bf16 - default to fp32 communication

#########################################
# Scale/predivide gradients before allreduce
Expand Down
Loading

0 comments on commit 996951e

Please # to comment.