From 85ab78b4d4ae629de124e7ed62f11aa3791d9b1c Mon Sep 17 00:00:00 2001 From: wrobstory Date: Fri, 7 Aug 2015 10:45:35 -0700 Subject: [PATCH 1/5] REF/ENH: Change recur_dict func sig to be more reduce-like, add stats_combiner --- malort/core.py | 13 ++--- malort/stats.py | 81 ++++++++++++++++++++++++++++--- malort/tests/test_malort_stats.py | 14 +++--- 3 files changed, 87 insertions(+), 21 deletions(-) diff --git a/malort/core.py b/malort/core.py index bff5cc0..a06e064 100644 --- a/malort/core.py +++ b/malort/core.py @@ -6,9 +6,7 @@ JSON -> Postgres Column types """ -from __future__ import absolute_import -from __future__ import print_function -from __future__ import division +from __future__ import absolute_import, print_function, division from collections import defaultdict import json @@ -18,6 +16,8 @@ import re import time +import dask.bag as db + from malort.stats import recur_dict, dict_generator from malort.type_mappers import TypeMappers @@ -35,15 +35,16 @@ def analyze(path, delimiter='\n', parse_timestamps=True, **kwargs): For flat text files, the JSON blob delimiter parse_timestamps: boolean, default True If True, will attempt to regex match ISO8601 formatted parse_timestamps - kwargs: - passed into json.loads. Here you can specify encoding, etc. + kwargs: + passed into json.loads. Here you can specify encoding, etc. """ stats = {} start_time = time.time() + bag = db.from_filenames(path).map(json.loads) for count, blob in enumerate(dict_generator(path, delimiter, **kwargs), start=1): - recur_dict(blob, stats, parse_timestamps=parse_timestamps) + recur_dict(stats, blob, parse_timestamps=parse_timestamps) elapsed = time.time() - start_time print('Malort run finished: {} JSON blobs analyzed in {} seconds.' diff --git a/malort/stats.py b/malort/stats.py index a6fc222..66bcf0c 100644 --- a/malort/stats.py +++ b/malort/stats.py @@ -6,9 +6,7 @@ Functions to generate Malort stats """ -from __future__ import absolute_import -from __future__ import print_function -from __future__ import division +from __future__ import absolute_import, print_function, division import decimal import json @@ -25,6 +23,7 @@ \d([\.,]\d+)?)?([zZ]|([\+-])([01]\d|2[0-3]):?([0-5]\d)?) ?)?)?$""", re.VERBOSE) + def delimited(file, delimiter='\n', bufsize=4096): buf = '' while True: @@ -38,6 +37,7 @@ def delimited(file, delimiter='\n', bufsize=4096): yield line buf = lines[-1] + def catch_json_error(blob, filepath, **kwargs): """Wrapper to provide better error message for JSON reads""" try: @@ -48,6 +48,7 @@ def catch_json_error(blob, filepath, **kwargs): return parsed + def dict_generator(path, delimiter='\n', **kwargs): """ Given a directory path, return a generator that will return a dict for each @@ -80,11 +81,77 @@ def dict_generator(path, delimiter='\n', **kwargs): else: yield catch_json_error(fread.read(), filepath, **kwargs) + def get_new_mean(value, current_mean, count): """Given a value, current mean, and count, return new mean""" summed = current_mean * count return (summed + value)/(count + 1) + +def combine_means(means, counts): + """Combine ordered iter of means and counts""" + numer = sum([mean * count for mean, count in zip(means, counts) + if mean is not None and count is not None]) + denom = sum([c for c in counts if c is not None]) + return numer / denom + + +def stats_combiner(accum, value): + """ + Combine two sets of stats into one. Used for final dask rollup of + multiple partitions of stats dicts into one unified stats dict. Best + thought of as a reduction over multiple stats object. + + Parameters + ---------- + accum: dict + Accumlating tats dict + value: dict + New stats dict to merge with accumulator + + Returns + ------- + dict + """ + for value_type, type_stats in value.items(): + accum_entry = accum.get(value_type) + if accum_entry: + max_ = (accum_entry.get("max"), value.get("max")) + min_ = (accum_entry.get("min"), value.get("min")) + count = (accum_entry.get("count"), value.get("count")) + mean = (accum_entry.get("mean"), value.get("mean")) + + if any(max_): + accum_entry["max"] = max(max_) + if any(min_): + accum_entry["min"] = min(min_) + if any(count): + accum_entry["count"] = sum([c for c in count if x is not None]) + if any(mean): + accum_entry["mean"] = combine_means(mean, count) + + # Type specific entries + if value_type == "float": + fixed_length = (accum_entry.get("fixed_length"), + value.get("fixed_length")) + if not all(fixed_length): + accum_entry["fixed_length"] = False + + max_prec = (accum_entry.get("max_precision"), + value.get("max_precision")) + accum_entry["max_precision"] = max(max_prec) + + max_scale = (accum_entry.get("max_scale"), + value.get("max_scale")) + accum_entry["max_scale"] = max(max_scale) + elif value_type == "str": + samples = accum_entry.get("sample", []) +\ + value.get("sample", []) + accum_entry["sample"] = random.sample(samples, 3) + + return accum + + def updated_entry_stats(value, current_stats, parse_timestamps=True): """ Given a value and a dict of current statistics, return a dict of new @@ -156,7 +223,7 @@ def updated_entry_stats(value, current_stats, parse_timestamps=True): return value_type, new_stats -def recur_dict(value, stats, parent=None, **kwargs): +def recur_dict(stats, value, parent=None, **kwargs): """ Recurse through a dict `value` and update `stats` for each field. Can handle nested dicts, lists of dicts, and lists of values (must be @@ -170,9 +237,7 @@ def recur_dict(value, stats, parent=None, **kwargs): Parent key to get key nesting depth. kwargs: Options for update_entry_stats """ - parent = parent or '' - def update_stats(current_val, nested_path, base_key): "Updater function" if nested_path not in stats: @@ -187,14 +252,14 @@ def update_stats(current_val, nested_path, base_key): for k, v in value.items(): parent_path = '.'.join([parent, k]) if parent != '' else k if isinstance(v, (list, dict)): - recur_dict(v, stats, parent_path) + recur_dict(stats, v, parent_path) else: update_stats(v, parent_path, k) elif isinstance(value, list): for v in value: if isinstance(v, (list, dict)): - recur_dict(v, stats, parent) + recur_dict(stats, v, parent) else: base_key = parent.split(".")[-1] update_stats(json.dumps(value), parent, base_key) diff --git a/malort/tests/test_malort_stats.py b/malort/tests/test_malort_stats.py index 04c5e90..9ba839f 100644 --- a/malort/tests/test_malort_stats.py +++ b/malort/tests/test_malort_stats.py @@ -124,10 +124,10 @@ def test_recur_simple(self): 'base_key': 'key5'} } - stats = mt.stats.recur_dict(simple1, {}) + stats = mt.stats.recur_dict({}, simple1) self.assertDictEqual(stats, expected) - updated_stats = mt.stats.recur_dict({'key1': 2}, stats) + updated_stats = mt.stats.recur_dict(stats, {'key1': 2}) self.assertDictEqual(updated_stats['key1'], {'int': {'count': 2, 'max': 2, 'mean': 1.5, 'min': 1}, 'base_key': 'key1'}) @@ -177,7 +177,7 @@ def test_recur_depth_one(self): 'min': 8.0}}, 'key5.key4': {'base_key': 'key4', 'bool': {'count': 1}}} - stats = mt.stats.recur_dict(depth_one, {}) + stats = mt.stats.recur_dict({}, depth_one) self.assert_stats(stats, expected) @property @@ -252,7 +252,7 @@ def test_recur_depth_two(self): } } - stats = mt.stats.recur_dict(depth_two, {}) + stats = mt.stats.recur_dict({}, depth_two) self.assert_stats(stats, self.depth_two_expected) def test_recur_with_array(self): @@ -264,7 +264,7 @@ def test_recur_with_array(self): ] } - stats = mt.stats.recur_dict(with_list, {}) + stats = mt.stats.recur_dict({}, with_list) self.assert_stats(stats, self.depth_two_expected) def test_recur_with_val_array(self): @@ -274,7 +274,7 @@ def test_recur_with_val_array(self): "key3": [{"key2": ["foo", "bar"]}] } - stats = mt.stats.recur_dict(with_list, {}) + stats = mt.stats.recur_dict({}, with_list) expected = {'key1': {'base_key': 'key1', 'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}}, @@ -299,4 +299,4 @@ def test_raises_with_list_of_unknown(self): } with pytest.raises(TypeError): - mt.stats.recur_dict(with_values, {}) + mt.stats.recur_dict({}, with_values) From e0ac4f53bc7b40341b5494a8d8461714e08da8b6 Mon Sep 17 00:00:00 2001 From: wrobstory Date: Mon, 10 Aug 2015 16:53:57 -0700 Subject: [PATCH 2/5] REF: Move file reading and stat gen to Dask, update tests --- malort/core.py | 16 +-- malort/stats.py | 98 ++++++++++------ malort/test_helpers.py | 5 +- malort/tests/test_files/test1.json | 6 +- malort/tests/test_files/test2.json | 6 +- malort/tests/test_files/test3.json | 1 + malort/tests/test_files/test4.json | 1 + .../test_pipe_delimited_1 | 1 - .../test_pipe_delimited_2 | 1 - .../test_mult_delimited_1 | 2 + .../test_mult_delimited_2 | 2 + .../test_nl_delimited_1} | 0 .../test_nl_delimited_2 | 2 + malort/tests/test_malort_core.py | 51 +++++---- malort/tests/test_malort_mappers.py | 4 +- malort/tests/test_malort_stats.py | 106 +++++++++++++++++- 16 files changed, 216 insertions(+), 86 deletions(-) create mode 100644 malort/tests/test_files/test3.json create mode 100644 malort/tests/test_files/test4.json delete mode 100644 malort/tests/test_files_delimited/test_pipe_delimited_1 delete mode 100644 malort/tests/test_files_delimited/test_pipe_delimited_2 create mode 100644 malort/tests/test_files_mult_type/test_mult_delimited_1 create mode 100644 malort/tests/test_files_mult_type/test_mult_delimited_2 rename malort/tests/{test_files/test3delimited => test_files_newline_delimited/test_nl_delimited_1} (100%) create mode 100644 malort/tests/test_files_newline_delimited/test_nl_delimited_2 diff --git a/malort/core.py b/malort/core.py index a06e064..672e0dc 100644 --- a/malort/core.py +++ b/malort/core.py @@ -9,6 +9,7 @@ from __future__ import absolute_import, print_function, division from collections import defaultdict +from functools import partial import json import os from os.path import isfile, join, splitext @@ -17,12 +18,13 @@ import time import dask.bag as db +from dask.async import get_sync -from malort.stats import recur_dict, dict_generator +from malort.stats import recur_dict, combine_stats, dict_generator from malort.type_mappers import TypeMappers -def analyze(path, delimiter='\n', parse_timestamps=True, **kwargs): +def analyze(path, parse_timestamps=True, **kwargs): """ Analyze a given directory of either .json or flat text files with delimited JSON to get relevant key statistics. @@ -31,8 +33,6 @@ def analyze(path, delimiter='\n', parse_timestamps=True, **kwargs): ---------- path: string Path to directory - delimiter: string, default newline - For flat text files, the JSON blob delimiter parse_timestamps: boolean, default True If True, will attempt to regex match ISO8601 formatted parse_timestamps kwargs: @@ -42,9 +42,11 @@ def analyze(path, delimiter='\n', parse_timestamps=True, **kwargs): stats = {} start_time = time.time() - bag = db.from_filenames(path).map(json.loads) - for count, blob in enumerate(dict_generator(path, delimiter, **kwargs), start=1): - recur_dict(stats, blob, parse_timestamps=parse_timestamps) + file_list = [os.path.join(path, f) for f in os.listdir(path)] + bag = db.from_filenames(file_list).map(json.loads) + recur_partial = partial(recur_dict, parse_timestamps=parse_timestamps) + stats = bag.fold(recur_partial, combine_stats, initial={}).compute() + count = bag.count().compute() elapsed = time.time() - start_time print('Malort run finished: {} JSON blobs analyzed in {} seconds.' diff --git a/malort/stats.py b/malort/stats.py index 66bcf0c..b719fff 100644 --- a/malort/stats.py +++ b/malort/stats.py @@ -93,10 +93,10 @@ def combine_means(means, counts): numer = sum([mean * count for mean, count in zip(means, counts) if mean is not None and count is not None]) denom = sum([c for c in counts if c is not None]) - return numer / denom + return round(numer / denom, 3) -def stats_combiner(accum, value): +def combine_stats(accum, value): """ Combine two sets of stats into one. Used for final dask rollup of multiple partitions of stats dicts into one unified stats dict. Best @@ -113,41 +113,65 @@ def stats_combiner(accum, value): ------- dict """ - for value_type, type_stats in value.items(): - accum_entry = accum.get(value_type) - if accum_entry: - max_ = (accum_entry.get("max"), value.get("max")) - min_ = (accum_entry.get("min"), value.get("min")) - count = (accum_entry.get("count"), value.get("count")) - mean = (accum_entry.get("mean"), value.get("mean")) - - if any(max_): - accum_entry["max"] = max(max_) - if any(min_): - accum_entry["min"] = min(min_) - if any(count): - accum_entry["count"] = sum([c for c in count if x is not None]) - if any(mean): - accum_entry["mean"] = combine_means(mean, count) - - # Type specific entries - if value_type == "float": - fixed_length = (accum_entry.get("fixed_length"), - value.get("fixed_length")) - if not all(fixed_length): - accum_entry["fixed_length"] = False - - max_prec = (accum_entry.get("max_precision"), - value.get("max_precision")) - accum_entry["max_precision"] = max(max_prec) - - max_scale = (accum_entry.get("max_scale"), - value.get("max_scale")) - accum_entry["max_scale"] = max(max_scale) - elif value_type == "str": - samples = accum_entry.get("sample", []) +\ - value.get("sample", []) - accum_entry["sample"] = random.sample(samples, 3) + for field_name, type_stats in value.items(): + if accum.get(field_name): + for value_type, val_stats in type_stats.items(): + accum_entry = accum[field_name].get(value_type) + if not accum_entry: + # If accum doesn't already have an entry, but the + # value does, continue + accum[field_name][value_type] = val_stats + continue + + # base_key is not a statistic to be updated + if value_type == "base_key": + accum_entry = None + + if accum_entry: + max_ = (accum_entry.get("max"), val_stats.get("max")) + min_ = (accum_entry.get("min"), val_stats.get("min")) + count = (accum_entry.get("count"), val_stats.get("count")) + mean = (accum_entry.get("mean"), val_stats.get("mean")) + + if any(max_): + accum_entry["max"] = max(max_) + if any(min_): + accum_entry["min"] = min(min_) + if any(count): + accum_entry["count"] = sum([c for c in count + if c is not None]) + if any(mean): + accum_entry["mean"] = combine_means(mean, count) + + # Type specific entries + if value_type == "float": + + fixed_length = (accum_entry.get("fixed_length"), + val_stats.get("fixed_length")) + + # Decimals not fixed length if prec/scale do not match + a_prec, a_scale = (accum_entry.get("max_precision"), + accum_entry.get("max_scale")) + v_prec, v_scale = (val_stats.get("max_precision"), + val_stats.get("max_scale")) + prec_scale_eq = (a_prec == v_prec, a_scale == v_scale) + + if not all(fixed_length) or not all(prec_scale_eq): + accum_entry["fixed_length"] = False + + max_prec = (accum_entry.get("max_precision"), + val_stats.get("max_precision")) + accum_entry["max_precision"] = max(max_prec) + + max_scale = (accum_entry.get("max_scale"), + val_stats.get("max_scale")) + accum_entry["max_scale"] = max(max_scale) + + elif value_type == "str": + samples = accum_entry.get("sample", []) +\ + val_stats.get("sample", []) + accum_entry["sample"] = random.sample( + samples, min(len(samples), 3)) return accum diff --git a/malort/test_helpers.py b/malort/test_helpers.py index a2169d8..b22fede 100644 --- a/malort/test_helpers.py +++ b/malort/test_helpers.py @@ -12,10 +12,11 @@ TEST_FILES_1 = os.path.normpath(os.path.join(os.path.abspath(__file__), '..', 'tests', 'test_files')) TEST_FILES_2 = os.path.normpath(os.path.join(os.path.abspath(__file__), - '..', 'tests', 'test_files_delimited')) + '..', 'tests', 'test_files_newline_delimited')) TEST_FILES_3 = os.path.normpath(os.path.join(os.path.abspath(__file__), '..', 'tests', 'test_files_nested')) - +TEST_FILES_4 = os.path.normpath(os.path.join(os.path.abspath(__file__), + '..', 'tests', 'test_files_mult_type')) class TestHelpers(unittest.TestCase): diff --git a/malort/tests/test_files/test1.json b/malort/tests/test_files/test1.json index 4cce51e..dbed660 100644 --- a/malort/tests/test_files/test1.json +++ b/malort/tests/test_files/test1.json @@ -1,5 +1 @@ -{"intfield": 5, - "floatfield": 2.345, - "datefield": "2014-09-26 17:00:00", - "charfield": "fixedlength", - "varcharfield": "var"} \ No newline at end of file +{"intfield": 5,"floatfield": 2.345,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "var"} diff --git a/malort/tests/test_files/test2.json b/malort/tests/test_files/test2.json index a1acbac..96a6b2c 100644 --- a/malort/tests/test_files/test2.json +++ b/malort/tests/test_files/test2.json @@ -1,5 +1 @@ -{"intfield": 10, - "floatfield": 4.7891, - "datefield": "2014-09-26 17:00:00", - "charfield": "fixedlength", - "varcharfield": "varyin"} \ No newline at end of file +{"intfield": 10,"floatfield": 4.7891,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "varyin"} diff --git a/malort/tests/test_files/test3.json b/malort/tests/test_files/test3.json new file mode 100644 index 0000000..9bc437e --- /dev/null +++ b/malort/tests/test_files/test3.json @@ -0,0 +1 @@ +{"intfield": 15,"floatfield": 3.0012,"charfield": "fixedlength","varcharfield": "varyingle","datefield": "2014-09-26 17:00:00"} diff --git a/malort/tests/test_files/test4.json b/malort/tests/test_files/test4.json new file mode 100644 index 0000000..06d31d5 --- /dev/null +++ b/malort/tests/test_files/test4.json @@ -0,0 +1 @@ +{"intfield": 20,"floatfield": 10.8392,"charfield": "fixedlength","varcharfield": "varyinglengt","datefield": "2014-09-26 17:00:00"} diff --git a/malort/tests/test_files_delimited/test_pipe_delimited_1 b/malort/tests/test_files_delimited/test_pipe_delimited_1 deleted file mode 100644 index 95a7598..0000000 --- a/malort/tests/test_files_delimited/test_pipe_delimited_1 +++ /dev/null @@ -1 +0,0 @@ -{"foo": 10,"bar": 2.0,"baz": "fixed","qux": 10}|{"foo": "foo","bar": true,"baz": 2,"qux": "varyin"} \ No newline at end of file diff --git a/malort/tests/test_files_delimited/test_pipe_delimited_2 b/malort/tests/test_files_delimited/test_pipe_delimited_2 deleted file mode 100644 index 883191a..0000000 --- a/malort/tests/test_files_delimited/test_pipe_delimited_2 +++ /dev/null @@ -1 +0,0 @@ -{"foo": 1000,"bar": "bar","baz": "fixed","qux": "var"}|{"foo": "foo","bar": 4.0,"baz": 1,"qux": "varyingle"} \ No newline at end of file diff --git a/malort/tests/test_files_mult_type/test_mult_delimited_1 b/malort/tests/test_files_mult_type/test_mult_delimited_1 new file mode 100644 index 0000000..53f36d6 --- /dev/null +++ b/malort/tests/test_files_mult_type/test_mult_delimited_1 @@ -0,0 +1,2 @@ +{"foo": 10,"bar": 2.0,"baz": "fixed","qux": 10} +{"foo": "foo","bar": true,"baz": 2,"qux": "varyin"} \ No newline at end of file diff --git a/malort/tests/test_files_mult_type/test_mult_delimited_2 b/malort/tests/test_files_mult_type/test_mult_delimited_2 new file mode 100644 index 0000000..92e14a8 --- /dev/null +++ b/malort/tests/test_files_mult_type/test_mult_delimited_2 @@ -0,0 +1,2 @@ +{"foo": 1000,"bar": "bar","baz": "fixed","qux": "var"} +{"foo": "foo","bar": 4.0,"baz": 1,"qux": "varyingle"} \ No newline at end of file diff --git a/malort/tests/test_files/test3delimited b/malort/tests/test_files_newline_delimited/test_nl_delimited_1 similarity index 100% rename from malort/tests/test_files/test3delimited rename to malort/tests/test_files_newline_delimited/test_nl_delimited_1 diff --git a/malort/tests/test_files_newline_delimited/test_nl_delimited_2 b/malort/tests/test_files_newline_delimited/test_nl_delimited_2 new file mode 100644 index 0000000..24484a0 --- /dev/null +++ b/malort/tests/test_files_newline_delimited/test_nl_delimited_2 @@ -0,0 +1,2 @@ +{"intfield": 5,"floatfield": 2.345,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "var"} +{"intfield": 10,"floatfield": 4.7891,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "varyin"} \ No newline at end of file diff --git a/malort/tests/test_malort_core.py b/malort/tests/test_malort_core.py index 7443201..b7fe0d2 100644 --- a/malort/tests/test_malort_core.py +++ b/malort/tests/test_malort_core.py @@ -13,18 +13,16 @@ import malort as mt from malort.test_helpers import (TestHelpers, TEST_FILES_1, TEST_FILES_2, - TEST_FILES_3) + TEST_FILES_3, TEST_FILES_4) class TestCore(TestHelpers): - def test_files_1(self): - mtresult = mt.analyze(TEST_FILES_1) - expected = { + expected_1_and_2 = { 'charfield': {'str': {'count': 4, 'max': 11, 'mean': 11.0, 'min': 11, 'sample': ['fixedlength']}, 'base_key': 'charfield'}, - 'floatfield': {'float': {'count': 4, 'max': 10.8392, 'mean': 5.243, + 'floatfield': {'float': {'count': 4, 'max': 10.8392, 'mean': 5.244, 'min': 2.345, 'max_precision': 6, 'max_scale': 4, 'fixed_length': False}, 'base_key': 'floatfield'}, @@ -37,12 +35,35 @@ def test_files_1(self): 'varyinglengt']}, 'base_key': 'varcharfield'}, 'datefield': {'datetime': {'count': 4}, 'base_key': 'datefield'} - } - self.assert_stats(mtresult.stats, expected) + } + + def test_files_1(self): + mtresult = mt.analyze(TEST_FILES_1) + self.assert_stats(mtresult.stats, self.expected_1_and_2) self.assertDictEqual(mtresult.get_conflicting_types(), {}) def test_files_2(self): - mtresult = mt.analyze(TEST_FILES_2, '|') + mtresult = mt.analyze(TEST_FILES_2) + self.assert_stats(mtresult.stats, self.expected_1_and_2) + self.assertDictEqual(mtresult.get_conflicting_types(), {}) + + def test_files_3(self): + mtresult = mt.analyze(TEST_FILES_3) + expected = {'baz.qux': {'base_key': 'qux', + 'str': {'count': 3, + 'max': 5, + 'mean': 3.667, + 'min': 3, + 'sample': ['One', 'Two', 'Three']}}, + 'foo.bar': {'base_key': 'bar', + 'int': {'count': 3, 'max': 30, 'mean': 20.0, + 'min': 10}}, + 'qux': {'base_key': 'qux', 'bool': {'count': 1}}} + self.assert_stats(mtresult.stats, expected) + self.assert_stats(mtresult.get_conflicting_types(), expected) + + def test_files_4(self): + mtresult = mt.analyze(TEST_FILES_4) expected = { 'bar': {'bool': {'count': 1}, 'float': {'count': 2, 'max': 4.0, 'mean': 3.0, 'min': 2.0, @@ -64,21 +85,7 @@ def test_files_2(self): 'sample': ['var', 'varyin', 'varyingle']}, 'base_key': 'qux'} } - self.assert_stats(mtresult.stats, expected) - self.assert_stats(mtresult.get_conflicting_types(), expected) - def test_files_3(self): - mtresult = mt.analyze(TEST_FILES_3) - expected = {'baz.qux': {'base_key': 'qux', - 'str': {'count': 3, - 'max': 5, - 'mean': 3.667, - 'min': 3, - 'sample': ['One', 'Two', 'Three']}}, - 'foo.bar': {'base_key': 'bar', - 'int': {'count': 3, 'max': 30, 'mean': 20.0, - 'min': 10}}, - 'qux': {'base_key': 'qux', 'bool': {'count': 1}}} self.assert_stats(mtresult.stats, expected) self.assert_stats(mtresult.get_conflicting_types(), expected) diff --git a/malort/tests/test_malort_mappers.py b/malort/tests/test_malort_mappers.py index 108507d..c398677 100644 --- a/malort/tests/test_malort_mappers.py +++ b/malort/tests/test_malort_mappers.py @@ -9,7 +9,7 @@ import malort as mt from malort.test_helpers import (TestHelpers, TEST_FILES_1, TEST_FILES_2, - TEST_FILES_3) + TEST_FILES_3, TEST_FILES_4) class TestTypeMappers(unittest.TestCase): @@ -27,7 +27,7 @@ def test_files_1(self): self.assertDictEqual(types, expected) def test_files_2(self): - mtresult = mt.analyze(TEST_FILES_2, '|') + mtresult = mt.analyze(TEST_FILES_4) types = mtresult.get_redshift_types() for k, v in types.items(): self.assertEqual(v, 'Multiple types detected.') diff --git a/malort/tests/test_malort_stats.py b/malort/tests/test_malort_stats.py index 9ba839f..8e99f00 100644 --- a/malort/tests/test_malort_stats.py +++ b/malort/tests/test_malort_stats.py @@ -24,10 +24,6 @@ def test_json_files_newline(self): gen = mt.stats.dict_generator(TEST_FILES_1) self.assertEquals(len([d for d in gen]), 4) - def test_json_files_pipe(self): - gen = mt.stats.dict_generator(TEST_FILES_2, '|') - self.assertEquals(len([d for d in gen]), 4) - class TestUpdateEntryStats(TestHelpers): @@ -300,3 +296,105 @@ def test_raises_with_list_of_unknown(self): with pytest.raises(TypeError): mt.stats.recur_dict({}, with_values) + + +class TestStatsCombiner(TestHelpers): + + def test_simple_stat_agg(self): + accum = { + 'key1': {'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1'}, + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + } + value = { + 'key1': {'int': {'count': 1, 'max': 4, 'mean': 1.0, 'min': 4}, + 'base_key': 'key1'}, + 'key2': {'str': {'count': 9, 'max': 5, 'mean': 6.0, 'min': 0, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + } + combined = mt.stats.combine_stats(accum, value) + expected = { + 'key1': {'int': {'count': 2, 'max': 4, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1'}, + 'key2': {'str': {'count': 10, 'max': 5, 'mean': 5.7, 'min': 0, + 'sample': ['Foo', 'Foo']}, + 'base_key': 'key2'}, + } + self.assert_stats(combined, expected) + + def test_value_missing_key(self): + accum = { + 'key1': {'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1', + 'float': {'count': 1, 'fixed_length': True, 'max': 4.0, + 'max_precision': 2, 'max_scale': 1, 'mean': 4.0, + 'min': 4.0}}, + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + } + value = { + 'key1': {'int': {'count': 1, 'max': 4, 'mean': 1.0, 'min': 4}, + 'base_key': 'key1', + 'float': {'count': 12, 'fixed_length': False, 'max': 2.0, + 'max_precision': 10, 'max_scale': 0, + 'mean': 10.0, 'min': 1.0}} + } + combined = mt.stats.combine_stats(accum, value) + expected = { + 'key1': {'int': {'count': 2, 'max': 4, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1', + 'float': {'count': 13, 'fixed_length': False, 'max': 4.0, + 'max_precision': 10, 'max_scale': 1, + 'mean': 9.538, 'min': 1.0}}, + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'} + } + self.assert_stats(combined, expected) + + def test_accum_missing_key(self): + accum = { + 'key1': {'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1'} + } + + value = { + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + 'key1': {'str': {'count': 1, 'max': 2, 'mean': 2.0, 'min': 2, + 'sample': ['Foo']}} + } + combined = mt.stats.combine_stats(accum, value) + expected = { + 'key1': {'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}, + 'str': {'count': 1, 'max': 2, 'mean': 2.0, 'min': 2, + 'sample': ['Foo']}, + 'base_key': 'key1'}, + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + } + self.assert_stats(combined, expected) + + def test_mult_sample(self): + samples = ["foo", "bar", "baz", "qux", "Foo", "Bar"] + accum = { + 'key1': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': samples[0:4]}, + 'base_key': 'key2'}, + } + + value = { + 'key1': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': samples[4:]}, + 'base_key': 'key2'}, + } + + combined = mt.stats.combine_stats(accum, value) + sample_key = combined['key1']['str']['sample'] + assert len(set(sample_key).difference(set(samples))) == 0 From dbbf161fde03786d1a33529cd9a52b03c35b70d1 Mon Sep 17 00:00:00 2001 From: wrobstory Date: Tue, 11 Aug 2015 10:55:10 -0700 Subject: [PATCH 3/5] REF: Perform record counting as part of reduce --- malort/core.py | 3 ++- malort/stats.py | 15 ++++++++++++++- malort/test_helpers.py | 3 +++ malort/tests/test_malort_core.py | 6 +++++- malort/tests/test_malort_stats.py | 32 +++++++++++++++++++++++-------- 5 files changed, 48 insertions(+), 11 deletions(-) diff --git a/malort/core.py b/malort/core.py index 672e0dc..2dc9460 100644 --- a/malort/core.py +++ b/malort/core.py @@ -46,7 +46,8 @@ def analyze(path, parse_timestamps=True, **kwargs): bag = db.from_filenames(file_list).map(json.loads) recur_partial = partial(recur_dict, parse_timestamps=parse_timestamps) stats = bag.fold(recur_partial, combine_stats, initial={}).compute() - count = bag.count().compute() + count = stats["total_records"] + del stats["total_records"] elapsed = time.time() - start_time print('Malort run finished: {} JSON blobs analyzed in {} seconds.' diff --git a/malort/stats.py b/malort/stats.py index b719fff..bb98cac 100644 --- a/malort/stats.py +++ b/malort/stats.py @@ -113,7 +113,15 @@ def combine_stats(accum, value): ------- dict """ + for field_name, type_stats in value.items(): + + # Update total count + if field_name == "total_records": + accum["total_records"] = accum["total_records"] + type_stats + continue + + # Combine accum stats from different branches if accum.get(field_name): for value_type, val_stats in type_stats.items(): accum_entry = accum[field_name].get(value_type) @@ -125,7 +133,7 @@ def combine_stats(accum, value): # base_key is not a statistic to be updated if value_type == "base_key": - accum_entry = None + continue if accum_entry: max_ = (accum_entry.get("max"), val_stats.get("max")) @@ -262,6 +270,11 @@ def recur_dict(stats, value, parent=None, **kwargs): kwargs: Options for update_entry_stats """ parent = parent or '' + + if parent == '': + total_records = stats.get("total_records") + stats["total_records"] = (total_records + 1) if total_records else 1 + def update_stats(current_val, nested_path, base_key): "Updater function" if nested_path not in stats: diff --git a/malort/test_helpers.py b/malort/test_helpers.py index b22fede..100c75e 100644 --- a/malort/test_helpers.py +++ b/malort/test_helpers.py @@ -23,6 +23,9 @@ class TestHelpers(unittest.TestCase): def assert_stats(self, result, expected): """Test helper for testing stats results""" for key, value in result.items(): + if key == 'total_records': + self.assertEqual(expected['total_records'], value) + continue for typek, typev in value.items(): if typek == 'str': for k, v in typev.items(): diff --git a/malort/tests/test_malort_core.py b/malort/tests/test_malort_core.py index b7fe0d2..9303b29 100644 --- a/malort/tests/test_malort_core.py +++ b/malort/tests/test_malort_core.py @@ -34,16 +34,18 @@ class TestCore(TestHelpers): 'sample': ['var', 'varyin', 'varyingle', 'varyinglengt']}, 'base_key': 'varcharfield'}, - 'datefield': {'datetime': {'count': 4}, 'base_key': 'datefield'} + 'datefield': {'datetime': {'count': 4}, 'base_key': 'datefield'}, } def test_files_1(self): mtresult = mt.analyze(TEST_FILES_1) + self.assertEqual(mtresult.count, 4) self.assert_stats(mtresult.stats, self.expected_1_and_2) self.assertDictEqual(mtresult.get_conflicting_types(), {}) def test_files_2(self): mtresult = mt.analyze(TEST_FILES_2) + self.assertEqual(mtresult.count, 4) self.assert_stats(mtresult.stats, self.expected_1_and_2) self.assertDictEqual(mtresult.get_conflicting_types(), {}) @@ -60,6 +62,7 @@ def test_files_3(self): 'min': 10}}, 'qux': {'base_key': 'qux', 'bool': {'count': 1}}} self.assert_stats(mtresult.stats, expected) + self.assertEqual(mtresult.count, 3) self.assert_stats(mtresult.get_conflicting_types(), expected) def test_files_4(self): @@ -87,6 +90,7 @@ def test_files_4(self): } self.assert_stats(mtresult.stats, expected) + self.assertEqual(mtresult.count, 4) self.assert_stats(mtresult.get_conflicting_types(), expected) def test_gen_redshift_jsonpaths(self): diff --git a/malort/tests/test_malort_stats.py b/malort/tests/test_malort_stats.py index 8e99f00..b2fcd96 100644 --- a/malort/tests/test_malort_stats.py +++ b/malort/tests/test_malort_stats.py @@ -117,7 +117,8 @@ def test_recur_simple(self): 'key4': {'bool': {'count': 1}, 'base_key': 'key4'}, 'key5': {'str': {'count': 1, 'max': 23, 'mean': 23.0, 'min': 23, 'sample': ['["one", "two", "three"]']}, - 'base_key': 'key5'} + 'base_key': 'key5'}, + 'total_records': 1 } stats = mt.stats.recur_dict({}, simple1) @@ -171,7 +172,8 @@ def test_recur_depth_one(self): 'max_scale': 1, 'mean': 8.0, 'min': 8.0}}, - 'key5.key4': {'base_key': 'key4', 'bool': {'count': 1}}} + 'key5.key4': {'base_key': 'key4', 'bool': {'count': 1}}, + 'total_records': 1} stats = mt.stats.recur_dict({}, depth_one) self.assert_stats(stats, expected) @@ -235,7 +237,8 @@ def depth_two_expected(self): 'max_scale': 1, 'mean': 2.0, 'min': 2.0}}, - 'key5.key6.key4': {'base_key': 'key4', 'bool': {'count': 1}}} + 'key5.key6.key4': {'base_key': 'key4', 'bool': {'count': 1}}, + 'total_records': 1} def test_recur_depth_two(self): depth_two = { @@ -285,7 +288,8 @@ def test_recur_with_val_array(self): 'max': 14, 'mean': 14.0, 'min': 14, - 'sample': ['["foo", "bar"]']}}} + 'sample': ['["foo", "bar"]']}}, + 'total_records': 1} self.assert_stats(stats, expected) def test_raises_with_list_of_unknown(self): @@ -307,6 +311,7 @@ def test_simple_stat_agg(self): 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, 'sample': ['Foo']}, 'base_key': 'key2'}, + 'total_records': 3 } value = { 'key1': {'int': {'count': 1, 'max': 4, 'mean': 1.0, 'min': 4}, @@ -314,6 +319,7 @@ def test_simple_stat_agg(self): 'key2': {'str': {'count': 9, 'max': 5, 'mean': 6.0, 'min': 0, 'sample': ['Foo']}, 'base_key': 'key2'}, + 'total_records': 1 } combined = mt.stats.combine_stats(accum, value) expected = { @@ -322,6 +328,7 @@ def test_simple_stat_agg(self): 'key2': {'str': {'count': 10, 'max': 5, 'mean': 5.7, 'min': 0, 'sample': ['Foo', 'Foo']}, 'base_key': 'key2'}, + 'total_records': 4 } self.assert_stats(combined, expected) @@ -335,13 +342,15 @@ def test_value_missing_key(self): 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, 'sample': ['Foo']}, 'base_key': 'key2'}, + 'total_records': 1 } value = { 'key1': {'int': {'count': 1, 'max': 4, 'mean': 1.0, 'min': 4}, 'base_key': 'key1', 'float': {'count': 12, 'fixed_length': False, 'max': 2.0, 'max_precision': 10, 'max_scale': 0, - 'mean': 10.0, 'min': 1.0}} + 'mean': 10.0, 'min': 1.0}}, + 'total_records': 10 } combined = mt.stats.combine_stats(accum, value) expected = { @@ -352,14 +361,16 @@ def test_value_missing_key(self): 'mean': 9.538, 'min': 1.0}}, 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, 'sample': ['Foo']}, - 'base_key': 'key2'} + 'base_key': 'key2'}, + 'total_records': 11 } self.assert_stats(combined, expected) def test_accum_missing_key(self): accum = { 'key1': {'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}, - 'base_key': 'key1'} + 'base_key': 'key1'}, + 'total_records': 2 } value = { @@ -367,7 +378,8 @@ def test_accum_missing_key(self): 'sample': ['Foo']}, 'base_key': 'key2'}, 'key1': {'str': {'count': 1, 'max': 2, 'mean': 2.0, 'min': 2, - 'sample': ['Foo']}} + 'sample': ['Foo']}}, + 'total_records': 2 } combined = mt.stats.combine_stats(accum, value) expected = { @@ -378,6 +390,7 @@ def test_accum_missing_key(self): 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, 'sample': ['Foo']}, 'base_key': 'key2'}, + 'total_records': 4 } self.assert_stats(combined, expected) @@ -387,14 +400,17 @@ def test_mult_sample(self): 'key1': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, 'sample': samples[0:4]}, 'base_key': 'key2'}, + 'total_records': 1 } value = { 'key1': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, 'sample': samples[4:]}, 'base_key': 'key2'}, + 'total_records': 1 } combined = mt.stats.combine_stats(accum, value) sample_key = combined['key1']['str']['sample'] assert len(set(sample_key).difference(set(samples))) == 0 + assert combined['total_records'] == 2 From 274584dba63d1f6a116b56f0e79eb5512a81f69c Mon Sep 17 00:00:00 2001 From: wrobstory Date: Fri, 14 Aug 2015 12:48:58 -0700 Subject: [PATCH 4/5] ENH: Add requirements.txt, update README, remove unneeded import --- README.md | 12 +++++------- malort/core.py | 1 - requirements.txt | 9 +++++++++ 3 files changed, 14 insertions(+), 8 deletions(-) create mode 100644 requirements.txt diff --git a/README.md b/README.md index baf0947..343e1e0 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Because for (mostly) structured documents where we're expecting the schema to ra How ------ -Malort will read through a directory of .json or flat text files with delimited JSON blobs and generate relevant statistics on each key. +Malort will read through a directory of .json or flat text files (optionally gzipped) with delimited JSON blobs and generate relevant statistics on each key. It uses the Dask libary to parallelize these computations. For example, let's look at a directory with two JSON files, and one text file with newline-delimited JSON: ```json @@ -109,18 +109,16 @@ Install API --- -* `result = malort.analyze(path, delimiter='\n', parse_timestamps=True)` +* `result = malort.analyze(path, parse_timestamps=True)` ```python -Analyze a given directory of either .json or flat text files -with delimited JSON to get relevant key statistics. +Analyze a given directory of either .json, flat text files +with newline-delimited JSON, or gzipped files with newline-delimted JSON to get relevant key statistics. Parameters ---------- path: string Path to directory -delimiter: string, default newline - For flat text files, the JSON blob delimiter parse_timestamps: boolean, default True If True, will attempt to regex match ISO8601 formatted parse_timestamps ``` @@ -158,7 +156,7 @@ Yes. How fast is it? --------------- -With timestamp parsing turned on, I used Malort to process 2.1 GB of files (1,326,794 nested JSON blobs) in 8 minutes. There are undoubtedly ways to do it faster. +With timestamp parsing turned on, I used Malort to process 2.1 GB of files (1,326,794 nested JSON blobs) in 8 minutes. There are undoubtedly ways to do it faster. Speed will depend on a number of factors, including nesting depth. Should I use the column type results verbatim? ---------------------------------------------- diff --git a/malort/core.py b/malort/core.py index 2dc9460..2a275c5 100644 --- a/malort/core.py +++ b/malort/core.py @@ -18,7 +18,6 @@ import time import dask.bag as db -from dask.async import get_sync from malort.stats import recur_dict, combine_stats, dict_generator from malort.type_mappers import TypeMappers diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..aed19c8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +dask==0.7.0 +dill==0.2.4 +numpy==1.9.2 +pandas==0.16.2 +python-dateutil==2.4.2 +pytz==2015.4 +six==1.9.0 +toolz==0.7.2 +wheel==0.24.0 From 363df727a0b7a1f15708b37198c6abd6f253667f Mon Sep 17 00:00:00 2001 From: wrobstory Date: Fri, 14 Aug 2015 12:56:47 -0700 Subject: [PATCH 5/5] VER: Bump version to 0.0.4 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 066d82d..88bfea2 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name='malort', - version='0.0.3', + version='0.0.4', description='JSON to Postgres Column Types', author='Rob Story', author_email='wrobstory@gmail.com',