Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Nemo and xgboost integration #103

Merged
merged 17 commits into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions docker/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,38 @@ gquant_ver=$(grep version gQuant/setup.py | sed "s/^.*version='\([^;]*\)'.*/\1/"
CONTAINER="nvidia/cuda:${CUDA_STR}-runtime-${OS_STR}"
D_CONT=${D_CONT:="gquant/gquant:${gquant_ver}-${CUDA_STR}_${OS_STR}_${RAPIDS_VERSION}_${MODE_STR}"}


cat << 'EOF' > sacrebleu.patch
--- nemo/collections/nlp/metrics/sacrebleu.py
+++ sacrebleu_fix.py
@@ -61,13 +61,16 @@
VERSION = '1.3.5'

try:
+ import threading
# SIGPIPE is not available on Windows machines, throwing an exception.
from signal import SIGPIPE

# If SIGPIPE is available, change behaviour to default instead of ignore.
from signal import signal, SIG_DFL

- signal(SIGPIPE, SIG_DFL)
+
+ if threading.current_thread() == threading.main_thread():
+ signal(SIGPIPE, SIG_DFL)

except ImportError:
logging.warning('Could not import signal.SIGPIPE (this is expected on Windows machines)')
EOF


cat > $D_FILE <<EOF
FROM $CONTAINER
EXPOSE 8888
EXPOSE 8787
EXPOSE 8786
RUN apt-get update
RUN apt-get install -y curl git net-tools iproute2 vim wget locales-all build-essential libfontconfig1 libxrender1 \
RUN apt-get install -y curl git net-tools iproute2 vim wget locales-all build-essential libfontconfig1 libxrender1 rsync libsndfile1 ffmpeg \
&& rm -rf /var/lib/apt/lists/*

RUN mkdir /.local /.jupyter /.config /.cupy \
Expand Down Expand Up @@ -140,7 +165,7 @@ RUN wget \
&& conda init

RUN conda install -y -c rapidsai -c nvidia -c conda-forge \
-c defaults rapids=$RAPIDS_VERSION python=3.7 cudatoolkit=$CUDA_STR
-c defaults rapids=$RAPIDS_VERSION cudatoolkit=$CUDA_STR python=3.7

RUN conda install -y -c conda-forge jupyterlab

Expand All @@ -161,8 +186,26 @@ RUN jupyter labextension install jupyterlab-nvdashboard
RUN pip install dask_labextension
RUN jupyter labextension install dask-labextension
RUN jupyter serverextension enable dask_labextension

## install the jsonpath lib
RUN pip install jsonpath-ng ray[tune] Cython

## install the NemO
WORKDIR /home/quant/
RUN git clone -b v0.11.1 https://github.com/NVIDIA/NeMo.git
WORKDIR /home/quant/NeMo
RUN sed -i 's/numba<=0.48/numba==0.49.1/g' requirements/requirements_asr.txt
COPY sacrebleu.patch /home/quant/NeMo/
RUN patch -u nemo/collections/nlp/metrics/sacrebleu.py -i sacrebleu.patch && bash reinstall.sh

RUN conda install -y ruamel.yaml

RUN mkdir -p /home/quant/gQuant
WORKDIR /home/quant/gQuant
$INSTALL_GQUANT
EOF
docker build -f $D_FILE -t $D_CONT .

if [ -f "sacrebleu.patch" ] ; then
rm sacrebleu.patch
fi
140 changes: 98 additions & 42 deletions gquant/dataframe_flow/_node_flow.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from collections.abc import Iterable
import warnings
import numpy as np
import pandas as pd
import dask
from dask.dataframe import DataFrame as DaskDataFrame
import cudf
import dask_cudf
import copy
from dask.base import is_dask_collection
from dask.distributed import Future

from .taskSpecSchema import TaskSpecSchema
from .portsSpecSchema import PortsSpecSchema
Expand Down Expand Up @@ -55,6 +59,16 @@ class NodeTaskGraphMixin(object):
_get_output_ports
'''

def __getstate__(self):
state = self.__dict__.copy()
if 'input_df' in state:
del state['input_df']
# print('state', state)
return state

def __setstate__(self, state):
self.__dict__.update(state)

def __init__(self):
self.inputs = []
self.outputs = []
Expand Down Expand Up @@ -277,9 +291,15 @@ def __valide(self, node_output, ref_cols):
if self.delayed_process and \
cudf.DataFrame in expected_type and \
dask_cudf.DataFrame not in expected_type:
expected_type.extend([dask_cudf.DataFrame])
expected_type.append(dask_cudf.DataFrame)

if out_type not in expected_type:
match = False
for expected in expected_type:
if issubclass(out_type, expected):
match = True
break

if not match:
raise Exception(
'Node "{}" output port "{}" produced wrong type '
'"{}". Expected type "{}"'
Expand Down Expand Up @@ -395,44 +415,35 @@ def __make_copy(self, df_obj):
return df_obj

def __check_dly_processing_prereq(self, inputs):
'''All inputs must be dask_cudf.DataFrame types. Output types must
'''At least one input must be a dask DataFrame type. Output types must
be specified as cudf.DataFrame or dask_cudf.DataFrame. (Functionality
could also be extended to support dask.dataframe.DataFrame, but
could also be extended to support dask dataframe of pandas, but
currently only cudf/dask_cudf dataframes are supported.)
'''
# check if dask future or delayed
ivals = inputs.values()
if not any((is_dask_collection(iv) for iv in ivals)) and \
not any((isinstance(iv, Future) for iv in ivals)):
# None of the inputs are Delayed or Futures so no intention of
# using delayed processing. Return False and avoid printing
# non-applicable warning.
return False

use_delayed = False
in_types = {}
for iport, ival in inputs.items():
itype = type(ival)
in_types[iport] = itype
if itype in (dask_cudf.DataFrame,):
for ival in ivals:
if isinstance(ival, DaskDataFrame):
use_delayed = True
break

if use_delayed:
warn_msg = \
'Node "{}" iport "{}" is of type "{}" and it '\
'should be dask_cudf.DataFrame. Ignoring '\
'"delayed_process" setting.'
for iport, itype in in_types.items():
if itype not in (dask_cudf.DataFrame,):
warnings.warn(warn_msg.format(self.uid, iport, itype))
use_delayed = False

if use_delayed:
# NOTE: Currently only support delayed processing when one of the
# inputs is a dask_cudf.DataFrame. In the future might generalize
# to support dask processing of other delayed/future type inputs.
if not use_delayed:
warn_msg = \
'Node "{}" oport "{}" is of type "{}" and it '\
'should be cudf.DataFrame or dask_cudf.DataFrame. Ignoring '\
'"delayed_process" setting.'
for oport, oport_spec in \
self._get_output_ports(full_port_spec=True).items():
otype = oport_spec.get('type', [])
if not isinstance(otype, list):
otype = [otype]
if dask_cudf.DataFrame not in otype and \
cudf.DataFrame not in otype:
warnings.warn(warn_msg.format(self.uid, oport, otype))
use_delayed = False
'None of the Node "{}" inputs '\
'is a dask_cudf.DataFrame. Ignoring '\
'"delayed_process" setting.'.format(self.uid)
warnings.warn(warn_msg)

return use_delayed

Expand All @@ -445,6 +456,15 @@ def __delayed_call(self, inputs):
the same i.e. equal number of partitions.
'''

def df_copy(df_in):
'''Used for delayed unpacking.'''
# Needed for the same reason as __make_copy. To prevent columns
# addition in the input data frames. In python everything is
# by reference value and dataframes are mutable.
# Handle the case when dask_cudf.DataFrames are source frames
# which appear as cudf.DataFrame in a dask-delayed function.
return df_in.copy(deep=False)

def get_pout(out_dict, port):
'''Get the output in out_dict at key port. Used for delayed
unpacking.'''
Expand All @@ -459,16 +479,29 @@ def get_pout(out_dict, port):

df_out = out_dict.get(port, cudf.DataFrame())

if isinstance(df_out, cudf.DataFrame):
if isinstance(df_out, cudf.DataFrame) or \
isinstance(df_out, pd.DataFrame):
# Needed for the same reason as __make_copy. To prevent columns
# addition in the input data frames. In python everything is
# by reference value and dataframes are mutable.
# Handle the case when dask_cudf.DataFrames are source frames
# which appear as cudf.DataFrame in a dask-delayed function.

# TODO: This copy might not be needed given df_copy fix.
return df_out.copy(deep=False)

return df_out

inputs_not_dly = {}
for iport, inarg in inputs.items():
# dcudf not necessarily a dask cudf frame
if not isinstance(inarg, DaskDataFrame):
# TODO: There could be cases where this non-delayed args are
# mutable. In that case USER BEWARE. Could copy here to
# deal with that. Shallow copy would be preferred but not
# 100% reliable.
inputs_not_dly[iport] = inarg

inputs_dly = {}
# A dask_cudf object will return a list of dask delayed object using
# to_delayed() API. Below the logic assumes (otherwise error) that
Expand All @@ -490,7 +523,11 @@ def get_pout(out_dict, port):
# p_x - partition index

npartitions = None
for iport, dcudf in inputs.items():
for iport, inarg in inputs.items():
# dcudf not necessarily a dask cudf frame
if not isinstance(inarg, DaskDataFrame):
continue
dcudf = inarg
ddf_dly_list = dcudf.to_delayed()
npartitions_ = len(ddf_dly_list)
if npartitions is None:
Expand All @@ -501,9 +538,10 @@ def get_pout(out_dict, port):
' has {} npartitions and other inputs have {} partitions'
.format(self.uid, iport, npartitions_, npartitions))
for idly, dly in enumerate(ddf_dly_list):
inputs_dly.setdefault(idly, {}).update({
# very import to use shallow copy of inputs_not_dly
inputs_dly.setdefault(idly, inputs_not_dly.copy()).update({
# iport: dly.persist() # DON'T PERSIST HERE
iport: dly
iport: dask.delayed(df_copy)(dly)
})

# DEBUGGING
Expand All @@ -528,8 +566,7 @@ def get_pout(out_dict, port):
output_df_dly = dask.delayed(self.decorate_process())(inputs_)
output_df_dly_per = output_df_dly.persist()
for oport in self._get_output_ports():
oport_out = dask.delayed(get_pout)(
output_df_dly_per, oport)
oport_out = dask.delayed(get_pout)(output_df_dly_per, oport)
outputs_dly.setdefault(oport, []).append(oport_out.persist())

# DEBUGGING
Expand All @@ -538,8 +575,24 @@ def get_pout(out_dict, port):
output_df = {}
# A dask_cudf object is synthesized from a list of delayed objects.
# Per outputs_dly above use dask_cudf.from_delayed API.
for oport in self._get_output_ports():
output_df[oport] = dask_cudf.from_delayed(outputs_dly[oport])
for oport, port_spec in \
self._get_output_ports(full_port_spec=True).items():
port_type = port_spec.get(PortsSpecSchema.port_type, type(None))
if not isinstance(port_type, Iterable):
port_type = [port_type]
# DEBUGGING
# print('__DELAYED_CALL node "{}" port "{}" port type "{}"'.format(
# self.uid, oport, port_type))
if dask_cudf.DataFrame in port_type or DaskDataFrame in port_type:
output_df[oport] = dask_cudf.from_delayed(outputs_dly[oport])
else:
# outputs_dly[oport] is currently a list. Run compute on each
# partition, and keep the first one.
# This is not very generalizeable
# TODO: Check for equivalency and output a warning in case
# outputs don't match from different partitions.
output_df[oport] = \
[iout.compute() for iout in outputs_dly[oport]][0]

return output_df

Expand Down Expand Up @@ -580,8 +633,11 @@ def get_type(type_def):
ports = from_node.ports_setup()
from_port_name = node_input['from_port']
to_port_name = node_input['to_port']
types = get_type(ports.outports[from_port_name]['type'])
output[to_port_name] = types
if from_port_name in ports.outports:
types = get_type(ports.outports[from_port_name]['type'])
output[to_port_name] = types
else:
continue
return output

def decorate_process(self):
Expand Down
Loading