Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

renamed macros, schema creation fix, incremental fix, view fix, seed fix #3

Merged
merged 6 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions dbt/include/exasol/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ALTER_COLUMN_TYPE_MACRO_NAME = 'alter_column_type'
*/


{% macro exasol__list_relations_without_caching(information_schema, schema) %}
{% macro exasol__list_relations_without_caching(schema) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
select
'db' as [database],
Expand All @@ -37,12 +37,10 @@ ALTER_COLUMN_TYPE_MACRO_NAME = 'alter_column_type'
{{ return(load_result('list_schemas').table) }}
{% endmacro %}

{% macro exasol__create_schema(database_name, schema_name) -%}
{%if not adapter.check_schema_exists(database_name,schema_name) %}
{% call statement('create_schema', fetch_result=True, auto_begin=False) -%}
CREATE SCHEMA IF NOT EXISTS {{ schema_name | replace('"', "") }}
{% endcall %}
{%- endif -%}
{% macro exasol__create_schema(relation) -%}
{%- call statement('create_schema') -%}
create schema if not exists {{ relation.without_identifier() }}
{% endcall %}
{% endmacro %}

{% macro exasol__drop_schema(database_name, schema_name) -%}
Expand Down Expand Up @@ -100,7 +98,7 @@ ALTER_COLUMN_TYPE_MACRO_NAME = 'alter_column_type'
{% macro exasol__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
select
lower(column_name) as column_name,
CONCAT('"',lower(column_name), '"') as column_name,
column_type,
column_maxsize,
column_num_prec,
Expand Down
6 changes: 3 additions & 3 deletions dbt/include/exasol/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
{% if unique_key is not none %}
delete
from {{ target_relation }}
where ({{ unique_key }}) in (
select ({{ unique_key }})
where ("{{ unique_key }}") in (
select ("{{ unique_key }}")
from {{ tmp_relation.schema }}.{{tmp_relation.identifier}}
);
{%endif%}
Expand All @@ -29,7 +29,7 @@
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set identifier = model['alias'] -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%}
{% set existing_relation = adapter.get_relation(database, schema, identifier) %}
{% set existing_relation = adapter.get_relation(database=database, schema=schema, identifier = identifier) %}
{% set tmp_relation = make_temp_relation(target_relation) %}

-- setup
Expand Down
6 changes: 5 additions & 1 deletion dbt/include/exasol/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro basic_load_csv_rows(model, batch_size, agate_table) %}
{% macro exasol__basic_load_csv_rows(model, batch_size, agate_table) %}
{% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %}
{% set bindings = [] %}

Expand Down Expand Up @@ -35,3 +35,7 @@
{# Return SQL so we can render it out into the compiled files #}
{{ return(statements[0]) }}
{% endmacro %}

{% macro exasol__load_csv_rows(model, agate_table) %}
{{ return(exasol__basic_load_csv_rows(model, 10000, agate_table) )}}
{% endmacro %}
109 changes: 102 additions & 7 deletions dbt/include/exasol/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro build_snapshot_table(strategy, sql) %}
{% macro exasol__build_snapshot_table(strategy, sql) %}

select sbq.*,
{{ strategy.scd_id }} as dbt_scd_id,
Expand All @@ -11,7 +11,7 @@

{% endmacro %}

{% macro snapshot_staging_table_inserts(strategy, source_sql, target_relation) -%}
{% macro exasol__snapshot_staging_table_inserts(strategy, source_sql, target_relation) -%}

with snapshot_query as (

Expand Down Expand Up @@ -63,7 +63,7 @@

{%- endmacro %}

{% macro snapshot_staging_table_updates(strategy, source_sql, target_relation) -%}
{% macro exasol__snapshot_staging_table_updates(strategy, source_sql, target_relation) -%}

with snapshot_query as (

Expand Down Expand Up @@ -116,10 +116,10 @@
{% do adapter.drop_relation(staging_relation) %}
{% endmacro %}

{% macro build_snapshot_staging_table(strategy, sql, target_relation) %}
{% macro exasol__build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% set inserts_select = snapshot_staging_table_inserts(strategy, sql, target_relation) %}
{% set updates_select = snapshot_staging_table_updates(strategy, sql, target_relation) %}
{% set inserts_select = exasol__snapshot_staging_table_inserts(strategy, sql, target_relation) %}
{% set updates_select = exasol__snapshot_staging_table_updates(strategy, sql, target_relation) %}

{% call statement('build_snapshot_staging_relation_inserts') %}
{{ create_table_as(True, tmp_relation, inserts_select) }}
Expand All @@ -133,4 +133,99 @@
{% endcall %}

{% do return(tmp_relation) %}
{% endmacro %}
{% endmacro %}

{% materialization snapshot, adapter='exasol' %}
{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}

{% if not adapter.check_schema_exists(model.database, model.schema) %}
{% do create_schema(model.database, model.schema) %}
{% endif %}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}

{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}


{{ run_hooks(pre_hooks, inside_transaction=False) }}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}

{% if not target_relation_exists %}

{% set build_sql = exasol__build_snapshot_table(strategy, model['injected_sql']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}

{% set staging_table = exasol__build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}

{% set final_sql = exasol__snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}

{% endif %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro default__snapshot_merge_sql(target, source, insert_cols) -%}
{% macro exasol__snapshot_merge_sql(target, source, insert_cols) -%}
{%- set insert_cols_csv = insert_cols | join(', ') -%}

merge into {{ target | upper }} as DBT_INTERNAL_DEST
Expand Down
51 changes: 50 additions & 1 deletion dbt/include/exasol/macros/materializations/strategies.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{% endfor %})
{% endmacro %}

{% macro snapshot_check_all_get_existing_columns(node, target_exists) -%}
{% macro exasol__snapshot_check_all_get_existing_columns(node, target_exists) -%}
{%- set query_columns = get_columns_in_query(node['injected_sql']) -%}
{%- if not target_exists -%}
{# no table yet -> return whatever the query does #}
Expand All @@ -27,3 +27,52 @@
{%- endfor -%}
{{ return([ns.column_added, intersection]) }}
{%- endmacro %}

{% macro exasol__snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set select_current_time -%}
select {{ snapshot_get_time() }} as snapshot_start
{%- endset %}

{#-- don't access the column by name, to avoid dealing with casing issues on snowflake #}
{%- set now = run_query(select_current_time)[0][0] -%}
{% if now is none or now is undefined -%}
{%- do exceptions.raise_compiler_error('Could not get a snapshot start time from the database') -%}
{%- endif %}
{% set updated_at = snapshot_string_as_time(now) %}

{% set column_added = false %}

{% if check_cols_config == 'all' %}
{% set column_added, check_cols = exasol__snapshot_check_all_get_existing_columns(node, target_exists) %}
{% elif check_cols_config is iterable and (check_cols_config | length) > 0 %}
{% set check_cols = check_cols_config %}
{% else %}
{% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %}
{% endif %}

{%- set row_changed_expr -%}
(
{%- if column_added -%}
TRUE
{%- else -%}
{%- for col in check_cols -%}
{{ snapshotted_rel }}.{{ col }} != {{ current_rel }}.{{ col }}
or
({{ snapshotted_rel }}.{{ col }} is null) != ({{ current_rel }}.{{ col }} is null)
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
{%- endif -%}
)
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}

{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
}) %}
{% endmacro %}
3 changes: 2 additions & 1 deletion dbt/include/exasol/macros/materializations/view.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{% materialization view, adapter='exasol' -%}
{{ create_or_replace_view() }}
{{ return(create_or_replace_view()) }}

{%- endmaterialization %}
8 changes: 4 additions & 4 deletions dbt/include/exasol/macros/schema_tests.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro test_unique(model) %}
{% macro exasol__test_unique(model) %}

{% set column_name = kwargs.get('column_name', kwargs.get('arg')) %}

Expand All @@ -18,7 +18,7 @@ from (
{% endmacro %}


{% macro test_not_null(model) %}
{% macro exasol__test_not_null(model) %}

{% set column_name = kwargs.get('column_name', kwargs.get('arg')) %}

Expand All @@ -29,7 +29,7 @@ where {{ column_name }} is null
{% endmacro %}


{% macro test_accepted_values(model, values) %}
{% macro exasol__test_accepted_values(model, values) %}

{% set column_name = kwargs.get('column_name', kwargs.get('field')) %}

Expand Down Expand Up @@ -63,7 +63,7 @@ from validation_errors
{% endmacro %}


{% macro test_relationships(model, to, field) %}
{% macro exasol__test_relationships(model, to, field) %}

{% set column_name = kwargs.get('column_name', kwargs.get('from')) %}

Expand Down