Skip to content

Commit

Permalink
Merge pull request #3 from wrobstory/PR_daskify
Browse files Browse the repository at this point in the history
PR Daskify
  • Loading branch information
wrobstory committed Aug 14, 2015
2 parents d03519b + 363df72 commit fc4a114
Show file tree
Hide file tree
Showing 19 changed files with 319 additions and 80 deletions.
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Because for (mostly) structured documents where we're expecting the schema to ra

How
------
Malort will read through a directory of .json or flat text files with delimited JSON blobs and generate relevant statistics on each key.
Malort will read through a directory of .json or flat text files (optionally gzipped) with delimited JSON blobs and generate relevant statistics on each key. It uses the Dask libary to parallelize these computations.

For example, let's look at a directory with two JSON files, and one text file with newline-delimited JSON:
```json
Expand Down Expand Up @@ -109,18 +109,16 @@ Install

API
---
* `result = malort.analyze(path, delimiter='\n', parse_timestamps=True)`
* `result = malort.analyze(path, parse_timestamps=True)`

```python
Analyze a given directory of either .json or flat text files
with delimited JSON to get relevant key statistics.
Analyze a given directory of either .json, flat text files
with newline-delimited JSON, or gzipped files with newline-delimted JSON to get relevant key statistics.

Parameters
----------
path: string
Path to directory
delimiter: string, default newline
For flat text files, the JSON blob delimiter
parse_timestamps: boolean, default True
If True, will attempt to regex match ISO8601 formatted parse_timestamps
```
Expand Down Expand Up @@ -158,7 +156,7 @@ Yes.

How fast is it?
---------------
With timestamp parsing turned on, I used Malort to process 2.1 GB of files (1,326,794 nested JSON blobs) in 8 minutes. There are undoubtedly ways to do it faster.
With timestamp parsing turned on, I used Malort to process 2.1 GB of files (1,326,794 nested JSON blobs) in 8 minutes. There are undoubtedly ways to do it faster. Speed will depend on a number of factors, including nesting depth.

Should I use the column type results verbatim?
----------------------------------------------
Expand Down
25 changes: 14 additions & 11 deletions malort/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,24 @@
JSON -> Postgres Column types
"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import, print_function, division

from collections import defaultdict
from functools import partial
import json
import os
from os.path import isfile, join, splitext
import random
import re
import time

from malort.stats import recur_dict, dict_generator
import dask.bag as db

from malort.stats import recur_dict, combine_stats, dict_generator
from malort.type_mappers import TypeMappers


def analyze(path, delimiter='\n', parse_timestamps=True, **kwargs):
def analyze(path, parse_timestamps=True, **kwargs):
"""
Analyze a given directory of either .json or flat text files
with delimited JSON to get relevant key statistics.
Expand All @@ -31,19 +32,21 @@ def analyze(path, delimiter='\n', parse_timestamps=True, **kwargs):
----------
path: string
Path to directory
delimiter: string, default newline
For flat text files, the JSON blob delimiter
parse_timestamps: boolean, default True
If True, will attempt to regex match ISO8601 formatted parse_timestamps
kwargs:
passed into json.loads. Here you can specify encoding, etc.
kwargs:
passed into json.loads. Here you can specify encoding, etc.
"""

stats = {}

start_time = time.time()
for count, blob in enumerate(dict_generator(path, delimiter, **kwargs), start=1):
recur_dict(blob, stats, parse_timestamps=parse_timestamps)
file_list = [os.path.join(path, f) for f in os.listdir(path)]
bag = db.from_filenames(file_list).map(json.loads)
recur_partial = partial(recur_dict, parse_timestamps=parse_timestamps)
stats = bag.fold(recur_partial, combine_stats, initial={}).compute()
count = stats["total_records"]
del stats["total_records"]

elapsed = time.time() - start_time
print('Malort run finished: {} JSON blobs analyzed in {} seconds.'
Expand Down
116 changes: 109 additions & 7 deletions malort/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
Functions to generate Malort stats
"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import, print_function, division

import decimal
import json
Expand All @@ -25,6 +23,7 @@
\d([\.,]\d+)?)?([zZ]|([\+-])([01]\d|2[0-3]):?([0-5]\d)?)
?)?)?$""", re.VERBOSE)


def delimited(file, delimiter='\n', bufsize=4096):
buf = ''
while True:
Expand All @@ -38,6 +37,7 @@ def delimited(file, delimiter='\n', bufsize=4096):
yield line
buf = lines[-1]


def catch_json_error(blob, filepath, **kwargs):
"""Wrapper to provide better error message for JSON reads"""
try:
Expand All @@ -48,6 +48,7 @@ def catch_json_error(blob, filepath, **kwargs):

return parsed


def dict_generator(path, delimiter='\n', **kwargs):
"""
Given a directory path, return a generator that will return a dict for each
Expand Down Expand Up @@ -80,11 +81,109 @@ def dict_generator(path, delimiter='\n', **kwargs):
else:
yield catch_json_error(fread.read(), filepath, **kwargs)


def get_new_mean(value, current_mean, count):
"""Given a value, current mean, and count, return new mean"""
summed = current_mean * count
return (summed + value)/(count + 1)


def combine_means(means, counts):
"""Combine ordered iter of means and counts"""
numer = sum([mean * count for mean, count in zip(means, counts)
if mean is not None and count is not None])
denom = sum([c for c in counts if c is not None])
return round(numer / denom, 3)


def combine_stats(accum, value):
"""
Combine two sets of stats into one. Used for final dask rollup of
multiple partitions of stats dicts into one unified stats dict. Best
thought of as a reduction over multiple stats object.
Parameters
----------
accum: dict
Accumlating tats dict
value: dict
New stats dict to merge with accumulator
Returns
-------
dict
"""

for field_name, type_stats in value.items():

# Update total count
if field_name == "total_records":
accum["total_records"] = accum["total_records"] + type_stats
continue

# Combine accum stats from different branches
if accum.get(field_name):
for value_type, val_stats in type_stats.items():
accum_entry = accum[field_name].get(value_type)
if not accum_entry:
# If accum doesn't already have an entry, but the
# value does, continue
accum[field_name][value_type] = val_stats
continue

# base_key is not a statistic to be updated
if value_type == "base_key":
continue

if accum_entry:
max_ = (accum_entry.get("max"), val_stats.get("max"))
min_ = (accum_entry.get("min"), val_stats.get("min"))
count = (accum_entry.get("count"), val_stats.get("count"))
mean = (accum_entry.get("mean"), val_stats.get("mean"))

if any(max_):
accum_entry["max"] = max(max_)
if any(min_):
accum_entry["min"] = min(min_)
if any(count):
accum_entry["count"] = sum([c for c in count
if c is not None])
if any(mean):
accum_entry["mean"] = combine_means(mean, count)

# Type specific entries
if value_type == "float":

fixed_length = (accum_entry.get("fixed_length"),
val_stats.get("fixed_length"))

# Decimals not fixed length if prec/scale do not match
a_prec, a_scale = (accum_entry.get("max_precision"),
accum_entry.get("max_scale"))
v_prec, v_scale = (val_stats.get("max_precision"),
val_stats.get("max_scale"))
prec_scale_eq = (a_prec == v_prec, a_scale == v_scale)

if not all(fixed_length) or not all(prec_scale_eq):
accum_entry["fixed_length"] = False

max_prec = (accum_entry.get("max_precision"),
val_stats.get("max_precision"))
accum_entry["max_precision"] = max(max_prec)

max_scale = (accum_entry.get("max_scale"),
val_stats.get("max_scale"))
accum_entry["max_scale"] = max(max_scale)

elif value_type == "str":
samples = accum_entry.get("sample", []) +\
val_stats.get("sample", [])
accum_entry["sample"] = random.sample(
samples, min(len(samples), 3))

return accum


def updated_entry_stats(value, current_stats, parse_timestamps=True):
"""
Given a value and a dict of current statistics, return a dict of new
Expand Down Expand Up @@ -156,7 +255,7 @@ def updated_entry_stats(value, current_stats, parse_timestamps=True):
return value_type, new_stats


def recur_dict(value, stats, parent=None, **kwargs):
def recur_dict(stats, value, parent=None, **kwargs):
"""
Recurse through a dict `value` and update `stats` for each field.
Can handle nested dicts, lists of dicts, and lists of values (must be
Expand All @@ -170,9 +269,12 @@ def recur_dict(value, stats, parent=None, **kwargs):
Parent key to get key nesting depth.
kwargs: Options for update_entry_stats
"""

parent = parent or ''

if parent == '':
total_records = stats.get("total_records")
stats["total_records"] = (total_records + 1) if total_records else 1

def update_stats(current_val, nested_path, base_key):
"Updater function"
if nested_path not in stats:
Expand All @@ -187,14 +289,14 @@ def update_stats(current_val, nested_path, base_key):
for k, v in value.items():
parent_path = '.'.join([parent, k]) if parent != '' else k
if isinstance(v, (list, dict)):
recur_dict(v, stats, parent_path)
recur_dict(stats, v, parent_path)
else:
update_stats(v, parent_path, k)

elif isinstance(value, list):
for v in value:
if isinstance(v, (list, dict)):
recur_dict(v, stats, parent)
recur_dict(stats, v, parent)
else:
base_key = parent.split(".")[-1]
update_stats(json.dumps(value), parent, base_key)
Expand Down
8 changes: 6 additions & 2 deletions malort/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
TEST_FILES_1 = os.path.normpath(os.path.join(os.path.abspath(__file__),
'..', 'tests', 'test_files'))
TEST_FILES_2 = os.path.normpath(os.path.join(os.path.abspath(__file__),
'..', 'tests', 'test_files_delimited'))
'..', 'tests', 'test_files_newline_delimited'))
TEST_FILES_3 = os.path.normpath(os.path.join(os.path.abspath(__file__),
'..', 'tests', 'test_files_nested'))

TEST_FILES_4 = os.path.normpath(os.path.join(os.path.abspath(__file__),
'..', 'tests', 'test_files_mult_type'))

class TestHelpers(unittest.TestCase):

def assert_stats(self, result, expected):
"""Test helper for testing stats results"""
for key, value in result.items():
if key == 'total_records':
self.assertEqual(expected['total_records'], value)
continue
for typek, typev in value.items():
if typek == 'str':
for k, v in typev.items():
Expand Down
6 changes: 1 addition & 5 deletions malort/tests/test_files/test1.json
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
{"intfield": 5,
"floatfield": 2.345,
"datefield": "2014-09-26 17:00:00",
"charfield": "fixedlength",
"varcharfield": "var"}
{"intfield": 5,"floatfield": 2.345,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "var"}
6 changes: 1 addition & 5 deletions malort/tests/test_files/test2.json
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
{"intfield": 10,
"floatfield": 4.7891,
"datefield": "2014-09-26 17:00:00",
"charfield": "fixedlength",
"varcharfield": "varyin"}
{"intfield": 10,"floatfield": 4.7891,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "varyin"}
1 change: 1 addition & 0 deletions malort/tests/test_files/test3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"intfield": 15,"floatfield": 3.0012,"charfield": "fixedlength","varcharfield": "varyingle","datefield": "2014-09-26 17:00:00"}
1 change: 1 addition & 0 deletions malort/tests/test_files/test4.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"intfield": 20,"floatfield": 10.8392,"charfield": "fixedlength","varcharfield": "varyinglengt","datefield": "2014-09-26 17:00:00"}
1 change: 0 additions & 1 deletion malort/tests/test_files_delimited/test_pipe_delimited_1

This file was deleted.

1 change: 0 additions & 1 deletion malort/tests/test_files_delimited/test_pipe_delimited_2

This file was deleted.

2 changes: 2 additions & 0 deletions malort/tests/test_files_mult_type/test_mult_delimited_1
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"foo": 10,"bar": 2.0,"baz": "fixed","qux": 10}
{"foo": "foo","bar": true,"baz": 2,"qux": "varyin"}
2 changes: 2 additions & 0 deletions malort/tests/test_files_mult_type/test_mult_delimited_2
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"foo": 1000,"bar": "bar","baz": "fixed","qux": "var"}
{"foo": "foo","bar": 4.0,"baz": 1,"qux": "varyingle"}
File renamed without changes.
2 changes: 2 additions & 0 deletions malort/tests/test_files_newline_delimited/test_nl_delimited_2
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"intfield": 5,"floatfield": 2.345,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "var"}
{"intfield": 10,"floatfield": 4.7891,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "varyin"}
Loading

0 comments on commit fc4a114

Please # to comment.