Skip to content

Commit

Permalink
Upgrade pyarrow to fix S3 segfaults (#7)
Browse files Browse the repository at this point in the history
* Bump arrow all the way to latest
* Add extra module support - closes #4
* Version bump
  • Loading branch information
dacort authored Jan 17, 2024
1 parent d6cafb7 commit 530a1f4
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 74 deletions.
64 changes: 49 additions & 15 deletions faker_cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
from faker import Faker
import click
import sys
from faker_cli.templates import CloudFrontWriter, S3AccessLogs, S3AccessWriter, CloudTrailLogs, CloudFrontLogs

from faker_cli.writer import CSVWriter, JSONWriter, ParquetWriter, DeltaLakeWriter
from typing import List

import click
from faker import Faker

from faker_cli.templates import (
CloudFrontLogs,
CloudFrontWriter,
S3AccessLogs,
S3AccessWriter,
)
from faker_cli.writer import CSVWriter, JSONWriter


def infer_column_names(col_names, col_types: str) -> List[str]:
"""
Infer column names from column types
"""
# For now, nothing special - but eventually we need to parse things out
if col_names:
return col_names.split(",")

return col_types.split(",")


KLAS_MAPPER = {
"csv": CSVWriter,
"json": JSONWriter,
"parquet": ParquetWriter,
"deltalake": DeltaLakeWriter
}

TEMPLATE_MAPPER = {
Expand All @@ -32,9 +38,16 @@ def infer_column_names(col_names, col_types: str) -> List[str]:
fake.add_provider(S3AccessLogs)
fake.add_provider(CloudFrontLogs)


@click.command()
@click.option("--num-rows", "-n", default=1, help="Number of rows")
@click.option("--format", "-f", type=click.Choice(["csv", "json", "parquet", "deltalake"]), default="csv", help="Format of the output")
@click.option(
"--format",
"-f",
type=click.Choice(["csv", "json", "parquet", "deltalake"]),
default="csv",
help="Format of the output",
)
@click.option("--output", "-o", type=click.Path(writable=True))
@click.option("--columns", "-c", help="Column names", default=None, required=False)
@click.option("--template", "-t", help="Template to use", type=click.Choice(["s3access", "cloudfront"]), default=None)
Expand All @@ -53,16 +66,37 @@ def main(num_rows, format, output, columns, template, column_types):
ctx = click.get_current_context()
click.echo(ctx.get_help())
ctx.exit()
raise click.BadArgumentUsage(
"either --template or a list of Faker property names must be provided."
)
raise click.BadArgumentUsage("either --template or a list of Faker property names must be provided.")

# Parquet output requires a filename
if format in ["parquet", "deltalake"] and output is None:
raise click.BadArgumentUsage("parquet | deltalake formats requires --output/-o filename parameter.")
if output is not None and format not in ["parquet", "deltalake"]:
raise click.BadArgumentUsage("output files not supported for csv/json yet.")


# Optionally load additional features
if format == "parquet":
try:
from faker_cli.writers.parquet import ParquetWriter

KLAS_MAPPER["parquet"] = ParquetWriter
except ImportError:
raise click.ClickException(
"Using Parquet writer, but the 'pyarrow' package is not installed. "
"Make sure to install faker-cli using `pip install faker-cli[parquet]`."
)

if format == "deltalake":
try:
from faker_cli.writers.delta import DeltaLakeWriter

KLAS_MAPPER["deltalake"] = DeltaLakeWriter
except ImportError:
raise click.ClickException(
"Using Delta writer, but the 'deltalake' package is not installed. "
"Make sure to install faker-cli using `pip install faker-cli[delta]`."
)

# If the user provides a template, we use that provider and writer and exit.
# We assume a template has a custom writer that may be different than CSV or JSON
if template:
Expand All @@ -72,13 +106,13 @@ def main(num_rows, format, output, columns, template, column_types):
row = fake.format(log_entry)
writer.write(row)
return

# Now, if a template hasn't been provided, generate some fake data!
col_types = column_types.split(",")
headers = infer_column_names(columns, column_types)
writer = KLAS_MAPPER.get(format)(sys.stdout, headers, output)
for i in range(num_rows):
# TODO: Handle args
row = [ fake.format(ctype) for ctype in col_types ]
row = [fake.format(ctype) for ctype in col_types]
writer.write(row)
writer.close()
27 changes: 0 additions & 27 deletions faker_cli/writer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import csv
import json
from typing import Optional
import pyarrow as pa
import pyarrow.parquet as pq
import deltalake


class Writer:
Expand Down Expand Up @@ -38,27 +35,3 @@ def write(self, row):
jsonl = json.dumps(dict(zip(self.headers, row)), default=str)
self.writer.write(jsonl)
self.writer.write("\n")


class ParquetWriter(Writer):
def __init__(self, output, headers, filename):
super().__init__(output, headers)
self.filename = filename
self.table: pa.Table = None

def write(self, row):
ini_dict = [{k: [v]} for k, v in list(zip(self.headers, row))]
tbl = {k: v for d in ini_dict for k, v in d.items()}
table = pa.table(tbl)
if self.table is None:
self.table = table
else:
self.table = pa.concat_tables([self.table, table])

def close(self):
pq.write_table(self.table, self.filename)


class DeltaLakeWriter(ParquetWriter):
def close(self):
deltalake.write_deltalake(table_or_uri=self.filename, data=self.table)
8 changes: 8 additions & 0 deletions faker_cli/writers/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import deltalake

from faker_cli.writers.parquet import ParquetWriter


class DeltaLakeWriter(ParquetWriter):
def close(self):
deltalake.write_deltalake(table_or_uri=self.filename, data=self.table)
23 changes: 23 additions & 0 deletions faker_cli/writers/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

import pyarrow as pa
import pyarrow.parquet as pq

from faker_cli.writer import Writer

class ParquetWriter(Writer):
def __init__(self, output, headers, filename):
super().__init__(output, headers)
self.filename = filename
self.table: pa.Table = None

def write(self, row):
ini_dict = [{k: [v]} for k, v in list(zip(self.headers, row))]
tbl = {k: v for d in ini_dict for k, v in d.items()}
table = pa.table(tbl)
if self.table is None:
self.table = table
else:
self.table = pa.concat_tables([self.table, table])

def close(self):
pq.write_table(self.table, self.filename)
87 changes: 58 additions & 29 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 530a1f4

Please # to comment.