Skip to content

Commit

Permalink
mef: reindex during entities reindexing
Browse files Browse the repository at this point in the history
* Reindex Mef record after entity indexing.
* Corrects function `get_all_pids_without_agents_and_viaf`.
* Deletes old constants `AGENTS`and `CONCEPTS`.
* Updates the dependencies lock file.

Co-Authored-by: Peter Weber <peter.weber@rero.ch>
  • Loading branch information
rerowep committed Aug 17, 2022
1 parent 3b8648b commit 4373288
Show file tree
Hide file tree
Showing 26 changed files with 238 additions and 211 deletions.
22 changes: 11 additions & 11 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 89 additions & 39 deletions rero_mef/agents/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,54 @@
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_search import current_search

from .mef.api import AgentMefRecord, build_ref_string
from ..api import Action, ReroIndexer, ReroMefRecord


def get_viaf_by_agent(agent, online=False):
"""Get VIAF record by agent.
:param agent: Agency do get corresponding VIAF record.
:param online: Try to get VIAF record online if not exist.
"""
from .viaf.api import AgentViafRecord, AgentViafSearch
if isinstance(agent, AgentMefRecord):
viaf_pid = agent.get('viaf_pid')
return cls.get_record_by_pid(viaf_pid), False
if isinstance(agent, AgentViafRecord):
viaf_pid = agent.get('pid')
return cls.get_record_by_pid(viaf_pid), False
pid = agent.get('pid')
viaf_pid_name = agent.viaf_pid_name
query = AgentViafSearch() \
.filter({'term': {viaf_pid_name: pid}})
try:
viaf_pid = next(query.source(['pid']).scan()).pid
return AgentViafRecord.get_record_by_pid(viaf_pid), False
except StopIteration:
if online:
viaf_source_code = agent.viaf_source_code
viaf_data = AgentViafRecord.get_online_viaf_record(
viaf_source_code=viaf_source_code,
pid=pid
)
if viaf_data:
viaf_pid = viaf_data.get('pid')
viaf_record = AgentViafRecord.get_record_by_pid(viaf_pid)
if viaf_record:
viaf_record.reindex()
AgentViafRecord.flush_indexes()
return viaf_record, False
viaf_record = AgentViafRecord.create(
data=viaf_data,
dbcommit=True,
reindex=True
)
AgentViafRecord.flush_indexes()
return viaf_record, True
return None, False


class AgentRecord(ReroMefRecord):
"""Agent Record class."""

Expand Down Expand Up @@ -54,15 +99,6 @@ def create(cls, data, id_=None, delete_pid=False, dbcommit=False,
)
return record

@classmethod
def update_indexes(cls):
"""Update indexes."""
try:
index = f'agents_{cls.name}'
current_search.flush_and_refresh(index=index)
except Exception as err:
current_app.logger.error(f'ERROR flush and refresh: {err}')

def create_or_update_mef_viaf_record(self, dbcommit=False, reindex=False,
online=False):
"""Create or update MEF and VIAF record.
Expand All @@ -72,48 +108,61 @@ def create_or_update_mef_viaf_record(self, dbcommit=False, reindex=False,
:param online: Try to get VIAF record online.
:returns: MEF record, MEF action, VIAF record, VIAF
"""
from .mef.api import AgentMefRecord
from .viaf.api import AgentViafRecord
AgentViafRecord.update_indexes()
viaf_record, got_online = AgentViafRecord.get_viaf_by_agent(
AgentViafRecord.flush_indexes()
viaf_record, got_online = get_viaf_by_agent(
agent=self,
online=online
)
from .mef.api import AgentMefRecord
ref_string = AgentMefRecord.build_ref_string(

ref_string = build_ref_string(
agent=self.agent,
agent_pid=self.pid
)
agent_pid=self.pid)

mef_data = {self.agent: {'$ref': ref_string}}
mef_record = AgentMefRecord.get_mef_by_entity_pid(self.pid, self.name)
mef_record = AgentMefRecord.get_mef_by_entity_pid(
entity_pid=self.pid,
entity_name=self.name
)
if viaf_record:
mef_data['viaf_pid'] = viaf_record.pid
if not mef_record:
mef_record = AgentMefRecord.get_mef_by_viaf_pid(
viaf_record.pid)
viaf_pid=viaf_record.pid
)
if self.deleted:
mef_record, mef_action = self.delete_from_mef(
dbcommit=dbcommit,
reindex=reindex
)
elif mef_record:
mef_action = Action.UPDATE
mef_record = mef_record.update(
data=mef_data,
dbcommit=dbcommit,
reindex=reindex
)
else:
if mef_record:
mef_action = Action.UPDATE
mef_record = mef_record.update(
data=mef_data,
dbcommit=dbcommit,
reindex=reindex
)
else:
mef_action = Action.CREATE
mef_record = AgentMefRecord.create(
data=mef_data,
dbcommit=dbcommit,
reindex=reindex,
)
mef_action = Action.CREATE
mef_record = AgentMefRecord.create(
data=mef_data,
dbcommit=dbcommit,
reindex=reindex
)
if reindex:
AgentMefRecord.update_indexes()
AgentMefRecord.flush_indexes()
return mef_record, mef_action, viaf_record, got_online

@classmethod
def flush_indexes(cls):
"""Update indexes."""
try:
index = f'agents_{cls.name}'
current_search.flush_and_refresh(index=index)
except Exception as err:
current_app.logger.error(f'ERROR flush and refresh: {err}')

def delete_from_mef(self, dbcommit=False, reindex=False, verbose=False):
"""Delete agent from MEF record."""
from .mef.api import AgentMefRecord
Expand Down Expand Up @@ -146,7 +195,7 @@ def delete_from_mef(self, dbcommit=False, reindex=False, verbose=False):
)
mef_action = Action.CREATE
if reindex:
AgentMefRecord.update_indexes()
AgentMefRecord.flush_indexes()
if verbose:
click.echo(
f'Delete {self.agent}: {self.pid} '
Expand All @@ -162,7 +211,6 @@ def create_or_update_agent_mef_viaf(cls, data, id_=None, delete_pid=True,
verbose=False):
"""Create or update agent, MEF and VIAF record."""
from rero_mef.agents.mef.api import AgentMefRecord
from rero_mef.agents.viaf.api import AgentViafRecord

with contextlib.suppress(Exception):
persistent_id = PersistentIdentifier.query.filter_by(
Expand Down Expand Up @@ -195,7 +243,7 @@ def create_or_update_agent_mef_viaf(cls, data, id_=None, delete_pid=True,
mef_record = AgentMefRecord.get_mef_by_entity_pid(
record.pid, record.name)
mef_action = Action.UPTODATE
viaf_record, online = AgentViafRecord.get_viaf_by_agent(
viaf_record, online = get_viaf_by_agent(
record)
else:
mef_record, mef_action, viaf_record, online = \
Expand All @@ -214,10 +262,12 @@ def get_online_record(cls, id, verbose=False):
"""
raise NotImplementedError()

@property
def deleted(self):
"""Get record deleted value."""
return self.get('deleted')
def reindex(self, forceindex=False):
"""Reindex record."""
result = super().reindex(forceindex=forceindex)
if mef := AgentMefRecord.get_mef_by_entity_pid(self.pid, self.name):
mef.reindex(forceindex=forceindex)
return result


class AgentIndexer(ReroIndexer):
Expand Down
4 changes: 2 additions & 2 deletions rero_mef/agents/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ def create_from_viaf(test_md5, enqueue, online, verbose, progress, wait,
@with_appcontext
def create_mef(pid_type, enqueue, online, verbose, progress, wait, missing):
"""Create MEF from agents."""
AGENTS = current_app.config.get('AGENTS', [])
agents = current_app.config.get('RERO_AGENTS', [])
if missing:
missing_pids, to_much_pids = \
AgentMefRecord.get_all_missing_pids(pid_type, verbose=progress)
for agent in pid_type:
if agent not in AGENTS:
if agent not in agents:
click.secho(
f'Error create MEF from {agent}. Wrong agent!',
fg='red'
Expand Down
41 changes: 26 additions & 15 deletions rero_mef/agents/mef/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,23 @@
from .minters import mef_id_minter
from .models import AgentMefMetadata
from .providers import MefProvider
from ...api import ReroIndexer
from ...api import Action, ReroIndexer
from ...api_mef import EntityMefRecord
from ...utils import mef_get_all_missing_entity_pids, progressbar


def build_ref_string(agent_pid, agent):
"""Build url for agent's api.
:param agent_pid: Agent pid.
:param agent: Agent type.
:returns: URL to agent
"""
with current_app.app_context():
return (f'{current_app.config.get("RERO_MEF_APP_BASE_URL")}'
f'/api/agents/{agent}/{agent_pid}')


class AgentMefSearch(RecordsSearch):
"""RecordsSearch."""

Expand All @@ -55,22 +67,10 @@ class AgentMefRecord(EntityMefRecord):
model_cls = AgentMefMetadata
search = AgentMefSearch
mef_type = 'AGENTS'
entities = ['idref', 'gnd', 'rero']

@classmethod
def build_ref_string(cls, agent_pid, agent):
"""Build url for agent's api.
:param agent_pid: Agent pid.
:param agent: Agent type.
:returns: URL to agent
"""
with current_app.app_context():
ref_string = (f'{current_app.config.get("RERO_MEF_APP_BASE_URL")}'
f'/api/agents/{agent}/{agent_pid}')
return ref_string

@classmethod
def update_indexes(cls):
def flush_indexes(cls):
"""Update indexes."""
try:
current_search.flush_and_refresh(index='mef')
Expand Down Expand Up @@ -155,6 +155,17 @@ def get_all_pids_without_agents_viaf(cls):
for hit in query:
yield hit.pid

def create_or_update_mef_viaf_record(self, dbcommit=False, reindex=False,
online=False):
"""Create or update MEF and VIAF record.
:param dbcommit: Commit changes to DB.
:param reindex: Reindex record.
:param online: Try to get VIAF record online.
:returns: MEF record, MEF action, VIAF record, VIAF
"""
return self, Action.ERROR, None, False


class AgentMefIndexer(ReroIndexer):
"""AgentMefIndexer."""
Expand Down
2 changes: 1 addition & 1 deletion rero_mef/agents/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def create_viaf_files(
def get_agents_endpoints():
"""Get all agents from config."""
agents_endpoints = {}
agents = current_app.config.get('AGENTS', [])
agents = current_app.config.get('RERO_AGENTS', [])
endpoints = current_app.config.get('RECORDS_REST_ENDPOINTS', {})
for endpoint, data in endpoints.items():
if endpoint in agents:
Expand Down
Loading

0 comments on commit 4373288

Please # to comment.