-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
monitoring: data integrity and DB connections
* Adds monitoring for DB connections and activities. * Adds monitoring for data integrity between database and elasticsearch. * Manually installs xpdf in GitHub Actions, to avoid an error during CI checks. * Closes #448. Co-Authored-by: Peter Weber <peter.weber@rero.ch> Co-Authored-by: Sébastien Délèze <sebastien.deleze@rero.ch>
- Loading branch information
Showing
8 changed files
with
698 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Swiss Open Access Repository | ||
# Copyright (C) 2019 RERO | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU Affero General Public License as published by | ||
# the Free Software Foundation, version 3 of the License. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU Affero General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Affero General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
"""Monitoring features.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Swiss Open Access Repository | ||
# Copyright (C) 2019 RERO | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU Affero General Public License as published by | ||
# the Free Software Foundation, version 3 of the License. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU Affero General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Affero General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
"""Data integrity monitoring.""" | ||
|
||
from elasticsearch.exceptions import NotFoundError | ||
from flask import current_app | ||
from invenio_pidstore.models import PersistentIdentifier, PIDStatus | ||
from invenio_search import RecordsSearch | ||
|
||
|
||
class DataIntegrityMonitoring(): | ||
"""Data integrity monitoring.""" | ||
|
||
def get_db_count(self, doc_type, with_deleted=False): | ||
"""Get database count. | ||
Get count of items in the database for the given document type. | ||
:param doc_type: Resource type. | ||
:param with_deleted: Count also deleted items. | ||
:returns: Items count. | ||
""" | ||
if not current_app.config.get('RECORDS_REST_ENDPOINTS').get(doc_type): | ||
raise Exception( | ||
'No endpoint configured for "{type}"'.format(type=doc_type)) | ||
|
||
query = PersistentIdentifier.query.filter_by(pid_type=doc_type) | ||
if not with_deleted: | ||
query = query.filter_by(status=PIDStatus.REGISTERED) | ||
|
||
return query.count() | ||
|
||
def get_es_count(self, index): | ||
"""Get elasticsearch count. | ||
Get count of items in elasticsearch for the given index. | ||
:param index: Elasticsearch index. | ||
:return: Items count. | ||
""" | ||
try: | ||
return RecordsSearch(index=index).query().count() | ||
except NotFoundError: | ||
raise Exception('No index found for "{type}"'.format(type=index)) | ||
|
||
def missing_pids(self, doc_type, with_deleted=False): | ||
"""Get ES and DB counts. | ||
:param doc_type: Resource type. | ||
:param with_deleted: Check also delete items in database. | ||
""" | ||
index = current_app.config.get('RECORDS_REST_ENDPOINTS').get( | ||
doc_type, {}).get('search_index') | ||
|
||
if not index: | ||
raise Exception( | ||
'No "search_index" configured for resource "{type}"'.format( | ||
type=doc_type)) | ||
|
||
result = {'es': [], 'es_double': [], 'db': []} | ||
|
||
# Elastic search PIDs | ||
es_pids = {} | ||
for hit in RecordsSearch(index=index).source('pid').scan(): | ||
if es_pids.get(hit.pid): | ||
result['es_double'].append(hit.pid) | ||
es_pids[hit.pid] = 1 | ||
|
||
# Database PIDs | ||
query = PersistentIdentifier.query.filter_by(pid_type=doc_type) | ||
if not with_deleted: | ||
query = query.filter_by(status=PIDStatus.REGISTERED) | ||
|
||
for identifier in query: | ||
if es_pids.get(identifier.pid_value): | ||
es_pids.pop(identifier.pid_value) | ||
else: | ||
result['db'].append(identifier.pid_value) | ||
|
||
# Transform dictionary to list | ||
result['es'] = [v for v in es_pids] | ||
|
||
return result | ||
|
||
def info(self, with_deleted=False): | ||
"""Get count details for all resources. | ||
:param with_deleted: Count also deleted items in database. | ||
:returns: Dictionary with differences for each resource. | ||
""" | ||
info = {} | ||
for doc_type, endpoint in current_app.config.get( | ||
'RECORDS_REST_ENDPOINTS').items(): | ||
info[doc_type] = self.missing_pids(doc_type, with_deleted) | ||
|
||
return info | ||
|
||
def hasError(self, with_deleted=False): | ||
"""Check if any endpoint has an integrity error. | ||
:param with_deleted: Count also deleted items in database. | ||
:returns: True if an error is found | ||
""" | ||
for doc_type, item in self.info(with_deleted).items(): | ||
for key in ['es', 'es_double', 'db']: | ||
if item[key]: | ||
return True | ||
|
||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Swiss Open Access Repository | ||
# Copyright (C) 2019 RERO | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU Affero General Public License as published by | ||
# the Free Software Foundation, version 3 of the License. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU Affero General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Affero General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
"""DB monitoring.""" | ||
|
||
from invenio_db import db | ||
|
||
|
||
class DatabaseMonitoring(): | ||
"""DB monitoring.""" | ||
|
||
def count_connections(self): | ||
"""Count current DB connections. | ||
:returns: Dict with information about current connections. | ||
""" | ||
query = """ | ||
select | ||
max_conn, used, res_for_super, | ||
max_conn-used-res_for_super free | ||
from | ||
( | ||
select count(*) used | ||
from pg_stat_activity | ||
) t1, | ||
( | ||
select setting::int res_for_super | ||
from pg_settings | ||
where name=$$superuser_reserved_connections$$ | ||
) t2, | ||
( | ||
select setting::int max_conn | ||
from pg_settings | ||
where name=$$max_connections$$ | ||
) t3 | ||
""" | ||
result = db.session.execute(query).first() | ||
|
||
return { | ||
'max': result['max_conn'], | ||
'used': result['used'], | ||
'reserved_for_super': result['res_for_super'], | ||
'free': result['free'] | ||
} | ||
|
||
def activity(self): | ||
"""Get current activity. | ||
:returns: A list of the current activities. | ||
""" | ||
query = """ | ||
SELECT | ||
pid, application_name, client_addr, client_port, backend_start, | ||
xact_start, query_start, wait_event, state, left(query, 64) | ||
FROM | ||
pg_stat_activity | ||
ORDER BY query_start DESC | ||
""" | ||
|
||
def format_row(row): | ||
"""Format returned row from DB.""" | ||
return { | ||
'application_name': row['application_name'], | ||
'client_address': row['client_addr'], | ||
'client_port': row['client_port'], | ||
'query': row['left'], | ||
'query_start': row['query_start'], | ||
'state': row['state'], | ||
'wait_event': row['wait_event'], | ||
'transaction_start': row['xact_start'] | ||
} | ||
|
||
return list(map(format_row, db.session.execute(query).fetchall())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Swiss Open Access Repository | ||
# Copyright (C) 2019 RERO | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU Affero General Public License as published by | ||
# the Free Software Foundation, version 3 of the License. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU Affero General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Affero General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
"""Monitoring views.""" | ||
|
||
from functools import wraps | ||
|
||
from flask import Blueprint, abort, jsonify | ||
from flask_security import current_user | ||
|
||
from sonar.modules.permissions import superuser_access_permission | ||
from sonar.monitoring.api.data_integrity import DataIntegrityMonitoring | ||
from sonar.monitoring.api.database import DatabaseMonitoring | ||
|
||
blueprint = Blueprint('monitoring_api', __name__, url_prefix='/monitoring') | ||
|
||
|
||
def is_superuser(func): | ||
"""Decorator checking if a user is logged and has role `superuser`.""" | ||
|
||
@wraps(func) | ||
def decorated_view(*args, **kwargs): | ||
if not current_user.is_authenticated: | ||
return jsonify(), 401 | ||
|
||
if not superuser_access_permission.can(): | ||
return jsonify({'status': 'error: Forbidden'}), 403 | ||
|
||
return func(*args, **kwargs) | ||
|
||
return decorated_view | ||
|
||
|
||
@blueprint.route('/db/connections/count') | ||
@is_superuser | ||
def db_connection_count(): | ||
"""Information about current database connections.""" | ||
try: | ||
db_monitoring = DatabaseMonitoring() | ||
return jsonify(db_monitoring.count_connections()) | ||
except Exception: | ||
abort(500) | ||
|
||
|
||
@blueprint.route('/db/activity') | ||
@is_superuser | ||
def db_activity(): | ||
"""Current database activity.""" | ||
try: | ||
db_monitoring = DatabaseMonitoring() | ||
return jsonify(db_monitoring.activity()) | ||
except Exception: | ||
abort(500) | ||
|
||
|
||
@blueprint.route('/data/status') | ||
def data_status(): | ||
"""Status of data integrity.""" | ||
data_monitoring = DataIntegrityMonitoring() | ||
return jsonify( | ||
{'status': 'green' if not data_monitoring.hasError() else 'red'}) | ||
|
||
|
||
@blueprint.route('/data/info') | ||
@is_superuser | ||
def data_info(): | ||
"""Info of data integrity.""" | ||
data_monitoring = DataIntegrityMonitoring() | ||
return jsonify(data_monitoring.info()) |
Oops, something went wrong.