From 41f5649658149a46361e047c55bbc7f83dd87557 Mon Sep 17 00:00:00 2001 From: Peter Weber Date: Thu, 16 Jun 2022 21:38:35 +0200 Subject: [PATCH] indexer: better code * Eliminates exceptions for bulk indexing. * Better code with sourcery suggestions. Co-Authored-by: Peter Weber --- rero_mef/api.py | 12 +++-- rero_mef/utils.py | 126 ++++++++++++++++++++-------------------------- 2 files changed, 63 insertions(+), 75 deletions(-) diff --git a/rero_mef/api.py b/rero_mef/api.py index 0b173be9..001d80dc 100644 --- a/rero_mef/api.py +++ b/rero_mef/api.py @@ -407,9 +407,15 @@ def _get_indexer_class(self, payload): """Get the record class from payload.""" # take the first defined doc type for finding the class pid_type = payload.get('doc_type', 'rec') - endpoints = current_app.config.get('RECORDS_REST_ENDPOINTS') - return obj_or_import_string( - endpoints.get(pid_type).get('indexer_class', RecordIndexer)) + try: + indexer = obj_or_import_string( + current_app.config[ + 'RECORDS_REST_ENDPOINTS'][pid_type]['indexer_class'] + ) + except Exception: + # provide default indexer if no indexer is defined in config. + indexer = ReroIndexer + return indexer def process_bulk_queue(self, es_bulk_kwargs=None, stats_only=True): """Process bulk indexing queue. diff --git a/rero_mef/utils.py b/rero_mef/utils.py index 63f92158..cd195a43 100644 --- a/rero_mef/utils.py +++ b/rero_mef/utils.py @@ -23,7 +23,6 @@ # under the terms of the MIT License; see LICENSE file for more details. """Utilities.""" - import datetime import gc import hashlib @@ -53,7 +52,9 @@ from invenio_oaiharvester.utils import get_oaiharvest_object from invenio_pidstore.errors import PIDDoesNotExistError from invenio_pidstore.models import PersistentIdentifier +from invenio_records.api import Record from invenio_records_rest.utils import obj_or_import_string +from invenio_search import RecordsSearch from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from pymarc.marcxml import parse_xml_to_array from sickle import Sickle, oaiexceptions @@ -142,7 +143,7 @@ def next_resumption_token_and_items(self): """Get next resumtion token and items.""" self.resumption_token = self._get_resumption_token() self._items = self.oai_response.xml.iterfind( - './/' + self.sickle.oai_namespace + self.element) + f'.//{self.sickle.oai_namespace}{self.element}') def _next_response(self): """Get the next response from the OAI server.""" @@ -167,7 +168,7 @@ def _next_response(self): current_app.logger.error(f'Sickle harvest {count} {err}') sleep(60) error = self.oai_response.xml.find( - './/' + self.sickle.oai_namespace + 'error') + f'.//{self.sickle.oai_namespace}error') if error is not None: code = error.attrib.get('code', 'UNKNOWN') description = error.text or '' @@ -179,7 +180,7 @@ def _next_response(self): if self.resumption_token: # Test we got a complete response ('resumptionToken' in xml) resumption_token_element = self.oai_response.xml.find( - './/' + self.sickle.oai_namespace + 'resumptionToken') + f'.//{self.sickle.oai_namespace}resumptionToken') if resumption_token_element is None: current_app.logger.error( @@ -197,7 +198,7 @@ def _next_response(self): def oai_process_records_from_dates(name, sickle, oai_item_iterator, transformation, record_cls, max_retries=0, - access_token=None, days_spann=30, + access_token=None, days_span=30, from_date=None, until_date=None, ignore_deleted=False, dbcommit=True, reindex=True, test_md5=True, @@ -253,7 +254,7 @@ def oai_process_records_from_dates(name, sickle, oai_item_iterator, if dates['until']: my_until_date = parser.parse(dates['until']) while my_from_date <= my_until_date: - until_date = my_from_date + timedelta(days=days_spann) + until_date = my_from_date + timedelta(days=days_span) if until_date > my_until_date: until_date = my_until_date dates = { @@ -320,18 +321,18 @@ def oai_process_records_from_dates(name, sickle, oai_item_iterator, if debug: traceback.print_exc() except NoRecordsMatch: - my_from_date = my_from_date + timedelta(days=days_spann + 1) + my_from_date = my_from_date + timedelta(days=days_span + 1) continue except Exception as err: current_app.logger.error(err) if debug: traceback.print_exc() count = -1 - my_from_date = my_from_date + timedelta(days=days_spann + 1) + my_from_date = my_from_date + timedelta(days=days_span + 1) if verbose: from_date = my_from_date.strftime("%Y-%m-%d") click.echo( - f'OAI {name} {spec}: {from_date} .. +{days_spann}' + f'OAI {name} {spec}: {from_date} .. +{days_span}' ) if update_last_run: @@ -346,7 +347,7 @@ def oai_process_records_from_dates(name, sickle, oai_item_iterator, def oai_save_records_from_dates(name, file_name, sickle, oai_item_iterator, max_retries=0, - access_token=None, days_spann=30, + access_token=None, days_span=30, from_date=None, until_date=None, verbose=False, **kwargs): """Harvest and save multiple records from an OAI repo. @@ -396,7 +397,7 @@ def oai_save_records_from_dates(name, file_name, sickle, oai_item_iterator, if dates['until']: my_until_date = parser.parse(dates['until']) while my_from_date <= my_until_date: - until_date = my_from_date + timedelta(days=days_spann) + until_date = my_from_date + timedelta(days=days_span) if until_date > my_until_date: until_date = my_until_date dates = { @@ -416,21 +417,21 @@ def oai_save_records_from_dates(name, file_name, sickle, oai_item_iterator, f'count:{count:>10} = {id}' ) rec = records[0] - rec.leader = rec.leader[0:9] + 'a' + rec.leader[10:] + rec.leader = f'{rec.leader[:9]}a{rec.leader[10:]}' output_file.write(rec.as_marc()) except NoRecordsMatch: my_from_date = my_from_date + timedelta( - days=days_spann + 1) + days=days_span + 1) continue except Exception as err: current_app.logger.error(err) - my_from_date = my_from_date + timedelta(days=days_spann + 1) + my_from_date = my_from_date + timedelta(days=days_span + 1) if verbose: from_date = my_from_date.strftime("%Y-%m-%d") click.echo( f'OAI {name} spec({spec}): ' - f'{from_date} .. +{days_spann}' + f'{from_date} .. +{days_span}' ) if verbose: click.echo(f'OAI {name}: {count}') @@ -542,15 +543,12 @@ def export_json_records(pids, pid_type, output_file_name, indent=2, click.echo(f'ERROR: Can not export pid:{pid}') -def number_records_in_file(json_file, type): +def number_records_in_file(json_file, file_type): """Get number of records per file.""" count = 0 with open(json_file, 'r', buffering=1) as file: for line in file: - if type == 'json': - if '"pid"' in line: - count += 1 - elif type == 'csv': + if file_type == 'json' and '"pid"' in line or file_type == 'csv': count += 1 return count @@ -559,13 +557,11 @@ def progressbar(items, length=0, verbose=False): """Verbose progress bar.""" if verbose: with click.progressbar( - items, label=str(length), length=length - ) as progressbar_items: - for item in progressbar_items: - yield item + items, label=str(length), length=length + ) as progressbar_items: + yield from progressbar_items else: - for item in items: - yield item + yield from items def get_host(): @@ -699,9 +695,7 @@ def bulk_load_agent(agent, data, table, columns, bulk_count=0, verbose=False, count = 0 buffer = StringIO() buffer_uuid = [] - index = -1 - if 'id' in columns: - index = columns.index('id') + index = columns.index('id') if 'id' in columns else -1 start_time = datetime.now() with open(data, 'r', encoding='utf-8', buffering=1) as input_file: for line in input_file: @@ -728,9 +722,8 @@ def bulk_load_agent(agent, data, table, columns, bulk_count=0, verbose=False, bulk_index(agent=agent, uuids=buffer_uuid, verbose=verbose) buffer_uuid.clear() - else: - if verbose: - click.echo() + elif verbose: + click.echo() # force the Garbage Collector to release unreferenced memory # gc.collect() @@ -751,9 +744,8 @@ def bulk_load_agent(agent, data, table, columns, bulk_count=0, verbose=False, if index >= 0 and reindex: bulk_index(agent=agent, uuids=buffer_uuid, verbose=verbose) buffer_uuid.clear() - else: - if verbose: - click.echo() + elif verbose: + click.echo() # force the Garbage Collector to release unreferenced memory gc.collect() @@ -867,7 +859,7 @@ def bulk_save_pids(agent, file_name, verbose=False): 'object_type', 'object_uuid', ) - tmp_file_name = file_name + '_tmp' + tmp_file_name = f'{file_name}_tmp' bulk_save_agent( agent=agent, file_name=tmp_file_name, @@ -900,17 +892,14 @@ def bulk_save_ids(agent, file_name, verbose=False): def create_md5(record): """Create md5 for record.""" - data_md5 = hashlib.md5( + return hashlib.md5( json.dumps(record, sort_keys=True).encode('utf-8') ).hexdigest() - return data_md5 def add_md5(record): """Add md5 to json.""" - schema = None - if record.get('$schema'): - schema = record.pop('$schema') + schema = record.pop('$schema') if record.get('$schema') else None if record.get('md5'): record.pop('md5') record['md5'] = create_md5(record) @@ -968,10 +957,8 @@ def get_entity_classes(without_mef_viaf=True): endpoints.pop('mef', None) endpoints.pop('viaf', None) for agent in endpoints: - record_class = obj_or_import_string( - endpoints[agent].get('record_class') - ) - if record_class: + if record_class := obj_or_import_string( + endpoints[agent].get('record_class')): agents[agent] = record_class return agents @@ -979,24 +966,30 @@ def get_entity_classes(without_mef_viaf=True): def get_endpoint_class(entity, class_name): """Get entity class from config.""" endpoints = current_app.config.get('RECORDS_REST_ENDPOINTS', {}) - endpoint = endpoints.get(entity, {}) - endpoint_class = obj_or_import_string(endpoint.get(class_name)) - return endpoint_class + if endpoint := endpoints.get(entity, {}): + return obj_or_import_string(endpoint.get(class_name)) def get_entity_class(entity): """Get entity record class from config.""" - return get_endpoint_class(entity=entity, class_name='record_class') + if entity := get_endpoint_class(entity=entity, class_name='record_class'): + return entity + return Record def get_entity_search_class(entity): """Get entity search class from config.""" - return get_endpoint_class(entity=entity, class_name='search_class') + if search := get_endpoint_class(entity=entity, class_name='search_class'): + return search + return RecordsSearch def get_entity_indexer_class(entity): """Get entity indexer class from config.""" - return get_endpoint_class(entity=entity, class_name='indexer_class') + from .api import ReroIndexer + if search := get_endpoint_class(entity=entity, class_name='indexer_class'): + return search + return ReroIndexer def write_link_json( @@ -1009,13 +1002,12 @@ def write_link_json( verbose=False ): """Write a JSON record into file.""" - json_data = {} key_per_catalog_id = { 'DNB': 'gnd_pid', 'RERO': 'rero_pid', 'SUDOC': 'idref_pid' } - json_data['viaf_pid'] = viaf_pid + json_data = {'viaf_pid': viaf_pid} write_to_file_viaf = False for key, value in corresponding_data.items(): if key in key_per_catalog_id: @@ -1029,11 +1021,7 @@ def write_link_json( json_dump['pid'] = agent_pid del(json_dump['viaf_pid']) # only save VIAF data with used pids - if agent == 'viaf': - write_to_file = write_to_file_viaf - else: - write_to_file = True - + write_to_file = write_to_file_viaf if agent == 'viaf' else True if write_to_file: record_uuid = str(uuid4()) date = str(datetime.utcnow()) @@ -1061,7 +1049,6 @@ def append_fixtures_new_identifiers(identifier, pids, pid_type): def get_diff_db_es_pids(agent, verbose=False): """Get differences between DB and ES pids.""" - pids_db = {} pids_es = {} pids_es_double = [] record_class = get_entity_class(agent) @@ -1073,8 +1060,7 @@ def get_diff_db_es_pids(agent, verbose=False): length=count, verbose=verbose ) - for pid in progress: - pids_db[pid] = 1 + pids_db = {pid: 1 for pid in progress} search_class = get_entity_search_class(agent) count = search_class().source('pid').count() if verbose: @@ -1092,8 +1078,8 @@ def get_diff_db_es_pids(agent, verbose=False): pids_db.pop(pid) else: pids_es[pid] = 1 - pids_db = [v for v in pids_db] - pids_es = [v for v in pids_es] + pids_db = list(pids_db) + pids_es = list(pids_es) if verbose: click.echo(f'Counts DB: {len(pids_db)} ES: {len(pids_es)} ' f'ES+: {len(pids_es_double)}') @@ -1194,7 +1180,6 @@ def mef_get_all_missing_entity_pids(mef_class, entity, verbose=False): :returns: Missing VIAF pids. """ record_class = get_entity_class(entity) - missing_pids = {} unexisting_pids = {} no_pids = [] if verbose: @@ -1204,8 +1189,7 @@ def mef_get_all_missing_entity_pids(mef_class, entity, verbose=False): length=record_class.count(), verbose=verbose ) - for pid in progress: - missing_pids[pid] = 1 + missing_pids = {pid: 1 for pid in progress} name = record_class.name if verbose: click.echo(f'Get pids for {name} from MEF and calculate missing ...') @@ -1217,14 +1201,13 @@ def mef_get_all_missing_entity_pids(mef_class, entity, verbose=False): ) for hit in progress: data = hit.to_dict() - agent_pid = data.get(name, {}).get('pid') - if agent_pid: + if agent_pid := data.get(name, {}).get('pid'): res = missing_pids.pop(agent_pid, False) if not res: unexisting_pids[hit.pid] = agent_pid else: no_pids.append(hit.pid) - return [v for v in missing_pids], unexisting_pids, no_pids + return list(missing_pids), unexisting_pids, no_pids def get_mefs_endpoints(): @@ -1234,11 +1217,10 @@ def get_mefs_endpoints(): from .concepts.mef.api import ConceptMefRecord from .concepts.utils import get_concepts_endpoints - mefs = [] - mefs.append({ + mefs = [{ 'mef_class': AgentMefRecord, 'endpoints': get_agents_endpoints() - }) + }] mefs.append({ 'mef_class': ConceptMefRecord, 'endpoints': get_concepts_endpoints()