Skip to content

Bulk update #53

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 5 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ pip install git+https://github.com/RedisGraph/redisgraph-bulk-loader.git@master
```

## Usage
Pip installation exposes `redisgraph-bulk-insert` as a command to invoke this tool:
Pip installation exposes `redisgraph-bulk-loader` as a command to invoke this tool:
```
redisgraph-bulk-insert GRAPHNAME [OPTIONS]
redisgraph-bulk-loader GRAPHNAME [OPTIONS]
```

Installation by cloning the repository allows the script to be invoked via Python like so:
Expand Down Expand Up @@ -63,7 +63,7 @@ The only required arguments are the name to give the newly-created graph (which
The nodes and relationship flags should be specified once per input file.

```
redisgraph-bulk-insert GRAPH_DEMO -n example/Person.csv -n example/Country.csv -r example/KNOWS.csv -r example/VISITED.csv
redisgraph-bulk-loader GRAPH_DEMO -n example/Person.csv -n example/Country.csv -r example/KNOWS.csv -r example/VISITED.csv
```
The label (for nodes) or relationship type (for relationships) is derived from the base name of the input CSV file. In this example, we'll construct two sets of nodes, labeled `Person` and `Country`, and two types of relationships - `KNOWS` and `VISITED`.

Expand Down Expand Up @@ -172,3 +172,36 @@ Inserting these CSVs with the command:

Will produce a graph named SocialGraph with 2 users, Jeffrey and Filipe. Jeffrey follows Filipe, and that relation has a reaction_count of 25. Filipe also follows Jeffrey, with a reaction_count of 10.

## Performing bulk updates
Pip installation also exposes the command `redisgraph-bulk-update`:
```
redisgraph-bulk-update GRAPHNAME [OPTIONS]
```

Installation by cloning the repository allows the bulk updater to be invoked via Python like so:
```
python3 redisgraph_bulk_loader/bulk_update.py GRAPHNAME [OPTIONS]
```

| Flags | Extended flags | Parameter |
|:-----:|--------------------------|:----------------------------------------------------------:|
| -h | --host TEXT | Redis server host (default: 127.0.0.1) |
| -p | --port INTEGER | Redis server port (default: 6379) |
| -a | --password TEXT | Redis server password (default: none) |
| -u | --unix-socket-path TEXT | Redis unix socket path (default: none) |
| -q | --query TEXT | Query to run on server |
| -v | --variable-name TEXT | Variable name for row array in queries (default: row) |
| -c | --csv TEXT | Path to CSV input file |
| -o | --separator TEXT | Field token separator in CSV file |
| -n | --no-header | If set, the CSV file has no header |
| -t | --max-token-size INTEGER | Max size of each token in megabytes (default 500, max 512) |

The bulk updater allows a CSV file to be read in batches and committed to RedisGraph according to the provided query.

For example, given the CSV files described in [Input Schema CSV examples](#input-schema-csv-examples), the bulk loader could create the same nodes and relationships with the commands:
```
redisgraph-bulk-update SocialGraph --csv User.csv --query "MERGE (:User {id: row[0], name: row[1], rank: row[2]})"
redisgraph-bulk-update SocialGraph --csv FOLLOWS.csv --query "MATCH (start {id: row[0]}), (end {id: row[1]}) MERGE (start)-[f:FOLLOWS]->(end) SET f.reaction_count = row[2]"
```

When using the bulk updater, it is essential to sanitize CSV inputs beforehand, as RedisGraph *will* commit changes to the graph incrementally. As such, malformed inputs may leave the graph in a partially-updated state.
158 changes: 158 additions & 0 deletions redisgraph_bulk_loader/bulk_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import sys
import csv
import redis
import click
from redisgraph import Graph
from timeit import default_timer as timer


def utf8len(s):
return len(s.encode('utf-8'))


# Count number of rows in file.
def count_entities(filename):
entities_count = 0
with open(filename, 'rt') as f:
entities_count = sum(1 for line in f)
return entities_count


class BulkUpdate:
"""Handler class for emitting bulk update commands"""
def __init__(self, graph_name, max_token_size, separator, no_header, filename, query, variable_name, client):
self.separator = separator
self.no_header = no_header
self.query = " ".join(["UNWIND $rows AS", variable_name, query])
self.buffer_size = 0
self.max_token_size = max_token_size * 1024 * 1024 - utf8len(self.query)
self.filename = filename
self.graph_name = graph_name
self.graph = Graph(graph_name, client)
self.statistics = {}

def update_statistics(self, result):
for key, new_val in result.statistics.items():
try:
val = self.statistics[key]
except KeyError:
val = 0
val += new_val
self.statistics[key] = val

def emit_buffer(self, rows):
command = " ".join([rows, self.query])
result = self.graph.query(command)
self.update_statistics(result)

def quote_string(self, cell):
cell = cell.strip()
# Quote-interpolate cell if it is an unquoted string.
try:
float(cell) # Check for numeric
except ValueError:
if ((cell.lower() != 'false' and cell.lower() != 'true') and # Check for boolean
(cell[0] != '[' and cell.lower != ']') and # Check for array
(cell[0] != "\"" and cell[-1] != "\"") and # Check for double-quoted string
(cell[0] != "\'" and cell[-1] != "\'")): # Check for single-quoted string
cell = "".join(["\"", cell, "\""])
return cell

# Raise an exception if the query triggers a compile-time error
def validate_query(self):
command = " ".join(["CYPHER rows=[]", self.query])
# The plan call will raise an error if the query is malformed or invalid.
self.graph.execution_plan(command)

def process_update_csv(self):
entity_count = count_entities(self.filename)

with open(self.filename, 'rt') as f:
if self.no_header is False:
next(f) # skip header

reader = csv.reader(f, delimiter=self.separator, skipinitialspace=True, quoting=csv.QUOTE_NONE, escapechar='\\')

rows_strs = []
with click.progressbar(reader, length=entity_count, label=self.graph_name) as reader:
for row in reader:
# Prepare the string representation of the current row.
row = ",".join([self.quote_string(cell) for cell in row])
next_line = "".join(["[", row.strip(), "]"])

# Emit buffer now if the max token size would be exceeded by this addition.
added_size = utf8len(next_line) + 1 # Add one to compensate for the added comma.
if self.buffer_size + added_size > self.max_token_size:
# Concatenate all rows into a valid parameter set
buf = "".join(["CYPHER rows=[", ",".join(rows_strs), "]"])
self.emit_buffer(buf)
rows_strs = []
self.buffer_size = 0

# Concatenate the string into the rows string representation.
rows_strs.append(next_line)
self.buffer_size += added_size
# Concatenate all rows into a valid parameter set
buf = "".join(["CYPHER rows=[", ",".join(rows_strs), "]"])
self.emit_buffer(buf)


################################################################################
# Bulk updater
################################################################################
# Command-line arguments
@click.command()
@click.argument('graph')
# Redis server connection settings
@click.option('--host', '-h', default='127.0.0.1', help='Redis server host')
@click.option('--port', '-p', default=6379, help='Redis server port')
@click.option('--password', '-a', default=None, help='Redis server password')
@click.option('--unix-socket-path', '-u', default=None, help='Redis server unix socket path')
# Cypher query options
@click.option('--query', '-q', help='Query to run on server')
@click.option('--variable-name', '-v', default='row', help='Variable name for row array in queries (default: row)')
# CSV file options
@click.option('--csv', '-c', help='Path to CSV input file')
@click.option('--separator', '-o', default=',', help='Field token separator in CSV file')
@click.option('--no-header', '-n', default=False, is_flag=True, help='If set, the CSV file has no header')
# Buffer size restrictions
@click.option('--max-token-size', '-t', default=500, help='Max size of each token in megabytes (default 500, max 512)')
def bulk_update(graph, host, port, password, unix_socket_path, query, variable_name, csv, separator, no_header, max_token_size):
if sys.version_info[0] < 3:
raise Exception("Python 3 is required for the RedisGraph bulk updater.")

start_time = timer()

# Attempt to connect to Redis server
try:
if unix_socket_path is not None:
client = redis.StrictRedis(unix_socket_path=unix_socket_path, password=password, decode_responses=True)
else:
client = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)
except redis.exceptions.ConnectionError as e:
print("Could not connect to Redis server.")
raise e

# Attempt to verify that RedisGraph module is loaded
try:
module_list = client.execute_command("MODULE LIST")
if not any('graph' in module_description for module_description in module_list):
print("RedisGraph module not loaded on connected server.")
sys.exit(1)
except redis.exceptions.ResponseError:
# Ignore check if the connected server does not support the "MODULE LIST" command
pass

updater = BulkUpdate(graph, max_token_size, separator, no_header, csv, query, variable_name, client)
updater.validate_query()
updater.process_update_csv()

end_time = timer()

for key, value in updater.statistics.items():
print(key + ": " + repr(value))
print("Update of graph '%s' complete in %f seconds" % (graph, end_time - start_time))


if __name__ == '__main__':
bulk_update()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ def read_all(f):
entry_points='''
[console_scripts]
redisgraph-bulk-loader=redisgraph_bulk_loader.bulk_insert:bulk_insert
redisgraph-bulk-update=redisgraph_bulk_loader.bulk_update:bulk_update
'''
)
4 changes: 1 addition & 3 deletions test/test_bulk_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,6 @@ def test17_ensure_index_is_created(self):
self.assertIn('2 nodes created', res.output)
self.assertIn('Indices created: 1', res.output)

graph = Graph(graphname, self.redis_con)
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
res = r.execute_command("GRAPH.EXPLAIN", graphname, 'MATCH (p:Person) WHERE p.age > 16 RETURN p')
self.assertIn(' Index Scan | (p:Person)', res)
Expand Down Expand Up @@ -710,12 +709,11 @@ def test18_ensure_full_text_index_is_created(self):

graph = Graph(graphname, self.redis_con)
query_result = graph.query("CALL db.idx.fulltext.queryNodes('Monkeys', 'tamarin') YIELD node RETURN node.name")
expected_result = [ ['Emperor Tamarin'],['Golden Lion Tamarin'], ['Cotton-top Tamarin'] ]
expected_result = [['Emperor Tamarin'], ['Golden Lion Tamarin'], ['Cotton-top Tamarin']]

# We should find only the tamarins
self.assertEqual(query_result.result_set, expected_result)



if __name__ == '__main__':
unittest.main()
Loading