diff --git a/delta/data/feat/speech_ops.py b/delta/data/feat/speech_ops.py index 12d90054..d9f6ccda 100644 --- a/delta/data/feat/speech_ops.py +++ b/delta/data/feat/speech_ops.py @@ -304,8 +304,8 @@ def _loop_body(time, inputs, output_tas): loop_vars = (time, waveforms, output_tas) parallel_iterations = 10 - shape_invariants = tf.nest.map_structure( - lambda t: tf.TensorShape(None), loop_vars) + shape_invariants = tf.nest.map_structure(lambda t: tf.TensorShape(None), + loop_vars) (time, inputs, output_tas) = tf.while_loop( _loop_continue, @@ -362,8 +362,8 @@ def _loop_body(time, end_time, context, left_context, right_context, loop_vars = (time, T, context, left_context, right_context, output_tas) parallel_iterations = 10 - shape_invariants = tf.nest.map_structure( - lambda t: tf.TensorShape(None), loop_vars) + shape_invariants = tf.nest.map_structure(lambda t: tf.TensorShape(None), + loop_vars) (time, end_time, context, left_context, right_context, output_tas) = tf.while_loop( diff --git a/delta/data/feat/tf_speech_feature.py b/delta/data/feat/tf_speech_feature.py index d16b8aa9..bf536fb7 100644 --- a/delta/data/feat/tf_speech_feature.py +++ b/delta/data/feat/tf_speech_feature.py @@ -61,8 +61,7 @@ def compute_mel_filterbank_features(waveforms, frame_step=10, fft_length=None, window_fn=functools.partial( - tf.signal.hann_window, - periodic=True), + tf.signal.hann_window, periodic=True), lower_edge_hertz=80.0, upper_edge_hertz=7600.0, num_mel_bins=80, @@ -130,11 +129,9 @@ def compute_mel_filterbank_features(waveforms, # Warp the linear-scale, magnitude spectrograms into the mel-scale. num_spectrogram_bins = magnitude_spectrograms.shape[-1].value linear_to_mel_weight_matrix = ( - tf.signal.linear_to_mel_weight_matrix(num_mel_bins, - num_spectrogram_bins, - sample_rate, - lower_edge_hertz, - upper_edge_hertz)) + tf.signal.linear_to_mel_weight_matrix(num_mel_bins, num_spectrogram_bins, + sample_rate, lower_edge_hertz, + upper_edge_hertz)) mel_spectrograms = tf.tensordot(magnitude_spectrograms, linear_to_mel_weight_matrix, 1) # Note: Shape inference for tensordot does not currently handle this case. diff --git a/delta/data/frontend/plp_test.py b/delta/data/frontend/plp_test.py index 103702c0..0ea08964 100644 --- a/delta/data/frontend/plp_test.py +++ b/delta/data/frontend/plp_test.py @@ -48,7 +48,8 @@ def test_plp(self): [0.052763, -0.271487, 0.011329, 0.025320, 0.012851]]) self.assertEqual(tf.rank(plp_test).eval(), 2) - self.assertAllClose(plp_test.eval()[50:55, 5:10], output_true, rtol=1e-05, atol=1e-05) + self.assertAllClose( + plp_test.eval()[50:55, 5:10], output_true, rtol=1e-05, atol=1e-05) if __name__ == '__main__': diff --git a/delta/data/preprocess/base_preparer.py b/delta/data/preprocess/base_preparer.py index 93aa89f0..dc3f43f3 100644 --- a/delta/data/preprocess/base_preparer.py +++ b/delta/data/preprocess/base_preparer.py @@ -121,18 +121,17 @@ def prepare_one_raw_data(self, one_path, one_path_after, mode, batch_num = int(math.ceil(data_size / float(self.batch_size))) if self.multi_text: one_text_after = [] - for i, one_text in enumerate(text): #to be confirmed + for i, one_text in enumerate(text): #to be confirmed one_text_iterator = get_pre_process_text_ds_iter( one_text, pre_process_pipeline, self.num_parallel_calls, self.batch_size) - text_after_arr = self.run_dataset(one_text_iterator,batch_num) + text_after_arr = self.run_dataset(one_text_iterator, batch_num) text_after = [one_line.decode("utf-8") for one_line in text_after_arr] all_texts += text_after one_text_after.append(text_after) else: text = text[0] - text_iterator = get_pre_process_text_ds_iter(text, - pre_process_pipeline, + text_iterator = get_pre_process_text_ds_iter(text, pre_process_pipeline, self.num_parallel_calls, self.batch_size) text_after_arr = self.run_dataset(text_iterator, batch_num) @@ -147,7 +146,9 @@ def prepare_one_raw_data(self, one_path, one_path_after, mode, label_ds = label[i].batch(self.batch_size) label_iterator = label_ds.make_initializable_iterator() label_after_arr = self.run_dataset(label_iterator, batch_num) - label_after_one = [one_line.decode("utf-8") for one_line in label_after_arr] + label_after_one = [ + one_line.decode("utf-8") for one_line in label_after_arr + ] one_label_after.append(label_after_one) all_labels[i] += label_after_one else: @@ -155,14 +156,16 @@ def prepare_one_raw_data(self, one_path, one_path_after, mode, label_ds = label.batch(self.batch_size) label_iterator = label_ds.make_initializable_iterator() label_after_arr = self.run_dataset(label_iterator, batch_num) - one_label_after = [one_line.decode("utf-8") for one_line in label_after_arr] + one_label_after = [ + one_line.decode("utf-8") for one_line in label_after_arr + ] all_labels += one_label_after logging.debug(f"one_text_after: {len(one_text_after)}") self.save_a_raw_file(one_label_after, one_text_after, one_path_after, infer_without_label) - def run_dataset(self, data_iterator,batch_num): + def run_dataset(self, data_iterator, batch_num): """Run the text pre-process pipeline, fetch data in numpy array format.""" data_after = [] data_t = data_iterator.get_next() @@ -176,7 +179,6 @@ def run_dataset(self, data_iterator,batch_num): data_after_arr = np.concatenate(data_after, axis=0) return data_after_arr - def load_a_raw_file(self, one_path, infer_without_label): """ Load a raw file. Return text and label. diff --git a/delta/data/preprocess/text_cls_preparer.py b/delta/data/preprocess/text_cls_preparer.py index 0d158349..5c821e09 100644 --- a/delta/data/preprocess/text_cls_preparer.py +++ b/delta/data/preprocess/text_cls_preparer.py @@ -20,7 +20,6 @@ from delta.data import utils as data_utils from delta.utils.register import registers - # pylint: disable=too-many-instance-attributes @@ -46,13 +45,12 @@ def load_a_raw_file(self, one_path, infer_without_label): ds_list = load_textline_dataset(one_path, column_num) if infer_without_label: text = ds_list - label = [] #to modifiy + label = [] #to modifiy else: text = ds_list[1:] label = ds_list[:1] return (text, label) - def save_a_raw_file(self, label, text_after, one_path_after, infer_without_label): """Save a raw file.""" diff --git a/delta/data/preprocess/text_match_preparer.py b/delta/data/preprocess/text_match_preparer.py index ebe33870..62c74635 100644 --- a/delta/data/preprocess/text_match_preparer.py +++ b/delta/data/preprocess/text_match_preparer.py @@ -40,9 +40,9 @@ def load_a_raw_file(self, one_path, infer_without_label): """ if infer_without_label: - column_num=2 + column_num = 2 else: - column_num=3 + column_num = 3 ds_list = load_textline_dataset([one_path], column_num) if infer_without_label: @@ -52,7 +52,7 @@ def load_a_raw_file(self, one_path, infer_without_label): text = ds_list[1:] label = ds_list[:1] - return (text,label) + return (text, label) def save_a_raw_file(self, label, text_after, one_path_after, infer_without_label): diff --git a/delta/data/preprocess/text_ops.py b/delta/data/preprocess/text_ops.py index 48bbefe4..c0b253bc 100644 --- a/delta/data/preprocess/text_ops.py +++ b/delta/data/preprocess/text_ops.py @@ -55,26 +55,23 @@ def tokenize_sentence(texts, max_seq_len, vocab_path): def chinese_word_cut_tf(input_str, use_file=False): """""" main_root = os.environ["MAIN_ROOT"] - dict_path = os.path.join(main_root, - "tools/cppjieba/dict/jieba.dict.utf8") - hmm_path = os.path.join(main_root, - "tools/cppjieba/dict/hmm_model.utf8") - user_dict_path = os.path.join(main_root, - "tools/cppjieba/dict/user.dict.utf8") + dict_path = os.path.join(main_root, "tools/cppjieba/dict/jieba.dict.utf8") + hmm_path = os.path.join(main_root, "tools/cppjieba/dict/hmm_model.utf8") + user_dict_path = os.path.join(main_root, "tools/cppjieba/dict/user.dict.utf8") idf_path = os.path.join(main_root, "tools/cppjieba/dict/idf.utf8") stop_word_path = os.path.join(main_root, "tools/cppjieba/dict/stop_words.utf8") if use_file: output_str = py_x_ops.jieba_cut( - input_str, - use_file=True, - hmm=True, - dict_path=dict_path, - hmm_path=hmm_path, - user_dict_path=user_dict_path, - idf_path=idf_path, - stop_word_path=stop_word_path) + input_str, + use_file=True, + hmm=True, + dict_path=dict_path, + hmm_path=hmm_path, + user_dict_path=user_dict_path, + idf_path=idf_path, + stop_word_path=stop_word_path) else: dict_lines = read_lines_from_text_file(dict_path) model_lines = read_lines_from_text_file(hmm_path) @@ -83,14 +80,14 @@ def chinese_word_cut_tf(input_str, use_file=False): stop_word_lines = read_lines_from_text_file(stop_word_path) output_str = py_x_ops.jieba_cut( - input_str, - use_file=False, - hmm=True, - dict_lines=dict_lines, - model_lines=model_lines, - user_dict_lines=user_dict_lines, - idf_lines=idf_lines, - stop_word_lines=stop_word_lines) + input_str, + use_file=False, + hmm=True, + dict_lines=dict_lines, + model_lines=model_lines, + user_dict_lines=user_dict_lines, + idf_lines=idf_lines, + stop_word_lines=stop_word_lines) return output_str @@ -136,7 +133,8 @@ def char_cut_tf(input_str): def load_textline_dataset(paths, column_num): """Load raw data for text task.""" ds = tf.data.TextLineDataset(paths) - ds = ds.map(lambda x: tf.strings.split(x, sep="\t", result_type="RaggedTensor")) + ds = ds.map( + lambda x: tf.strings.split(x, sep="\t", result_type="RaggedTensor")) ds = ds.filter(lambda line: tf.equal(tf.size(line), column_num)) ds_list = [] for i in range(column_num): @@ -162,13 +160,13 @@ def process_one_label_dataset(label_ds, config, output_index=None): label_vocab_file_path = config["data"]["task"]["label_vocab"] label_ds = label_ds.map( - lambda x: tokenize_label( - x, maxlen=1, label_vocab_file_path=label_vocab_file_path, pad_id=0), - num_parallel_calls=num_parallel_calls) + lambda x: tokenize_label( + x, maxlen=1, label_vocab_file_path=label_vocab_file_path, pad_id=0), + num_parallel_calls=num_parallel_calls) label_ds = label_ds.map( - lambda l: tf.one_hot(l, num_classes, dtype=tf.int32), - num_parallel_calls=num_parallel_calls) + lambda l: tf.one_hot(l, num_classes, dtype=tf.int32), + num_parallel_calls=num_parallel_calls) label_ds = label_ds.map(tf.squeeze, num_parallel_calls=num_parallel_calls) @@ -185,7 +183,7 @@ def process_multi_label_dataset(label_ds, config, output_index=None): label_vocab_file_path = config["data"]["task"]["label_vocab"] if isinstance(label_vocab_file_path, list): if output_index is None or output_index not in range( - len(label_vocab_file_path)): + len(label_vocab_file_path)): raise IndexError("output_index:{} not in the range of classes length: " "{}!".format(output_index, len(label_vocab_file_path))) label_vocab_file_path = label_vocab_file_path[output_index] @@ -194,12 +192,12 @@ def process_multi_label_dataset(label_ds, config, output_index=None): label_vocab_file_path = label_vocab_file_path label_ds = label_ds.map( - lambda x: tokenize_label( - x, - maxlen=max_seq_len, - label_vocab_file_path=label_vocab_file_path, - pad_id=0), - num_parallel_calls=num_parallel_calls) + lambda x: tokenize_label( + x, + maxlen=max_seq_len, + label_vocab_file_path=label_vocab_file_path, + pad_id=0), + num_parallel_calls=num_parallel_calls) label_ds = label_ds.map(tf.squeeze, num_parallel_calls=num_parallel_calls) return label_ds diff --git a/delta/data/preprocess/text_seq2seq_preparer.py b/delta/data/preprocess/text_seq2seq_preparer.py index 026267fc..e6b3ea4b 100644 --- a/delta/data/preprocess/text_seq2seq_preparer.py +++ b/delta/data/preprocess/text_seq2seq_preparer.py @@ -65,7 +65,8 @@ def prepare_raw_data(self, pre_process_pipeline): self.prepare_one_raw_data((one_path_text, one_path_target), (one_path_text_after, one_path_target_after), mode, infer_without_label, - pre_process_pipeline, all_texts, all_labels,data_size) + pre_process_pipeline, all_texts, all_labels, + data_size) return all_texts, all_labels def load_a_raw_file(self, one_path, infer_without_label): @@ -79,10 +80,10 @@ def load_a_raw_file(self, one_path, infer_without_label): column_num = 1 text_path, target_path = one_path texts = load_textline_dataset([text_path], column_num) - # texts = data_utils.load_seq2seq_raw_data([text_path]) + # texts = data_utils.load_seq2seq_raw_data([text_path]) if not infer_without_label: - target = load_textline_dataset([target_path],column_num) - return texts+target, target + target = load_textline_dataset([target_path], column_num) + return texts + target, target return texts, [] def save_a_raw_file(self, label, text_after, one_path_after, diff --git a/delta/data/task/base_text_task.py b/delta/data/task/base_text_task.py index 3171ed25..62e79fd6 100644 --- a/delta/data/task/base_text_task.py +++ b/delta/data/task/base_text_task.py @@ -57,7 +57,6 @@ def __init__(self, config, mode): self.shuffle_buffer_size = self.task_config['shuffle_buffer_size'] self.need_shuffle = self.task_config['need_shuffle'] - def input_fn(self): def _input_fn(): diff --git a/delta/data/task/speaker_cls_task.py b/delta/data/task/speaker_cls_task.py index 91b9aa4c..dec90768 100644 --- a/delta/data/task/speaker_cls_task.py +++ b/delta/data/task/speaker_cls_task.py @@ -583,6 +583,7 @@ def make_example(inputs, labels, filenames, clip_ids, soft_labels): batch(batch_size, drop_remainder=False).\ prefetch(tf.data.experimental.AUTOTUNE) + class KaldiDir: def __init__(self, kaldi_dir): diff --git a/delta/data/task/text_cls_task.py b/delta/data/task/text_cls_task.py index 126954d0..db3fe541 100644 --- a/delta/data/task/text_cls_task.py +++ b/delta/data/task/text_cls_task.py @@ -41,7 +41,7 @@ class TextClsTask(TextTask): def __init__(self, config, mode): super().__init__(config, mode) self.infer_no_label = self.config["data"][utils.INFER].get( - 'infer_no_label', False) + 'infer_no_label', False) self.vocab_min_frequency = self.task_config['vocab_min_frequency'] self.text_vocab_file_path = self.task_config['text_vocab'] self.label_vocab_file_path = self.task_config['label_vocab'] @@ -68,7 +68,8 @@ def generate_data(self): text_ds = load_textline_dataset(self.paths_after_pre_process, column_num) else: column_num = 2 - label_ds, text_ds = load_textline_dataset(self.paths_after_pre_process, column_num) + label_ds, text_ds = load_textline_dataset(self.paths_after_pre_process, + column_num) input_pipeline_func = self.get_input_pipeline(for_export=False) @@ -106,8 +107,8 @@ def generate_data(self): self.split_token)) self.config['data']['split_token'] = int(vocab_dict[self.split_token]) self.config['data']['vocab_size'] = vocab_size - self.config['data']['{}_data_size'.format(self.mode)] = get_file_len(self.paths_after_pre_process) - + self.config['data']['{}_data_size'.format(self.mode)] = get_file_len( + self.paths_after_pre_process) return data_set diff --git a/delta/data/task/text_cls_task_test.py b/delta/data/task/text_cls_task_test.py index 31ef5ae2..2d65220c 100644 --- a/delta/data/task/text_cls_task_test.py +++ b/delta/data/task/text_cls_task_test.py @@ -246,7 +246,8 @@ def test_chinese_word(self): shape_op = tf.shape(input_x) with self.cached_session(use_gpu=False, force_gpu=False) as sess: - res, shape_res = sess.run([input_x, shape_op], feed_dict={input_sentence: ["我很愤怒"]}) + res, shape_res = sess.run([input_x, shape_op], + feed_dict={input_sentence: ["我很愤怒"]}) logging.debug(res[0]) logging.debug(np.shape(res[0])) logging.debug(f"shape: {shape_res}") diff --git a/delta/data/task/text_match_task.py b/delta/data/task/text_match_task.py index 85cb5167..070a03cd 100644 --- a/delta/data/task/text_match_task.py +++ b/delta/data/task/text_match_task.py @@ -48,7 +48,7 @@ def __init__(self, config, mode): one_path + ".after" for one_path in self.paths ] self.infer_no_label = self.config["data"][utils.INFER].get( - 'infer_no_label', False) + 'infer_no_label', False) self.infer_without_label = bool(mode == utils.INFER and self.infer_no_label) self.prepare() @@ -57,11 +57,13 @@ def __init__(self, config, mode): def generate_data(self): """Generate data for offline training.""" if self.infer_without_label: - column_num=2 - text_ds_left, text_ds_right = load_textline_dataset(self.paths_after_pre_process, column_num) + column_num = 2 + text_ds_left, text_ds_right = load_textline_dataset( + self.paths_after_pre_process, column_num) else: - column_num=3 - label,text_ds_left, text_ds_right=load_textline_dataset(self.paths_after_pre_process, column_num) + column_num = 3 + label, text_ds_left, text_ds_right = load_textline_dataset( + self.paths_after_pre_process, column_num) input_pipeline_func = self.get_input_pipeline(for_export=False) text_ds_left = text_ds_left.map( @@ -86,7 +88,8 @@ def generate_data(self): vocab_size = len(vocab_dict) self.config['data']['vocab_size'] = vocab_size - self.config['data']['{}_data_size'.format(self.mode)] = get_file_len(self.paths_after_pre_process) + self.config['data']['{}_data_size'.format(self.mode)] = get_file_len( + self.paths_after_pre_process) return data_set_left_right, text_len_left_right diff --git a/delta/data/task/text_nlu_joint_task.py b/delta/data/task/text_nlu_joint_task.py index bc02410c..1c57f6ed 100644 --- a/delta/data/task/text_nlu_joint_task.py +++ b/delta/data/task/text_nlu_joint_task.py @@ -50,7 +50,7 @@ def __init__(self, config, mode): one_path + ".after" for one_path in self.paths ] self.infer_no_label = self.config["data"][utils.INFER].get( - 'infer_no_label', False) + 'infer_no_label', False) self.infer_without_label = bool(mode == utils.INFER and self.infer_no_label) self.prepare() @@ -75,15 +75,16 @@ def generate_data(self): text_ds = load_textline_dataset(self.paths_after_pre_process, column_num) else: column_num = 3 - intent_label_ds, slots_label_ds, text_ds = load_textline_dataset(self.paths_after_pre_process, column_num) + intent_label_ds, slots_label_ds, text_ds = load_textline_dataset( + self.paths_after_pre_process, column_num) logging.info("Loading text dataset...") input_pipeline_func = self.get_input_pipeline(for_export=False) text_ds = text_ds.map( - input_pipeline_func, num_parallel_calls=self.num_parallel_calls) + input_pipeline_func, num_parallel_calls=self.num_parallel_calls) text_size_ds = text_ds.map( - lambda x: compute_sen_lens(x, padding_token=0), - num_parallel_calls=self.num_parallel_calls) + lambda x: compute_sen_lens(x, padding_token=0), + num_parallel_calls=self.num_parallel_calls) text_ds = tf.data.Dataset.zip((text_ds, text_size_ds)) if self.infer_without_label: @@ -97,7 +98,8 @@ def generate_data(self): self.config['data']['vocab_size'] = get_vocab_size( self.text_vocab_file_path) - self.config['data']['{}_data_size'.format(self.mode)] = get_file_len(self.paths_after_pre_process) + self.config['data']['{}_data_size'.format(self.mode)] = get_file_len( + self.paths_after_pre_process) return data_set diff --git a/delta/data/task/text_seq2seq_task.py b/delta/data/task/text_seq2seq_task.py index 87d02ace..a3cc4f9e 100644 --- a/delta/data/task/text_seq2seq_task.py +++ b/delta/data/task/text_seq2seq_task.py @@ -62,7 +62,7 @@ def __init__(self, config, mode): one_path + ".after" for one_path in self.tgt_paths ] self.infer_no_label = self.config["data"][utils.INFER].get( - 'infer_no_label', False) + 'infer_no_label', False) self.infer_without_label = bool(mode == utils.INFER and self.infer_no_label) self.prepare() @@ -103,14 +103,14 @@ def generate_data(self): input_pipeline_func = self.get_input_pipeline(for_export=False) src_ds = src_ds.map( - input_pipeline_func, num_parallel_calls=self.num_parallel_calls) + input_pipeline_func, num_parallel_calls=self.num_parallel_calls) src_size_ds = src_ds.map( - lambda x: compute_sen_lens(x, padding_token=utils.PAD_IDX), - num_parallel_calls=self.num_parallel_calls) + lambda x: compute_sen_lens(x, padding_token=utils.PAD_IDX), + num_parallel_calls=self.num_parallel_calls) src_ds = src_ds.map( - self.exclude_padding, num_parallel_calls=self.num_parallel_calls) + self.exclude_padding, num_parallel_calls=self.num_parallel_calls) if self.infer_without_label: data_set = tf.data.Dataset.zip((src_ds, src_size_ds)) @@ -122,19 +122,19 @@ def generate_data(self): tgt_in_ds = tgt.map(lambda x: self.START_TOKEN + ' ' + x) tgt_in_ds = tgt_in_ds.map( - lambda batch: self.text_pipeline_func(batch, self.max_dec_len, self. - text_vocab_file_path), - num_parallel_calls=self.num_parallel_calls) + lambda batch: self.text_pipeline_func(batch, self.max_dec_len, self. + text_vocab_file_path), + num_parallel_calls=self.num_parallel_calls) tgt_in_size_ds = tgt_in_ds.map( - lambda x: compute_sen_lens(x, padding_token=utils.PAD_IDX), - num_parallel_calls=self.num_parallel_calls) + lambda x: compute_sen_lens(x, padding_token=utils.PAD_IDX), + num_parallel_calls=self.num_parallel_calls) tgt_in_ds = tgt_in_ds.map( - self.exclude_padding, num_parallel_calls=self.num_parallel_calls) + self.exclude_padding, num_parallel_calls=self.num_parallel_calls) inp_ds = tf.data.Dataset.zip( - (src_ds, src_size_ds, tgt_in_ds, tgt_in_size_ds)) + (src_ds, src_size_ds, tgt_in_ds, tgt_in_size_ds)) if self.use_label_vocab: target_vocab_file_path = self.label_vocab_file_paths[0] @@ -163,12 +163,14 @@ def generate_data(self): def feature_spec(self): """Get shapes for feature.""" if not self.infer_without_label: - feature_shapes = [(tf.TensorShape([tf.Dimension(None)]), tf.TensorShape([]), - tf.TensorShape([tf.Dimension(None)]), tf.TensorShape([]))] + feature_shapes = [ + (tf.TensorShape([tf.Dimension(None)]), tf.TensorShape([]), + tf.TensorShape([tf.Dimension(None)]), tf.TensorShape([])) + ] feature_shapes.append(tf.TensorShape([tf.Dimension(None)])) else: - feature_shapes = [(tf.TensorShape([tf.Dimension(None)]), tf.TensorShape([])) - ] + feature_shapes = [(tf.TensorShape([tf.Dimension(None)]), + tf.TensorShape([]))] if len(feature_shapes) == 1: return feature_shapes[0] return tuple(feature_shapes) diff --git a/delta/data/task/text_seq_label_task.py b/delta/data/task/text_seq_label_task.py index 168b5b4f..3218e6dd 100644 --- a/delta/data/task/text_seq_label_task.py +++ b/delta/data/task/text_seq_label_task.py @@ -63,10 +63,10 @@ def generate_data(self): logging.info("process text ds...") input_pipeline_func = self.get_input_pipeline(for_export=False) text_ds = text_ds.map( - input_pipeline_func, num_parallel_calls=self.num_parallel_calls) + input_pipeline_func, num_parallel_calls=self.num_parallel_calls) text_size_ds = text_ds.map( - lambda x: compute_sen_lens(x, padding_token=0), - num_parallel_calls=self.num_parallel_calls) + lambda x: compute_sen_lens(x, padding_token=0), + num_parallel_calls=self.num_parallel_calls) text_ds = tf.data.Dataset.zip((text_ds, text_size_ds)) logging.info("process label ds...") @@ -78,7 +78,8 @@ def generate_data(self): self.config['data']['vocab_size'] = get_vocab_size( self.text_vocab_file_path) - self.config['data']['{}_data_size'.format(self.mode)] = get_file_len(self.paths) + self.config['data']['{}_data_size'.format(self.mode)] = get_file_len( + self.paths) return data_set diff --git a/delta/data/utils/common_utils.py b/delta/data/utils/common_utils.py index 5bcb23a3..fb2a998c 100644 --- a/delta/data/utils/common_utils.py +++ b/delta/data/utils/common_utils.py @@ -26,7 +26,6 @@ from delta import utils - def input_fn(dataset, mode, batch_size, num_epoch=None): ''' params: dataset, tf.data.Dataset @@ -153,7 +152,8 @@ def load_npy(npy_path, dtype=np.float32): def get_file_len(fname_paths): len_res = [] for fname in fname_paths: - p = subprocess.Popen(['wc', '-l', fname], stdout=subprocess.PIPE, + p = subprocess.Popen(['wc', '-l', fname], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) result, err = p.communicate() if p.returncode != 0: @@ -162,6 +162,7 @@ def get_file_len(fname_paths): return sum(len_res) + def read_lines_from_text_file(file_path): """Read lines from a text file.""" with open(file_path) as f: diff --git a/delta/layers/common_layers.py b/delta/layers/common_layers.py index 4818842b..8e9d5329 100644 --- a/delta/layers/common_layers.py +++ b/delta/layers/common_layers.py @@ -22,6 +22,7 @@ #pylint: disable=invalid-name + def splice_layer(x, name, context): ''' Splice a tensor along the last dimension with context. diff --git a/delta/layers/ops/kernels/jieba_op.cc b/delta/layers/ops/kernels/jieba_op.cc index c2906ad9..b466bb8f 100644 --- a/delta/layers/ops/kernels/jieba_op.cc +++ b/delta/layers/ops/kernels/jieba_op.cc @@ -37,9 +37,8 @@ class JiebaCutOp : public OpKernel { public: explicit JiebaCutOp(OpKernelConstruction* ctx) : OpKernel(ctx) { bool use_file; - OP_REQUIRES_OK(ctx, ctx->GetAttr("use_file", - &use_file)); - if (use_file){ + OP_REQUIRES_OK(ctx, ctx->GetAttr("use_file", &use_file)); + if (use_file) { std::string dict_path; OP_REQUIRES_OK(ctx, ctx->GetAttr("dict_path", &dict_path)); std::string hmm_path; @@ -51,10 +50,9 @@ class JiebaCutOp : public OpKernel { std::string stop_word_path; OP_REQUIRES_OK(ctx, ctx->GetAttr("stop_word_path", &stop_word_path)); OP_REQUIRES_OK(ctx, ctx->GetAttr("hmm", &hmm)); - jieba_ = new cppjieba::Jieba(dict_path, hmm_path, user_dict_path, idf_path, - stop_word_path); - } - else{ + jieba_ = new cppjieba::Jieba(dict_path, hmm_path, user_dict_path, + idf_path, stop_word_path); + } else { std::vector dict_lines; OP_REQUIRES_OK(ctx, ctx->GetAttr("dict_lines", &dict_lines)); std::vector model_lines; @@ -66,8 +64,8 @@ class JiebaCutOp : public OpKernel { std::vector stop_word_lines; OP_REQUIRES_OK(ctx, ctx->GetAttr("stop_word_lines", &stop_word_lines)); OP_REQUIRES_OK(ctx, ctx->GetAttr("hmm", &hmm)); - jieba_ = new cppjieba::Jieba(dict_lines, model_lines, user_dict_lines, idf_lines, - stop_word_lines); + jieba_ = new cppjieba::Jieba(dict_lines, model_lines, user_dict_lines, + idf_lines, stop_word_lines); } } diff --git a/delta/layers/ops/kernels/jieba_op_test.py b/delta/layers/ops/kernels/jieba_op_test.py index 13cdb121..e74d9f76 100644 --- a/delta/layers/ops/kernels/jieba_op_test.py +++ b/delta/layers/ops/kernels/jieba_op_test.py @@ -23,9 +23,9 @@ from delta.data.utils import read_lines_from_text_file from delta.layers.ops import py_x_ops - # pylint: disable=not-context-manager, invalid-name + def test_one(sess, ops, inputs): ''' elapse time of op ''' t1 = time.time() @@ -135,8 +135,9 @@ def test_jieba_cut_op_no_file(self): sentence_out_res = test_one(sess, sentence_out, {sentence_in: ["吉林省长春药店"]}) self.assertEqual("吉林省 长春 药店", sentence_out_res[0].decode("utf-8")) - sentence_out_res, shape_res = test_one(sess, [sentence_out, shape_op], - {sentence_in: ["吉林省长春药店", "南京市长江大桥"]}) + sentence_out_res, shape_res = test_one( + sess, [sentence_out, shape_op], + {sentence_in: ["吉林省长春药店", "南京市长江大桥"]}) self.assertEqual( "吉林省 长春 药店\n南京市 长江大桥", "\n".join([one_sen.decode("utf-8") for one_sen in sentence_out_res diff --git a/delta/layers/ops/kernels/plp_op_test.py b/delta/layers/ops/kernels/plp_op_test.py index 340a89e8..3791a879 100644 --- a/delta/layers/ops/kernels/plp_op_test.py +++ b/delta/layers/ops/kernels/plp_op_test.py @@ -55,7 +55,8 @@ def test_plp(self): self.assertEqual(tf.rank(output).eval(), 2) logging.info('Shape of PLP: {}'.format(output.shape)) - self.assertAllClose(output.eval()[50:55, 5:10], output_true, rtol=1e-05, atol=1e-05) + self.assertAllClose( + output.eval()[50:55, 5:10], output_true, rtol=1e-05, atol=1e-05) if __name__ == '__main__': diff --git a/delta/layers/ops/kernels/tokenizer_ops.cc b/delta/layers/ops/kernels/tokenizer_ops.cc index e43c64b9..4375ada4 100644 --- a/delta/layers/ops/kernels/tokenizer_ops.cc +++ b/delta/layers/ops/kernels/tokenizer_ops.cc @@ -38,22 +38,21 @@ class SentenceToIdsOp : public OpKernel { &load_token_ids_from_vocab)); bool use_vocab_file; - OP_REQUIRES_OK(ctx, ctx->GetAttr("use_vocab_file", - &use_vocab_file)); + OP_REQUIRES_OK(ctx, ctx->GetAttr("use_vocab_file", &use_vocab_file)); bool check_tokens; OP_REQUIRES_OK(ctx, ctx->GetAttr("check_tokens", &check_tokens)); OP_REQUIRES_OK(ctx, ctx->GetAttr("delimiter", &delimiter_)); CHECK_GT(maxlen_, 0); - if (use_vocab_file){ + if (use_vocab_file) { OP_REQUIRES_OK(ctx, ctx->GetAttr("vocab_filepath", &vocab_filepath_)); - OP_REQUIRES_OK(ctx, vocab_.Load(vocab_filepath_, load_token_ids_from_vocab, - check_tokens)); - } - else{ + OP_REQUIRES_OK(ctx, vocab_.Load(vocab_filepath_, + load_token_ids_from_vocab, check_tokens)); + } else { std::vector vocab; OP_REQUIRES_OK(ctx, ctx->GetAttr("vocab", &vocab)); - OP_REQUIRES_OK(ctx, vocab_.Load(vocab, load_token_ids_from_vocab, check_tokens)); + OP_REQUIRES_OK( + ctx, vocab_.Load(vocab, load_token_ids_from_vocab, check_tokens)); } } diff --git a/delta/layers/ops/kernels/tokenizer_ops_test.py b/delta/layers/ops/kernels/tokenizer_ops_test.py index e3ab0fc3..f92c8ce7 100644 --- a/delta/layers/ops/kernels/tokenizer_ops_test.py +++ b/delta/layers/ops/kernels/tokenizer_ops_test.py @@ -113,7 +113,8 @@ def test_text_to_tokenid(self): load_token_ids_from_vocab=False, pad_id=-1) batch_shape_op = tf.shape(batch_op) - shape_res, token_ids, paddings = sess.run([batch_shape_op, batch_op, batch_padding_op]) + shape_res, token_ids, paddings = sess.run( + [batch_shape_op, batch_op, batch_padding_op]) elapsed = time.time() - start logging.info("Time cost: {:.4f}s".format(elapsed)) logging.info(token_ids) @@ -136,7 +137,8 @@ def test_text_to_tokenid(self): load_token_ids_from_vocab=False, pad_id=-1) single_shape_op = tf.shape(single_op) - single_shape_res, token_ids, paddings = sess.run([single_shape_op, single_op, single_padding_op]) + single_shape_res, token_ids, paddings = sess.run( + [single_shape_op, single_op, single_padding_op]) logging.info("single_op: {}".format(single_op)) logging.info(f"single_shape: {single_shape_res}") self.assertAllEqual(single_shape_res, [10]) diff --git a/delta/layers/recurrent.py b/delta/layers/recurrent.py index 42e89b4b..7808ecda 100644 --- a/delta/layers/recurrent.py +++ b/delta/layers/recurrent.py @@ -62,7 +62,7 @@ def build(self, input_shape): def compute_output_shape(self, input_shape): return tf.TensorShape([input_shape[0], self.cell_dim * 2]) - + def compute_mask(self, inputs, mask=None): return None diff --git a/delta/serving/eval_text_cls_pb.py b/delta/serving/eval_text_cls_pb.py index 6a6c79e0..374a8a6c 100644 --- a/delta/serving/eval_text_cls_pb.py +++ b/delta/serving/eval_text_cls_pb.py @@ -42,13 +42,13 @@ def __init__(self, config, gpu_str=None, mode=utils.INFER): self.inspect_ops() self.input_sentence = self.graph.get_tensor_by_name( - config['solver']['service']['input_sentence']) + config['solver']['service']['input_sentence']) self.input_x = self.graph.get_tensor_by_name( - config['solver']['service']['input_x']) + config['solver']['service']['input_x']) self.score = self.graph.get_tensor_by_name( - config['solver']['service']['score']) + config['solver']['service']['score']) self.preds = self.graph.get_tensor_by_name( - config['solver']['service']['preds']) + config['solver']['service']['preds']) @property def config(self): @@ -61,7 +61,8 @@ def get_test_feed_dict(self): def infer_one(self): feed_dict = self.get_test_feed_dict() - input_x, score, preds = self.sess.run([self.input_x, self.score, self.preds], feed_dict=feed_dict) + input_x, score, preds = self.sess.run( + [self.input_x, self.score, self.preds], feed_dict=feed_dict) logging.info(f"input_x: {input_x}") logging.info(f"preds: {preds}") logging.info(f"score: {score}") @@ -69,6 +70,7 @@ def infer_one(self): def eval_or_infer(self): pass + def main(_): ''' main func ''' FLAGS = app.flags.FLAGS #pylint: disable=invalid-name @@ -110,7 +112,10 @@ def main(_): def define_flags(): ''' define flags for evaluator''' - app.flags.DEFINE_string('config', 'egs/mock_text_cls_data/text_cls/v1/config/han-cls.yml', help='config path') + app.flags.DEFINE_string( + 'config', + 'egs/mock_text_cls_data/text_cls/v1/config/han-cls.yml', + help='config path') app.flags.DEFINE_string('mode', 'eval', 'eval, infer, debug, infer_one') # The GPU devices which are visible for current process app.flags.DEFINE_string('gpu', '', 'gpu number') diff --git a/delta/utils/ctc_utils.py b/delta/utils/ctc_utils.py index 36bf47dd..9618b635 100644 --- a/delta/utils/ctc_utils.py +++ b/delta/utils/ctc_utils.py @@ -18,6 +18,7 @@ import delta.compat as tf + def transform_preprocess(labels=None, blank_index=None, num_class=None): ''' Ensure that the value of blank_index is in a reasonable range, and transform the DenseTensor labels to a SparseTensor ''' diff --git a/delta/utils/solver/raw_solver.py b/delta/utils/solver/raw_solver.py index 0584b830..bf2fbb08 100644 --- a/delta/utils/solver/raw_solver.py +++ b/delta/utils/solver/raw_solver.py @@ -368,7 +368,8 @@ def train(self): # pylint: disable=too-many-locals checkpoint_dir = get_checkpoint_dir(self.config) # scaffold - scaffold = self.get_scaffold(mode, global_step, train_model.iterator.initializer) + scaffold = self.get_scaffold(mode, global_step, + train_model.iterator.initializer) with tf.train.MonitoredTrainingSession( checkpoint_dir=checkpoint_dir, @@ -419,7 +420,8 @@ def train_and_eval(self): # pylint: disable=too-many-locals checkpoint_dir = get_checkpoint_dir(self.config) # scaffold - scaffold = self.get_scaffold(utils.TRAIN, global_step, train_model.iterator.initializer) + scaffold = self.get_scaffold(utils.TRAIN, global_step, + train_model.iterator.initializer) with tf.train.MonitoredTrainingSession( checkpoint_dir=checkpoint_dir, diff --git a/delta/utils/solver/utils/solver_utils.py b/delta/utils/solver/utils/solver_utils.py index 1a3e812a..5ad2b098 100644 --- a/delta/utils/solver/utils/solver_utils.py +++ b/delta/utils/solver/utils/solver_utils.py @@ -62,9 +62,15 @@ def to_saved_model(config, sess, inputs: dict, outputs: dict): export_path = os.path.abspath(export_path) logging.info('Exporting model to: {}'.format(export_path)) if os.path.exists(export_path): - files = [one.decode() for one in os.listdir(export_path) if isinstance(one, bytes)] + files = [ + one.decode() + for one in os.listdir(export_path) + if isinstance(one, bytes) + ] if "variables" in files: - cmd = input(f"Export directory already exists, and isn't empty. Overwrite? [y/n]").strip().lower() + cmd = input( + f"Export directory already exists, and isn't empty. Overwrite? [y/n]" + ).strip().lower() if cmd == "" or cmd == "y": shutil.rmtree(export_path) builder = tf.saved_model.builder.SavedModelBuilder(export_path) diff --git a/deltann/Makefile b/deltann/Makefile index d78086fe..99569725 100644 --- a/deltann/Makefile +++ b/deltann/Makefile @@ -15,7 +15,7 @@ # ============================================================================== # Find where we're running from, so we can store generated files here. -.PHONY: all example clean +.PHONY: all examples clean SHELL=/bin/bash @@ -125,7 +125,7 @@ $(OBJDIR)%.o:%.cc -@mkdir -p $(dir $@) $(CXX) $(CXXFLAGS) $(INCLUDES) -c $< -o $@ -example: +examples: $(MAKE) -C $(EXAMPLE_DIR) clean: diff --git a/deltann/core/runtime.cc b/deltann/core/runtime.cc new file mode 100644 index 00000000..65a63e16 --- /dev/null +++ b/deltann/core/runtime.cc @@ -0,0 +1,330 @@ +/* Copyright (C) 2017 Beijing Didi Infinity Technology and Development Co.,Ltd. +All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ +#include "core/runtime.h" + +namespace delta { + +namespace core { + +namespace { + +// load ops lib +DeltaStatus load_custom_ops_lib(const std::string& lib) { + if (!lib.size()) { + LOG_WARN << "no custom ops to load"; + return DeltaStatus::STATUS_OK; + } + +#ifdef USE_TF + LOG_INFO << "custom op lib is: " << lib; + TF_Status* status = TF_NewStatus(); + TF_Library* custom_op_lib = TF_LoadLibrary(lib.c_str(), status); + + TF_Code code = TF_GetCode(status); + string status_msg(TF_Message(status)); + TF_DeleteStatus(status); + if (TF_OK != code) { + LOG_FATAL << status_msg; + } + LOG_INFO << "custom op lib load succesfully" << lib; + + TF_Buffer op_list_buf = TF_GetOpList(custom_op_lib); + tensorflow::OpList op_list; + DELTA_CHECK(op_list.ParseFromArray(op_list_buf.data, op_list_buf.length)); + for (int i = 0; i != op_list.op_size(); ++i) { + LOG_INFO << "cutsom op: " << op_list.op(i).name(); + } + + TF_DeleteLibraryHandle(custom_op_lib); +#endif // USE_TF + return DeltaStatus::STATUS_OK; +} + +// load all models +DeltaStatus load_models(const RuntimeConfig& rt_cfg, + std::unordered_map* graphs) { + const int num_threads = rt_cfg.num_threads(); + + if (!graphs->size()) { + LOG_WARN << "graphs size is empty, " << graphs->size(); + return DeltaStatus::STATUS_ERROR; + } + + for (auto& iter : *graphs) { + LOG_INFO << "Load model for graph " << iter.first; + + Graph& graph = iter.second; + std::string engine_type = graph.engine_type(); + ModelMeta model_meta = graph.model_meta(); + auto search = _global_engines.find(engine_type); + + BaseModel* model = nullptr; + + if (search != _global_engines.end()) { +#ifdef USE_TF + if (EngineType::DELTA_EIGINE_TF == _global_engines[engine_type]) { + LOG_INFO << "User engine tf"; + model = new TFModel(model_meta, num_threads); +#endif + + } else if (EngineType::DELTA_EIGINE_TFTRT == + _global_engines[engine_type]) { + LOG_INFO << "User engine tftrt"; + +#ifdef USE_TFLITE + } else if (EngineType::DELTA_EIGINE_TFLITE == + _global_engines[engine_type]) { + LOG_INFO << "User engine tf lite"; + model = new TFLiteModel(model_meta, num_threads); +#endif + +#ifdef USE_TF_SERVING + } else if (EngineType::DELTA_EIGINE_TFSERVING == + _global_engines[engine_type]) { + LOG_INFO << "User engine TFSERVING"; + model = new TFServingModel(model_meta, num_threads); +#endif + + } else if (EngineType::DELTA_EIGINE_SERVING == + _global_engines[engine_type]) { + LOG_FATAL << "Not support now SERVING"; + } else { + LOG_FATAL << "Not support engine type: " << engine_type; + } + } else { + LOG_FATAL << "Not support engine"; + return DeltaStatus::STATUS_ERROR; + } + + graph.set_model(model); + model = nullptr; + } // for + + return DeltaStatus::STATUS_OK; +} + +} // namespace + +// load custom ops +DeltaStatus Runtime::load_custom_ops() { + return load_custom_ops_lib(_rt_cfg.custom_ops_path()); +} + +// load model +DeltaStatus Runtime::load_model() { + if (DeltaStatus::STATUS_OK != load_models(_rt_cfg, &_graphs)) { + LOG_FATAL << "load model failed"; + return DeltaStatus::STATUS_ERROR; + } + return DeltaStatus::STATUS_OK; +} + +DeltaStatus Runtime::set_inputs(const std::vector& inputs) { + _inputs_data.clear(); + for (auto& in : inputs) { + LOG_INFO << "Graph name " << in._graph_name; + auto search = _graphs.find(in._graph_name); + if (search != _graphs.end()) { + Graph& graph = search->second; + // std::unordered_map& inputs = graph.get_inputs(); + LOG_INFO << "input name " << in._input_name; + Input& input = graph.get_inputs().at(in._input_name); + DELTA_CHECK_GE(in._size, input.size()); + InputData input_data(input); + input_data.copy_from(in._ptr, in._size); + _inputs_data.push_back(input_data); + } else { + LOG_FATAL << "Error, Graph " << in._graph_name << " not exist!"; + } + } + return DeltaStatus::STATUS_OK; +} + +// get all out result +DeltaStatus Runtime::get_outputs(std::vector* results) { + // get resulst + results->clear(); + for (auto& output_data : _outputs_data) { + LOG_INFO << "out shape is " << output_data.shape(); + DataType dtype = output_data.dtype(); + int size = output_data.size(); + LOG_INFO << "out size is " << size; + std::stringstream ss; + switch (dtype) { + case DataType::DELTA_FLOAT32: { + float* ptr = static_cast(output_data.ptr()); + for (int i = 0; i < size; ++i) { + ss << ptr[i] << ","; + } + } break; + case DataType::DELTA_INT32: { + int* ptr = static_cast(output_data.ptr()); + for (int i = 0; i < size; ++i) { + ss << ptr[i] << ","; + } + } break; + case DataType::DELTA_CHAR: { + LOG_FATAL << "DataType::CHAR do nothing"; + } + default: + LOG_FATAL << "Error, not support dtype "; + } + results->push_back(ss.str()); + LOG_INFO << "results are " << *results; + } + + return DeltaStatus::STATUS_OK; +} + +// run +// only support one Graph now!!!! how to support multi-graphs ??? +DeltaStatus Runtime::run() { + _outputs_data.clear(); + + Graph& graph = _graphs.begin()->second; + + // out + std::unordered_map& outputs = graph.get_outputs(); + for (auto& output : outputs) { + _outputs_data.push_back(OutputData(output.second)); + } + + // run + if (DeltaStatus::STATUS_OK != + this->run(&(graph), _inputs_data, &_outputs_data)) { + LOG_FATAL << "run failed"; + return DeltaStatus::STATUS_ERROR; + } +} + +#if 0 +DeltaStatus Runtime::run(const string name, float* buffer, const int size, + vector* results) { + LOG_WARN << "graph name is " << name; + auto search = _graphs.find(name); + + if (search != _graphs.end()) { + Graph& graph = search->second; + std::vector inputs_data; + std::vector outputs_data; + // feed data + std::unordered_map& inputs = graph.get_inputs(); + + int in_size = 0; + for (auto& input : inputs) { + in_size += input.second.size(); + } + + DELTA_CHECK_GE(size, in_size); + + // in + float* src = buffer; + for (auto& input : inputs) { + InputData in(input.second); + in.copy_from(src); + inputs_data.push_back(in); + src += input.second.size() * sizeof(float); + } + + // out + std::unordered_map& outputs = graph.get_outputs(); + for (auto& output : outputs) { + outputs_data.push_back(OutputData(output.second)); + } + + // run + if (DeltaStatus::STATUS_OK != + this->run(&(graph), inputs_data, &outputs_data)) { + LOG_FATAL << "run failed"; + return DeltaStatus::STATUS_ERROR; + } + + // get resulst + results->clear(); + for (auto& output_data : outputs_data) { + LOG_INFO << "out shape is " << output_data.shape(); + DataType dtype = output_data.dtype(); + int size = output_data.size(); + LOG_INFO << "out size is " << size; + std::stringstream ss; + switch (dtype) { + case DataType::DELTA_FLOAT32: { + float* ptr = static_cast(output_data.ptr()); + for (int i = 0; i < size; ++i) { + ss << ptr[i] << ","; + } + } break; + case DataType::DELTA_INT32: { + int* ptr = static_cast(output_data.ptr()); + for (int i = 0; i < size; ++i) { + ss << ptr[i] << ","; + } + } break; + default: + LOG_FATAL << "Error, not support dtype "; + } + results->push_back(ss.str()); + LOG_INFO << "results are " << *results; + } + } else { + LOG_FATAL << "Error, run failed, graph " << name << " not exist."; + } + return DeltaStatus::STATUS_OK; +} +#endif + +// load model if not loaded, then warmup model; +DeltaStatus Runtime::warmup() { + if (DeltaStatus::STATUS_OK != load_custom_ops()) { + LOG_FATAL << "load custom ops failed"; + return DeltaStatus::STATUS_ERROR; + } + + if (DeltaStatus::STATUS_OK != load_model()) { + LOG_FATAL << "load model failed"; + return DeltaStatus::STATUS_ERROR; + } + + for (auto& graph : _graphs) { + std::vector inputs_data; + std::vector outputs_data; + + std::unordered_map& inputs = graph.second.get_inputs(); + for (auto& input : inputs) { + InputData in(input.second); + in.feed_random_data(); + inputs_data.push_back(in); + } + + std::unordered_map& outputs = + graph.second.get_outputs(); + for (auto& output : outputs) { + outputs_data.push_back(OutputData(output.second)); + } + + if (DeltaStatus::STATUS_OK != + this->run(&(graph.second), inputs_data, &outputs_data)) { + LOG_FATAL << "run failed"; + return DeltaStatus::STATUS_ERROR; + } + } + + return DeltaStatus::STATUS_OK; +} + +} // namespace core + +} // namespace delta diff --git a/deltann/core/runtime.h b/deltann/core/runtime.h index f5930ead..ceaf6cef 100644 --- a/deltann/core/runtime.h +++ b/deltann/core/runtime.h @@ -53,105 +53,6 @@ struct In { std::size_t _size; }; -namespace { - -// load ops lib -DeltaStatus load_custom_ops_lib(const std::string& lib) { - if (!lib.size()) { - LOG_WARN << "no custom ops to load"; - return DeltaStatus::STATUS_OK; - } - -#ifdef USE_TF - LOG_INFO << "custom op lib is: " << lib; - TF_Status* status = TF_NewStatus(); - TF_Library* custom_op_lib = TF_LoadLibrary(lib.c_str(), status); - - TF_Code code = TF_GetCode(status); - string status_msg(TF_Message(status)); - TF_DeleteStatus(status); - if (TF_OK != code) { - LOG_FATAL << status_msg; - } - LOG_INFO << "custom op lib load succesfully" << lib; - - TF_Buffer op_list_buf = TF_GetOpList(custom_op_lib); - tensorflow::OpList op_list; - DELTA_CHECK(op_list.ParseFromArray(op_list_buf.data, op_list_buf.length)); - for (int i = 0; i != op_list.op_size(); ++i) { - LOG_INFO << "cutsom op: " << op_list.op(i).name(); - } - - TF_DeleteLibraryHandle(custom_op_lib); -#endif // USE_TF - return DeltaStatus::STATUS_OK; -} - -// load all models -DeltaStatus load_models(const RuntimeConfig& rt_cfg, - std::unordered_map* graphs) { - const int num_threads = rt_cfg.num_threads(); - - if (!graphs->size()) { - LOG_WARN << "graphs size is empty, " << graphs->size(); - return DeltaStatus::STATUS_ERROR; - } - - for (auto& iter : *graphs) { - LOG_INFO << "Load model for graph " << iter.first; - - Graph& graph = iter.second; - std::string engine_type = graph.engine_type(); - ModelMeta model_meta = graph.model_meta(); - auto search = _global_engines.find(engine_type); - - BaseModel* model = nullptr; - - if (search != _global_engines.end()) { -#ifdef USE_TF - if (EngineType::DELTA_EIGINE_TF == _global_engines[engine_type]) { - LOG_INFO << "User engine tf"; - model = new TFModel(model_meta, num_threads); -#endif - - } else if (EngineType::DELTA_EIGINE_TFTRT == - _global_engines[engine_type]) { - LOG_INFO << "User engine tftrt"; - -#ifdef USE_TFLITE - } else if (EngineType::DELTA_EIGINE_TFLITE == - _global_engines[engine_type]) { - LOG_INFO << "User engine tf lite"; - model = new TFLiteModel(model_meta, num_threads); -#endif - -#ifdef USE_TF_SERVING - } else if (EngineType::DELTA_EIGINE_TFSERVING == - _global_engines[engine_type]) { - LOG_INFO << "User engine TFSERVING"; - model = new TFServingModel(model_meta, num_threads); -#endif - - } else if (EngineType::DELTA_EIGINE_SERVING == - _global_engines[engine_type]) { - LOG_FATAL << "Not support now SERVING"; - } else { - LOG_FATAL << "Not support engine type: " << engine_type; - } - } else { - LOG_FATAL << "Not support engine"; - return DeltaStatus::STATUS_ERROR; - } - - graph.set_model(model); - model = nullptr; - } // for - - return DeltaStatus::STATUS_OK; -} - -} // namespace - // run graph class Runtime { public: @@ -171,75 +72,16 @@ class Runtime { return names; } - DeltaStatus load_custom_ops() { - return load_custom_ops_lib(_rt_cfg.custom_ops_path()); - } + // load custom ops + DeltaStatus load_custom_ops(); // load model - DeltaStatus load_model() { - if (DeltaStatus::STATUS_OK != load_models(_rt_cfg, &_graphs)) { - LOG_FATAL << "load model failed"; - return DeltaStatus::STATUS_ERROR; - } - return DeltaStatus::STATUS_OK; - } + DeltaStatus load_model(); - DeltaStatus set_inputs(const std::vector& inputs) { - _inputs_data.clear(); - for (auto& in : inputs) { - LOG_INFO << "Graph name " << in._graph_name; - auto search = _graphs.find(in._graph_name); - if (search != _graphs.end()) { - Graph& graph = search->second; - // std::unordered_map& inputs = graph.get_inputs(); - LOG_INFO << "input name " << in._input_name; - Input& input = graph.get_inputs().at(in._input_name); - DELTA_CHECK_GE(in._size, input.size()); - InputData input_data(input); - input_data.copy_from(in._ptr, in._size); - _inputs_data.push_back(input_data); - } else { - LOG_FATAL << "Error, Graph " << in._graph_name << " not exist!"; - } - } - return DeltaStatus::STATUS_OK; - } + DeltaStatus set_inputs(const std::vector& inputs); // get all out result - DeltaStatus get_outputs(std::vector* results) { - // get resulst - results->clear(); - for (auto& output_data : _outputs_data) { - LOG_INFO << "out shape is " << output_data.shape(); - DataType dtype = output_data.dtype(); - int size = output_data.size(); - LOG_INFO << "out size is " << size; - std::stringstream ss; - switch (dtype) { - case DataType::DELTA_FLOAT32: { - float* ptr = static_cast(output_data.ptr()); - for (int i = 0; i < size; ++i) { - ss << ptr[i] << ","; - } - } break; - case DataType::DELTA_INT32: { - int* ptr = static_cast(output_data.ptr()); - for (int i = 0; i < size; ++i) { - ss << ptr[i] << ","; - } - } break; - case DataType::DELTA_CHAR: { - LOG_FATAL << "DataType::CHAR do nothing"; - } - default: - LOG_FATAL << "Error, not support dtype "; - } - results->push_back(ss.str()); - LOG_INFO << "results are " << *results; - } - - return DeltaStatus::STATUS_OK; - } + DeltaStatus get_outputs(std::vector* results); // get one out result DeltaStatus get_output(const std::string& out_name, string* output) { @@ -247,101 +89,7 @@ class Runtime { } // run - // only support one Graph now!!!! how to support multi-graphs ??? - DeltaStatus run() { - _outputs_data.clear(); - - Graph& graph = _graphs.begin()->second; - - // out - std::unordered_map& outputs = graph.get_outputs(); - for (auto& output : outputs) { - _outputs_data.push_back(OutputData(output.second)); - } - - // run - if (DeltaStatus::STATUS_OK != - this->run(&(graph), _inputs_data, &_outputs_data)) { - LOG_FATAL << "run failed"; - return DeltaStatus::STATUS_ERROR; - } - } - -#if 0 - DeltaStatus run(const string name, float* buffer, const int size, - vector* results) { - LOG_WARN << "graph name is " << name; - auto search = _graphs.find(name); - - if (search != _graphs.end()) { - Graph& graph = search->second; - std::vector inputs_data; - std::vector outputs_data; - // feed data - std::unordered_map& inputs = graph.get_inputs(); - - int in_size = 0; - for (auto& input : inputs) { - in_size += input.second.size(); - } - - DELTA_CHECK_GE(size, in_size); - - // in - float* src = buffer; - for (auto& input : inputs) { - InputData in(input.second); - in.copy_from(src); - inputs_data.push_back(in); - src += input.second.size() * sizeof(float); - } - - // out - std::unordered_map& outputs = graph.get_outputs(); - for (auto& output : outputs) { - outputs_data.push_back(OutputData(output.second)); - } - - // run - if (DeltaStatus::STATUS_OK != - this->run(&(graph), inputs_data, &outputs_data)) { - LOG_FATAL << "run failed"; - return DeltaStatus::STATUS_ERROR; - } - - // get resulst - results->clear(); - for (auto& output_data : outputs_data) { - LOG_INFO << "out shape is " << output_data.shape(); - DataType dtype = output_data.dtype(); - int size = output_data.size(); - LOG_INFO << "out size is " << size; - std::stringstream ss; - switch (dtype) { - case DataType::DELTA_FLOAT32: { - float* ptr = static_cast(output_data.ptr()); - for (int i = 0; i < size; ++i) { - ss << ptr[i] << ","; - } - } break; - case DataType::DELTA_INT32: { - int* ptr = static_cast(output_data.ptr()); - for (int i = 0; i < size; ++i) { - ss << ptr[i] << ","; - } - } break; - default: - LOG_FATAL << "Error, not support dtype "; - } - results->push_back(ss.str()); - LOG_INFO << "results are " << *results; - } - } else { - LOG_FATAL << "Error, run failed, graph " << name << " not exist."; - } - return DeltaStatus::STATUS_OK; - } -#endif + DeltaStatus run(); // run one graph, with inputs to get outputs; // DeltaStatus run(const Graph& graph, @@ -351,47 +99,12 @@ class Runtime { } // load model if not loaded, then warmup model; - DeltaStatus warmup() { - if (DeltaStatus::STATUS_OK != load_custom_ops()) { - LOG_FATAL << "load custom ops failed"; - return DeltaStatus::STATUS_ERROR; - } - - if (DeltaStatus::STATUS_OK != load_model()) { - LOG_FATAL << "load model failed"; - return DeltaStatus::STATUS_ERROR; - } - - for (auto& graph : _graphs) { - std::vector inputs_data; - std::vector outputs_data; - - std::unordered_map& inputs = - graph.second.get_inputs(); - for (auto& input : inputs) { - InputData in(input.second); - in.feed_random_data(); - inputs_data.push_back(in); - } - - std::unordered_map& outputs = - graph.second.get_outputs(); - for (auto& output : outputs) { - outputs_data.push_back(OutputData(output.second)); - } - if (DeltaStatus::STATUS_OK != - this->run(&(graph.second), inputs_data, &outputs_data)) { - LOG_FATAL << "run failed"; - return DeltaStatus::STATUS_ERROR; - } - } - - return DeltaStatus::STATUS_OK; - } + DeltaStatus warmup(); // reload model with new config, then warmup model; DeltaStatus warmup(bool reset) {} + // number of output nodes int get_output_num() { return _outputs_data.size(); } #define CHECK_OUTPUT_INDEX(index) \ @@ -399,11 +112,13 @@ class Runtime { DELTA_CHECK_LT(index, num); \ DELTA_CHECK_GE(index, 0); + // the rank of the `output_index` int get_output_ndim(int output_index) { CHECK_OUTPUT_INDEX(output_index); return _outputs_data[output_index].ndim(); } + // the `dim_index` number of the `output_index` int get_output_dim(int output_index, int dim_index) { CHECK_OUTPUT_INDEX(output_index); diff --git a/deltann/core/utils/misc.cc b/deltann/core/utils/misc.cc index 1da16a3b..f9ffd8d8 100644 --- a/deltann/core/utils/misc.cc +++ b/deltann/core/utils/misc.cc @@ -77,5 +77,4 @@ char gen_random_char() { return c; } - } // namespace delta diff --git a/deltann/examples/text_cls/test.cc b/deltann/examples/text_cls/test.cc index a027421e..7f8bb1f5 100644 --- a/deltann/examples/text_cls/test.cc +++ b/deltann/examples/text_cls/test.cc @@ -43,7 +43,7 @@ int main(int argc, char** argv) { fprintf(stderr, "The output num is %d\n", out_num); for (int i = 0; i < out_num; ++i) { int byte_size = DeltaGetOutputByteSize(inf, i); - fprintf(stderr, "The %d output byte size is %d\n", i, byte_size); + fprintf(stderr, "The %d output size is %d (bytes).\n", i, byte_size); float* data = reinterpret_cast(malloc(byte_size)); DeltaCopyToBuffer(inf, i, reinterpret_cast(data), byte_size); diff --git a/deltann/test.sh b/deltann/test.sh index 248ec316..763812c9 100755 --- a/deltann/test.sh +++ b/deltann/test.sh @@ -1,4 +1,6 @@ #!/bin/bash +make examples + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PWD/../dpl/lib/tensorflow:$PWD/../dpl/lib/deltann/ ./examples/text_cls/test.bin examples/text_cls/model.yaml