Skip to content

Commit

Permalink
Finalize Glue catalog support for iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
dacort committed Jul 11, 2024
1 parent 9d84599 commit e0d31b8
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 369 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ And, of course, Iceberg tables!

Currently supported are writing to a Glue or generic SQL catalog.


```bash
fake -n 10 pyint,user_name,date_this_year -f deltalake -o glue://default.iceberg_sample
fake -n 10 pyint,user_name,date_this_year -f iceberg -C glue://default.iceberg_sample -o s3://YOUR_BUCKET/iceberg-data/
```

## Templates
Expand Down
5 changes: 3 additions & 2 deletions faker_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def main(num_rows, format, output, columns, template, catalog, column_types):

# Parquet output requires a filename
if format in ["parquet", "deltalake", "iceberg"] and output is None:
raise click.BadArgumentUsage("parquet | deltalake formats requires --output/-o filename parameter.")
raise click.BadArgumentUsage(f"{format} format requires --output/-o filename parameter.")
if output is not None and format not in ["parquet", "deltalake", "iceberg"]:
raise click.BadArgumentUsage("output files not supported for csv/json yet.")
if catalog and format not in ['iceberg']:
Expand Down Expand Up @@ -117,7 +117,8 @@ def main(num_rows, format, output, columns, template, catalog, column_types):
format_klas = KLAS_MAPPER.get(format)
if format_klas is None:
raise click.ClickException(f"Format {format} not supported.")
writer = format_klas(sys.stdout, headers, output)
# Fix in a better way - maybe passing **kwargs?
writer = format_klas(sys.stdout, headers, output, catalog)
for i in range(num_rows):
writer.write(generate_row(fake, col_types))
writer.close()
Expand Down
4 changes: 2 additions & 2 deletions faker_cli/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def close(self):


class CSVWriter(Writer):
def __init__(self, output, headers, filename):
def __init__(self, output, headers, filename, catalog_uri):
super().__init__(output, headers)
self.writer = csv.writer(self.output)
self.write(headers)
Expand All @@ -27,7 +27,7 @@ def write(self, row):


class JSONWriter(Writer):
def __init__(self, output, headers, filename):
def __init__(self, output, headers, filename, catalog_uri):
super().__init__(output, headers)
self.writer = self.output

Expand Down
67 changes: 35 additions & 32 deletions faker_cli/writers/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@
import pyarrow as pa
from pyiceberg.catalog import Catalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchNamespaceError
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError

from faker_cli.writer import Writer
from faker_cli.writers.parquet import ParquetWriter


class CatalogManager:
def __init__(self, uri: str, location: str) -> None:
[self.database, self.table] = urlparse(uri).netloc.split(".")
self.catalog = self._from_uri(uri, location)

def _from_uri(self, uri: str, location:str) -> Catalog:
def _from_uri(self, uri: str, location: str) -> Catalog:
u = urlparse(uri)
namespace = u.netloc.split(".")[0]
if u.scheme == "glue":
try:
from pyiceberg.catalog.glue import GlueCatalog
Expand All @@ -26,54 +25,58 @@ def _from_uri(self, uri: str, location:str) -> Catalog:
"Using Iceberg writer with Glue catalog, but the 'boto3' package is not installed. "
"Make sure to install faker-cli using `pip install faker-cli[iceberg,glue]`."
)
glue = GlueCatalog(namespace)
glue = GlueCatalog(self.database)
try:
glue.load_namespace_properties(namespace)
glue.load_namespace_properties(self.database)
glue.load_table(u.netloc)
raise Exception("Table already exists, please delete or choose another name.")
except NoSuchNamespaceError:
glue.create_namespace(namespace)
glue.create_namespace(self.database)
except NoSuchTableError:
pass

return glue

elif u.scheme == "sqlite":
self.temp_path = TemporaryDirectory()
sql = SqlCatalog(
namespace, uri=f"sqlite:////{self.temp_path.name}/pyiceberg_catalog.db", warehouse=location
self.database, uri=f"sqlite:////{self.temp_path.name}/pyiceberg_catalog.db", warehouse=location
)
sql.create_namespace(namespace)
sql.create_namespace(self.database)
return sql
else:
raise Exception("Unknown catalog type")
raise Exception("Unsupported catalog type, only glue or sqllite are supported.")

def create_table(self, schema, warehouse_path) -> pa.Table:
if self.catalog is SqlCatalog:
table = self.catalog.create_table(
f"{self.database}.{self.table}",
schema=schema,
)
else:
# location required for GlueCatalog
table = self.catalog.create_table(
f"{self.database}.{self.table}",
schema=schema,
location=warehouse_path.rstrip("/"),
)
return table


class IcebergWriter(ParquetWriter):
def __init__(self, output, headers, filename, catalog_uri):
super().__init__(output, headers, filename)
super().__init__(output, headers, filename, catalog_uri)
self.warehouse_path = filename
self.temp_path = TemporaryDirectory()
self.table: pa.Table = None
self.catalog: CatalogManager = CatalogManager(catalog_uri, filename)

# def write(self, row):
# df = pa.Table.from_pylist([dict(zip(self.headers, row))])
# if self.table is None:
# self.table = self._init_table("test", df.schema)

# # Kind of works, but writes a file per row
# self.table.append(df)

def _init_table(self, name, schema):
# self.catalog.create_namespace("default")

table = self.catalog.catalog.create_table(
f"default.{name}",
schema=schema,
)

return table

def close(self):
iceberg_table = self._init_table("test", self.table.schema)
iceberg_table = self.catalog.create_table(self.table.schema, self.warehouse_path)
iceberg_table.overwrite(self.table)

pa.fs.copy_files(f"{self.temp_path.name}/pyiceberg_catalog.db", f"{self.warehouse_path}pyiceberg_catalog.db")
return super().close()
if self.catalog is SqlCatalog:
pa.fs.copy_files(
f"{self.temp_path.name}/pyiceberg_catalog.db", f"{self.warehouse_path}pyiceberg_catalog.db"
)
return super().close()
2 changes: 1 addition & 1 deletion faker_cli/writers/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from faker_cli.writer import Writer

class ParquetWriter(Writer):
def __init__(self, output, headers, filename):
def __init__(self, output, headers, filename, catalog_uri):
super().__init__(output, headers)
self.filename = filename
self.table: pa.Table = None
Expand Down
Loading

0 comments on commit e0d31b8

Please # to comment.