Skip to content

Commit

Permalink
Initial implementation of iceberg support
Browse files Browse the repository at this point in the history
  • Loading branch information
dacort committed Feb 28, 2024
1 parent ba35f27 commit 5cb78b2
Show file tree
Hide file tree
Showing 4 changed files with 807 additions and 6 deletions.
19 changes: 15 additions & 4 deletions faker_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@click.option(
"--format",
"-f",
type=click.Choice(["csv", "json", "parquet", "deltalake"]),
type=click.Choice(["csv", "json", "parquet", "deltalake", "iceberg"]),
default="csv",
help="Format of the output",
)
Expand All @@ -45,7 +45,7 @@ def main(num_rows, format, output, columns, template, column_types):
Generate fake data, easily.
COLUMN_TYPES is a comma-seperated list of Faker property names, like
pyint,username,date_this_year
pyint,user_name,date_this_year
You can also use --template for real-world synthetic data.
"""
Expand All @@ -57,9 +57,9 @@ def main(num_rows, format, output, columns, template, column_types):
raise click.BadArgumentUsage("either --template or a list of Faker property names must be provided.")

# Parquet output requires a filename
if format in ["parquet", "deltalake"] and output is None:
if format in ["parquet", "deltalake", "iceberg"] and output is None:
raise click.BadArgumentUsage("parquet | deltalake formats requires --output/-o filename parameter.")
if output is not None and format not in ["parquet", "deltalake"]:
if output is not None and format not in ["parquet", "deltalake", "iceberg"]:
raise click.BadArgumentUsage("output files not supported for csv/json yet.")

# Optionally load additional features
Expand All @@ -85,6 +85,17 @@ def main(num_rows, format, output, columns, template, column_types):
"Make sure to install faker-cli using `pip install faker-cli[delta]`."
)

if format == "iceberg":
try:
from faker_cli.writers.iceberg import IcebergWriter

KLAS_MAPPER["iceberg"] = IcebergWriter
except ImportError:
raise click.ClickException(
"Using Iceberg writer, but the 'iceberg' package is not installed. "
"Make sure to install faker-cli using `pip install faker-cli[iceberg]`."
)

# If the user provides a template, we use that provider and writer and exit.
# We assume a template has a custom writer that may be different than CSV or JSON
if template:
Expand Down
47 changes: 47 additions & 0 deletions faker_cli/writers/iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from tempfile import TemporaryDirectory

import pyarrow as pa
from pyiceberg.catalog.sql import SqlCatalog

from faker_cli.writer import Writer
from faker_cli.writers.parquet import ParquetWriter


class IcebergWriter(ParquetWriter):
def __init__(self, output, headers, filename):
super().__init__(output, headers, filename)
self.warehouse_path = filename
self.temp_path = TemporaryDirectory()
self.table: pa.Table = None
self.catalog: SqlCatalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:////{self.temp_path.name}/pyiceberg_catalog.db",
"warehouse": self.warehouse_path,
},
)

# def write(self, row):
# df = pa.Table.from_pylist([dict(zip(self.headers, row))])
# if self.table is None:
# self.table = self._init_table("test", df.schema)

# # Kind of works, but writes a file per row
# self.table.append(df)

def _init_table(self, name, schema):
self.catalog.create_namespace("default")

table = self.catalog.create_table(
f"default.{name}",
schema=schema,
)

return table

def close(self):
iceberg_table = self._init_table("test", self.table.schema)
iceberg_table.overwrite(self.table)

pa.fs.copy_files(f"{self.temp_path.name}/pyiceberg_catalog.db", f"{self.warehouse_path}pyiceberg_catalog.db")
return super().close()
Loading

0 comments on commit 5cb78b2

Please # to comment.