diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 58c4574..51e7bc4 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -43,7 +43,7 @@ jobs: id: set-tag uses: docker/metadata-action@v3 with: - images: reivilibre/panopticon + images: matrixdotorg/panopticon tags: | type=ref,event=tag,prefix=release- diff --git a/.github/workflows/docker_aggregate.yml b/.github/workflows/docker_aggregate.yml index 4fed011..abe7b85 100644 --- a/.github/workflows/docker_aggregate.yml +++ b/.github/workflows/docker_aggregate.yml @@ -43,7 +43,7 @@ jobs: id: set-tag uses: docker/metadata-action@v3 with: - images: reivilibre/panopticon + images: matrixdotorg/panopticon tags: | type=ref,event=tag,prefix=aggregation-release- # we want all tags to be prefixed with aggregate- (to distinguish from diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 90ded53..3f79e4d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -5,7 +5,7 @@ on: pull_request: jobs: run-unit-tests: - name: Unit tests + name: Unit tests (Panopticon) runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -17,3 +17,30 @@ jobs: - run: go get github.com/go-sql-driver/mysql - run: go build - run: ./runtests.sh + + run-unit-tests-aggregate: + services: + mariadb: + image: mariadb:latest + ports: + - 3306 + env: + MARIADB_USER: maria + MARIADB_PASSWORD: passypass + MARIADB_DATABASE: teststats + MARIADB_ROOT_PASSWORD: rootyroot + + name: Unit tests (Aggregate) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: sudo apt-get update && sudo apt-get install python3-pip python3 && sudo apt-get clean + - run: pip install green -r requirements.txt + - run: > + cd scripts && + PANOPTICON_DB_NAME=teststats + PANOPTICON_DB_USER=maria + PANOPTICON_DB_PASSWORD=passypass + PANOPTICON_DB_HOST=localhost + PANOPTICON_DB_PORT=${{ job.services.mariadb.ports[3306] }} + green test_aggregate.py -vv diff --git a/.gitignore b/.gitignore index f7efeff..709f6f9 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ _testmain.go venv/* .idea .python-version +/scripts/__pycache__/ diff --git a/RELEASING.md b/RELEASING.md new file mode 100644 index 0000000..792a424 --- /dev/null +++ b/RELEASING.md @@ -0,0 +1,13 @@ +# Releasing (and Deploying) Panopticon — some brief notes + +1. Ensure that you are on a recently-updated copy of the `master` branch and that CI is happy. +2. Based on the changes since the last release, add an entry at the top of the `CHANGELOG.md`, following the style of the prior entries. +3. Set a variable for the version you are releasing, for convenience: `ver=x.y.z`. +4. `git add CHANGELOG.md && git commit -m $ver && git push` +5. Give [the changelog](https://github.com/matrix-org/panopticon/blob/master/CHANGELOG.md) a quick check to ensure everything is in order. +6. When you're ready, tag the release with `git tag -s v$ver` and push it with `git push origin tag v$ver`. +7. Create a release on GitHub: `xdg-open https://github.com/matrix-org/panopticon/releases/edit/v$ver` +8. With any luck, *GitHub Actions* will spring into action, building the Docker images and pushing them to [Docker Hub](https://hub.docker.com/r/matrixdotorg/panopticon/tags?page=1&ordering=last_updated). + + +[Private, infrastructure-specific, instructions](https://gitlab.matrix.org/new-vector/internal/-/wikis/Panopticon) are available to internal members for deploying the 'official' deployment of Panopticon. diff --git a/run-panopticon.py b/run-panopticon.py index 85dff4c..7e9f32b 100644 --- a/run-panopticon.py +++ b/run-panopticon.py @@ -3,6 +3,6 @@ from scripts.aggregate import Config c = Config() -db = "%s:%s@tcp/%s" % (c.DB_USER, c.DB_PASSWORD, c.DB_NAME) +db = "%s:%s@tcp/%s" % (c.db_user, c.db_password, c.db_name) command = "./panopticon --db-driver=mysql --db %s --port 34124" % db os.system(command) diff --git a/scripts/aggregate.py b/scripts/aggregate.py index bfd8f21..195b3a3 100644 --- a/scripts/aggregate.py +++ b/scripts/aggregate.py @@ -3,183 +3,199 @@ # The goal of the aggregate datastore is to improve analytics performance. import pymysql.cursors -import yaml import os import time -from os.path import expanduser from dateutil import tz from datetime import datetime +from pymysql import Connection -class Config: - def __init__(self): - self.DB_NAME = os.environ["PANOPTICON_DB_NAME"] - self.DB_USER = os.environ["PANOPTICON_DB_USER"] - self.DB_PASSWORD = os.environ["PANOPTICON_DB_PASSWORD"] - self.DB_HOST = os.environ["PANOPTICON_DB_HOST"] - self.DB_PORT = int(os.environ["PANOPTICON_DB_PORT"]) +ONE_DAY = 24 * 60 * 60 +# if the aggregation table isn't populated, this (2015-01-01) is the date that +# we will start from. +INITIAL_DAY = 1443657600 -def main(): - CONFIG = Config() +METRIC_COLUMNS = ('total_users', 'total_nonbridged_users', 'total_room_count', 'daily_active_users', 'daily_active_rooms', 'daily_messages', 'daily_sent_messages', 'daily_active_e2ee_rooms', 'daily_e2ee_messages', 'daily_sent_e2ee_messages', 'monthly_active_users', 'r30_users_all', 'r30_users_android', 'r30_users_ios', 'r30_users_electron', 'r30_users_web', 'r30v2_users_all', 'r30v2_users_android', 'r30v2_users_ios', 'r30v2_users_electron', 'r30v2_users_web', 'daily_user_type_native', 'daily_user_type_bridged', 'daily_user_type_guest') - db = pymysql.connect( - host=CONFIG.DB_HOST, - user=CONFIG.DB_USER, - passwd=CONFIG.DB_PASSWORD, - db=CONFIG.DB_NAME, - port=CONFIG.DB_PORT, - ssl={'ssl': {}} - ) - - ONE_DAY = 24 * 60 * 60 +class Config: + def __init__(self): + self.db_name = os.environ["PANOPTICON_DB_NAME"] + self.db_user = os.environ["PANOPTICON_DB_USER"] + self.db_password = os.environ["PANOPTICON_DB_PASSWORD"] + self.db_host = os.environ["PANOPTICON_DB_HOST"] + self.db_port = int(os.environ["PANOPTICON_DB_PORT"]) + + def connect_db(self) -> Connection: + return pymysql.connect( + host=self.db_host, + user=self.db_user, + passwd=self.db_password, + db=self.db_name, + port=self.db_port, + ssl={'ssl': {}} + ) + +def set_up_aggregate_stats_table(db: Connection): # Set up aggregate_stats schema SCHEMA = """ - CREATE TABLE IF NOT EXISTS `aggregate_stats` ( - `day` bigint(20) NOT NULL, - `total_users` bigint(20) DEFAULT NULL, - `total_nonbridged_users` bigint(20) DEFAULT NULL, - `total_room_count` bigint(20) DEFAULT NULL, - `daily_active_users` bigint(20) DEFAULT NULL, - `daily_active_rooms` bigint(20) DEFAULT NULL, - `daily_messages` bigint(20) DEFAULT NULL, - `daily_sent_messages` bigint(20) DEFAULT NULL, - `daily_active_e2ee_rooms` bigint(20) DEFAULT NULL, - `daily_e2ee_messages` bigint(20) DEFAULT NULL, - `daily_sent_e2ee_messages` bigint(20) DEFAULT NULL, - `monthly_active_users` bigint(20) DEFAULT NULL, - `r30_users_all` bigint(20) DEFAULT NULL, - `r30_users_android` bigint(20) DEFAULT NULL, - `r30_users_ios` bigint(20) DEFAULT NULL, - `r30_users_electron` bigint(20) DEFAULT NULL, - `r30_users_web` bigint(20) DEFAULT NULL, - `r30v2_users_all` bigint(20) DEFAULT NULL, - `r30v2_users_android` bigint(20) DEFAULT NULL, - `r30v2_users_ios` bigint(20) DEFAULT NULL, - `r30v2_users_electron` bigint(20) DEFAULT NULL, - `r30v2_users_web` bigint(20) DEFAULT NULL, - `daily_user_type_native` bigint(20) DEFAULT NULL, - `daily_user_type_bridged` bigint(20) DEFAULT NULL, - `daily_user_type_guest` bigint(20) DEFAULT NULL, - `daily_active_homeservers` bigint(20) DEFAULT NULL, - `server_context` text, - PRIMARY KEY (`day`), - UNIQUE KEY `day` (`day`) - ) ENGINE=InnoDB DEFAULT CHARSET=latin1 + CREATE TABLE IF NOT EXISTS `aggregate_stats` ( + `day` bigint(20) NOT NULL, + `total_users` bigint(20) DEFAULT NULL, + `total_nonbridged_users` bigint(20) DEFAULT NULL, + `total_room_count` bigint(20) DEFAULT NULL, + `daily_active_users` bigint(20) DEFAULT NULL, + `daily_active_rooms` bigint(20) DEFAULT NULL, + `daily_messages` bigint(20) DEFAULT NULL, + `daily_sent_messages` bigint(20) DEFAULT NULL, + `daily_active_e2ee_rooms` bigint(20) DEFAULT NULL, + `daily_e2ee_messages` bigint(20) DEFAULT NULL, + `daily_sent_e2ee_messages` bigint(20) DEFAULT NULL, + `monthly_active_users` bigint(20) DEFAULT NULL, + `r30_users_all` bigint(20) DEFAULT NULL, + `r30_users_android` bigint(20) DEFAULT NULL, + `r30_users_ios` bigint(20) DEFAULT NULL, + `r30_users_electron` bigint(20) DEFAULT NULL, + `r30_users_web` bigint(20) DEFAULT NULL, + `r30v2_users_all` bigint(20) DEFAULT NULL, + `r30v2_users_android` bigint(20) DEFAULT NULL, + `r30v2_users_ios` bigint(20) DEFAULT NULL, + `r30v2_users_electron` bigint(20) DEFAULT NULL, + `r30v2_users_web` bigint(20) DEFAULT NULL, + `daily_user_type_native` bigint(20) DEFAULT NULL, + `daily_user_type_bridged` bigint(20) DEFAULT NULL, + `daily_user_type_guest` bigint(20) DEFAULT NULL, + `daily_active_homeservers` bigint(20) DEFAULT NULL, + `server_context` text, + PRIMARY KEY (`day`), + UNIQUE KEY `day` (`day`) + ) ENGINE=InnoDB DEFAULT CHARSET=latin1 """ - while True: + create_table(db, SCHEMA) - create_table(db, SCHEMA) - with db.cursor() as cursor: - start_date_query = """ - SELECT day from aggregate_stats - ORDER BY day DESC - LIMIT 1 - """ - cursor.execute(start_date_query) - try: - last_day_in_db = cursor.fetchone()[0] - except: - # If no data to read assume is empty revert to 2015-10-01 - # which is when the stats table is populated from. - last_day_in_db = 1443657600 +def main(): + configuration = Config() + + db = configuration.connect_db() + + set_up_aggregate_stats_table(db) + + while True: now = datetime.utcnow().date() today = int(datetime(now.year, now.month, now.day, tzinfo=tz.tzutc()).strftime('%s')) - processing_day = last_day_in_db + ONE_DAY - - while processing_day < today: - with db.cursor() as cursor: - # Need to filter on "AND total_users > 0" since some installs - # run with a standby unused server with an empty db. This means - # that picking a recent entry for a given server is likely to - # under report. Filtering on total_users removes the standbys. - # It also filters out genuinely unused servers, but the value of - # aggregating these servers is limited. - query = """ - SELECT - SUM(total_users) as 'total_users', - SUM(total_nonbridged_users) as 'total_nonbridged_users', - SUM(total_room_count) as 'total_room_count', - SUM(daily_active_users) as 'daily_active_users', - SUM(daily_active_rooms) as 'daily_active_rooms', - SUM(daily_messages) as 'daily_messages', - SUM(daily_sent_messages) as 'daily_sent_messages', - SUM(daily_active_e2ee_rooms) as 'daily_active_e2ee_rooms', - SUM(daily_e2ee_messages) as 'daily_e2ee_messages', - SUM(daily_sent_e2ee_messages) as 'daily_sent_e2ee_messages', - SUM(monthly_active_users) as 'monthly_active_users', - SUM(r30_users_all) as 'r30_users_all', - SUM(r30_users_android) as 'r30_users_android', - SUM(r30_users_ios) as 'r30_users_ios', - SUM(r30_users_electron) as 'r30_users_electron', - SUM(r30_users_web) as 'r30_users_web', - SUM(r30v2_users_all) as 'r30v2_users_all', - SUM(r30v2_users_android) as 'r30v2_users_android', - SUM(r30v2_users_ios) as 'r30v2_users_ios', - SUM(r30v2_users_electron) as 'r30v2_users_electron', - SUM(r30v2_users_web) as 'r30v2_users_web', - SUM(daily_user_type_native) as 'daily_user_type_native', - SUM(daily_user_type_bridged) as 'daily_user_type_bridged', - SUM(daily_user_type_guest) as 'daily_user_type_guest', - COUNT(homeserver) as 'homeserver' - FROM ( - SELECT *, MAX(local_timestamp) - FROM stats - WHERE local_timestamp >= %s and local_timestamp < %s - AND total_users > 0 - GROUP BY homeserver - ) as s; - """ - - date_range = (processing_day, processing_day + ONE_DAY) - cursor.execute(query, date_range) - result = cursor.fetchone() - - insert_query = """ - INSERT into aggregate_stats - ( - day, - total_users, - total_nonbridged_users, - total_room_count, - daily_active_users, - daily_active_rooms, - daily_messages, - daily_sent_messages, - daily_active_e2ee_rooms, - daily_e2ee_messages, - daily_sent_e2ee_messages, - monthly_active_users, - r30_users_all, - r30_users_android, - r30_users_ios, - r30_users_electron, - r30_users_web, - r30v2_users_all, - r30v2_users_android, - r30v2_users_ios, - r30v2_users_electron, - r30v2_users_web, - daily_user_type_native, - daily_user_type_bridged, - daily_user_type_guest, - daily_active_homeservers, - server_context - ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - """ - insert_data = [x if x is None else int(x) for x in result] - # insert day at the front - insert_data.insert(0, processing_day) - # append context at the end - insert_data.append(None) - cursor.execute(insert_query, insert_data) - db.commit() - processing_day = processing_day + ONE_DAY + aggregate_until_today(db, today) time.sleep(ONE_DAY) + +def aggregate_until_today(db: Connection, today: int): + with db.cursor() as cursor: + start_date_query = """ + SELECT day from aggregate_stats + ORDER BY day DESC + LIMIT 1 + """ + cursor.execute(start_date_query) + try: + last_day_in_db = cursor.fetchone()[0] + except TypeError: + # If no data to read assume is empty revert to 2015-10-01 + # which is when the stats table is populated from. + last_day_in_db = INITIAL_DAY + + processing_day = last_day_in_db + ONE_DAY + while processing_day < today: + with db.cursor() as cursor: + # Need to filter on "AND total_users > 0" since some installs + # run with a standby unused server with an empty db. This means + # that picking a recent entry for a given server is likely to + # under report. Filtering on total_users removes the standbys. + # It also filters out genuinely unused servers, but the value of + # aggregating these servers is limited. + query = """ + SELECT + SUM(total_users) as 'total_users', + SUM(total_nonbridged_users) as 'total_nonbridged_users', + SUM(total_room_count) as 'total_room_count', + SUM(daily_active_users) as 'daily_active_users', + SUM(daily_active_rooms) as 'daily_active_rooms', + SUM(daily_messages) as 'daily_messages', + SUM(daily_sent_messages) as 'daily_sent_messages', + SUM(daily_active_e2ee_rooms) as 'daily_active_e2ee_rooms', + SUM(daily_e2ee_messages) as 'daily_e2ee_messages', + SUM(daily_sent_e2ee_messages) as 'daily_sent_e2ee_messages', + SUM(monthly_active_users) as 'monthly_active_users', + SUM(r30_users_all) as 'r30_users_all', + SUM(r30_users_android) as 'r30_users_android', + SUM(r30_users_ios) as 'r30_users_ios', + SUM(r30_users_electron) as 'r30_users_electron', + SUM(r30_users_web) as 'r30_users_web', + SUM(r30v2_users_all) as 'r30v2_users_all', + SUM(r30v2_users_android) as 'r30v2_users_android', + SUM(r30v2_users_ios) as 'r30v2_users_ios', + SUM(r30v2_users_electron) as 'r30v2_users_electron', + SUM(r30v2_users_web) as 'r30v2_users_web', + SUM(daily_user_type_native) as 'daily_user_type_native', + SUM(daily_user_type_bridged) as 'daily_user_type_bridged', + SUM(daily_user_type_guest) as 'daily_user_type_guest', + COUNT(homeserver) as 'homeserver' + FROM ( + SELECT *, MAX(local_timestamp) + FROM stats + WHERE local_timestamp >= %s and local_timestamp < %s + AND total_users > 0 + GROUP BY homeserver + ) as s; + """ + + date_range = (processing_day, processing_day + ONE_DAY) + cursor.execute(query, date_range) + result = cursor.fetchone() + + insert_query = """ + INSERT into aggregate_stats + ( + day, + total_users, + total_nonbridged_users, + total_room_count, + daily_active_users, + daily_active_rooms, + daily_messages, + daily_sent_messages, + daily_active_e2ee_rooms, + daily_e2ee_messages, + daily_sent_e2ee_messages, + monthly_active_users, + r30_users_all, + r30_users_android, + r30_users_ios, + r30_users_electron, + r30_users_web, + r30v2_users_all, + r30v2_users_android, + r30v2_users_ios, + r30v2_users_electron, + r30v2_users_web, + daily_user_type_native, + daily_user_type_bridged, + daily_user_type_guest, + daily_active_homeservers, + server_context + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s)""" + insert_data = [x if x is None else int(x) for x in result] + # insert day at the front + insert_data.insert(0, processing_day) + # append context at the end + insert_data.append(None) + cursor.execute(insert_query, insert_data) + db.commit() + processing_day = processing_day + ONE_DAY + + def create_table(db, schema): """This method executes a CREATE TABLE IF NOT EXISTS command _without_ generating a mysql warning if the table already exists.""" diff --git a/scripts/test_aggregate.py b/scripts/test_aggregate.py new file mode 100644 index 0000000..a27e690 --- /dev/null +++ b/scripts/test_aggregate.py @@ -0,0 +1,149 @@ +from typing import Dict, Optional +from unittest import TestCase + +from pymysql.cursors import Cursor + +from aggregate import Config +from aggregate import set_up_aggregate_stats_table +from aggregate import METRIC_COLUMNS +from aggregate import INITIAL_DAY, aggregate_until_today +from aggregate import ONE_DAY + + +def insert_recording( + cursor: Cursor, + homeserver: str, + timestamp: int, + metrics: Dict[str, int], + remote_addr: str = "192.42.42.42", +): + """ + Insert a row that emulates a row that Panopticon would update after a server + phones home. + """ + metric_set_lines = ",\n".join(f"`{metric}` = %s" for metric in METRIC_COLUMNS) + cursor.execute( + f""" + INSERT INTO stats + SET + homeserver = %s, + local_timestamp = %s, + remote_timestamp = %s, + remote_addr = %s, + forwarded_for = %s, + user_agent = %s, + {metric_set_lines}; + """, + (homeserver, timestamp, timestamp, remote_addr, remote_addr, "FakeStats/42.x.y") + + tuple(metrics.values()), + ) + + +def select_aggregate(cursor: Cursor, day: int) -> Optional[Dict[str, int]]: + """ + Select the aggregated statistics for a given day. + """ + + extra_columns = ("daily_active_homeservers",) + all_columns = METRIC_COLUMNS + extra_columns + + metric_select_lines = "\n,".join(f"`{metric}`" for metric in all_columns) + cursor.execute( + f""" + SELECT + {metric_select_lines}, + daily_active_homeservers + FROM aggregate_stats + WHERE day = %s + """, + (day,), + ) + row = cursor.fetchone() + if row is None: + return None + else: + return dict(zip(all_columns, row)) + + +class AggregateTestCase(TestCase): + def setUp(self) -> None: + self.config = Config() + db = self.config.connect_db() + with db.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS aggregate_stats;") + cursor.execute("DROP TABLE IF EXISTS stats;") + metric_lines = ",\n".join(f"`{metric}` BIGINT" for metric in METRIC_COLUMNS) + cursor.execute( + f""" + CREATE TABLE stats ( + id INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT, + homeserver VARCHAR(256), + local_timestamp BIGINT, + remote_timestamp BIGINT, + remote_addr TEXT, + forwarded_for TEXT, + user_agent TEXT, + {metric_lines} + ); + """ + ) + set_up_aggregate_stats_table(db) + + def test_sum_of_metrics(self): + """ + Tests that the aggregator reports the sum of metrics. + """ + + db = self.config.connect_db() + with db.cursor() as cursor: + insert_recording( + cursor, + "hs1", + INITIAL_DAY + ONE_DAY + 300, + {metric: 1 for metric in METRIC_COLUMNS}, + ) + insert_recording( + cursor, + "hs2", + INITIAL_DAY + ONE_DAY + 300, + {metric: 3 for metric in METRIC_COLUMNS}, + ) + + aggregate_until_today(db, today=INITIAL_DAY + 2 * ONE_DAY) + + with db.cursor() as cursor: + row = select_aggregate(cursor, INITIAL_DAY + ONE_DAY) + self.assertIsNot(row, None) + self.assertEqual(row["total_users"], 4) + + def test_empty_homeservers_not_counted(self): + """ + Tests that empty servers are not counted (because they are likely to be + standby backup servers). + """ + + db = self.config.connect_db() + with db.cursor() as cursor: + insert_recording( + cursor, + "hs1", + INITIAL_DAY + ONE_DAY + 300, + {metric: 1 for metric in METRIC_COLUMNS}, + ) + insert_recording( + cursor, + "hs2-standby-backup", + INITIAL_DAY + ONE_DAY + 300, + dict( + {metric: 3 for metric in METRIC_COLUMNS}, + total_users=0, + ), + ) + + aggregate_until_today(db, today=INITIAL_DAY + 2 * ONE_DAY) + + with db.cursor() as cursor: + row = select_aggregate(cursor, INITIAL_DAY + ONE_DAY) + self.assertIsNot(row, None) + self.assertEqual(row["total_users"], 1) + self.assertEqual(row["daily_active_homeservers"], 1)