diff --git a/README.md b/README.md index 0cb4feb..458b06c 100644 --- a/README.md +++ b/README.md @@ -119,8 +119,8 @@ Install qsv:: [Download the appropriate precompiled binaries](https://github.com/jqnatividad/qsv/releases/latest) for your platform and copy it to the appropriate directory, e.g. for Linux: - wget https://github.com/jqnatividad/qsv/releases/download/0.44.0/qsv-0.44.0-x86_64-unknown-linux-gnu.zip - unzip qsv-0.44.0-x86_64-unknown-linux-gnu.zip + wget https://github.com/jqnatividad/qsv/releases/download/0.45.2/qsv-0.45.2-x86_64-unknown-linux-gnu.zip + unzip qsv-0.45.2-x86_64-unknown-linux-gnu.zip sudo mv qsv /usr/local/bin sudo mv qsvlite /usr/local/bin sudo mv qsvdp /usr/local/bin diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 821d38d..1bca4ef 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -83,6 +83,16 @@ TYPE_MAPPING = web.app.config.get('TYPE_MAPPING', _TYPE_MAPPING) TYPES = web.app.config.get('TYPES', _TYPES) +# if a field has any of these anywhere in their name, date inferencing +# is turned on when scanning for data types, which is a relatively expensive op +# if DATELIKE_FIELDNAMES is empty, date inferencing will always be on +_DATELIKE_FIELDNAMES = ['date', 'time', 'open', 'close', 'due'] + +DATELIKE_FIELDNAMES = web.app.config.get( + 'DATELIKE_FIELDNAMES', _DATELIKE_FIELDNAMES) + +DATELIKE_FIELDNAMES = [field.lower() for field in DATELIKE_FIELDNAMES] + DATASTORE_URLS = { 'datastore_delete': '{ckan_url}/api/action/datastore_delete', 'resource_update': '{ckan_url}/api/action/resource_update' @@ -124,7 +134,7 @@ def as_dict(self): """ if self.response and len(self.response) > 200: - response = str.encode(self.response[:200] + '...') + response = self.response[:200] else: response = self.response return { @@ -502,7 +512,8 @@ def push_to_datastore(task_id, input, dry_run=False): ) dupe_count = int(str(qsv_dedup.stderr).strip()) if dupe_count > 0: - logger.info('{:,} duplicates found and removed...'.format(dupe_count)) + logger.info( + '{:,} duplicates found and removed...'.format(dupe_count)) else: logger.info('No duplicates found...') @@ -529,13 +540,33 @@ def push_to_datastore(task_id, input, dry_run=False): record_count = int(str(qsv_count.stdout).strip()) logger.info('{:,} records detected...'.format(record_count)) + # if DATELIKE_FIELDNAMES is not empty, scan CSV headers for date-like field, + # otherwise, always --infer-dates when scanning for types + inferdates_flag = True + if DATELIKE_FIELDNAMES: + try: + qsv_headers = subprocess.run( + [QSV_BIN, 'headers', tmp.name], capture_output=True, text=True) + except subprocess.CalledProcessError as e: + tmp.close() + raise util.JobError( + 'Cannot scan CSV headers: {}'.format(e) + ) + header_fields = str(qsv_headers.stdout).strip().lower() + if not any(datelike_fieldname in header_fields for datelike_fieldname in DATELIKE_FIELDNAMES): + inferdates_flag = False + # run qsv stats to get data types and descriptive statistics headers = [] types = [] qsv_stats_csv = tempfile.NamedTemporaryFile(suffix='.csv') + qsv_stats_cmd = [QSV_BIN, 'stats', tmp.name, + '--output', qsv_stats_csv.name] + if inferdates_flag: + qsv_stats_cmd.append('--infer-dates') + logger.info('Date-like fields detected. Date inferencing enabled...') try: - qsv_stats = subprocess.run( - [QSV_BIN, 'stats', tmp.name, '--infer-dates', '--output', qsv_stats_csv.name], check=True) + qsv_stats = subprocess.run(qsv_stats_cmd, check=True) except subprocess.CalledProcessError as e: tmp.close() qsv_stats_csv.close() @@ -692,6 +723,9 @@ def push_to_datastore(task_id, input, dry_run=False): else: alias = None + # check if the alias exist, if it does + # add a sequence suffix until it can be created + # tell CKAN to calculate_record_count and set alias if set send_resource_to_datastore(resource, headers_dicts, api_key, ckan_url, records=None, aliases=alias, calculate_record_count=True) diff --git a/datapusher/settings.py b/datapusher/settings.py index 2739a6a..dd021a3 100644 --- a/datapusher/settings.py +++ b/datapusher/settings.py @@ -41,3 +41,9 @@ # logging LOG_FILE = os.environ.get('DATAPUSHER_LOG_FILE', '/tmp/ckan_service.log') STDERR = bool(int(os.environ.get('DATAPUSHER_STDERR', '1'))) + +# other config values that can be overriden here if you don't want to modify jobs.py +# see jobs.py for their current default values +# DATELIKE_FIELDNAMES +# TYPE_MAPPING +# TYPES