From 372f0d73893806abe1413cd6e484f3aae8dfcf49 Mon Sep 17 00:00:00 2001 From: Paul Hobson Date: Wed, 23 Nov 2022 12:18:31 -0800 Subject: [PATCH 1/9] parametrize test for a small a big dataframe --- dask_snowflake/tests/test_core.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/dask_snowflake/tests/test_core.py b/dask_snowflake/tests/test_core.py index e55bc2c..87b16a0 100644 --- a/dask_snowflake/tests/test_core.py +++ b/dask_snowflake/tests/test_core.py @@ -237,9 +237,16 @@ def test_execute_params(table, connection_kwargs, client): ) -def test_result_batching(table, connection_kwargs, client): +@pytest.mark.parametrize( + "end_date", + [ + pytest.param("2000-01-31", id="one month"), + pytest.param("2000-12-31", id="twelve months"), + ], +) +def test_result_batching(table, connection_kwargs, end_date, client): ddf = ( - dask.datasets.timeseries(freq="10s", seed=1) + dask.datasets.timeseries(start="2000-01-01", end=end_date, freq="10s", seed=1) .reset_index(drop=True) .rename(columns=lambda c: c.upper()) ) From fac877c394f190aa66594c2c1554aa8e31500c4d Mon Sep 17 00:00:00 2001 From: Paul Hobson Date: Wed, 23 Nov 2022 13:07:04 -0800 Subject: [PATCH 2/9] break up batching test --- dask_snowflake/tests/test_core.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/dask_snowflake/tests/test_core.py b/dask_snowflake/tests/test_core.py index 87b16a0..eaf62bb 100644 --- a/dask_snowflake/tests/test_core.py +++ b/dask_snowflake/tests/test_core.py @@ -241,10 +241,14 @@ def test_execute_params(table, connection_kwargs, client): "end_date", [ pytest.param("2000-01-31", id="one month"), - pytest.param("2000-12-31", id="twelve months"), + pytest.param( + "2000-12-31", + id="twelve months", + marks=pytest.mark.xfail(reason="inconsistent partition sizing"), + ), ], ) -def test_result_batching(table, connection_kwargs, end_date, client): +def test_result_batching_partition_sizes(table, connection_kwargs, end_date, client): ddf = ( dask.datasets.timeseries(start="2000-01-01", end=end_date, freq="10s", seed=1) .reset_index(drop=True) @@ -263,6 +267,24 @@ def test_result_batching(table, connection_kwargs, end_date, client): partition_sizes = ddf_out.memory_usage_per_partition().compute() assert (partition_sizes < 2 * parse_bytes("2 MiB")).all() + +@pytest.mark.parametrize( + "end_date", + [ + pytest.param("2000-01-31", id="one month"), + pytest.param("2000-12-31", id="twelve months"), + ], +) +def test_result_batching_nparitions_and_equality( + table, connection_kwargs, end_date, client +): + ddf = ( + dask.datasets.timeseries(start="2000-01-01", end=end_date, freq="10s", seed=1) + .reset_index(drop=True) + .rename(columns=lambda c: c.upper()) + ) + to_snowflake(ddf, name=table, connection_kwargs=connection_kwargs) + # Test partition_size logic ddf_out = read_snowflake( f"SELECT * FROM {table}", From 399da1465a208d5ec4459059461ecd4ed0aa104e Mon Sep 17 00:00:00 2001 From: Paul Hobson Date: Wed, 23 Nov 2022 14:08:05 -0800 Subject: [PATCH 3/9] remove serializable lock --- dask_snowflake/core.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/dask_snowflake/core.py b/dask_snowflake/core.py index b2eaaeb..8f8c0c8 100644 --- a/dask_snowflake/core.py +++ b/dask_snowflake/core.py @@ -18,7 +18,7 @@ from dask.delayed import delayed from dask.highlevelgraph import HighLevelGraph from dask.layers import DataFrameIOLayer -from dask.utils import SerializableLock, parse_bytes +from dask.utils import parse_bytes @delayed @@ -34,16 +34,15 @@ def write_snowflake( with snowflake.connector.connect(**connection_kwargs) as conn: # NOTE: Use a process-wide lock to avoid a `boto` multithreading issue # https://github.com/snowflakedb/snowflake-connector-python/issues/156 - with SerializableLock(token="write_snowflake"): - write_pandas( - conn=conn, - df=df, - schema=connection_kwargs.get("schema", None), - # NOTE: since ensure_db_exists uses uppercase for the table name - table_name=name.upper(), - parallel=1, - quote_identifiers=False, - ) + write_pandas( + conn=conn, + df=df, + schema=connection_kwargs.get("schema", None), + # NOTE: since ensure_db_exists uses uppercase for the table name + table_name=name.upper(), + parallel=1, + quote_identifiers=False, + ) @delayed From 059914f0d51d138416b5eb89e13b3df8c5b503f8 Mon Sep 17 00:00:00 2001 From: Paul Hobson Date: Wed, 23 Nov 2022 14:48:41 -0800 Subject: [PATCH 4/9] (bonys )markdown linting [ci skip] --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index eea7b17..b3f6c12 100644 --- a/README.md +++ b/README.md @@ -12,13 +12,13 @@ it out! `dask-snowflake` can be installed with `pip`: -``` +```shell pip install dask-snowflake ``` or with `conda`: -``` +```shell conda install -c conda-forge dask-snowflake ``` From 4d967598704292af039ed53e6c955f3fc3220d81 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 17 May 2023 11:59:18 -0500 Subject: [PATCH 5/9] Remove parallel=1 --- dask_snowflake/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_snowflake/core.py b/dask_snowflake/core.py index 8f8c0c8..7c1ecc8 100644 --- a/dask_snowflake/core.py +++ b/dask_snowflake/core.py @@ -40,7 +40,6 @@ def write_snowflake( schema=connection_kwargs.get("schema", None), # NOTE: since ensure_db_exists uses uppercase for the table name table_name=name.upper(), - parallel=1, quote_identifiers=False, ) From 28650b129c999a17be9f928757397520c84874ae Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 17 May 2023 12:00:46 -0500 Subject: [PATCH 6/9] Remove stale comment --- dask_snowflake/core.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dask_snowflake/core.py b/dask_snowflake/core.py index 7c1ecc8..43c11fd 100644 --- a/dask_snowflake/core.py +++ b/dask_snowflake/core.py @@ -32,8 +32,6 @@ def write_snowflake( **connection_kwargs, } with snowflake.connector.connect(**connection_kwargs) as conn: - # NOTE: Use a process-wide lock to avoid a `boto` multithreading issue - # https://github.com/snowflakedb/snowflake-connector-python/issues/156 write_pandas( conn=conn, df=df, From 5177082d356536b18d63d9dea58285a53bf63e7a Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 26 May 2023 16:16:52 -0500 Subject: [PATCH 7/9] Remove unrelated tests --- dask_snowflake/tests/test_core.py | 33 ++----------------------------- 1 file changed, 2 insertions(+), 31 deletions(-) diff --git a/dask_snowflake/tests/test_core.py b/dask_snowflake/tests/test_core.py index 9e31033..d8dfbfd 100644 --- a/dask_snowflake/tests/test_core.py +++ b/dask_snowflake/tests/test_core.py @@ -260,20 +260,9 @@ def test_execute_params(table, connection_kwargs, client): ) -@pytest.mark.parametrize( - "end_date", - [ - pytest.param("2000-01-31", id="one month"), - pytest.param( - "2000-12-31", - id="twelve months", - marks=pytest.mark.xfail(reason="inconsistent partition sizing"), - ), - ], -) -def test_result_batching_partition_sizes(table, connection_kwargs, end_date, client): +def test_result_batching(table, connection_kwargs, client): ddf = ( - dask.datasets.timeseries(start="2000-01-01", end=end_date, freq="10s", seed=1) + dask.datasets.timeseries(freq="10s", seed=1) .reset_index(drop=True) .rename(columns=lambda c: c.upper()) ) @@ -290,24 +279,6 @@ def test_result_batching_partition_sizes(table, connection_kwargs, end_date, cli partition_sizes = ddf_out.memory_usage_per_partition().compute() assert (partition_sizes < 2 * parse_bytes("2 MiB")).all() - -@pytest.mark.parametrize( - "end_date", - [ - pytest.param("2000-01-31", id="one month"), - pytest.param("2000-12-31", id="twelve months"), - ], -) -def test_result_batching_nparitions_and_equality( - table, connection_kwargs, end_date, client -): - ddf = ( - dask.datasets.timeseries(start="2000-01-01", end=end_date, freq="10s", seed=1) - .reset_index(drop=True) - .rename(columns=lambda c: c.upper()) - ) - to_snowflake(ddf, name=table, connection_kwargs=connection_kwargs) - # Test partition_size logic ddf_out = read_snowflake( f"SELECT * FROM {table}", From a163a895a9a35ac39de77f41b547c350c2b670d9 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 17 Jul 2023 16:17:48 -0500 Subject: [PATCH 8/9] Rerun CI From a58b685f590c3fbe87bad658da0e6076e65e91ca Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 17 Jul 2023 16:41:35 -0500 Subject: [PATCH 9/9] Test fixup --- dask_snowflake/tests/test_core.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/dask_snowflake/tests/test_core.py b/dask_snowflake/tests/test_core.py index d8dfbfd..2993599 100644 --- a/dask_snowflake/tests/test_core.py +++ b/dask_snowflake/tests/test_core.py @@ -103,22 +103,20 @@ def test_to_snowflake_compute_false(table, connection_kwargs, client): def test_arrow_options(table, connection_kwargs, client): - # We use a single partition Dask DataFrame to ensure the - # categories used below are always in the same order. - to_snowflake(ddf.repartition(1), name=table, connection_kwargs=connection_kwargs) + to_snowflake(ddf, name=table, connection_kwargs=connection_kwargs) query = f"SELECT * FROM {table}" df_out = read_snowflake( query, connection_kwargs=connection_kwargs, - arrow_options={"categories": ["A"]}, + arrow_options={"types_mapper": lambda x: pd.Float32Dtype()}, npartitions=2, ) # FIXME: Why does read_snowflake return lower-case columns names? df_out.columns = df_out.columns.str.upper() # FIXME: We need to sort the DataFrame because paritions are written # in a non-sequential order. - expected = df.astype({"A": "category"}) + expected = df.astype(pd.Float32Dtype()) dd.utils.assert_eq( expected, df_out.sort_values(by="A").reset_index(drop=True), check_dtype=False )