From 1f7de24bad1660ab8d0a49e2a1a27fc434a6f9ee Mon Sep 17 00:00:00 2001 From: Tom Emeko Date: Fri, 21 Feb 2020 13:37:29 -0500 Subject: [PATCH 1/3] Add original PR code Credit: kannibalox https://github.com/pyroscope/pyrocore/pull/60/files --- src/pyrocore/daemon/webapp.py | 21 ++++++- src/pyrocore/data/config/torque.ini | 10 +-- src/pyrocore/torrent/jobs.py | 98 +++++++++++------------------ src/pyrocore/util/stats.py | 61 ++++++++++++------ 4 files changed, 98 insertions(+), 92 deletions(-) diff --git a/src/pyrocore/daemon/webapp.py b/src/pyrocore/daemon/webapp.py index 83bcca8f..83d94603 100644 --- a/src/pyrocore/daemon/webapp.py +++ b/src/pyrocore/daemon/webapp.py @@ -125,7 +125,7 @@ def json_engine(self, req): # pylint: disable=R0201,W0613 """ Return torrent engine data. """ try: - return stats.engine_data(config.engine) + return wrap_engine_data(config.engine) except (error.LoggableError, xmlrpc.ERRORS) as torrent_exc: raise exc.HTTPInternalServerError(str(torrent_exc)) @@ -252,6 +252,23 @@ def make_app(httpd_config): .add_route("/json/{action}", controller=JsonController(**httpd_config.json)) ) +def wrap_engine_data(engine): + result = stats.engine_data(engine) + + # Build result object + return dict( + now = time.time(), + engine_id = engine.engine_id, + versions = engine.versions, + uptime = engine.uptime, + upload = [result['throttle.global_up.rate'], result['throttle.global_up.max_rate']], + download = [result['throttle.global_down.rate'], result['throttle.global_down.max_rate']], + views = dict([(name, values['size']) + for name, values in result['views'].items() + ]), + ) + + def module_test(): """ Quick test using… @@ -264,7 +281,7 @@ def module_test(): try: engine = connect() print("%s - %s" % (engine.engine_id, engine.open())) - pprint.pprint(stats.engine_data(engine)) + pprint.pprint(wrap_engine_data(engine)) print("%s - %s" % (engine.engine_id, engine.open())) except (error.LoggableError, xmlrpc.ERRORS) as torrent_exc: print("ERROR: %s" % torrent_exc) diff --git a/src/pyrocore/data/config/torque.ini b/src/pyrocore/data/config/torque.ini index 91f8d73e..647409df 100644 --- a/src/pyrocore/data/config/torque.ini +++ b/src/pyrocore/data/config/torque.ini @@ -97,14 +97,8 @@ job.fluxstats.schedule = second=*/15 job.fluxstats.active = False ;job.fluxstats.log_level = DEBUG -; Database name -job.fluxstats.dbname = torque -; Series name for rTorrent data (set empty to disable) -job.fluxstats.series = rtorrent -; Series name for host data (set empty to disable) -job.fluxstats.series_host = host -;job.fluxstats. -;job.fluxstats. +; Series prefix +job.fluxstats.series_prefix = rtorrent_ # Tree watch job.treewatch.handler = pyrocore.torrent.watch:TreeWatch diff --git a/src/pyrocore/torrent/jobs.py b/src/pyrocore/torrent/jobs.py index 3a76faef..b4067115 100644 --- a/src/pyrocore/torrent/jobs.py +++ b/src/pyrocore/torrent/jobs.py @@ -18,13 +18,8 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -try: - import json -except ImportError: - import simplejson as json # pylint: disable=F0401 - import requests -from requests.exceptions import RequestException +from requests.exceptions import RequestException, HTTPError from pyrobase.parts import Bunch from pyrocore import error @@ -32,27 +27,6 @@ from pyrocore.util import fmt, xmlrpc, pymagic, stats -def _flux_engine_data(engine): - """ Return rTorrent data set for pushing to InfluxDB. - """ - data = stats.engine_data(engine) - - # Make it flat - data["up_rate"] = data["upload"][0] - data["up_limit"] = data["upload"][1] - data["down_rate"] = data["download"][0] - data["down_limit"] = data["download"][1] - data["version"] = data["versions"][0] - views = data["views"] - - del data["upload"] - del data["download"] - del data["versions"] - del data["views"] - - return data, views - - class EngineStats(object): """ rTorrent connection statistics logger. """ @@ -99,63 +73,61 @@ def __init__(self, config=None): def _influxdb_url(self): """ Return REST API URL to access time series. """ - url = "{0}/db/{1}/series".format(self.influxdb.url.rstrip('/'), self.config.dbname) + url = "{0}/write?db={1}".format(self.influxdb.url.rstrip('/'), self.config.dbname) if self.influxdb.user and self.influxdb.password: url += "?u={0}&p={1}".format(self.influxdb.user, self.influxdb.password) return url + def _influxdb_data(self): + """ Return statitics data formatted according to InfluxDB's line protocol + """ + datastr = '' + + try: + proxy = config_ini.engine.open() + hostname = proxy.system.hostname() + pid = proxy.system.pid() + data = stats.engine_data(config_ini.engine) + views = data['views'] + del data['views'] + datastr = u"{0}stat,hostname={1},pid={2} ".format( + self.config.series_prefix, hostname, pid) + datastr += ','.join(['='.join([k, str(v)]) for k, v in data.items()]) + '\n' + for view_name, values in views.items(): + vstr = u"{0}view,hostname={1},pid={2},name={3} ".format( + self.config.series_prefix, hostname, pid, view_name) + vstr += ','.join(['='.join([k, str(v)]) for k, v in values.items()]) + datastr += vstr + "\n" + except (error.LoggableError, xmlrpc.ERRORS), exc: + self.LOG.warn("InfluxDB stats: {0}".format(exc)) + return datastr def _push_data(self): """ Push stats data to InfluxDB. """ - if not (self.config.series or self.config.series_host): - self.LOG.info("Misconfigured InfluxDB job, neither 'series' nor 'series_host' is set!") - return - # Assemble data - fluxdata = [] - - if self.config.series: - try: - config_ini.engine.open() - data, views = _flux_engine_data(config_ini.engine) - fluxdata.append(dict( - name=self.config.series, - columns=data.keys(), - points=[data.values()] - )) - fluxdata.append(dict( - name=self.config.series + '_views', - columns=views.keys(), - points=[views.values()] - )) - except (error.LoggableError, xmlrpc.ERRORS), exc: - self.LOG.warn("InfluxDB stats: {0}".format(exc)) - -# if self.config.series_host: -# fluxdata.append(dict( -# name = self.config.series_host, -# columns = .keys(), -# points = [.values()] -# )) - - if not fluxdata: + datastr = self._influxdb_data() + + if not datastr: self.LOG.debug("InfluxDB stats: no data (previous errors?)") return # Encode into InfluxDB data packet fluxurl = self._influxdb_url() - fluxjson = json.dumps(fluxdata) - self.LOG.debug("POST to {0} with {1}".format(fluxurl.split('?')[0], fluxjson)) + self.LOG.debug("POST to {0} with {1}".format(fluxurl.split('?')[0], datastr)) # Push it! try: # TODO: Use a session - requests.post(fluxurl, data=fluxjson, timeout=self.influxdb.timeout) + r = requests.post(fluxurl, data=datastr, timeout=self.influxdb.timeout) + r.raise_for_status() except RequestException, exc: - self.LOG.info("InfluxDB POST error: {0}".format(exc)) + self.LOG.warn("InfluxDB POST error: {0}".format(exc)) + except HTTPError, exc: + self.LOG.warn("InfluxDB POST HTTP error {0}: Response: {1}".format( + str(r.status_code), r.content)) def run(self): diff --git a/src/pyrocore/util/stats.py b/src/pyrocore/util/stats.py index 57291cef..c41b6f13 100644 --- a/src/pyrocore/util/stats.py +++ b/src/pyrocore/util/stats.py @@ -19,36 +19,59 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from __future__ import absolute_import -import time - +from pyrocore import error +from pyrocore.util import xmlrpc def engine_data(engine): """ Get important performance data and metadata from rTorrent. """ - views = ("default", "main", "started", "stopped", "complete", - "incomplete", "seeding", "leeching", "active", "messages") + proxy = engine.open() + views = proxy.view.list() + methods = [ "throttle.global_up.rate", "throttle.global_up.max_rate", "throttle.global_down.rate", "throttle.global_down.max_rate", + "pieces.stats_not_preloaded", "pieces.stats_preloaded", + "system.files.opened_counter", "system.files.failed_counter", "system.files.closed_counter", + "pieces.memory.block_count", "pieces.memory.current", + "network.open_sockets" ] # Get data via multicall - proxy = engine.open() calls = [dict(methodName=method, params=[]) for method in methods] \ + [dict(methodName="view.size", params=['', view]) for view in views] result = proxy.system.multicall(calls, flatten=True) + result_dict = {} + for m in methods: + result_dict[m] = result[0] + del result[0] + result_dict['views'] = {} + for v in views: + result_dict['views'][v] = {} + result_dict['views'][v]['size'] = result[0] + del result[0] + return result_dict + +def module_test(): + """ Quick test using… + + python -m pyrocore.util.stats + """ + import pprint + from pyrocore import connect + + try: + engine = connect() + print("%s - %s" % (engine.engine_id, engine.open())) + + result = engine_data(engine) + print "result = ", + pprint.pprint(result) + + print("%s - %s" % (engine.engine_id, engine.open())) + except (error.LoggableError, xmlrpc.ERRORS), torrent_exc: + print("ERROR: %s" % torrent_exc) + - # Build result object - data = dict( - now = time.time(), - engine_id = engine.engine_id, - versions = engine.versions, - uptime = engine.uptime, - upload = [result[0], result[1]], - download = [result[2], result[3]], - views = dict([(name, result[4+i]) - for i, name in enumerate(views) - ]), - ) - - return data +if __name__ == "__main__": + module_test() From ef7413b3b559464fdc5159004e8a8a84f93d3494 Mon Sep 17 00:00:00 2001 From: Tom Emeko Date: Fri, 21 Feb 2020 13:41:46 -0500 Subject: [PATCH 2/3] Modernize except statements in jobs.py --- src/pyrocore/torrent/jobs.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/pyrocore/torrent/jobs.py b/src/pyrocore/torrent/jobs.py index b4067115..7f4f9ebe 100644 --- a/src/pyrocore/torrent/jobs.py +++ b/src/pyrocore/torrent/jobs.py @@ -49,7 +49,7 @@ def run(self): fmt.human_duration(proxy.system.time() - config_ini.engine.startup, 0, 2, True).strip(), proxy )) - except (error.LoggableError, xmlrpc.ERRORS), exc: + except (error.LoggableError, xmlrpc.ERRORS) as exc: self.LOG.warn(str(exc)) @@ -100,7 +100,7 @@ def _influxdb_data(self): self.config.series_prefix, hostname, pid, view_name) vstr += ','.join(['='.join([k, str(v)]) for k, v in values.items()]) datastr += vstr + "\n" - except (error.LoggableError, xmlrpc.ERRORS), exc: + except (error.LoggableError, xmlrpc.ERRORS) as exc: self.LOG.warn("InfluxDB stats: {0}".format(exc)) return datastr @@ -123,9 +123,9 @@ def _push_data(self): # TODO: Use a session r = requests.post(fluxurl, data=datastr, timeout=self.influxdb.timeout) r.raise_for_status() - except RequestException, exc: + except RequestException as exc: self.LOG.warn("InfluxDB POST error: {0}".format(exc)) - except HTTPError, exc: + except HTTPError as exc: self.LOG.warn("InfluxDB POST HTTP error {0}: Response: {1}".format( str(r.status_code), r.content)) @@ -155,7 +155,7 @@ def module_test(): pprint.pprint(views) print("%s - %s" % (engine.engine_id, engine.open())) - except (error.LoggableError, xmlrpc.ERRORS), torrent_exc: + except (error.LoggableError, xmlrpc.ERRORS) as torrent_exc: print("ERROR: %s" % torrent_exc) From ba9ae001b99fafc8b12d54207f8c24e4d50b41f0 Mon Sep 17 00:00:00 2001 From: Tom Emeko Date: Fri, 21 Feb 2020 13:46:46 -0500 Subject: [PATCH 3/3] Other PR feedback from original PR --- src/pyrocore/torrent/jobs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pyrocore/torrent/jobs.py b/src/pyrocore/torrent/jobs.py index 7f4f9ebe..bd2a17eb 100644 --- a/src/pyrocore/torrent/jobs.py +++ b/src/pyrocore/torrent/jobs.py @@ -76,7 +76,7 @@ def _influxdb_url(self): url = "{0}/write?db={1}".format(self.influxdb.url.rstrip('/'), self.config.dbname) if self.influxdb.user and self.influxdb.password: - url += "?u={0}&p={1}".format(self.influxdb.user, self.influxdb.password) + url += "&u={0}&p={1}".format(self.influxdb.user, self.influxdb.password) return url @@ -121,13 +121,13 @@ def _push_data(self): # Push it! try: # TODO: Use a session - r = requests.post(fluxurl, data=datastr, timeout=self.influxdb.timeout) - r.raise_for_status() + response = requests.post(fluxurl, data=datastr, timeout=self.influxdb.timeout) + response.raise_for_status() except RequestException as exc: self.LOG.warn("InfluxDB POST error: {0}".format(exc)) except HTTPError as exc: self.LOG.warn("InfluxDB POST HTTP error {0}: Response: {1}".format( - str(r.status_code), r.content)) + str(response.status_code), response.content)) def run(self):