Skip to content

Commit

Permalink
Merge pull request #3 from ilikutle/master
Browse files Browse the repository at this point in the history
renamed macros, schema creation fix, incremental fix, view fix, seed fix
  • Loading branch information
Torsten Glunde authored Jun 22, 2020
2 parents 27fe3a1 + 2c8e002 commit 262a539
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 26 deletions.
14 changes: 6 additions & 8 deletions dbt/include/exasol/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ALTER_COLUMN_TYPE_MACRO_NAME = 'alter_column_type'
*/


{% macro exasol__list_relations_without_caching(information_schema, schema) %}
{% macro exasol__list_relations_without_caching(schema) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
select
'db' as [database],
Expand All @@ -37,12 +37,10 @@ ALTER_COLUMN_TYPE_MACRO_NAME = 'alter_column_type'
{{ return(load_result('list_schemas').table) }}
{% endmacro %}

{% macro exasol__create_schema(database_name, schema_name) -%}
{%if not adapter.check_schema_exists(database_name,schema_name) %}
{% call statement('create_schema', fetch_result=True, auto_begin=False) -%}
CREATE SCHEMA IF NOT EXISTS {{ schema_name | replace('"', "") }}
{% endcall %}
{%- endif -%}
{% macro exasol__create_schema(relation) -%}
{%- call statement('create_schema') -%}
create schema if not exists {{ relation.without_identifier() }}
{% endcall %}
{% endmacro %}

{% macro exasol__drop_schema(database_name, schema_name) -%}
Expand Down Expand Up @@ -100,7 +98,7 @@ ALTER_COLUMN_TYPE_MACRO_NAME = 'alter_column_type'
{% macro exasol__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
select
lower(column_name) as column_name,
CONCAT('"',lower(column_name), '"') as column_name,
column_type,
column_maxsize,
column_num_prec,
Expand Down
6 changes: 3 additions & 3 deletions dbt/include/exasol/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
{% if unique_key is not none %}
delete
from {{ target_relation }}
where ({{ unique_key }}) in (
select ({{ unique_key }})
where ("{{ unique_key }}") in (
select ("{{ unique_key }}")
from {{ tmp_relation.schema }}.{{tmp_relation.identifier}}
);
{%endif%}
Expand All @@ -29,7 +29,7 @@
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set identifier = model['alias'] -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%}
{% set existing_relation = adapter.get_relation(database, schema, identifier) %}
{% set existing_relation = adapter.get_relation(database=database, schema=schema, identifier = identifier) %}
{% set tmp_relation = make_temp_relation(target_relation) %}

-- setup
Expand Down
6 changes: 5 additions & 1 deletion dbt/include/exasol/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro basic_load_csv_rows(model, batch_size, agate_table) %}
{% macro exasol__basic_load_csv_rows(model, batch_size, agate_table) %}
{% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %}
{% set bindings = [] %}

Expand Down Expand Up @@ -35,3 +35,7 @@
{# Return SQL so we can render it out into the compiled files #}
{{ return(statements[0]) }}
{% endmacro %}

{% macro exasol__load_csv_rows(model, agate_table) %}
{{ return(exasol__basic_load_csv_rows(model, 10000, agate_table) )}}
{% endmacro %}
109 changes: 102 additions & 7 deletions dbt/include/exasol/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro build_snapshot_table(strategy, sql) %}
{% macro exasol__build_snapshot_table(strategy, sql) %}

select sbq.*,
{{ strategy.scd_id }} as dbt_scd_id,
Expand All @@ -11,7 +11,7 @@

{% endmacro %}

{% macro snapshot_staging_table_inserts(strategy, source_sql, target_relation) -%}
{% macro exasol__snapshot_staging_table_inserts(strategy, source_sql, target_relation) -%}

with snapshot_query as (

Expand Down Expand Up @@ -63,7 +63,7 @@

{%- endmacro %}

{% macro snapshot_staging_table_updates(strategy, source_sql, target_relation) -%}
{% macro exasol__snapshot_staging_table_updates(strategy, source_sql, target_relation) -%}

with snapshot_query as (

Expand Down Expand Up @@ -116,10 +116,10 @@
{% do adapter.drop_relation(staging_relation) %}
{% endmacro %}

{% macro build_snapshot_staging_table(strategy, sql, target_relation) %}
{% macro exasol__build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% set inserts_select = snapshot_staging_table_inserts(strategy, sql, target_relation) %}
{% set updates_select = snapshot_staging_table_updates(strategy, sql, target_relation) %}
{% set inserts_select = exasol__snapshot_staging_table_inserts(strategy, sql, target_relation) %}
{% set updates_select = exasol__snapshot_staging_table_updates(strategy, sql, target_relation) %}

{% call statement('build_snapshot_staging_relation_inserts') %}
{{ create_table_as(True, tmp_relation, inserts_select) }}
Expand All @@ -133,4 +133,99 @@
{% endcall %}

{% do return(tmp_relation) %}
{% endmacro %}
{% endmacro %}

{% materialization snapshot, adapter='exasol' %}
{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}

{% if not adapter.check_schema_exists(model.database, model.schema) %}
{% do create_schema(model.database, model.schema) %}
{% endif %}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}

{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}


{{ run_hooks(pre_hooks, inside_transaction=False) }}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}

{% if not target_relation_exists %}

{% set build_sql = exasol__build_snapshot_table(strategy, model['injected_sql']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}

{% set staging_table = exasol__build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}

{% set final_sql = exasol__snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}

{% endif %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro default__snapshot_merge_sql(target, source, insert_cols) -%}
{% macro exasol__snapshot_merge_sql(target, source, insert_cols) -%}
{%- set insert_cols_csv = insert_cols | join(', ') -%}

merge into {{ target | upper }} as DBT_INTERNAL_DEST
Expand Down
51 changes: 50 additions & 1 deletion dbt/include/exasol/macros/materializations/strategies.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{% endfor %})
{% endmacro %}

{% macro snapshot_check_all_get_existing_columns(node, target_exists) -%}
{% macro exasol__snapshot_check_all_get_existing_columns(node, target_exists) -%}
{%- set query_columns = get_columns_in_query(node['injected_sql']) -%}
{%- if not target_exists -%}
{# no table yet -> return whatever the query does #}
Expand All @@ -27,3 +27,52 @@
{%- endfor -%}
{{ return([ns.column_added, intersection]) }}
{%- endmacro %}

{% macro exasol__snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set select_current_time -%}
select {{ snapshot_get_time() }} as snapshot_start
{%- endset %}

{#-- don't access the column by name, to avoid dealing with casing issues on snowflake #}
{%- set now = run_query(select_current_time)[0][0] -%}
{% if now is none or now is undefined -%}
{%- do exceptions.raise_compiler_error('Could not get a snapshot start time from the database') -%}
{%- endif %}
{% set updated_at = snapshot_string_as_time(now) %}

{% set column_added = false %}

{% if check_cols_config == 'all' %}
{% set column_added, check_cols = exasol__snapshot_check_all_get_existing_columns(node, target_exists) %}
{% elif check_cols_config is iterable and (check_cols_config | length) > 0 %}
{% set check_cols = check_cols_config %}
{% else %}
{% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %}
{% endif %}

{%- set row_changed_expr -%}
(
{%- if column_added -%}
TRUE
{%- else -%}
{%- for col in check_cols -%}
{{ snapshotted_rel }}.{{ col }} != {{ current_rel }}.{{ col }}
or
({{ snapshotted_rel }}.{{ col }} is null) != ({{ current_rel }}.{{ col }} is null)
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
{%- endif -%}
)
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}

{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
}) %}
{% endmacro %}
3 changes: 2 additions & 1 deletion dbt/include/exasol/macros/materializations/view.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{% materialization view, adapter='exasol' -%}
{{ create_or_replace_view() }}
{{ return(create_or_replace_view()) }}

{%- endmaterialization %}
8 changes: 4 additions & 4 deletions dbt/include/exasol/macros/schema_tests.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro test_unique(model) %}
{% macro exasol__test_unique(model) %}

{% set column_name = kwargs.get('column_name', kwargs.get('arg')) %}

Expand All @@ -18,7 +18,7 @@ from (
{% endmacro %}


{% macro test_not_null(model) %}
{% macro exasol__test_not_null(model) %}

{% set column_name = kwargs.get('column_name', kwargs.get('arg')) %}

Expand All @@ -29,7 +29,7 @@ where {{ column_name }} is null
{% endmacro %}


{% macro test_accepted_values(model, values) %}
{% macro exasol__test_accepted_values(model, values) %}

{% set column_name = kwargs.get('column_name', kwargs.get('field')) %}

Expand Down Expand Up @@ -63,7 +63,7 @@ from validation_errors
{% endmacro %}


{% macro test_relationships(model, to, field) %}
{% macro exasol__test_relationships(model, to, field) %}

{% set column_name = kwargs.get('column_name', kwargs.get('from')) %}

Expand Down

0 comments on commit 262a539

Please # to comment.