Skip to content

Bulk update #53

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 5 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ pip install git+https://github.com/RedisGraph/redisgraph-bulk-loader.git@master
```

## Usage
Pip installation exposes `redisgraph-bulk-insert` as a command to invoke this tool:
Pip installation exposes `redisgraph-bulk-loader` as a command to invoke this tool:
```
redisgraph-bulk-insert GRAPHNAME [OPTIONS]
redisgraph-bulk-loader GRAPHNAME [OPTIONS]
```

Installation by cloning the repository allows the script to be invoked via Python like so:
Expand Down Expand Up @@ -63,7 +63,7 @@ The only required arguments are the name to give the newly-created graph (which
The nodes and relationship flags should be specified once per input file.

```
redisgraph-bulk-insert GRAPH_DEMO -n example/Person.csv -n example/Country.csv -r example/KNOWS.csv -r example/VISITED.csv
redisgraph-bulk-loader GRAPH_DEMO -n example/Person.csv -n example/Country.csv -r example/KNOWS.csv -r example/VISITED.csv
```
The label (for nodes) or relationship type (for relationships) is derived from the base name of the input CSV file. In this example, we'll construct two sets of nodes, labeled `Person` and `Country`, and two types of relationships - `KNOWS` and `VISITED`.

Expand Down Expand Up @@ -172,3 +172,36 @@ Inserting these CSVs with the command:

Will produce a graph named SocialGraph with 2 users, Jeffrey and Filipe. Jeffrey follows Filipe, and that relation has a reaction_count of 25. Filipe also follows Jeffrey, with a reaction_count of 10.

## Performing bulk updates
Pip installation also exposes the command `redisgraph-bulk-update`:
```
redisgraph-bulk-update GRAPHNAME [OPTIONS]
```

Installation by cloning the repository allows the bulk updater to be invoked via Python like so:
```
python3 redisgraph_bulk_loader/bulk_update.py GRAPHNAME [OPTIONS]
```

| Flags | Extended flags | Parameter |
|:-----:|--------------------------|:----------------------------------------------------------:|
| -h | --host TEXT | Redis server host (default: 127.0.0.1) |
| -p | --port INTEGER | Redis server port (default: 6379) |
| -a | --password TEXT | Redis server password (default: none) |
| -u | --unix-socket-path TEXT | Redis unix socket path (default: none) |
| -e | --query TEXT | Query to run on server |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is -q taken?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Good catch!

| -v | --variable-name TEXT | Variable name for row array in queries (default: row) |
| -c | --csv TEXT | Path to CSV input file |
| -o | --separator TEXT | Field token separator in CSV file |
| -n | --no-header | If set, the CSV file has no header |
| -t | --max-token-size INTEGER | Max size of each token in megabytes (default 500, max 512) |

The bulk updater allows a CSV file to be read in batches and committed to RedisGraph according to the provided query.

For example, given the CSV files described in [Input Schema CSV examples](#input-schema-csv-examples), the bulk loader could create the same nodes and relationships with the commands:
```
redisgraph-bulk-update SocialGraph --csv User.csv --query "MERGE (:User {id: row[0], name: row[1], rank: row[2]})"
redisgraph-bulk-update SocialGraph --csv FOLLOWS.csv --query "MATCH (start {id: row[0]}), (end {id: row[1]}) MERGE (start)-[f:FOLLOWS]->(end) SET f.reaction_count = row[2]"
```

When using the bulk updater, it is essential to sanitize CSV inputs beforehand, as RedisGraph *will* commit changes to the graph incrementally. As such, malformed inputs may leave the graph in a partially-updated state.
163 changes: 163 additions & 0 deletions redisgraph_bulk_loader/bulk_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import io
import sys
import csv
import redis
import click
from redis import ResponseError
from timeit import default_timer as timer


def utf8len(s):
return len(s.encode('utf-8'))


class BulkUpdate:
"""Handler class for emitting bulk update commands"""
def __init__(self, graph, max_token_size, separator, no_header, filename, query, variable_name, client):
self.separator = separator
self.no_header = no_header
self.query = " UNWIND $rows AS " + variable_name + " " + query
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding the white space when introducing the query parameter

self.max_token_size = max_token_size * 1024 * 1024 - utf8len(self.query)
self.infile = io.open(filename, 'rt')
self.graph = graph
self.client = client
self.statistics = {}

# Count number of rows in file.
def count_entities(self):
entities_count = 0
entities_count = sum(1 for line in self.infile)
# seek back
self.infile.seek(0)
return entities_count

def update_statistics(self, result):
for raw_stat in result[0]:
stat = raw_stat.split(": ")
key = stat[0]
try:
val = self.statistics[key]
except KeyError:
val = 0
val += float(stat[1].split(" ")[0])
self.statistics[key] = val

def emit_buffer(self, rows):
command = rows + self.query
try:
result = self.client.execute_command("GRAPH.QUERY", self.graph, command)
except ResponseError as e:
raise e
# If we encountered a run-time error, the last response element will be an exception.
if isinstance(result[-1], ResponseError):
raise result[-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to consider using RedisGraph-py you'll get error detection for free.

self.update_statistics(result)

def quote_string(self, cell):
cell = cell.strip()
# Quote-interpolate cell if it is an unquoted string.
try:
float(cell) # Check for numeric
except ValueError:
if ((cell.lower() != 'false' and cell.lower() != 'true') and # Check for boolean
(cell[0] != '[' and cell.lower != ']') and # Check for array
(cell[0] != "\"" and cell[-1] != "\"") and # Check for double-quoted string
(cell[0] != "\'" and cell[-1] != "\'")): # Check for single-quoted string
cell = "\"" + cell + "\""
return cell

def process_update_csv(self):
entity_count = self.count_entities()

if self.no_header is False:
next(self.infile) # skip header

reader = csv.reader(self.infile, delimiter=self.separator, skipinitialspace=True, quoting=csv.QUOTE_NONE, escapechar='\\')

rows_str = "CYPHER rows=["
first = True
with click.progressbar(reader, length=entity_count, label=self.graph) as reader:
for row in reader:
# Prepare the string representation of the current row.
row = ",".join([self.quote_string(cell) for cell in row])
next_line = "[" + row.strip() + "]"

# Emit buffer now if the max token size would be exceeded by this addition.
if utf8len(rows_str) + utf8len(next_line) > self.max_token_size:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computing utf8len(rows_str) with each iteration is expensive, compute it by maintaining the current length and adding utf8len(next_line) to it

# Add a closing bracket
rows_str += "]"
self.emit_buffer(rows_str)
rows_str = "CYPHER rows=["
first = True

# Add a comma separator if this is not the first row in the query.
if not first:
rows_str += ","
first = False

# Concatenate the string into the rows string representation.
rows_str += next_line
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expensive, see here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, good to know! Thanks!

# Add a closing bracket
rows_str += "]"
self.emit_buffer(rows_str)
self.infile.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using with open to scope access to the file, you can open/close the file multiple times (whenever needed)



################################################################################
# Bulk updater
################################################################################
# Command-line arguments
@click.command()
@click.argument('graph')
# Redis server connection settings
@click.option('--host', '-h', default='127.0.0.1', help='Redis server host')
@click.option('--port', '-p', default=6379, help='Redis server port')
@click.option('--password', '-a', default=None, help='Redis server password')
@click.option('--unix-socket-path', '-u', default=None, help='Redis server unix socket path')
# Cypher query options
@click.option('--query', '-e', help='Query to run on server')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can execute GRAPH.EXPLAIN with the specified query to quickly detect malformed queries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Good idea!

@click.option('--variable-name', '-v', default='row', help='Variable name for row array in queries (default: row)')
# CSV file options
@click.option('--csv', '-c', help='Path to CSV input file')
@click.option('--separator', '-o', default=',', help='Field token separator in CSV file')
@click.option('--no-header', '-n', default=False, is_flag=True, help='If set, the CSV file has no header')
# Buffer size restrictions
@click.option('--max-token-size', '-t', default=500, help='Max size of each token in megabytes (default 500, max 512)')
def bulk_update(graph, host, port, password, unix_socket_path, query, variable_name, csv, separator, no_header, max_token_size):
if sys.version_info[0] < 3:
raise Exception("Python 3 is required for the RedisGraph bulk updater.")

start_time = timer()

# Attempt to connect to Redis server
try:
if unix_socket_path is not None:
client = redis.StrictRedis(unix_socket_path=unix_socket_path, password=password, decode_responses=True)
else:
client = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)
except redis.exceptions.ConnectionError as e:
print("Could not connect to Redis server.")
raise e

# Attempt to verify that RedisGraph module is loaded
try:
module_list = client.execute_command("MODULE LIST")
if not any('graph' in module_description for module_description in module_list):
print("RedisGraph module not loaded on connected server.")
sys.exit(1)
except redis.exceptions.ResponseError:
# Ignore check if the connected server does not support the "MODULE LIST" command
pass

updater = BulkUpdate(graph, max_token_size, separator, no_header, csv, query, variable_name, client)
updater.process_update_csv()

end_time = timer()

for key, value in updater.statistics.items():
print(key + ": " + repr(value))
print("Update of graph '%s' complete in %f seconds" % (graph, end_time - start_time))


if __name__ == '__main__':
bulk_update()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ def read_all(f):
entry_points='''
[console_scripts]
redisgraph-bulk-loader=redisgraph_bulk_loader.bulk_insert:bulk_insert
redisgraph-bulk-update=redisgraph_bulk_loader.bulk_update:bulk_update
'''
)
4 changes: 1 addition & 3 deletions test/test_bulk_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,6 @@ def test17_ensure_index_is_created(self):
self.assertIn('2 nodes created', res.output)
self.assertIn('Indices created: 1', res.output)

graph = Graph(graphname, self.redis_con)
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
res = r.execute_command("GRAPH.EXPLAIN", graphname, 'MATCH (p:Person) WHERE p.age > 16 RETURN p')
self.assertIn(' Index Scan | (p:Person)', res)
Expand Down Expand Up @@ -710,12 +709,11 @@ def test18_ensure_full_text_index_is_created(self):

graph = Graph(graphname, self.redis_con)
query_result = graph.query("CALL db.idx.fulltext.queryNodes('Monkeys', 'tamarin') YIELD node RETURN node.name")
expected_result = [ ['Emperor Tamarin'],['Golden Lion Tamarin'], ['Cotton-top Tamarin'] ]
expected_result = [['Emperor Tamarin'], ['Golden Lion Tamarin'], ['Cotton-top Tamarin']]

# We should find only the tamarins
self.assertEqual(query_result.result_set, expected_result)



if __name__ == '__main__':
unittest.main()
Loading