Skip to content

Commit 2380bd1

Browse files
committed
Code cleaning
1 parent ce2b352 commit 2380bd1

File tree

12 files changed

+244
-97
lines changed

12 files changed

+244
-97
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# Named Entity Recognition
1+
# Named Entity Recognition Plugin
22

3-
This Dataiku DSS plugin provides a recipe, macro and webapp to recognize Named Entities (people, dates, places, etc.) in text data.
3+
This Dataiku DSS plugin provides a recipe, macro and webapp to recognize Named Entities (people, dates, places, etc.) in text data.
44

55
Documentation: https://www.dataiku.com/product/plugins/named-entity-recognition/
66

custom-recipes/named-entity-recognition-extract/recipe.json

+50-22
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010
{
1111
"name": "input_dataset",
1212
"label": "Input dataset",
13-
"description": "The dataset that contains your texts.",
13+
"description": "Dataset containing the text data to analyze",
1414
"arity": "UNARY",
1515
"required": true,
1616
"acceptsDataset": true
1717
},
1818
{
1919
"name": "model_folder",
20-
"label": "Model folder",
21-
"description": "A managed folder for saving the NER model (only required if using Flair)",
20+
"label": "Flair model (optional)",
21+
"description": "Folder containing Flair model weights",
2222
"arity": "UNARY",
2323
"required": false,
2424
"acceptsManagedFolder": true,
@@ -30,44 +30,72 @@
3030
{
3131
"name": "output_dataset",
3232
"label": "Output dataset",
33-
"description": "A dataset with the input texts and their corresponding entities",
33+
"description": "Dataset with the input text and the corresponding entities",
3434
"arity": "UNARY",
3535
"required": true,
3636
"acceptsDataset": true
3737
}
3838
],
3939
"params": [
40+
{
41+
"name": "separator_input",
42+
"label": "Input parameters",
43+
"type": "SEPARATOR"
44+
},
4045
{
4146
"name": "text_column_name",
4247
"label": "Text column",
4348
"type": "COLUMN",
4449
"mandatory": true,
45-
"columnRole": "input_dataset"
50+
"columnRole": "input_dataset",
51+
"allowedColumnTypes": [
52+
"string"
53+
]
4654
},
4755
{
4856
"visibilityCondition": "model.ner_model=='spacy'",
4957
"name": "text_language_spacy",
50-
"label": "Text language",
58+
"label": "Language",
59+
"description": "List of supported languages",
5160
"type": "SELECT",
52-
"description": "Select the language of your texts.",
5361
"selectChoices": [
62+
{
63+
"value": "zh",
64+
"label": "Chinese"
65+
},
5466
{
5567
"value": "en",
5668
"label": "English"
5769
},
5870
{
5971
"value": "fr",
6072
"label": "French"
73+
},
74+
{
75+
"value": "de",
76+
"label": "German"
77+
},
78+
{
79+
"value": "nb",
80+
"label": "Norwegian Bokmål"
81+
},
82+
{
83+
"value": "pl",
84+
"label": "Polish"
85+
},
86+
{
87+
"value": "es",
88+
"label": "Spanish"
6189
}
6290
],
6391
"defaultValue": "en"
6492
},
6593
{
6694
"visibilityCondition": "model.ner_model=='flair'",
6795
"name": "text_language_flair",
68-
"label": "Text language",
96+
"label": "Language",
6997
"type": "SELECT",
70-
"description": "Select the language of your texts.",
98+
"description": "Only supported language",
7199
"selectChoices": [
72100
{
73101
"value": "en",
@@ -77,41 +105,41 @@
77105
"defaultValue": "en"
78106
},
79107
{
80-
"label": "Advanced",
108+
"name": "separator_configuration",
109+
"label": "Configuration",
81110
"type": "SEPARATOR"
82111
},
83112
{
84113
"name": "advanced_settings",
85-
"label": "Show advanced Settings",
114+
"label": "Expert mode",
86115
"type": "BOOLEAN",
87-
"description": "",
88-
"defaultValue": false
89-
},
90-
{
91-
"visibilityCondition": "model.advanced_settings",
92-
"name": "output_single_json",
93-
"label": "Output single column",
94-
"type": "BOOLEAN",
95-
"description": "Output a single JSON column rather than one column per entity type",
96116
"defaultValue": false
97117
},
98118
{
99119
"visibilityCondition": "model.advanced_settings",
100120
"name": "ner_model",
101121
"label": "Model",
102122
"type": "SELECT",
103-
"description": "SpaCy (multi-language, faster, less accurate) of Flair (Enlgish only, slower, more accurate).",
123+
"description": "spaCy (multi-lingual, faster) or Flair (English only, slower)",
104124
"selectChoices": [
105125
{
106126
"value": "spacy",
107-
"label": "SpaCy"
127+
"label": "spaCy"
108128
},
109129
{
110130
"value": "flair",
111131
"label": "Flair"
112132
}
113133
],
114134
"defaultValue": "spacy"
135+
},
136+
{
137+
"visibilityCondition": "model.advanced_settings",
138+
"name": "output_single_json",
139+
"label": "JSON output",
140+
"type": "BOOLEAN",
141+
"description": "Output a single JSON column rather than one column per entity type",
142+
"defaultValue": false
115143
}
116144
]
117145
}
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
# -*- coding: utf-8 -*-
2-
import logging
3-
4-
from tqdm import tqdm
5-
62
import dataiku
73
from dataiku.customrecipe import get_input_names_for_role, get_output_names_for_role, get_recipe_config
84

5+
from dku_io_utils import process_dataset_chunks
6+
97
#############################
108
# Input & Output datasets
119
#############################
@@ -16,8 +14,6 @@
1614
output_dataset_name = get_output_names_for_role("output_dataset")[0]
1715
output_dataset = dataiku.Dataset(output_dataset_name)
1816

19-
input_df = input_dataset.get_dataframe()
20-
2117
#############################
2218
# Recipe Parameters
2319
#############################
@@ -26,7 +22,7 @@
2622

2723
text_column_name = recipe_config.get("text_column_name", None)
2824
if not text_column_name:
29-
raise ValueError("You did not choose a text column.")
25+
raise ValueError("Please choose a text column")
3026

3127
advanced_settings = recipe_config.get("advanced_settings", False)
3228
if advanced_settings:
@@ -38,8 +34,10 @@
3834

3935
if ner_model == "spacy":
4036
from ner_utils_spacy import extract_entities
37+
38+
language = recipe_config.get("text_language_spacy", "en")
4139
else:
42-
from ner_utils_flair import extract_entities
40+
from ner_utils_flair import extract_entities, CustomSequenceTagger
4341

4442
try:
4543
model_folder = get_input_names_for_role("model_folder")[0]
@@ -48,28 +46,24 @@
4846
"To use Flair, download the model using the macro and add the resulting folder as input to the recipe."
4947
)
5048
folder_path = dataiku.Folder(model_folder).get_path()
49+
tagger = CustomSequenceTagger.load("ner-ontonotes-fast", folder_path)
5150

5251
#############################
5352
# Main Loop
5453
#############################
5554

56-
CHUNK_SIZE = 100
57-
n_lines = 0
58-
logging.info("Started chunk-processing of input Dataset.")
59-
for chunk_idx, df in enumerate(tqdm(input_dataset.iter_dataframes(chunksize=CHUNK_SIZE))):
60-
# Process chunk
61-
out_df = extract_entities(df[text_column_name].fillna(" "), format=output_single_json)
55+
56+
def compute_entities_df(df):
57+
if ner_model == "spacy":
58+
out_df = extract_entities(df[text_column_name].fillna(" "), format=output_single_json, language=language)
59+
else:
60+
out_df = extract_entities(df[text_column_name].fillna(" "), format=output_single_json, tagger=tagger)
6261
df = df.reset_index(drop=True)
6362
out_df = out_df.reset_index(drop=True)
6463
out_df = df.merge(out_df, left_index=True, right_index=True)
64+
return out_df
6565

66-
# Append dataframe to output Dataset
67-
if chunk_idx == 0:
68-
output_dataset.write_schema_from_dataframe(out_df)
69-
writer = output_dataset.get_writer()
70-
writer.write_dataframe(out_df)
71-
else:
72-
writer.write_dataframe(out_df)
73-
n_lines += len(df)
74-
logging.info("Finished processing {} lines".format(n_lines))
75-
writer.close()
66+
67+
process_dataset_chunks(
68+
input_dataset=input_dataset, output_dataset=output_dataset, func=compute_entities_df, chunksize=100
69+
)

plugin.json

+8-5
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
{
22
"id": "named-entity-recognition",
3-
"version": "1.2.1",
3+
"version": "1.3.0",
44
"meta": {
55
"label": "Named Entity Recognition",
66
"category": "Natural Language Processing",
7-
"description": "Identify “real-world objects” (people names, dates, places, etc.) in a text",
8-
"author": "Dataiku (Alex COMBESSIE and Hicham EL BOUKKOURI)",
7+
"description": "Recognize Named Entities in text data using pre-trained models",
8+
"author": "Dataiku (Alex COMBESSIE, Du PHAN, Hicham EL BOUKKOURI)",
99
"icon": "icon-tags",
1010
"licenseInfo": "Apache Software License",
1111
"url": "https://www.dataiku.com/product/plugins/named-entity-recognition/",
12-
"tags": ["NLP"]
12+
"tags": [
13+
"NLP"
14+
],
15+
"supportLevel": "NOT_SUPPORTED"
1316
}
14-
}
17+
}

python-lib/dku_io_utils.py

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# -*- coding: utf-8 -*-
2+
"""Module with read/write utility functions based on the Dataiku API"""
3+
4+
import logging
5+
import math
6+
from time import time
7+
from typing import Callable, Dict
8+
9+
from tqdm import tqdm
10+
import dataiku
11+
12+
13+
def count_records(dataset: dataiku.Dataset) -> int:
14+
"""Count the number of records of a dataset using the Dataiku dataset metrics API
15+
16+
Args:
17+
dataset: dataiku.Dataset instance
18+
19+
Returns:
20+
Number of records
21+
"""
22+
metric_id = "records:COUNT_RECORDS"
23+
partitions = dataset.read_partitions
24+
client = dataiku.api_client()
25+
project = client.get_project(dataset.project_key)
26+
record_count = 0
27+
logging.info("Counting records of dataset: {}...".format(dataset.name))
28+
if partitions is None or len(partitions) == 0:
29+
project.get_dataset(dataset.short_name).compute_metrics(metric_ids=[metric_id])
30+
metric = dataset.get_last_metric_values()
31+
record_count = dataiku.ComputedMetrics.get_value_from_data(metric.get_global_data(metric_id=metric_id))
32+
logging.info("Dataset {} contains {:d} records and is not partitioned".format(dataset.name, record_count))
33+
else:
34+
for partition in partitions:
35+
project.get_dataset(dataset.name).compute_metrics(partition=partition, metric_ids=[metric_id])
36+
metric = dataset.get_last_metric_values()
37+
record_count += dataiku.ComputedMetrics.get_value_from_data(
38+
metric.get_partition_data(partition=partition, metric_id=metric_id)
39+
)
40+
logging.info(
41+
"Dataset {} contains {:d} records in partition(s) {}".format(dataset.name, record_count, partitions)
42+
)
43+
return record_count
44+
45+
46+
def process_dataset_chunks(
47+
input_dataset: dataiku.Dataset, output_dataset: dataiku.Dataset, func: Callable, chunksize: float = 1000, **kwargs
48+
) -> None:
49+
"""Read a dataset by chunks, process each dataframe chunk with a function and write back to another dataset.
50+
51+
Passes keyword arguments to the function, adds a tqdm progress bar and generic logging.
52+
Directly writes chunks to the output_dataset, so that only one chunk needs to be processed in-memory at a time.
53+
54+
Args:
55+
input_dataset: Input dataiku.Dataset instance
56+
output_dataset: Output dataiku.Dataset instance
57+
func: The function to apply to the `input_dataset` by chunks of pandas.DataFrame
58+
This function must take a pandas.DataFrame as first input argument,
59+
and output another pandas.DataFrame
60+
chunksize: Number of rows of each chunk of pandas.DataFrame fed to `func`
61+
**kwargs: Optional keyword arguments fed to `func`
62+
"""
63+
input_count_records = count_records(input_dataset)
64+
if input_count_records == 0:
65+
raise ValueError("Input dataset has no records")
66+
logging.info(
67+
"Processing dataset {} of {:d} rows by chunks of {:d}...".format(
68+
input_dataset.name, input_count_records, chunksize
69+
)
70+
)
71+
start = time()
72+
with output_dataset.get_writer() as writer:
73+
df_iterator = input_dataset.iter_dataframes(chunksize=chunksize, infer_with_pandas=False)
74+
len_iterator = math.ceil(input_count_records / chunksize)
75+
for i, df in tqdm(enumerate(df_iterator), total=len_iterator):
76+
output_df = func(df=df, **kwargs)
77+
if i == 0:
78+
output_dataset.write_schema_from_dataframe(
79+
output_df, dropAndCreate=bool(not output_dataset.writePartition)
80+
)
81+
writer.write_dataframe(output_df)
82+
logging.info(
83+
"Processing dataset {} of {:d} rows: Done in {:.2f} seconds.".format(
84+
input_dataset.name, input_count_records, time() - start
85+
)
86+
)
87+
88+
89+
def set_column_description(
90+
output_dataset: dataiku.Dataset, column_description_dict: Dict, input_dataset: dataiku.Dataset = None
91+
) -> None:
92+
"""Set column descriptions of the output dataset based on a dictionary of column descriptions
93+
94+
Retains the column descriptions from the input dataset if the column name matches.
95+
96+
Args:
97+
output_dataset: Output dataiku.Dataset instance
98+
column_description_dict: Dictionary holding column descriptions (value) by column name (key)
99+
input_dataset: Optional input dataiku.Dataset instance
100+
in case you want to retain input column descriptions
101+
"""
102+
output_dataset_schema = output_dataset.read_schema()
103+
input_dataset_schema = []
104+
input_columns_names = []
105+
if input_dataset is not None:
106+
input_dataset_schema = input_dataset.read_schema()
107+
input_columns_names = [col["name"] for col in input_dataset_schema]
108+
for output_col_info in output_dataset_schema:
109+
output_col_name = output_col_info.get("name", "")
110+
output_col_info["comment"] = column_description_dict.get(output_col_name)
111+
if output_col_name in input_columns_names:
112+
matched_comment = [
113+
input_col_info.get("comment", "")
114+
for input_col_info in input_dataset_schema
115+
if input_col_info.get("name") == output_col_name
116+
]
117+
if len(matched_comment) != 0:
118+
output_col_info["comment"] = matched_comment[0]
119+
output_dataset.write_schema(output_dataset_schema)

0 commit comments

Comments
 (0)