Skip to content

Commit

Permalink
Merge pull request #464 from firewall413/feature/added_glue_struct_su…
Browse files Browse the repository at this point in the history
…pport_and_read_options

Added basic AWS Glue struct datatype support and JSON/CSV/PARQUET read_options
  • Loading branch information
jwills authored Oct 17, 2024
2 parents 9b34a9d + 26253dd commit 02b3b86
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 14 deletions.
33 changes: 23 additions & 10 deletions dbt/adapters/duckdb/plugins/glue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from typing import Any
from typing import Dict
from typing import List
Expand Down Expand Up @@ -65,6 +66,16 @@ def _dbt2glue(dtype: str, ignore_null: bool = False) -> str: # pragma: no cover
return "date"
if data_type.lower() in ["blob", "bytea", "binary", "varbinary"]:
return "binary"
if data_type.lower() in ["struct"]:
struct_fields = re.findall(r"(\w+)\s+(\w+)", dtype[dtype.find("(") + 1 : dtype.rfind(")")])
glue_fields = []
for field_name, field_type in struct_fields:
glue_field_type = _dbt2glue(field_type)
glue_fields.append(f"{field_name}:{glue_field_type}")
struct_schema = f"struct<{','.join(glue_fields)}>"
if dtype.strip().endswith("[]"):
return f"array<{struct_schema}>"
return struct_schema
if data_type is None:
if ignore_null:
return ""
Expand Down Expand Up @@ -267,6 +278,7 @@ def _get_table_def(
def _get_glue_client(
settings: Dict[str, Any], secrets: Optional[List[Dict[str, Any]]]
) -> "GlueClient":
client = None
if secrets is not None:
for secret in secrets:
if isinstance(secret, Secret) and "config" == str(secret.provider).lower():
Expand All @@ -279,16 +291,17 @@ def _get_glue_client(
region_name=secret_kwargs.get("region"),
)
break
elif settings:
client = boto3.client(
"glue",
aws_access_key_id=settings.get("s3_access_key_id"),
aws_secret_access_key=settings.get("s3_secret_access_key"),
aws_session_token=settings.get("s3_session_token"),
region_name=settings.get("s3_region"),
)
else:
client = boto3.client("glue")
if client is None:
if settings:
client = boto3.client(
"glue",
aws_access_key_id=settings.get("s3_access_key_id"),
aws_secret_access_key=settings.get("s3_secret_access_key"),
aws_session_token=settings.get("s3_session_token"),
region_name=settings.get("s3_region"),
)
else:
client = boto3.client("glue")
return client


Expand Down
60 changes: 56 additions & 4 deletions dbt/include/duckdb/macros/materializations/external.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,23 @@

{%- set location = render(config.get('location', default=external_location(this, config))) -%})
{%- set rendered_options = render_write_options(config) -%}
{%- set format = config.get('format', 'parquet') -%}

{%- set format = config.get('format') -%}
{%- set allowed_formats = ['csv', 'parquet', 'json'] -%}
{%- if format -%}
{%- if format not in allowed_formats -%}
{{ exceptions.raise_compiler_error("Invalid format: " ~ format ~ ". Allowed formats are: " ~ allowed_formats | join(', ')) }}
{%- endif -%}
{%- else -%}
{%- set format = location.split('.')[-1].lower() if '.' in location else 'parquet' -%}
{%- set format = format if format in allowed_formats else 'parquet' -%}
{%- endif -%}

{%- set write_options = adapter.external_write_options(location, rendered_options) -%}
{%- set read_location = adapter.external_read_location(location, rendered_options) -%}
{%- set parquet_read_options = config.get('parquet_read_options', {'union_by_name': False}) -%}
{%- set json_read_options = config.get('json_read_options', {'auto_detect': True}) -%}
{%- set csv_read_options = config.get('csv_read_options', {'auto_detect': True}) -%}

-- set language - python or sql
{%- set language = model['language'] -%}
Expand Down Expand Up @@ -45,13 +59,51 @@
{{- create_table_as(False, temp_relation, compiled_code, language) }}
{%- endcall %}

-- write an temp relation into file
-- write a temp relation into file
{{ write_to_file(temp_relation, location, write_options) }}
-- create a view on top of the location

-- create a view on top of the location
{% call statement('main', language='sql') -%}
{% if format == 'json' %}
create or replace view {{ intermediate_relation }} as (
select * from read_json('{{ read_location }}'
{%- for key, value in json_read_options.items() -%}
, {{ key }}=
{%- if value is string -%}
'{{ value }}'
{%- else -%}
{{ value }}
{%- endif -%}
{%- endfor -%}
)
);
{% elif format == 'parquet' %}
create or replace view {{ intermediate_relation }} as (
select * from read_parquet('{{ read_location }}'
{%- for key, value in parquet_read_options.items() -%}
, {{ key }}=
{%- if value is string -%}
'{{ value }}'
{%- else -%}
{{ value }}
{%- endif -%}
{%- endfor -%}
)
);
{% elif format == 'csv' %}
create or replace view {{ intermediate_relation }} as (
select * from '{{ read_location }}'
select * from read_csv('{{ read_location }}'
{%- for key, value in csv_read_options.items() -%}
, {{ key }}=
{%- if value is string -%}
'{{ value }}'
{%- else -%}
{{ value }}
{%- endif -%}
{%- endfor -%}
)
);
{% endif %}
{%- endcall %}

-- cleanup
Expand Down

0 comments on commit 02b3b86

Please # to comment.