From 2982951583a91e7b8c77ceafcc1a1208e733d885 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 11:16:18 +0200 Subject: [PATCH 01/17] Add files via upload --- custom-recipes/api-connect/recipe.json | 603 +++++++++++++------------ 1 file changed, 321 insertions(+), 282 deletions(-) diff --git a/custom-recipes/api-connect/recipe.json b/custom-recipes/api-connect/recipe.json index cecae54..bc81b7e 100644 --- a/custom-recipes/api-connect/recipe.json +++ b/custom-recipes/api-connect/recipe.json @@ -1,282 +1,321 @@ -{ - "meta": { - "label": "API Connect Recipe", - "description": "Retrieve data from any Rest API based on your existing dataset", - "icon": "icon-rocket" - }, - "kind": "PYTHON", - "selectableFromDataset": "input_A_role", - "inputRoles": [ - { - "name": "input_A_role", - "label": "Dataset containing variables", - "description": "", - "arity": "UNARY", - "required": true, - "acceptsDataset": true - } - ], - - "outputRoles": [ - { - "name": "api_output", - "label": "Main output displayed name", - "description": "", - "arity": "UNARY", - "required": true, - "acceptsDataset": true - } - ], - "params": [ - { - "type": "SEPARATOR", - "label": "Authentication" - }, - { - "name": "credential", - "label": "Credential preset", - "type": "PRESET", - "parameterSetId": "credential" - }, - { - "type": "SEPARATOR", - "label": "API call parameters" - }, - - { - "name": "endpoint_url", - "label": "URL template", - "description": "https://{{my_variable}}.example.com/user/{{username}}/details", - "type": "TEXTAREA" - }, - { - "name": "http_method", - "label": "HTTP method", - "description": "", - "type": "SELECT", - "defaultValue": "GET", - "selectChoices":[ - {"value": "GET", "label": "GET"}, - {"value": "POST", "label": "POST"}, - {"value": "PUT", "label": "PUT"}, - {"value": "PATCH", "label": "PATCH"}, - {"value": "DELETE", "label": "DELETE"} - ] - }, - { - "name": "endpoint_query_string", - "label": "Query Params", - "description": "Will add ?key1=val1&key2=val2 to the URL", - "type": "KEY_VALUE_LIST" - }, - { - "name": "endpoint_body", - "label": "Body", - "description": "", - "type": "KEY_VALUE_LIST", - "visibilityCondition": false - }, - { - "name": "endpoint_headers", - "label": "Headers", - "description": "", - "type": "KEY_VALUE_LIST", - "defaultValue": [ - { - "from": "Content-Type", - "to": "application/json" - }, - { - "from": "Accept", - "to": "application/json" - } - ] - }, - { - "name": "body_format", - "label": "Body", - "description": "", - "type": "SELECT", - "defaultValue": null, - "selectChoices":[ - {"value": null, "label": "None"}, - {"value": "FORM_DATA", "label": "Form-data"}, - {"value": "RAW", "label": "Raw"} - ] - }, - { - "name": "text_body", - "label": "Request's body", - "description": "", - "type": "TEXTAREA", - "visibilityCondition": "model.body_format=='RAW'" - }, - { - "name": "key_value_body", - "label": "Request's body", - "description": "", - "type": "KEY_VALUE_LIST", - "visibilityCondition": "(['FORM_DATA'].indexOf(model.body_format)>-1)" - }, - - { - "type": "SEPARATOR", - "label": "Template variables", - "description": "URL, headers and parameters can be templated using {{variables}}" - }, - { - "name": "parameter_columns", - "label": "Columns to use as variables", - "description": "Unless renamed, these columns can be used as {{ColumnName}} variables in templates", - "type": "COLUMNS", - "columnRole": "input_A_role" - }, - { - "name": "parameter_renamings", - "label": "Variables columns renaming", - "description": "Rename 'My variable column' to 'my_variable', to use as {{my_variable}}, and avoid name colisions (optional)", - "type": "KEY_VALUE_LIST" - }, - { - "name": "custom_key_values", - "label": "Custom keys / values", - "description": "Replace the variable {{key}} by its value in templates (optional)", - "type": "KEY_VALUE_LIST", - "visibilityCondition": false - }, - - { - "type": "SEPARATOR", - "label": "Data extraction" - }, - { - "name": "extraction_key", - "label": "Key to data array (optional)", - "description": "", - "defaultValue": null, - "type": "STRING" - }, - { - "name": "raw_output", - "label": "Raw JSON output", - "description": "", - "defaultValue": true, - "type": "BOOLEAN" - }, - { - "type": "SEPARATOR", - "label": "Pagination" - }, - { - "name": "pagination_type", - "label": "Pagination mechanism", - "description": "Refer to the API's documentation", - "type": "SELECT", - "defaultValue": "na", - "selectChoices":[ - {"value": "na", "label": "No pagination"}, - {"value": "next_page", "label": "Next page URL provided"}, - {"value": "offset", "label": "Offset pagination"}, - {"value": "page", "label": "Per page"} - ] - }, - { - "type": "SEPARATOR", - "description": "⚠'Per page' mode requires the 'Key to data array' to be defined", - "visibilityCondition": "model.pagination_type=='page' && !model.extraction_key" - }, - { - "name": "next_page_url_key", - "label": "Key to next request URL", - "description": "Dot separated key path to next request URL", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type=='next_page'" - }, - { - "name": "is_next_page_url_relative", - "label": " ", - "description": "Next page URL is relative", - "type": "BOOLEAN", - "defaultValue": false, - "visibilityCondition": "model.pagination_type=='next_page'" - }, - { - "name": "next_page_url_base", - "label": "Base URL to next page", - "description": "https://mysite.com/path/", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type=='next_page' && (model.is_next_page_url_relative==true)" - }, - { - "name": "top_key", - "label": "Key limiting elements per page", - "description": "", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type == 'offset'" - }, - { - "name": "skip_key", - "label": "Key for element offset", - "description": "", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type=='offset'" - }, - { - "name": "skip_key", - "label": "Key for page offset", - "description": "", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type=='page'" - }, - { - "type": "SEPARATOR", - "label": "Advanced" - }, - { - "name": "ignore_ssl_check", - "label": "Ignore SSL check", - "type": "BOOLEAN", - "defaultValue": false - }, - { - "name": "redirect_auth_header", - "label": "Redirect authorization header", - "type": "BOOLEAN", - "defaultValue": false - }, - { - "name": "display_metadata", - "label": "Display metadata", - "description": "Status code, request time...", - "type": "BOOLEAN", - "defaultValue": false - }, - { - "name": "timeout", - "label": "Timeout (s)", - "description": "-1 for no limit", - "type": "INT", - "defaultValue": 3600 - }, - { - "name": "requests_per_minute", - "label": "Rate limit (requests/m)", - "description": "-1 for no limit", - "type": "INT", - "defaultValue": -1 - }, - { - "name": "maximum_number_rows", - "label": "Maximum number of rows", - "description": "-1 for no limit", - "type": "INT", - "defaultValue": -1 - } - ], - "resourceKeys": [] -} +{ + "meta": { + "label": "API Connect Recipe", + "description": "Retrieve data from any Rest API based on your existing dataset", + "icon": "icon-rocket" + }, + "kind": "PYTHON", + "selectableFromDataset": "input_A_role", + "inputRoles": [ + { + "name": "input_A_role", + "label": "Dataset containing variables", + "description": "", + "arity": "UNARY", + "required": true, + "acceptsDataset": true + } + ], + "outputRoles": [ + { + "name": "api_output", + "label": "Main output displayed name", + "description": "", + "arity": "UNARY", + "required": true, + "acceptsDataset": true + } + ], + "params": [ + { + "type": "SEPARATOR", + "label": "Authentication" + }, + { + "name": "credential", + "label": "Credential preset", + "type": "PRESET", + "parameterSetId": "credential" + }, + { + "name": "no_proxy", + "label": "Value to be passed to the no-proxy parameter (optional)", + "description": "", + "defaultValue": null, + "type": "STRING" + }, + { + "type": "SEPARATOR", + "label": "API call parameters" + }, + { + "name": "endpoint_url", + "label": "URL template", + "description": "https://{{my_variable}}.example.com/user/{{username}}/details", + "type": "TEXTAREA" + }, + { + "name": "http_method", + "label": "HTTP method", + "description": "", + "type": "SELECT", + "defaultValue": "GET", + "selectChoices": [ + { + "value": "GET", + "label": "GET" + }, + { + "value": "POST", + "label": "POST" + }, + { + "value": "PUT", + "label": "PUT" + }, + { + "value": "PATCH", + "label": "PATCH" + }, + { + "value": "DELETE", + "label": "DELETE" + } + ] + }, + { + "name": "endpoint_query_string", + "label": "Query Params", + "description": "Will add ?key1=val1&key2=val2 to the URL", + "type": "KEY_VALUE_LIST" + }, + { + "name": "endpoint_body", + "label": "Body", + "description": "", + "type": "KEY_VALUE_LIST", + "visibilityCondition": false + }, + { + "name": "endpoint_headers", + "label": "Headers", + "description": "", + "type": "KEY_VALUE_LIST", + "defaultValue": [ + { + "from": "Content-Type", + "to": "application/json" + }, + { + "from": "Accept", + "to": "application/json" + } + ] + }, + { + "name": "body_format", + "label": "Body", + "description": "", + "type": "SELECT", + "defaultValue": null, + "selectChoices": [ + { + "value": null, + "label": "None" + }, + { + "value": "FORM_DATA", + "label": "Form-data" + }, + { + "value": "RAW", + "label": "Raw" + } + ] + }, + { + "name": "text_body", + "label": "Request's body", + "description": "", + "type": "TEXTAREA", + "visibilityCondition": "model.body_format=='RAW'" + }, + { + "name": "key_value_body", + "label": "Request's body", + "description": "", + "type": "KEY_VALUE_LIST", + "visibilityCondition": "(['FORM_DATA'].indexOf(model.body_format)>-1)" + }, + { + "type": "SEPARATOR", + "label": "Template variables", + "description": "URL, headers and parameters can be templated using {{variables}}" + }, + { + "name": "parameter_columns", + "label": "Columns to use as variables", + "description": "Unless renamed, these columns can be used as {{ColumnName}} variables in templates", + "type": "COLUMNS", + "columnRole": "input_A_role" + }, + { + "name": "parameter_renamings", + "label": "Variables columns renaming", + "description": "Rename 'My variable column' to 'my_variable', to use as {{my_variable}}, and avoid name colisions (optional)", + "type": "KEY_VALUE_LIST" + }, + { + "name": "custom_key_values", + "label": "Custom keys / values", + "description": "Replace the variable {{key}} by its value in templates (optional)", + "type": "KEY_VALUE_LIST", + "visibilityCondition": false + }, + { + "type": "SEPARATOR", + "label": "Data extraction" + }, + { + "name": "extraction_key", + "label": "Key to data array (optional)", + "description": "", + "defaultValue": null, + "type": "STRING" + }, + { + "name": "raw_output", + "label": "Raw JSON output", + "description": "", + "defaultValue": true, + "type": "BOOLEAN" + }, + { + "type": "SEPARATOR", + "label": "Pagination" + }, + { + "name": "pagination_type", + "label": "Pagination mechanism", + "description": "Refer to the API's documentation", + "type": "SELECT", + "defaultValue": "na", + "selectChoices": [ + { + "value": "na", + "label": "No pagination" + }, + { + "value": "next_page", + "label": "Next page URL provided" + }, + { + "value": "offset", + "label": "Offset pagination" + }, + { + "value": "page", + "label": "Per page" + } + ] + }, + { + "type": "SEPARATOR", + "description": "⚠'Per page' mode requires the 'Key to data array' to be defined", + "visibilityCondition": "model.pagination_type=='page' && !model.extraction_key" + }, + { + "name": "next_page_url_key", + "label": "Key to next request URL", + "description": "Dot separated key path to next request URL", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type=='next_page'" + }, + { + "name": "is_next_page_url_relative", + "label": " ", + "description": "Next page URL is relative", + "type": "BOOLEAN", + "defaultValue": false, + "visibilityCondition": "model.pagination_type=='next_page'" + }, + { + "name": "next_page_url_base", + "label": "Base URL to next page", + "description": "https://mysite.com/path/", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type=='next_page' && (model.is_next_page_url_relative==true)" + }, + { + "name": "top_key", + "label": "Key limiting elements per page", + "description": "", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type == 'offset'" + }, + { + "name": "skip_key", + "label": "Key for element offset", + "description": "", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type=='offset'" + }, + { + "name": "skip_key", + "label": "Key for page offset", + "description": "", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type=='page'" + }, + { + "type": "SEPARATOR", + "label": "Advanced" + }, + { + "name": "ignore_ssl_check", + "label": "Ignore SSL check", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "redirect_auth_header", + "label": "Redirect authorization header", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "display_metadata", + "label": "Display metadata", + "description": "Status code, request time...", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "timeout", + "label": "Timeout (s)", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": 3600 + }, + { + "name": "requests_per_minute", + "label": "Rate limit (requests/m)", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": -1 + }, + { + "name": "maximum_number_rows", + "label": "Maximum number of rows", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": -1 + } + ], + "resourceKeys": [] +} \ No newline at end of file From 99b301414709a97160eb9293858beba0d1676b8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 11:16:27 +0200 Subject: [PATCH 02/17] Add files via upload --- custom-recipes/api-connect/recipe.py | 145 ++++++++++++++------------- 1 file changed, 76 insertions(+), 69 deletions(-) diff --git a/custom-recipes/api-connect/recipe.py b/custom-recipes/api-connect/recipe.py index 3ab98d5..9b8ee10 100644 --- a/custom-recipes/api-connect/recipe.py +++ b/custom-recipes/api-connect/recipe.py @@ -1,69 +1,76 @@ -# -*- coding: utf-8 -*- -import dataiku -from dataiku.customrecipe import get_input_names_for_role, get_recipe_config, get_output_names_for_role -import pandas as pd -from safe_logger import SafeLogger -from dku_utils import get_dku_key_values, get_endpoint_parameters -from rest_api_recipe_session import RestApiRecipeSession - - -logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) - - -def get_partitioning_keys(id_list, dku_flow_variables): - partitioning_keys = {} - partitioning = id_list.get_config().get("partitioning") - if partitioning: - dimensions_types = partitioning.get("dimensions", []) - dimensions = [] - for dimension_type in dimensions_types: - dimensions.append(dimension_type.get("name")) - for dimension in dimensions: - dimension_src = "DKU_DST_{}".format(dimension) - if dimension_src in dku_flow_variables: - partitioning_keys[dimension] = dku_flow_variables.get(dimension_src) - return partitioning_keys - - -logger.info('API-Connect plugin recipe v1.1.3') - -input_A_names = get_input_names_for_role('input_A_role') -config = get_recipe_config() -dku_flow_variables = dataiku.get_flow_variables() - -logger.info("config={}".format(logger.filter_secrets(config))) - -credential_parameters = config.get("credential", {}) -endpoint_parameters = get_endpoint_parameters(config) -extraction_key = endpoint_parameters.get("extraction_key", "") -is_raw_output = endpoint_parameters.get("raw_output", True) -parameter_columns = [column for column in config.get("parameter_columns", []) if column] -if len(parameter_columns) == 0: - raise ValueError("There is no parameter column selected.") -parameter_renamings = get_dku_key_values(config.get("parameter_renamings", {})) -custom_key_values = get_dku_key_values(config.get("custom_key_values", {})) -display_metadata = config.get("display_metadata", False) -maximum_number_rows = config.get("maximum_number_rows", -1) -input_parameters_dataset = dataiku.Dataset(input_A_names[0]) -partitioning_keys = get_partitioning_keys(input_parameters_dataset, dku_flow_variables) -custom_key_values.update(partitioning_keys) -input_parameters_dataframe = input_parameters_dataset.get_dataframe() - -recipe_session = RestApiRecipeSession( - custom_key_values, - credential_parameters, - endpoint_parameters, - extraction_key, - parameter_columns, - parameter_renamings, - display_metadata, - maximum_number_rows=maximum_number_rows -) -results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output) - -output_names_stats = get_output_names_for_role('api_output') -odf = pd.DataFrame(results) - -if odf.size > 0: - api_output = dataiku.Dataset(output_names_stats[0]) - api_output.write_with_schema(odf) +# -*- coding: utf-8 -*- +import dataiku +from dataiku.customrecipe import ( + get_input_names_for_role, + get_recipe_config, + get_output_names_for_role, +) +import pandas as pd +from safe_logger import SafeLogger +from dku_utils import get_dku_key_values, get_endpoint_parameters +from rest_api_recipe_session import RestApiRecipeSession + +import os + +logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) + + +def get_partitioning_keys(id_list, dku_flow_variables): + partitioning_keys = {} + partitioning = id_list.get_config().get("partitioning") + if partitioning: + dimensions_types = partitioning.get("dimensions", []) + dimensions = [] + for dimension_type in dimensions_types: + dimensions.append(dimension_type.get("name")) + for dimension in dimensions: + dimension_src = "DKU_DST_{}".format(dimension) + if dimension_src in dku_flow_variables: + partitioning_keys[dimension] = dku_flow_variables.get(dimension_src) + return partitioning_keys + + +logger.info("API-Connect plugin recipe v1.1.3") + +input_A_names = get_input_names_for_role("input_A_role") +config = get_recipe_config() +dku_flow_variables = dataiku.get_flow_variables() + +logger.info("config={}".format(logger.filter_secrets(config))) + +credential_parameters = config.get("credential", {}) +noproxy_parameters = config.get("no_proxy", {}) +endpoint_parameters = get_endpoint_parameters(config) +extraction_key = endpoint_parameters.get("extraction_key", "") +is_raw_output = endpoint_parameters.get("raw_output", True) +parameter_columns = [column for column in config.get("parameter_columns", []) if column] +if len(parameter_columns) == 0: + raise ValueError("There is no parameter column selected.") +parameter_renamings = get_dku_key_values(config.get("parameter_renamings", {})) +custom_key_values = get_dku_key_values(config.get("custom_key_values", {})) +display_metadata = config.get("display_metadata", False) +maximum_number_rows = config.get("maximum_number_rows", -1) +input_parameters_dataset = dataiku.Dataset(input_A_names[0]) +partitioning_keys = get_partitioning_keys(input_parameters_dataset, dku_flow_variables) +custom_key_values.update(partitioning_keys) +input_parameters_dataframe = input_parameters_dataset.get_dataframe() + +recipe_session = RestApiRecipeSession( + custom_key_values, + credential_parameters, + noproxy_parameters, + endpoint_parameters, + extraction_key, + parameter_columns, + parameter_renamings, + display_metadata, + maximum_number_rows=maximum_number_rows, +) +results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output) + +output_names_stats = get_output_names_for_role("api_output") +odf = pd.DataFrame(results) + +if odf.size > 0: + api_output = dataiku.Dataset(output_names_stats[0]) + api_output.write_with_schema(odf) From b74bb46da3621a93227d91346529ceb9e99fa3b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 11:17:24 +0200 Subject: [PATCH 03/17] Add files via upload --- .../api-connect_dataset/connector.py | 253 +++++++++--------- 1 file changed, 133 insertions(+), 120 deletions(-) diff --git a/python-connectors/api-connect_dataset/connector.py b/python-connectors/api-connect_dataset/connector.py index 49c7730..75cbf12 100644 --- a/python-connectors/api-connect_dataset/connector.py +++ b/python-connectors/api-connect_dataset/connector.py @@ -1,120 +1,133 @@ -from dataiku.connector import Connector -from dataikuapi.utils import DataikuException -from safe_logger import SafeLogger -from rest_api_client import RestAPIClient -from dku_utils import get_dku_key_values, get_endpoint_parameters, parse_keys_for_json, get_value_from_path -from dku_constants import DKUConstants -import json - - -logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) - - -class RestAPIConnector(Connector): - - def __init__(self, config, plugin_config): - Connector.__init__(self, config, plugin_config) # pass the parameters to the base class - logger.info('API-Connect plugin connector v1.1.3') - logger.info("config={}".format(logger.filter_secrets(config))) - endpoint_parameters = get_endpoint_parameters(config) - credential = config.get("credential", {}) - custom_key_values = get_dku_key_values(config.get("custom_key_values", {})) - self.client = RestAPIClient(credential, endpoint_parameters, custom_key_values) - extraction_key = endpoint_parameters.get("extraction_key", None) - self.extraction_key = extraction_key or '' - self.extraction_path = self.extraction_key.split('.') - self.raw_output = endpoint_parameters.get("raw_output", None) - self.maximum_number_rows = config.get("maximum_number_rows", -1) - self.display_metadata = config.get("display_metadata", False) - - def get_read_schema(self): - # In this example, we don't specify a schema here, so DSS will infer the schema - # from the columns actually returned by the generate_rows method - return None - - def generate_rows(self, dataset_schema=None, dataset_partitioning=None, - partition_id=None, records_limit=-1): - is_records_limit = (records_limit > 0) or (self.maximum_number_rows > 0) - if self.maximum_number_rows > 0: - records_limit = self.maximum_number_rows - record_count = 0 - while self.client.has_more_data(): - json_response = self.client.paginated_api_call() - metadata = self.client.get_metadata() if self.display_metadata else None - if self.extraction_key: - data = get_value_from_path(json_response, self.extraction_path) - else: - data = json_response - if isinstance(data, list): - record_count += len(data) - for row in data: - yield self.format_output(row, metadata) - else: - record_count += 1 - yield self.format_output(data, metadata) - if is_records_limit and record_count >= records_limit: - break - - def format_output(self, item, metadata=None): - output = metadata or {} - if self.raw_output: - output.update({ - DKUConstants.API_RESPONSE_KEY: json.dumps(item) - }) - else: - output.update(parse_keys_for_json(item)) - return output - - def get_writer(self, dataset_schema=None, dataset_partitioning=None, - partition_id=None): - """ - Returns a writer object to write in the dataset (or in a partition). - - The dataset_schema given here will match the the rows given to the writer below. - - Note: the writer is responsible for clearing the partition, if relevant. - """ - raise DataikuException("Unimplemented") - - def get_partitioning(self): - """ - Return the partitioning schema that the connector defines. - """ - raise DataikuException("Unimplemented") - - def list_partitions(self, partitioning): - """Return the list of partitions for the partitioning scheme - passed as parameter""" - return [] - - def partition_exists(self, partitioning, partition_id): - """Return whether the partition passed as parameter exists - - Implementation is only required if the corresponding flag is set to True - in the connector definition - """ - raise DataikuException("unimplemented") - - def get_records_count(self, partitioning=None, partition_id=None): - """ - Returns the count of records for the dataset (or a partition). - - Implementation is only required if the corresponding flag is set to True - in the connector definition - """ - raise DataikuException("unimplemented") - - -class CustomDatasetWriter(object): - def __init__(self): - return - - def write_row(self, row): - """ - Row is a tuple with N + 1 elements matching the schema passed to get_writer. - The last element is a dict of columns not found in the schema - """ - raise DataikuException("unimplemented") - - def close(self): - return +from dataiku.connector import Connector +from dataikuapi.utils import DataikuException +from safe_logger import SafeLogger +from rest_api_client import RestAPIClient +from dku_utils import ( + get_dku_key_values, + get_endpoint_parameters, + parse_keys_for_json, + get_value_from_path, +) +from dku_constants import DKUConstants +import json + + +logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) + + +class RestAPIConnector(Connector): + def __init__(self, config, plugin_config): + Connector.__init__( + self, config, plugin_config + ) # pass the parameters to the base class + logger.info("API-Connect plugin connector v1.1.3") + logger.info("config={}".format(logger.filter_secrets(config))) + endpoint_parameters = get_endpoint_parameters(config) + credential = config.get("credential", {}) + no_proxy = config.get("no_proxy", {}) + custom_key_values = get_dku_key_values(config.get("custom_key_values", {})) + self.client = RestAPIClient( + credential, no_proxy, endpoint_parameters, custom_key_values + ) + extraction_key = endpoint_parameters.get("extraction_key", None) + self.extraction_key = extraction_key or "" + self.extraction_path = self.extraction_key.split(".") + self.raw_output = endpoint_parameters.get("raw_output", None) + self.maximum_number_rows = config.get("maximum_number_rows", -1) + self.display_metadata = config.get("display_metadata", False) + + def get_read_schema(self): + # In this example, we don't specify a schema here, so DSS will infer the schema + # from the columns actually returned by the generate_rows method + return None + + def generate_rows( + self, + dataset_schema=None, + dataset_partitioning=None, + partition_id=None, + records_limit=-1, + ): + is_records_limit = (records_limit > 0) or (self.maximum_number_rows > 0) + if self.maximum_number_rows > 0: + records_limit = self.maximum_number_rows + record_count = 0 + while self.client.has_more_data(): + json_response = self.client.paginated_api_call() + metadata = self.client.get_metadata() if self.display_metadata else None + if self.extraction_key: + data = get_value_from_path(json_response, self.extraction_path) + else: + data = json_response + if isinstance(data, list): + record_count += len(data) + for row in data: + yield self.format_output(row, metadata) + else: + record_count += 1 + yield self.format_output(data, metadata) + if is_records_limit and record_count >= records_limit: + break + + def format_output(self, item, metadata=None): + output = metadata or {} + if self.raw_output: + output.update({DKUConstants.API_RESPONSE_KEY: json.dumps(item)}) + else: + output.update(parse_keys_for_json(item)) + return output + + def get_writer( + self, dataset_schema=None, dataset_partitioning=None, partition_id=None + ): + """ + Returns a writer object to write in the dataset (or in a partition). + + The dataset_schema given here will match the the rows given to the writer below. + + Note: the writer is responsible for clearing the partition, if relevant. + """ + raise DataikuException("Unimplemented") + + def get_partitioning(self): + """ + Return the partitioning schema that the connector defines. + """ + raise DataikuException("Unimplemented") + + def list_partitions(self, partitioning): + """Return the list of partitions for the partitioning scheme + passed as parameter""" + return [] + + def partition_exists(self, partitioning, partition_id): + """Return whether the partition passed as parameter exists + + Implementation is only required if the corresponding flag is set to True + in the connector definition + """ + raise DataikuException("unimplemented") + + def get_records_count(self, partitioning=None, partition_id=None): + """ + Returns the count of records for the dataset (or a partition). + + Implementation is only required if the corresponding flag is set to True + in the connector definition + """ + raise DataikuException("unimplemented") + + +class CustomDatasetWriter(object): + def __init__(self): + return + + def write_row(self, row): + """ + Row is a tuple with N + 1 elements matching the schema passed to get_writer. + The last element is a dict of columns not found in the schema + """ + raise DataikuException("unimplemented") + + def close(self): + return From eba06a792270f623806e2f16607443e9c562ab52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 11:17:40 +0200 Subject: [PATCH 04/17] Add files via upload --- .../api-connect_dataset/connector.json | 522 ++++++++++-------- 1 file changed, 282 insertions(+), 240 deletions(-) diff --git a/python-connectors/api-connect_dataset/connector.json b/python-connectors/api-connect_dataset/connector.json index f9c8f03..53aad3c 100644 --- a/python-connectors/api-connect_dataset/connector.json +++ b/python-connectors/api-connect_dataset/connector.json @@ -1,240 +1,282 @@ -{ - "meta" : { - "label": "Custom Rest API dataset", - "description": "Retrieve data from external API as dataset", - "icon": "icon-rocket" - }, - "readable": true, - "writable": false, - "params": [ - { - "type": "SEPARATOR", - "label": "Authentication" - }, - { - "name": "credential", - "label": "Credential preset", - "type": "PRESET", - "parameterSetId": "credential" - }, - { - "type": "SEPARATOR", - "label": "API call parameters" - }, - { - "name": "custom_key_values", - "label": "Custom keys / values", - "description": "Replace {{key}} by value in presets (optional)", - "type": "KEY_VALUE_LIST", - "visibilityCondition": false - }, - - { - "name": "endpoint_url", - "label": "URL template", - "description": "https://{{variable}}.exmpl.com/usr/{{username}}/details", - "type": "TEXTAREA" - }, - { - "name": "http_method", - "label": "HTTP method", - "description": "", - "type": "SELECT", - "defaultValue": "GET", - "selectChoices":[ - {"value": "GET", "label": "GET"}, - {"value": "POST", "label": "POST"}, - {"value": "PUT", "label": "PUT"}, - {"value": "PATCH", "label": "PATCH"}, - {"value": "DELETE", "label": "DELETE"} - ] - }, - { - "name": "endpoint_query_string", - "label": "Query Params", - "description": "Will add ?key1=val1&key2=val2 to the URL", - "type": "KEY_VALUE_LIST" - }, - { - "name": "endpoint_body", - "label": "Body", - "description": "", - "type": "KEY_VALUE_LIST", - "visibilityCondition": false - }, - { - "name": "endpoint_headers", - "label": "Headers", - "description": "", - "type": "KEY_VALUE_LIST", - "defaultValue": [ - { - "from": "Content-Type", - "to": "application/json" - }, - { - "from": "Accept", - "to": "application/json" - } - ] - }, - { - "name": "body_format", - "label": "Body", - "description": "", - "type": "SELECT", - "defaultValue": null, - "selectChoices":[ - {"value": null, "label": "None"}, - {"value": "FORM_DATA", "label": "Form-data"}, - {"value": "RAW", "label": "Raw"} - ] - }, - { - "name": "text_body", - "label": "Request's body", - "description": "", - "type": "TEXTAREA", - "visibilityCondition": "model.body_format=='RAW'" - }, - { - "name": "key_value_body", - "label": "Request's body", - "description": "", - "type": "KEY_VALUE_LIST", - "visibilityCondition": "(['FORM_DATA'].indexOf(model.body_format)>-1)" - }, - { - "type": "SEPARATOR", - "label": "Data extraction" - }, - { - "name": "extraction_key", - "label": "Path to data array (optional)", - "description": "Dot separated key path", - "defaultValue": null, - "type": "STRING" - }, - { - "name": "raw_output", - "label": "Raw JSON output", - "description": "", - "defaultValue": true, - "type": "BOOLEAN" - }, - { - "type": "SEPARATOR", - "label": "Pagination" - }, - { - "name": "pagination_type", - "label": "Pagination mechanism", - "description": "Refer to the API's documentation", - "type": "SELECT", - "defaultValue": "na", - "selectChoices":[ - {"value": "na", "label": "No pagination"}, - {"value": "next_page", "label": "Next page URL provided"}, - {"value": "offset", "label": "Offset pagination"}, - {"value": "page", "label": "Per page"} - ] - }, - { - "type": "SEPARATOR", - "description": "⚠ Requires a key to data array", - "visibilityCondition": "model.pagination_type=='page' && !model.extraction_key" - }, - { - "name": "next_page_url_key", - "label": "Key to next request URL", - "description": "Dot separated key path to next request URL", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type=='next_page'" - }, - { - "name": "is_next_page_url_relative", - "label": " ", - "description": "Next page URL is relative", - "type": "BOOLEAN", - "defaultValue": false, - "visibilityCondition": "model.pagination_type=='next_page'" - }, - { - "name": "next_page_url_base", - "label": "Base URL to next page", - "description": "https://mysite.com/path/", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type=='next_page' && (model.is_next_page_url_relative==true)" - }, - { - "name": "top_key", - "label": "Key limiting elements per page", - "description": "", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type == 'offset'" - }, - { - "name": "skip_key", - "label": "Key for element offset", - "description": "", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type=='offset'" - }, - { - "name": "skip_key", - "label": "Key for page offset", - "description": "", - "type": "STRING", - "defaultValue": null, - "visibilityCondition": "model.pagination_type=='page'" - }, - { - "type": "SEPARATOR", - "label": "Advanced" - }, - { - "name": "ignore_ssl_check", - "label": "Ignore SSL check", - "type": "BOOLEAN", - "defaultValue": false - }, - { - "name": "redirect_auth_header", - "label": "Redirect authorization header", - "type": "BOOLEAN", - "defaultValue": false - }, - { - "name": "display_metadata", - "label": "Display metadata", - "description": "Status code, request time...", - "type": "BOOLEAN", - "defaultValue": false - }, - { - "name": "timeout", - "label": "Timeout (s)", - "description": "-1 for no limit", - "type": "INT", - "defaultValue": 3600 - }, - { - "name": "requests_per_minute", - "label": "Rate limit (requests/m)", - "description": "-1 for no limit", - "type": "INT", - "defaultValue": -1 - }, - { - "name": "maximum_number_rows", - "label": "Maximum number of rows", - "description": "-1 for no limit", - "type": "INT", - "defaultValue": -1 - } - ] -} +{ + "meta": { + "label": "Custom Rest API dataset", + "description": "Retrieve data from external API as dataset", + "icon": "icon-rocket" + }, + "readable": true, + "writable": false, + "params": [ + { + "type": "SEPARATOR", + "label": "Authentication" + }, + { + "name": "credential", + "label": "Credential preset", + "type": "PRESET", + "parameterSetId": "credential" + }, + { + "name": "no_proxy", + "label": "Value to be passed to the no-proxy parameter (optional)", + "description": "", + "defaultValue": null, + "type": "STRING" + }, + { + "type": "SEPARATOR", + "label": "API call parameters" + }, + { + "name": "custom_key_values", + "label": "Custom keys / values", + "description": "Replace {{key}} by value in presets (optional)", + "type": "KEY_VALUE_LIST", + "visibilityCondition": false + }, + { + "name": "endpoint_url", + "label": "URL template", + "description": "https://{{variable}}.exmpl.com/usr/{{username}}/details", + "type": "TEXTAREA" + }, + { + "name": "http_method", + "label": "HTTP method", + "description": "", + "type": "SELECT", + "defaultValue": "GET", + "selectChoices": [ + { + "value": "GET", + "label": "GET" + }, + { + "value": "POST", + "label": "POST" + }, + { + "value": "PUT", + "label": "PUT" + }, + { + "value": "PATCH", + "label": "PATCH" + }, + { + "value": "DELETE", + "label": "DELETE" + } + ] + }, + { + "name": "endpoint_query_string", + "label": "Query Params", + "description": "Will add ?key1=val1&key2=val2 to the URL", + "type": "KEY_VALUE_LIST" + }, + { + "name": "endpoint_body", + "label": "Body", + "description": "", + "type": "KEY_VALUE_LIST", + "visibilityCondition": false + }, + { + "name": "endpoint_headers", + "label": "Headers", + "description": "", + "type": "KEY_VALUE_LIST", + "defaultValue": [ + { + "from": "Content-Type", + "to": "application/json" + }, + { + "from": "Accept", + "to": "application/json" + } + ] + }, + { + "name": "body_format", + "label": "Body", + "description": "", + "type": "SELECT", + "defaultValue": null, + "selectChoices": [ + { + "value": null, + "label": "None" + }, + { + "value": "FORM_DATA", + "label": "Form-data" + }, + { + "value": "RAW", + "label": "Raw" + } + ] + }, + { + "name": "text_body", + "label": "Request's body", + "description": "", + "type": "TEXTAREA", + "visibilityCondition": "model.body_format=='RAW'" + }, + { + "name": "key_value_body", + "label": "Request's body", + "description": "", + "type": "KEY_VALUE_LIST", + "visibilityCondition": "(['FORM_DATA'].indexOf(model.body_format)>-1)" + }, + { + "type": "SEPARATOR", + "label": "Data extraction" + }, + { + "name": "extraction_key", + "label": "Path to data array (optional)", + "description": "Dot separated key path", + "defaultValue": null, + "type": "STRING" + }, + { + "name": "raw_output", + "label": "Raw JSON output", + "description": "", + "defaultValue": true, + "type": "BOOLEAN" + }, + { + "type": "SEPARATOR", + "label": "Pagination" + }, + { + "name": "pagination_type", + "label": "Pagination mechanism", + "description": "Refer to the API's documentation", + "type": "SELECT", + "defaultValue": "na", + "selectChoices": [ + { + "value": "na", + "label": "No pagination" + }, + { + "value": "next_page", + "label": "Next page URL provided" + }, + { + "value": "offset", + "label": "Offset pagination" + }, + { + "value": "page", + "label": "Per page" + } + ] + }, + { + "type": "SEPARATOR", + "description": "⚠ Requires a key to data array", + "visibilityCondition": "model.pagination_type=='page' && !model.extraction_key" + }, + { + "name": "next_page_url_key", + "label": "Key to next request URL", + "description": "Dot separated key path to next request URL", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type=='next_page'" + }, + { + "name": "is_next_page_url_relative", + "label": " ", + "description": "Next page URL is relative", + "type": "BOOLEAN", + "defaultValue": false, + "visibilityCondition": "model.pagination_type=='next_page'" + }, + { + "name": "next_page_url_base", + "label": "Base URL to next page", + "description": "https://mysite.com/path/", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type=='next_page' && (model.is_next_page_url_relative==true)" + }, + { + "name": "top_key", + "label": "Key limiting elements per page", + "description": "", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type == 'offset'" + }, + { + "name": "skip_key", + "label": "Key for element offset", + "description": "", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type=='offset'" + }, + { + "name": "skip_key", + "label": "Key for page offset", + "description": "", + "type": "STRING", + "defaultValue": null, + "visibilityCondition": "model.pagination_type=='page'" + }, + { + "type": "SEPARATOR", + "label": "Advanced" + }, + { + "name": "ignore_ssl_check", + "label": "Ignore SSL check", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "redirect_auth_header", + "label": "Redirect authorization header", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "display_metadata", + "label": "Display metadata", + "description": "Status code, request time...", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "timeout", + "label": "Timeout (s)", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": 3600 + }, + { + "name": "requests_per_minute", + "label": "Rate limit (requests/m)", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": -1 + }, + { + "name": "maximum_number_rows", + "label": "Maximum number of rows", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": -1 + } + ] +} \ No newline at end of file From 7b5bde92361e446661edb79161f0351ab2bd322f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 11:18:08 +0200 Subject: [PATCH 05/17] Add files via upload --- python-lib/rest_api_client.py | 516 +++++++++++++++----------- python-lib/rest_api_recipe_session.py | 291 ++++++++------- 2 files changed, 449 insertions(+), 358 deletions(-) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index bd173d6..751bf4e 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -1,226 +1,290 @@ -import requests -import time -import copy -from pagination import Pagination -from safe_logger import SafeLogger -from loop_detector import LoopDetector -from dku_utils import get_dku_key_values, template_dict, format_template -from dku_constants import DKUConstants - - -logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) - - -class RestAPIClientError(ValueError): - pass - - -class RestAPIClient(object): - - def __init__(self, credential, endpoint, custom_key_values={}): - logger.info("Initialising RestAPIClient, credential={}, endpoint={}".format(logger.filter_secrets(credential), endpoint)) - - # presets_variables contains all variables available in templates using the {{variable_name}} notation - self.presets_variables = {} - self.presets_variables.update(endpoint) - self.presets_variables.update(credential) - self.presets_variables.update(custom_key_values) - - # requests_kwargs contains **kwargs used for requests - self.requests_kwargs = {} - - self.endpoint_query_string = endpoint.get("endpoint_query_string", []) - user_defined_keys = credential.get("user_defined_keys", []) - self.user_defined_keys = self.get_params(user_defined_keys, self.presets_variables) - self.presets_variables.update(self.user_defined_keys) - - endpoint_url = endpoint.get("endpoint_url", "") - self.endpoint_url = format_template(endpoint_url, **self.presets_variables) - self.http_method = endpoint.get("http_method", "GET") - - endpoint_headers = endpoint.get("endpoint_headers", "") - self.endpoint_headers = self.get_params(endpoint_headers, self.presets_variables) - - self.params = self.get_params(self.endpoint_query_string, self.presets_variables) - - self.extraction_key = endpoint.get("extraction_key", None) - - self.set_login(credential) - - self.requests_kwargs.update({"headers": self.endpoint_headers}) - self.ignore_ssl_check = endpoint.get("ignore_ssl_check", False) - if self.ignore_ssl_check: - self.requests_kwargs.update({"verify": False}) - else: - self.requests_kwargs.update({"verify": True}) - self.redirect_auth_header = endpoint.get("redirect_auth_header", False) - self.timeout = endpoint.get("timeout", -1) - if self.timeout > 0: - self.requests_kwargs.update({"timeout": self.timeout}) - - self.requests_kwargs.update({"params": self.params}) - self.pagination = Pagination() - next_page_url_key = endpoint.get("next_page_url_key", "") - is_next_page_url_relative = endpoint.get("is_next_page_url_relative", False) - next_page_url_base = endpoint.get("next_page_url_base", None) if is_next_page_url_relative else None - next_page_url_base = format_template(next_page_url_base, **self.presets_variables) - skip_key = endpoint.get("skip_key") - pagination_type = endpoint.get("pagination_type", "na") - if pagination_type == "next_page" and is_next_page_url_relative and not next_page_url_base: - raise RestAPIClientError("Pagination's 'Next page URL' is relative but no 'Base URL to next page' has been set") - self.pagination.configure_paging( - skip_key=skip_key, - next_page_key=next_page_url_key, - next_page_url_base=next_page_url_base, - pagination_type=pagination_type - ) - self.last_interaction = None - self.requests_per_minute = endpoint.get("requests_per_minute", -1) - if self.requests_per_minute > 0: - self.time_between_requests = 60 / self.requests_per_minute - else: - self.time_between_requests = None - self.time_last_request = None - self.loop_detector = LoopDetector() - body_format = endpoint.get("body_format", None) - if body_format == DKUConstants.RAW_BODY_FORMAT: - text_body = endpoint.get("text_body", "") - self.requests_kwargs.update({"data": text_body}) - elif body_format in [DKUConstants.FORM_DATA_BODY_FORMAT]: - key_value_body = endpoint.get("key_value_body", {}) - self.requests_kwargs.update({"json": get_dku_key_values(key_value_body)}) - self.metadata = {} - self.call_number = 0 - - def set_login(self, credential): - login_type = credential.get("login_type", "no_auth") - if login_type == "basic_login": - username = credential.get("username", "") - password = credential.get("password", "") - auth = (username, password) - self.requests_kwargs.update({"auth": auth}) - if login_type == "ntlm": - from requests_ntlm import HttpNtlmAuth - username = credential.get("username", "") - password = credential.get("password", "") - auth = HttpNtlmAuth(username, password) - self.requests_kwargs.update({"auth": auth}) - if login_type == "bearer_token": - token = credential.get("token", "") - bearer_template = credential.get("bearer_template", "Bearer {{token}}") - bearer_template = bearer_template.replace("{{token}}", token) - self.endpoint_headers.update({"Authorization": bearer_template}) - if login_type == "api_key": - self.api_key_name = credential.get("api_key_name", "") - self.api_key_value = credential.get("api_key_value", "") - self.api_key_destination = credential.get("api_key_destination", "header") - if self.api_key_destination == "header": - self.endpoint_headers.update({self.api_key_name: self.api_key_value}) - else: - self.params.update({self.api_key_name: self.api_key_value}) - - def get(self, url, can_raise_exeption=True, **kwargs): - json_response = self.request("GET", url, can_raise_exeption=can_raise_exeption, **kwargs) - return json_response - - def request(self, method, url, can_raise_exeption=True, **kwargs): - logger.info(u"Accessing endpoint {} with params={}".format(url, kwargs.get("params"))) - self.enforce_throttling() - kwargs = template_dict(kwargs, **self.presets_variables) - if self.loop_detector.is_stuck_in_loop(url, kwargs.get("params", {}), kwargs.get("headers", {})): - raise RestAPIClientError("The api-connect plugin is stuck in a loop. Please check the pagination parameters.") - request_start_time = time.time() - self.time_last_request = request_start_time - try: - response = self.request_with_redirect_retry(method, url, **kwargs) - request_finish_time = time.time() - except Exception as err: - self.pagination.is_last_batch_empty = True - error_message = "Error: {}".format(err) - if can_raise_exeption: - raise RestAPIClientError(error_message) - else: - return {"error": error_message} - self.set_metadata("request_duration", request_finish_time - request_start_time) - self.set_metadata("status_code", response.status_code) - self.set_metadata("response_headers", "{}".format(response.headers)) - if response.status_code >= 400: - error_message = "Error {}: {}".format(response.status_code, response.content) - self.pagination.is_last_batch_empty = True - if can_raise_exeption: - raise RestAPIClientError(error_message) - else: - return {"error": error_message} - if response.status_code in [204]: - self.pagination.update_next_page({}, response.links) - return self.empty_json_response() - try: - json_response = response.json() - except Exception as err: - self.pagination.update_next_page({}, None) - error_message = "Error '{}' when decoding JSON".format(str(err)[:100]) - logger.error(error_message) - logger.error("response.content={}".format(response.content)) - if can_raise_exeption: - raise RestAPIClientError("The API did not return JSON as expected. {}".format(error_message)) - return {"error": error_message} - - self.pagination.update_next_page(json_response, response.links) - return json_response - - def request_with_redirect_retry(self, method, url, **kwargs): - # In case of redirection to another domain, the authorization header is not kept - # If redirect_auth_header is true, another attempt is made with initial headers to the redirected url - response = requests.request(method, url, **kwargs) - if self.redirect_auth_header and not response.url.startswith(url): - redirection_kwargs = copy.deepcopy(kwargs) - redirection_kwargs.pop("params", None) # params are contained in the redirected url - logger.warning("Redirection ! Accessing endpoint {} with initial authorization headers".format(response.url)) - response = requests.request(method, response.url, **redirection_kwargs) - return response - - def paginated_api_call(self, can_raise_exeption=True): - if self.pagination.params_must_be_blanked: - self.requests_kwargs["params"] = {} - else: - pagination_params = self.pagination.get_params() - params = self.requests_kwargs.get("params") - params.update(pagination_params) - self.requests_kwargs.update({"params": params}) - self.call_number = self.call_number + 1 - logger.info("API call number #{}".format(self.call_number)) - return self.request(self.http_method, self.pagination.get_next_page_url(), can_raise_exeption, **self.requests_kwargs) - - def empty_json_response(self): - return {self.extraction_key: {}} if self.extraction_key else {} - - def set_metadata(self, metadata_name, value): - self.metadata["dku_{}".format(metadata_name)] = value - - @staticmethod - def get_params(endpoint_query_string, keywords): - templated_query_string = get_dku_key_values(endpoint_query_string) - ret = {} - for key in templated_query_string: - ret.update({key: format_template(templated_query_string.get(key, ""), **keywords) or ""}) - return ret - - def has_more_data(self): - if not self.pagination.is_paging_started: - self.start_paging() - return self.pagination.has_next_page() - - def start_paging(self): - logger.info("Start paging with counting key '{}'".format(self.extraction_key)) - self.pagination.reset_paging(counting_key=self.extraction_key, url=self.endpoint_url) - - def enforce_throttling(self): - if self.time_between_requests and self.time_last_request: - current_time = time.time() - time_since_last_resquests = current_time - self.time_last_request - if time_since_last_resquests < self.time_between_requests: - logger.info("Enforcing {}s throttling".format(self.time_between_requests - time_since_last_resquests)) - time.sleep(self.time_between_requests - time_since_last_resquests) - - def get_metadata(self): - return self.metadata +import requests +import time +import copy +from pagination import Pagination +from safe_logger import SafeLogger +from loop_detector import LoopDetector +from dku_utils import get_dku_key_values, template_dict, format_template +from dku_constants import DKUConstants +import os + +logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) + + +class RestAPIClientError(ValueError): + pass + + +class RestAPIClient(object): + def __init__(self, credential, noproxy, endpoint, custom_key_values={}): + logger.info( + "Initialising RestAPIClient, credential={}, endpoint={}".format( + logger.filter_secrets(credential), endpoint + ) + ) + + # presets_variables contains all variables available in templates using the {{variable_name}} notation + self.presets_variables = {} + self.presets_variables.update(endpoint) + self.presets_variables.update(credential) + self.presets_variables.update(noproxy) + self.presets_variables.update(custom_key_values) + + # Update no_proxy parameters + if noproxy != "": + if noproxy not in os.environ["no_proxy"]: + os.environ["no_proxy"] += f",{noproxy}" + + # requests_kwargs contains **kwargs used for requests + self.requests_kwargs = {} + + self.endpoint_query_string = endpoint.get("endpoint_query_string", []) + user_defined_keys = credential.get("user_defined_keys", []) + self.user_defined_keys = self.get_params( + user_defined_keys, self.presets_variables + ) + self.presets_variables.update(self.user_defined_keys) + + endpoint_url = endpoint.get("endpoint_url", "") + self.endpoint_url = format_template(endpoint_url, **self.presets_variables) + self.http_method = endpoint.get("http_method", "GET") + + endpoint_headers = endpoint.get("endpoint_headers", "") + self.endpoint_headers = self.get_params( + endpoint_headers, self.presets_variables + ) + + self.params = self.get_params( + self.endpoint_query_string, self.presets_variables + ) + + self.extraction_key = endpoint.get("extraction_key", None) + + self.set_login(credential) + + self.requests_kwargs.update({"headers": self.endpoint_headers}) + self.ignore_ssl_check = endpoint.get("ignore_ssl_check", False) + if self.ignore_ssl_check: + self.requests_kwargs.update({"verify": False}) + else: + self.requests_kwargs.update({"verify": True}) + self.redirect_auth_header = endpoint.get("redirect_auth_header", False) + self.timeout = endpoint.get("timeout", -1) + if self.timeout > 0: + self.requests_kwargs.update({"timeout": self.timeout}) + + self.requests_kwargs.update({"params": self.params}) + self.pagination = Pagination() + next_page_url_key = endpoint.get("next_page_url_key", "") + is_next_page_url_relative = endpoint.get("is_next_page_url_relative", False) + next_page_url_base = ( + endpoint.get("next_page_url_base", None) + if is_next_page_url_relative + else None + ) + next_page_url_base = format_template( + next_page_url_base, **self.presets_variables + ) + skip_key = endpoint.get("skip_key") + pagination_type = endpoint.get("pagination_type", "na") + if ( + pagination_type == "next_page" + and is_next_page_url_relative + and not next_page_url_base + ): + raise RestAPIClientError( + "Pagination's 'Next page URL' is relative but no 'Base URL to next page' has been set" + ) + self.pagination.configure_paging( + skip_key=skip_key, + next_page_key=next_page_url_key, + next_page_url_base=next_page_url_base, + pagination_type=pagination_type, + ) + self.last_interaction = None + self.requests_per_minute = endpoint.get("requests_per_minute", -1) + if self.requests_per_minute > 0: + self.time_between_requests = 60 / self.requests_per_minute + else: + self.time_between_requests = None + self.time_last_request = None + self.loop_detector = LoopDetector() + body_format = endpoint.get("body_format", None) + if body_format == DKUConstants.RAW_BODY_FORMAT: + text_body = endpoint.get("text_body", "") + self.requests_kwargs.update({"data": text_body}) + elif body_format in [DKUConstants.FORM_DATA_BODY_FORMAT]: + key_value_body = endpoint.get("key_value_body", {}) + self.requests_kwargs.update({"json": get_dku_key_values(key_value_body)}) + self.metadata = {} + self.call_number = 0 + + def set_login(self, credential): + login_type = credential.get("login_type", "no_auth") + if login_type == "basic_login": + username = credential.get("username", "") + password = credential.get("password", "") + auth = (username, password) + self.requests_kwargs.update({"auth": auth}) + if login_type == "ntlm": + from requests_ntlm import HttpNtlmAuth + + username = credential.get("username", "") + password = credential.get("password", "") + auth = HttpNtlmAuth(username, password) + self.requests_kwargs.update({"auth": auth}) + if login_type == "bearer_token": + token = credential.get("token", "") + bearer_template = credential.get("bearer_template", "Bearer {{token}}") + bearer_template = bearer_template.replace("{{token}}", token) + self.endpoint_headers.update({"Authorization": bearer_template}) + if login_type == "api_key": + self.api_key_name = credential.get("api_key_name", "") + self.api_key_value = credential.get("api_key_value", "") + self.api_key_destination = credential.get("api_key_destination", "header") + if self.api_key_destination == "header": + self.endpoint_headers.update({self.api_key_name: self.api_key_value}) + else: + self.params.update({self.api_key_name: self.api_key_value}) + + def get(self, url, can_raise_exeption=True, **kwargs): + json_response = self.request( + "GET", url, can_raise_exeption=can_raise_exeption, **kwargs + ) + return json_response + + def request(self, method, url, can_raise_exeption=True, **kwargs): + logger.info( + "Accessing endpoint {} with params={}".format(url, kwargs.get("params")) + ) + self.enforce_throttling() + kwargs = template_dict(kwargs, **self.presets_variables) + if self.loop_detector.is_stuck_in_loop( + url, kwargs.get("params", {}), kwargs.get("headers", {}) + ): + raise RestAPIClientError( + "The api-connect plugin is stuck in a loop. Please check the pagination parameters." + ) + request_start_time = time.time() + self.time_last_request = request_start_time + try: + response = self.request_with_redirect_retry(method, url, **kwargs) + request_finish_time = time.time() + except Exception as err: + self.pagination.is_last_batch_empty = True + error_message = "Error: {}".format(err) + if can_raise_exeption: + raise RestAPIClientError(error_message) + else: + return {"error": error_message} + self.set_metadata("request_duration", request_finish_time - request_start_time) + self.set_metadata("status_code", response.status_code) + self.set_metadata("response_headers", "{}".format(response.headers)) + if response.status_code >= 400: + error_message = "Error {}: {}".format( + response.status_code, response.content + ) + self.pagination.is_last_batch_empty = True + if can_raise_exeption: + raise RestAPIClientError(error_message) + else: + return {"error": error_message} + if response.status_code in [204]: + self.pagination.update_next_page({}, response.links) + return self.empty_json_response() + try: + json_response = response.json() + except Exception as err: + self.pagination.update_next_page({}, None) + error_message = "Error '{}' when decoding JSON".format(str(err)[:100]) + logger.error(error_message) + logger.error("response.content={}".format(response.content)) + if can_raise_exeption: + raise RestAPIClientError( + "The API did not return JSON as expected. {}".format(error_message) + ) + return {"error": error_message} + + self.pagination.update_next_page(json_response, response.links) + return json_response + + def request_with_redirect_retry(self, method, url, **kwargs): + # In case of redirection to another domain, the authorization header is not kept + # If redirect_auth_header is true, another attempt is made with initial headers to the redirected url + response = requests.request(method, url, **kwargs) + if self.redirect_auth_header and not response.url.startswith(url): + redirection_kwargs = copy.deepcopy(kwargs) + redirection_kwargs.pop( + "params", None + ) # params are contained in the redirected url + logger.warning( + "Redirection ! Accessing endpoint {} with initial authorization headers".format( + response.url + ) + ) + response = requests.request(method, response.url, **redirection_kwargs) + return response + + def paginated_api_call(self, can_raise_exeption=True): + if self.pagination.params_must_be_blanked: + self.requests_kwargs["params"] = {} + else: + pagination_params = self.pagination.get_params() + params = self.requests_kwargs.get("params") + params.update(pagination_params) + self.requests_kwargs.update({"params": params}) + self.call_number = self.call_number + 1 + logger.info("API call number #{}".format(self.call_number)) + return self.request( + self.http_method, + self.pagination.get_next_page_url(), + can_raise_exeption, + **self.requests_kwargs, + ) + + def empty_json_response(self): + return {self.extraction_key: {}} if self.extraction_key else {} + + def set_metadata(self, metadata_name, value): + self.metadata["dku_{}".format(metadata_name)] = value + + @staticmethod + def get_params(endpoint_query_string, keywords): + templated_query_string = get_dku_key_values(endpoint_query_string) + ret = {} + for key in templated_query_string: + ret.update( + { + key: format_template( + templated_query_string.get(key, ""), **keywords + ) + or "" + } + ) + return ret + + def has_more_data(self): + if not self.pagination.is_paging_started: + self.start_paging() + return self.pagination.has_next_page() + + def start_paging(self): + logger.info("Start paging with counting key '{}'".format(self.extraction_key)) + self.pagination.reset_paging( + counting_key=self.extraction_key, url=self.endpoint_url + ) + + def enforce_throttling(self): + if self.time_between_requests and self.time_last_request: + current_time = time.time() + time_since_last_resquests = current_time - self.time_last_request + if time_since_last_resquests < self.time_between_requests: + logger.info( + "Enforcing {}s throttling".format( + self.time_between_requests - time_since_last_resquests + ) + ) + time.sleep(self.time_between_requests - time_since_last_resquests) + + def get_metadata(self): + return self.metadata diff --git a/python-lib/rest_api_recipe_session.py b/python-lib/rest_api_recipe_session.py index c3340ec..90da196 100644 --- a/python-lib/rest_api_recipe_session.py +++ b/python-lib/rest_api_recipe_session.py @@ -1,132 +1,159 @@ -from dataikuapi.utils import DataikuException -from rest_api_client import RestAPIClient -from safe_logger import SafeLogger -from dku_utils import parse_keys_for_json, get_value_from_path -from dku_constants import DKUConstants -import copy -import json - -logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) - - -class RestApiRecipeSession: - def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings, - display_metadata=False, - maximum_number_rows=-1): - self.custom_key_values = custom_key_values - self.credential_parameters = credential_parameters - self.endpoint_parameters = endpoint_parameters - self.extraction_key = extraction_key - self.client = None - self.initial_parameter_columns = None - self.column_to_parameter_dict = self.get_column_to_parameter_dict(parameter_columns, parameter_renamings) - self.display_metadata = display_metadata - self.maximum_number_rows = maximum_number_rows - self.is_row_limit = (self.maximum_number_rows > 0) - self.can_raise = False - - @staticmethod - def get_column_to_parameter_dict(parameter_columns, parameter_renamings): - column_to_parameter_dict = {} - for parameter_column in parameter_columns: - if parameter_column in parameter_renamings: - column_to_parameter_dict[parameter_column] = parameter_renamings[parameter_column] - else: - column_to_parameter_dict[parameter_column] = parameter_column - return column_to_parameter_dict - - def process_dataframe(self, input_parameters_dataframe, is_raw_output): - results = [] - time_last_request = None - for index, input_parameters_row in input_parameters_dataframe.iterrows(): - rows_count = 0 - self.initial_parameter_columns = {} - for column_name in self.column_to_parameter_dict: - parameter_name = self.column_to_parameter_dict[column_name] - self.initial_parameter_columns.update({parameter_name: input_parameters_row.get(column_name)}) - updated_endpoint_parameters = copy.deepcopy(self.endpoint_parameters) - updated_endpoint_parameters.update(self.initial_parameter_columns) - logger.info("Processing row #{}, creating client with credential={}, updated_endpoint={}, custom_key_values={}".format( - index + 1, - logger.filter_secrets(self.credential_parameters), - updated_endpoint_parameters, - self.custom_key_values - )) - self.client = RestAPIClient(self.credential_parameters, updated_endpoint_parameters, custom_key_values=self.custom_key_values) - self.client.time_last_request = time_last_request - while self.client.has_more_data(): - page_results = self.retrieve_next_page(is_raw_output) - results.extend(page_results) - rows_count += len(page_results) - if self.is_row_limit and rows_count >= self.maximum_number_rows: - break - time_last_request = self.client.time_last_request - return results - - def retrieve_next_page(self, is_raw_output): - page_rows = [] - logger.info("retrieve_next_page: Calling next page") - json_response = self.client.paginated_api_call(can_raise_exeption=False) - metadata = self.client.get_metadata() if self.display_metadata else {} - is_api_returning_dict = True - if self.extraction_key: - data_rows = get_value_from_path(json_response, self.extraction_key.split("."), can_raise=False) - if data_rows is None: - error_message = "Extraction key '{}' was not found in the incoming data".format(self.extraction_key) - if self.can_raise: - raise DataikuException(error_message) - else: - return [{"error": error_message}] - page_rows.extend(self.format_page_rows(data_rows, is_raw_output, metadata)) - else: - # Todo: check api_response key is free and add something overwise - base_row = copy.deepcopy(metadata) - if is_raw_output: - if is_error_message(json_response): - base_row.update(parse_keys_for_json(json_response)) - else: - base_row.update({ - DKUConstants.API_RESPONSE_KEY: json.dumps(json_response) - }) - else: - if isinstance(json_response, dict): - base_row.update(parse_keys_for_json(json_response)) - elif isinstance(json_response, list): - is_api_returning_dict = False - for row in json_response: - base_row = copy.deepcopy(metadata) - base_row.update(parse_keys_for_json(row)) - base_row.update(self.initial_parameter_columns) - page_rows.append(base_row) - - if is_api_returning_dict: - base_row.update(self.initial_parameter_columns) - page_rows.append(base_row) - return page_rows - - def format_page_rows(self, data_rows, is_raw_output, metadata=None): - page_rows = [] - metadata = metadata or {} - for data_row in data_rows: - base_row = copy.deepcopy(self.initial_parameter_columns) - base_row.update(metadata) - if is_raw_output: - if is_error_message(data_row): - base_row.update(parse_keys_for_json(data_row)) - else: - base_row.update({ - DKUConstants.API_RESPONSE_KEY: json.dumps(data_row) - }) - else: - base_row.update(parse_keys_for_json(data_row)) - page_rows.append(base_row) - return page_rows - - -def is_error_message(jsons_response): - if type(jsons_response) not in [dict, list]: - return False - if "error" in jsons_response and len(jsons_response) == 1: - return True - else: - return False +from dataikuapi.utils import DataikuException +from rest_api_client import RestAPIClient +from safe_logger import SafeLogger +from dku_utils import parse_keys_for_json, get_value_from_path +from dku_constants import DKUConstants +import copy +import json + +logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) + + +class RestApiRecipeSession: + def __init__( + self, + custom_key_values, + credential_parameters, + noproxy_parameters, + endpoint_parameters, + extraction_key, + parameter_columns, + parameter_renamings, + display_metadata=False, + maximum_number_rows=-1, + ): + self.custom_key_values = custom_key_values + self.credential_parameters = credential_parameters + self.noproxy_parameters = noproxy_parameters + self.endpoint_parameters = endpoint_parameters + self.extraction_key = extraction_key + self.client = None + self.initial_parameter_columns = None + self.column_to_parameter_dict = self.get_column_to_parameter_dict( + parameter_columns, parameter_renamings + ) + self.display_metadata = display_metadata + self.maximum_number_rows = maximum_number_rows + self.is_row_limit = self.maximum_number_rows > 0 + self.can_raise = False + + @staticmethod + def get_column_to_parameter_dict(parameter_columns, parameter_renamings): + column_to_parameter_dict = {} + for parameter_column in parameter_columns: + if parameter_column in parameter_renamings: + column_to_parameter_dict[parameter_column] = parameter_renamings[ + parameter_column + ] + else: + column_to_parameter_dict[parameter_column] = parameter_column + return column_to_parameter_dict + + def process_dataframe(self, input_parameters_dataframe, is_raw_output): + results = [] + time_last_request = None + for index, input_parameters_row in input_parameters_dataframe.iterrows(): + rows_count = 0 + self.initial_parameter_columns = {} + for column_name in self.column_to_parameter_dict: + parameter_name = self.column_to_parameter_dict[column_name] + self.initial_parameter_columns.update( + {parameter_name: input_parameters_row.get(column_name)} + ) + updated_endpoint_parameters = copy.deepcopy(self.endpoint_parameters) + updated_endpoint_parameters.update(self.initial_parameter_columns) + logger.info( + "Processing row #{}, creating client with credential={}, updated_endpoint={}, custom_key_values={}".format( + index + 1, + logger.filter_secrets(self.credential_parameters), + updated_endpoint_parameters, + self.custom_key_values, + ) + ) + self.client = RestAPIClient( + self.credential_parameters, + self.noproxy_parameters, + updated_endpoint_parameters, + custom_key_values=self.custom_key_values, + ) + self.client.time_last_request = time_last_request + while self.client.has_more_data(): + page_results = self.retrieve_next_page(is_raw_output) + results.extend(page_results) + rows_count += len(page_results) + if self.is_row_limit and rows_count >= self.maximum_number_rows: + break + time_last_request = self.client.time_last_request + return results + + def retrieve_next_page(self, is_raw_output): + page_rows = [] + logger.info("retrieve_next_page: Calling next page") + json_response = self.client.paginated_api_call(can_raise_exeption=False) + metadata = self.client.get_metadata() if self.display_metadata else {} + is_api_returning_dict = True + if self.extraction_key: + data_rows = get_value_from_path( + json_response, self.extraction_key.split("."), can_raise=False + ) + if data_rows is None: + error_message = "Extraction key '{}' was not found in the incoming data".format( + self.extraction_key + ) + if self.can_raise: + raise DataikuException(error_message) + else: + return [{"error": error_message}] + page_rows.extend(self.format_page_rows(data_rows, is_raw_output, metadata)) + else: + # Todo: check api_response key is free and add something overwise + base_row = copy.deepcopy(metadata) + if is_raw_output: + if is_error_message(json_response): + base_row.update(parse_keys_for_json(json_response)) + else: + base_row.update( + {DKUConstants.API_RESPONSE_KEY: json.dumps(json_response)} + ) + else: + if isinstance(json_response, dict): + base_row.update(parse_keys_for_json(json_response)) + elif isinstance(json_response, list): + is_api_returning_dict = False + for row in json_response: + base_row = copy.deepcopy(metadata) + base_row.update(parse_keys_for_json(row)) + base_row.update(self.initial_parameter_columns) + page_rows.append(base_row) + + if is_api_returning_dict: + base_row.update(self.initial_parameter_columns) + page_rows.append(base_row) + return page_rows + + def format_page_rows(self, data_rows, is_raw_output, metadata=None): + page_rows = [] + metadata = metadata or {} + for data_row in data_rows: + base_row = copy.deepcopy(self.initial_parameter_columns) + base_row.update(metadata) + if is_raw_output: + if is_error_message(data_row): + base_row.update(parse_keys_for_json(data_row)) + else: + base_row.update( + {DKUConstants.API_RESPONSE_KEY: json.dumps(data_row)} + ) + else: + base_row.update(parse_keys_for_json(data_row)) + page_rows.append(base_row) + return page_rows + + +def is_error_message(jsons_response): + if type(jsons_response) not in [dict, list]: + return False + if "error" in jsons_response and len(jsons_response) == 1: + return True + else: + return False From 24a72bc66f4085072abbe0a111bbd8f4a2f6ef6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 11:23:56 +0200 Subject: [PATCH 06/17] Update rest_api_client.py --- python-lib/rest_api_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 751bf4e..b12b9cf 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -27,7 +27,7 @@ def __init__(self, credential, noproxy, endpoint, custom_key_values={}): self.presets_variables = {} self.presets_variables.update(endpoint) self.presets_variables.update(credential) - self.presets_variables.update(noproxy) + #self.presets_variables.update(noproxy) self.presets_variables.update(custom_key_values) # Update no_proxy parameters From 4da89c41c60a7ea0eb990adb78594357d47859f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 13:33:50 +0200 Subject: [PATCH 07/17] Update connector.json --- .../api-connect_dataset/connector.json | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/python-connectors/api-connect_dataset/connector.json b/python-connectors/api-connect_dataset/connector.json index 53aad3c..40153a3 100644 --- a/python-connectors/api-connect_dataset/connector.json +++ b/python-connectors/api-connect_dataset/connector.json @@ -17,6 +17,24 @@ "type": "PRESET", "parameterSetId": "credential" }, + { + "type": "SEPARATOR", + "label": "Proxy" + }, + { + "name": "http_proxy", + "label": "Overload the http_proxy environement variable (optional)", + "description": "", + "defaultValue": null, + "type": "STRING" + }, + { + "name": "https_proxy", + "label": "Overload the https_proxy environement variable (optional)", + "description": "", + "defaultValue": null, + "type": "STRING" + }, { "name": "no_proxy", "label": "Value to be passed to the no-proxy parameter (optional)", @@ -279,4 +297,4 @@ "defaultValue": -1 } ] -} \ No newline at end of file +} From 14dac1006c5eb5364cbc34b47f50ef3edc087142 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 13:36:40 +0200 Subject: [PATCH 08/17] Update connector.py --- python-connectors/api-connect_dataset/connector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python-connectors/api-connect_dataset/connector.py b/python-connectors/api-connect_dataset/connector.py index 75cbf12..292d988 100644 --- a/python-connectors/api-connect_dataset/connector.py +++ b/python-connectors/api-connect_dataset/connector.py @@ -24,10 +24,12 @@ def __init__(self, config, plugin_config): logger.info("config={}".format(logger.filter_secrets(config))) endpoint_parameters = get_endpoint_parameters(config) credential = config.get("credential", {}) + http_proxy = config.get("http_proxy", {}) + https_proxy = config.get("https_proxy", {}) no_proxy = config.get("no_proxy", {}) custom_key_values = get_dku_key_values(config.get("custom_key_values", {})) self.client = RestAPIClient( - credential, no_proxy, endpoint_parameters, custom_key_values + credential, http_proxy, https_proxy, no_proxy, endpoint_parameters, custom_key_values ) extraction_key = endpoint_parameters.get("extraction_key", None) self.extraction_key = extraction_key or "" From 0691d705c1f70ac5f86aeb85bda9dd329e1f90ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 13:37:50 +0200 Subject: [PATCH 09/17] Update recipe.py --- custom-recipes/api-connect/recipe.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/custom-recipes/api-connect/recipe.py b/custom-recipes/api-connect/recipe.py index 9b8ee10..8ae5e26 100644 --- a/custom-recipes/api-connect/recipe.py +++ b/custom-recipes/api-connect/recipe.py @@ -39,6 +39,8 @@ def get_partitioning_keys(id_list, dku_flow_variables): logger.info("config={}".format(logger.filter_secrets(config))) credential_parameters = config.get("credential", {}) +httpproxy_parameters = config.get("http_proxy", {}) +httpsproxy_parameters = config.get("https_proxy", {}) noproxy_parameters = config.get("no_proxy", {}) endpoint_parameters = get_endpoint_parameters(config) extraction_key = endpoint_parameters.get("extraction_key", "") @@ -58,6 +60,8 @@ def get_partitioning_keys(id_list, dku_flow_variables): recipe_session = RestApiRecipeSession( custom_key_values, credential_parameters, + httpproxy_parameters, + httpsproxy_parameters, noproxy_parameters, endpoint_parameters, extraction_key, From bc8fbca876ea6a0950ca3559f002266f670460dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 13:39:56 +0200 Subject: [PATCH 10/17] Update rest_api_recipe_session.py --- python-lib/rest_api_recipe_session.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python-lib/rest_api_recipe_session.py b/python-lib/rest_api_recipe_session.py index 90da196..e85ae8a 100644 --- a/python-lib/rest_api_recipe_session.py +++ b/python-lib/rest_api_recipe_session.py @@ -14,6 +14,8 @@ def __init__( self, custom_key_values, credential_parameters, + httpproxy_parameters, + httpsproxy_parameters, noproxy_parameters, endpoint_parameters, extraction_key, @@ -24,6 +26,8 @@ def __init__( ): self.custom_key_values = custom_key_values self.credential_parameters = credential_parameters + self.httpproxy_parameters = httpproxy_parameters + self.httpsproxy_parameters = httpsproxy_parameters self.noproxy_parameters = noproxy_parameters self.endpoint_parameters = endpoint_parameters self.extraction_key = extraction_key @@ -72,6 +76,8 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output): ) self.client = RestAPIClient( self.credential_parameters, + self.httpproxy_parameters, + self.httpsproxy_parameters, self.noproxy_parameters, updated_endpoint_parameters, custom_key_values=self.custom_key_values, From 19f9e19f3dad051738d804babba1ad7ffe21aba0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 13:42:14 +0200 Subject: [PATCH 11/17] Update rest_api_client.py --- python-lib/rest_api_client.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index b12b9cf..62c01a9 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -16,7 +16,7 @@ class RestAPIClientError(ValueError): class RestAPIClient(object): - def __init__(self, credential, noproxy, endpoint, custom_key_values={}): + def __init__(self, credential, httpproxy, httpsproxy, noproxy, endpoint, custom_key_values={}): logger.info( "Initialising RestAPIClient, credential={}, endpoint={}".format( logger.filter_secrets(credential), endpoint @@ -30,6 +30,16 @@ def __init__(self, credential, noproxy, endpoint, custom_key_values={}): #self.presets_variables.update(noproxy) self.presets_variables.update(custom_key_values) + # Update http_proxy parameters + if httpproxy != "": + if httpproxy not in os.environ["HTTP_PROXY"]: + os.environ["HTTP_PROXY"] += f",{httpproxy}" + + # Update https_proxy parameters + if httpsproxy != "": + if httpsproxy not in os.environ["HTTPS_PROXY"]: + os.environ["HTTPS_PROXY"] += f",{httpsproxy}" + # Update no_proxy parameters if noproxy != "": if noproxy not in os.environ["no_proxy"]: From 9cd5e4dd8c5f594ef81ffa20b8d0011c77c5cee5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 13:43:23 +0200 Subject: [PATCH 12/17] Update recipe.json --- custom-recipes/api-connect/recipe.json | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/custom-recipes/api-connect/recipe.json b/custom-recipes/api-connect/recipe.json index bc81b7e..c5fc615 100644 --- a/custom-recipes/api-connect/recipe.json +++ b/custom-recipes/api-connect/recipe.json @@ -37,6 +37,24 @@ "type": "PRESET", "parameterSetId": "credential" }, + { + "type": "SEPARATOR", + "label": "Proxy" + }, + { + "name": "http_proxy", + "label": "Overload the http_proxy environement variable (optional)", + "description": "", + "defaultValue": null, + "type": "STRING" + }, + { + "name": "https_proxy", + "label": "Overload the https_proxy environement variable (optional)", + "description": "", + "defaultValue": null, + "type": "STRING" + }, { "name": "no_proxy", "label": "Value to be passed to the no-proxy parameter (optional)", @@ -318,4 +336,4 @@ } ], "resourceKeys": [] -} \ No newline at end of file +} From 17f40bad8b1fb56a754838181c0183ee44e93649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 13:50:50 +0200 Subject: [PATCH 13/17] Update rest_api_client.py --- python-lib/rest_api_client.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 62c01a9..5f5263e 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -32,17 +32,25 @@ def __init__(self, credential, httpproxy, httpsproxy, noproxy, endpoint, custom_ # Update http_proxy parameters if httpproxy != "": - if httpproxy not in os.environ["HTTP_PROXY"]: + try: + if httpproxy not in os.environ["HTTP_PROXY"]: + os.environ["HTTP_PROXY"] += f",{httpproxy}" + except KeyError: os.environ["HTTP_PROXY"] += f",{httpproxy}" # Update https_proxy parameters if httpsproxy != "": - if httpsproxy not in os.environ["HTTPS_PROXY"]: + try: + if httpsproxy not in os.environ["HTTPS_PROXY"]: + os.environ["HTTPS_PROXY"] += f",{httpsproxy}" + except KeyError: os.environ["HTTPS_PROXY"] += f",{httpsproxy}" - # Update no_proxy parameters if noproxy != "": - if noproxy not in os.environ["no_proxy"]: + try: + if noproxy not in os.environ["no_proxy"]: + os.environ["no_proxy"] += f",{noproxy}" + except KeyError: os.environ["no_proxy"] += f",{noproxy}" # requests_kwargs contains **kwargs used for requests From d8ac5926d610d6cebeca7d0ef904b897ae3a42b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 13:59:31 +0200 Subject: [PATCH 14/17] Update rest_api_client.py --- python-lib/rest_api_client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 5f5263e..4d853fd 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -33,18 +33,18 @@ def __init__(self, credential, httpproxy, httpsproxy, noproxy, endpoint, custom_ # Update http_proxy parameters if httpproxy != "": try: - if httpproxy not in os.environ["HTTP_PROXY"]: - os.environ["HTTP_PROXY"] += f",{httpproxy}" + if httpproxy not in os.environ["http_proxy"]: + os.environ["http_proxy"] += f",{httpproxy}" except KeyError: - os.environ["HTTP_PROXY"] += f",{httpproxy}" + os.environ["http_proxy"] += f",{httpproxy}" # Update https_proxy parameters if httpsproxy != "": try: - if httpsproxy not in os.environ["HTTPS_PROXY"]: - os.environ["HTTPS_PROXY"] += f",{httpsproxy}" + if httpsproxy not in os.environ["https_proxy"]: + os.environ["https_proxy"] += f",{httpsproxy}" except KeyError: - os.environ["HTTPS_PROXY"] += f",{httpsproxy}" + os.environ["https_proxy"] += f",{httpsproxy}" # Update no_proxy parameters if noproxy != "": try: From 22b7822faf21c9800de3ff52b3e7f8ad305397ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 14:04:15 +0200 Subject: [PATCH 15/17] Update rest_api_client.py --- python-lib/rest_api_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 4d853fd..97a4307 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -36,7 +36,7 @@ def __init__(self, credential, httpproxy, httpsproxy, noproxy, endpoint, custom_ if httpproxy not in os.environ["http_proxy"]: os.environ["http_proxy"] += f",{httpproxy}" except KeyError: - os.environ["http_proxy"] += f",{httpproxy}" + os.environ["http_proxy"] = httpproxy # Update https_proxy parameters if httpsproxy != "": @@ -44,14 +44,14 @@ def __init__(self, credential, httpproxy, httpsproxy, noproxy, endpoint, custom_ if httpsproxy not in os.environ["https_proxy"]: os.environ["https_proxy"] += f",{httpsproxy}" except KeyError: - os.environ["https_proxy"] += f",{httpsproxy}" + os.environ["https_proxy"] = httpsproxy # Update no_proxy parameters if noproxy != "": try: if noproxy not in os.environ["no_proxy"]: os.environ["no_proxy"] += f",{noproxy}" except KeyError: - os.environ["no_proxy"] += f",{noproxy}" + os.environ["no_proxy"] = noproxy # requests_kwargs contains **kwargs used for requests self.requests_kwargs = {} From 2a7d5cbee3e1f782053db756c4f07afd2c0f9177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Fri, 22 Sep 2023 14:21:51 +0200 Subject: [PATCH 16/17] Update rest_api_client.py --- python-lib/rest_api_client.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 97a4307..2e83da9 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -37,6 +37,8 @@ def __init__(self, credential, httpproxy, httpsproxy, noproxy, endpoint, custom_ os.environ["http_proxy"] += f",{httpproxy}" except KeyError: os.environ["http_proxy"] = httpproxy + except TypeError: + pass # Update https_proxy parameters if httpsproxy != "": @@ -45,6 +47,8 @@ def __init__(self, credential, httpproxy, httpsproxy, noproxy, endpoint, custom_ os.environ["https_proxy"] += f",{httpsproxy}" except KeyError: os.environ["https_proxy"] = httpsproxy + except TypeError: + pass # Update no_proxy parameters if noproxy != "": try: @@ -52,6 +56,8 @@ def __init__(self, credential, httpproxy, httpsproxy, noproxy, endpoint, custom_ os.environ["no_proxy"] += f",{noproxy}" except KeyError: os.environ["no_proxy"] = noproxy + except TypeError: + pass # requests_kwargs contains **kwargs used for requests self.requests_kwargs = {} From 7b7293cf78c1d895ffd79becb26aef8140970b57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Redoul=C3=A8s?= Date: Thu, 9 Nov 2023 09:18:30 +0100 Subject: [PATCH 17/17] Update plugin.json --- plugin.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin.json b/plugin.json index 777d6f5..7351e92 100644 --- a/plugin.json +++ b/plugin.json @@ -4,7 +4,7 @@ "meta": { "label": "API Connect", "description": "Retrieve data from any REST API", - "author": "Dataiku (Alex Bourret)", + "author": "Dataiku (Alex Bourret), Safran (Guillaume Redoulès)", "icon": "icon-rocket", "tags": ["Connector", "Cloud"], "url": "https://www.dataiku.com/product/plugins/api-connect/",