Skip to content

Commit

Permalink
Neilj/aggregate (#13)
Browse files Browse the repository at this point in the history
Aggregate panopticon stats into daily buckets to improve performance.
  • Loading branch information
neilisfragile authored Jul 1, 2019
1 parent 3f71deb commit fd820ca
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ _testmain.go
*.exe
*.test
*.prof

venv/*

12 changes: 12 additions & 0 deletions Dockerfile-aggregate
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM python:3-alpine

WORKDIR /usr/src/app

COPY requirements.txt ./

RUN pip install --no-cache-dir -r requirements.txt

COPY scripts/aggregate.py .

CMD [ "python", "./aggregate.py" ]

10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ To add new tests, crib exiting files in the `tests` directory.

# Deployment using docker image

Set the environment variables
Set the environment variables for the go image
* `PANOPTICON_DB_DRIVER` (eg, mysql or sqlite)
* `PANOPTICON_DB` (go mysql connection string or filename for sqlite)
* `PANOPTICON_PORT` (http port to expose panopticon on)

Set the environment variables for the python image
* `PANOPTICON_DB_NAME`
* `PANOPTICON_DB_USER`
* `PANOPTICON_DB_PASSWORD`
* `PANOPTICON_DB_HOST`
* `PANOPTICON_DB_PORT`


6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type StatsReport struct {
PythonVersion string `json:"python_version"`
DatabaseEngine string `json:"database_engine"`
DatabaseServerVersion string `json:"database_server_version"`
ServerContext string `json:"server_context"`
RemoteAddr string
XForwardedFor string
UserAgent string
Expand Down Expand Up @@ -151,6 +152,8 @@ func (r *Recorder) Save(sr StatsReport) error {
cols, vals = appendIfNonEmpty(cols, vals, "database_engine", sr.DatabaseEngine)
cols, vals = appendIfNonEmpty(cols, vals, "database_server_version", sr.DatabaseServerVersion)

cols, vals = appendIfNonEmpty(cols, vals, "server_context", sr.ServerContext)

var valuePlaceholders []string
for i := range vals {
if *dbDriver == "mysql" {
Expand Down Expand Up @@ -237,7 +240,8 @@ func createTable(db *sql.DB) error {
daily_user_type_guest BIGINT,
python_version TEXT,
database_engine TEXT,
database_server_version TEXT
database_server_version TEXT,
server_context TEXT
)`)

return err
Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
PyMySQL==0.9.3
python-dateutil==2.8.0
PyYAML==5.1
six==1.12.0
8 changes: 8 additions & 0 deletions run-panopticon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python
import os
from scripts.aggregate import Config

c = Config()
db = "%s:%s@tcp/%s" % (c.DB_USER, c.DB_PASSWORD, c.DB_NAME)
command = "./panopticon --db-driver=mysql --db %s --port 34124" % db
os.system(command)
159 changes: 159 additions & 0 deletions scripts/aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#!/usr/bin/env python
# Script to read the stats table and aggregate results down to the sums per day
# The goal of the aggregate datastore is to improve analytics performance.

import pymysql.cursors
import yaml
import os
import time
from os.path import expanduser
from dateutil import tz
from datetime import datetime


class Config:
def __init__(self):
self.DB_NAME = os.environ["PANOPTICON_DB_NAME"]
self.DB_USER = os.environ["PANOPTICON_DB_USER"]
self.DB_PASSWORD = os.environ["PANOPTICON_DB_PASSWORD"]
self.DB_HOST = os.environ["PANOPTICON_DB_HOST"]
self.DB_PORT = int(os.environ["PANOPTICON_DB_PORT"])

def main():
CONFIG = Config()

db = pymysql.connect(
host=CONFIG.DB_HOST,
user=CONFIG.DB_USER,
passwd=CONFIG.DB_PASSWORD,
db=CONFIG.DB_NAME,
port=CONFIG.DB_PORT,
ssl={'ssl': {}}
)

ONE_DAY = 24 * 60 * 60

# Set up aggregate_stats schema
SCHEMA = """
CREATE TABLE IF NOT EXISTS `aggregate_stats` (
`day` bigint(20) NOT NULL,
`total_users` bigint(20) DEFAULT NULL,
`total_nonbridged_users` bigint(20) DEFAULT NULL,
`total_room_count` bigint(20) DEFAULT NULL,
`daily_active_users` bigint(20) DEFAULT NULL,
`daily_active_rooms` bigint(20) DEFAULT NULL,
`daily_messages` bigint(20) DEFAULT NULL,
`daily_sent_messages` bigint(20) DEFAULT NULL,
`r30_users_all` bigint(20) DEFAULT NULL,
`r30_users_android` bigint(20) DEFAULT NULL,
`r30_users_ios` bigint(20) DEFAULT NULL,
`r30_users_electron` bigint(20) DEFAULT NULL,
`r30_users_web` bigint(20) DEFAULT NULL,
`daily_user_type_native` bigint(20) DEFAULT NULL,
`daily_user_type_bridged` bigint(20) DEFAULT NULL,
`daily_user_type_guest` bigint(20) DEFAULT NULL,
`daily_active_homeservers` bigint(20) DEFAULT NULL,
`server_context` text,
PRIMARY KEY (`day`),
UNIQUE KEY `day` (`day`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
"""

while True:

create_table(db, SCHEMA)
with db.cursor() as cursor:
start_date_query = """
SELECT day from aggregate_stats
ORDER BY day DESC
LIMIT 1
"""
cursor.execute(start_date_query)
try:
last_day_in_db = cursor.fetchone()[0]
except:
# If no data to read assume is empty revert to 2015-10-01
# which is when the stats table is populated from.
last_day_in_db = 1443657600

now = datetime.utcnow().date()
today = int(datetime(now.year, now.month, now.day, tzinfo=tz.tzutc()).strftime('%s'))
processing_day = last_day_in_db + ONE_DAY

while processing_day < today:
with db.cursor() as cursor:
query = """
SELECT
SUM(total_users) as 'total_users',
SUM(total_nonbridged_users) as 'total_nonbridged_users',
SUM(total_room_count) as 'total_room_count',
SUM(daily_active_users) as 'daily_active_users',
SUM(daily_active_rooms) as 'daily_active_rooms',
SUM(daily_messages) as 'daily_messages',
SUM(daily_sent_messages) as 'daily_sent_messages',
SUM(r30_users_all) as 'r30_users_all',
SUM(r30_users_android) as 'r30_users_android',
SUM(r30_users_ios) as 'r30_users_ios',
SUM(r30_users_electron) as 'r30_users_electron',
SUM(r30_users_web) as 'r30_users_web',
SUM(daily_user_type_native) as 'daily_user_type_native',
SUM(daily_user_type_bridged) as 'daily_user_type_bridged',
SUM(daily_user_type_guest) as 'daily_user_type_guest',
COUNT(homeserver) as 'homeserver'
FROM (
SELECT *, MAX(local_timestamp)
FROM stats
WHERE local_timestamp >= %s and local_timestamp < %s
GROUP BY homeserver
) as s;
"""

date_range = (processing_day, processing_day + ONE_DAY)
cursor.execute(query, date_range)
result = cursor.fetchone()

insert_query = """
INSERT into aggregate_stats
(
day,
total_users,
total_nonbridged_users,
total_room_count,
daily_active_users,
daily_active_rooms,
daily_messages,
daily_sent_messages,
r30_users_all,
r30_users_android,
r30_users_ios,
r30_users_electron,
r30_users_web,
daily_user_type_native,
daily_user_type_bridged,
daily_user_type_guest,
daily_active_homeservers,
server_context
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, %s, %s, %s)
"""
insert_data = [x if x is None else int(x) for x in result]
# insert day at the front
insert_data.insert(0, processing_day)
# append context at the end
insert_data.append(None)
cursor.execute(insert_query, insert_data)
db.commit()
processing_day = processing_day + ONE_DAY
time.sleep(ONE_DAY)

def create_table(db, schema):
"""This method executes a CREATE TABLE IF NOT EXISTS command
_without_ generating a mysql warning if the table already exists."""
cursor = db.cursor()
cursor.execute('SET sql_notes = 0;')
cursor.execute(schema)
cursor.execute('SET sql_notes = 1;')
db.commit()


if __name__ == "__main__":
main()
6 changes: 3 additions & 3 deletions tests/test_push_good.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
. $(dirname $0)/setup.sh
log "Testing /push with beyond 0.99.2 pushes"

assert_eq "{}" "$(curl -k -d '{"daily_active_users": 10, "timestamp": 20, "total_users": 123, "total_room_count": 17, "daily_messages": 9, "uptime_seconds": 19, "r30_users_all": 5, "r30_users_android": 4, "r30_users_ios": 3, "r30_users_electron": 2, "r30_users_web": 1, "daily_user_type_native": 21, "daily_user_type_guest": 22, "daily_user_type_bridged": 23, "homeserver": "many.turtles", "memory_rss": 12, "cpu_average": 125, "cache_factor": 5.501, "event_cache_size": 10000, "python_version":"3.6.1", "database_engine":"PostgreSql", "database_server_version":"9.5.0"}' http://localhost:${port}/push 2>/dev/null)"
assert_eq "{}" "$(curl -k -d '{"daily_active_users": 10, "timestamp": 20, "total_users": 123, "total_room_count": 17, "daily_messages": 9, "uptime_seconds": 19, "r30_users_all": 5, "r30_users_android": 4, "r30_users_ios": 3, "r30_users_electron": 2, "r30_users_web": 1, "daily_user_type_native": 21, "daily_user_type_guest": 22, "daily_user_type_bridged": 23, "homeserver": "many.turtles", "memory_rss": 12, "cpu_average": 125, "cache_factor": 5.501, "event_cache_size": 10000, "python_version":"3.6.1", "database_engine":"PostgreSql", "database_server_version":"9.5.0", "server_context":"my_context"}' http://localhost:${port}/push 2>/dev/null)"

assert_eq "10|123|17|9|20|19|5|4|3|2|1|21|22|23|125|12|5.501|10000|3.6.1|PostgreSql|9.5.0" "$(sqlite3 ${dir}/stats.db 'SELECT daily_active_users, total_users, total_room_count, daily_messages, remote_timestamp, uptime_seconds, r30_users_all, r30_users_android, r30_users_ios, r30_users_electron, r30_users_web, daily_user_type_native, daily_user_type_guest, daily_user_type_bridged, cpu_average, memory_rss, cache_factor, event_cache_size, python_version, database_engine, database_server_version FROM stats WHERE homeserver == "many.turtles"')"
assert_eq "10|123|17|9|20|19|5|4|3|2|1|21|22|23|125|12|5.501|10000|3.6.1|PostgreSql|9.5.0|my_context" "$(sqlite3 ${dir}/stats.db 'SELECT daily_active_users, total_users, total_room_count, daily_messages, remote_timestamp, uptime_seconds, r30_users_all, r30_users_android, r30_users_ios, r30_users_electron, r30_users_web, daily_user_type_native, daily_user_type_guest, daily_user_type_bridged, cpu_average, memory_rss, cache_factor, event_cache_size, python_version, database_engine, database_server_version, server_context FROM stats WHERE homeserver == "many.turtles"')"


assert_eq "1" "$(sqlite3 ${dir}/stats.db 'SELECT COUNT(*) AS count FROM stats WHERE homeserver == "many.turtles" AND (remote_addr LIKE "127.0.0.1%" OR remote_addr LIKE "[::1]%")')"
Expand All @@ -21,7 +21,7 @@ assert_eq "10
456" "$(sqlite3 ${dir}/stats.db 'SELECT daily_active_users FROM stats WHERE homeserver == "many.turtles" ORDER BY local_timestamp ASC')"

assert_eq "{}" "$(curl -k -d '{"homeserver": "few.turtles"}' http://localhost:${port}/push 2>/dev/null)"
assert_eq "|||||" "$(sqlite3 ${dir}/stats.db 'SELECT daily_active_users, total_users, total_room_count, daily_messages, remote_timestamp, uptime_seconds FROM stats WHERE homeserver == "few.turtles"')"
assert_eq "|||||||||||||||||||||" "$(sqlite3 ${dir}/stats.db 'SELECT daily_active_users, total_users, total_room_count, daily_messages, remote_timestamp, uptime_seconds, r30_users_all, r30_users_android, r30_users_ios, r30_users_electron, r30_users_web, daily_user_type_native, daily_user_type_guest, daily_user_type_bridged, cpu_average, memory_rss, cache_factor, event_cache_size, python_version, database_engine, database_server_version, server_context FROM stats WHERE homeserver == "few.turtles"')"

assert_eq "{}" "$(curl -k -H "X-Forwarded-For: faraway.turtles" -d '{"homeserver": "proxied.turtles"}' http://localhost:${port}/push 2>/dev/null)"
assert_eq "faraway.turtles" "$(sqlite3 ${dir}/stats.db 'SELECT forwarded_for FROM stats WHERE homeserver == "proxied.turtles"')"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_push_good_before_0.99.2.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash -eu
# You almost certainly do not want to add more tests to this file
. $(dirname $0)/setup.sh
log "Testing /push with 0.27.2 - 0.99.1 pushes"
log "Testing /push with 0.33.6 - 0.99.1 pushes"

assert_eq "{}" "$(curl -k -d '{"daily_active_users": 10, "timestamp": 20, "total_users": 123, "total_room_count": 17, "daily_messages": 9, "uptime_seconds": 19, "r30_users_all": 5, "r30_users_android": 4, "r30_users_ios": 3, "r30_users_electron": 2, "r30_users_web": 1, "daily_user_type_native": 21, "daily_user_type_guest": 22, "daily_user_type_bridged": 23, "homeserver": "many.turtles", "memory_rss": 12, "cpu_average": 125, "cache_factor": 5.501, "event_cache_size": 10000, "python_version":"3.6.1"}' http://localhost:${port}/push 2>/dev/null)"

Expand Down
7 changes: 7 additions & 0 deletions tests/test_push_good_before_0.99.4.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash -eu
# You almost certainly do not want to add more tests to this file
. $(dirname $0)/setup.sh
log "Testing /push with 0.99.1 - 0.99.3 pushes"

assert_eq "{}" "$(curl -k -d '{"daily_active_users": 10, "timestamp": 20, "total_users": 123, "total_room_count": 17, "daily_messages": 9, "uptime_seconds": 19, "r30_users_all": 5, "r30_users_android": 4, "r30_users_ios": 3, "r30_users_electron": 2, "r30_users_web": 1, "daily_user_type_native": 21, "daily_user_type_guest": 22, "daily_user_type_bridged": 23, "homeserver": "many.turtles", "memory_rss": 12, "cpu_average": 125, "cache_factor": 5.501, "event_cache_size": 10000, "python_version":"3.6.1", "database_engine":"PostgreSql", "database_server_version":"9.5.0"}' http://localhost:${port}/push 2>/dev/null)"
assert_eq "10|123|17|9|20|19|5|4|3|2|1|21|22|23|125|12|5.501|10000|3.6.1|PostgreSql|9.5.0" "$(sqlite3 ${dir}/stats.db 'SELECT daily_active_users, total_users, total_room_count, daily_messages, remote_timestamp, uptime_seconds, r30_users_all, r30_users_android, r30_users_ios, r30_users_electron, r30_users_web, daily_user_type_native, daily_user_type_guest, daily_user_type_bridged, cpu_average, memory_rss, cache_factor, event_cache_size, python_version, database_engine, database_server_version FROM stats WHERE homeserver == "many.turtles"')"

0 comments on commit fd820ca

Please # to comment.