From 6152b335903cade028d39e851f35303295d45695 Mon Sep 17 00:00:00 2001 From: pall-j Date: Thu, 9 Jan 2025 15:14:05 +0100 Subject: [PATCH 1/2] docs: Add example and improve readme --- .github/workflows/dynamic_workflow.yaml | 2 +- README.md | 31 ++++++++-------- example/myjobpackage/__init__.py | 0 example/myjobpackage/entrypoint.py | 23 ++++++++++++ {tests => example/myjobpackage}/processing.py | 0 example/tests/__init__.py | 0 .../tests/data/tables/example_input.ndjson | 2 ++ .../tests/data/tables/expected_output.ndjson | 2 ++ .../data/tables/schema/example_input.json | 30 ++++++++++++++++ .../data/tables/schema/expected_output.json | 30 ++++++++++++++++ {tests => example/tests}/test_processing.py | 3 +- tests/test_example.py | 35 +++++++++++++++++++ 12 files changed, 141 insertions(+), 17 deletions(-) create mode 100644 example/myjobpackage/__init__.py create mode 100644 example/myjobpackage/entrypoint.py rename {tests => example/myjobpackage}/processing.py (100%) create mode 100644 example/tests/__init__.py create mode 100644 example/tests/data/tables/example_input.ndjson create mode 100644 example/tests/data/tables/expected_output.ndjson create mode 100644 example/tests/data/tables/schema/example_input.json create mode 100644 example/tests/data/tables/schema/expected_output.json rename {tests => example/tests}/test_processing.py (94%) create mode 100644 tests/test_example.py diff --git a/.github/workflows/dynamic_workflow.yaml b/.github/workflows/dynamic_workflow.yaml index 331103d..15a372f 100644 --- a/.github/workflows/dynamic_workflow.yaml +++ b/.github/workflows/dynamic_workflow.yaml @@ -55,7 +55,7 @@ jobs: run: | poetry run ruff check . poetry run ruff format --check . - poetry run pytest . + poetry run pytest tests publish: runs-on: ubuntu-22.04 diff --git a/README.md b/README.md index 5b27d38..ffd865c 100644 --- a/README.md +++ b/README.md @@ -121,17 +121,17 @@ process_data( myjobpackage ├── __init__.py ├── entrypoint.py # Databricks Notebook -├── processing.py -└── tests - ├── __init__.py - ├── test_processing.py - └── data - ├── tables - ├── example_input.ndjson - └── expected_output.ndjson - └── schema - ├── example_input.json - └── expected_output.json +└── processing.py +tests +├── __init__.py +├── test_processing.py +└── data + └── tables + ├── example_input.ndjson + ├── expected_output.ndjson + └── schema + ├── example_input.json + └── expected_output.json ``` **Data Format** @@ -265,6 +265,8 @@ def test_process_data( `assertDataFrameEqual` (this can be adjusted using the `checkRowOrder` parameter). +**ℹ️ For complete example, please look at [example](https://github.com/datamole-ai/pysparkdt/blob/main/example).** + **⚠️ Note on running tests in parallel** With the setup above, the metastore is shared on the module scope. @@ -296,11 +298,12 @@ the processing function. ```python def process_data( + spark: SparkSession, input_table: str, output_table: str, checkpoint_location: str, -) -> StreamingQuery - load_query = session.readStream.format('delta').table(input_table) +) -> StreamingQuery: + load_query = spark.readStream.format('delta').table(input_table) def process_batch(df: pyspark.sql.DataFrame, _) -> None: ... process df ... @@ -323,7 +326,7 @@ def process_data( def test_process_data(spark: SparkSession): ... spark_processing = process_data( - session=spark, + spark=spark, input_table_name='example_input', output_table='output', checkpoint_location=f'{TMP_DIR}/_checkpoint/output', diff --git a/example/myjobpackage/__init__.py b/example/myjobpackage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/myjobpackage/entrypoint.py b/example/myjobpackage/entrypoint.py new file mode 100644 index 0000000..08908bb --- /dev/null +++ b/example/myjobpackage/entrypoint.py @@ -0,0 +1,23 @@ +# Databricks notebook source +import sys +from pathlib import Path + +MODULE_DIR = Path.cwd().parent +sys.path.append(MODULE_DIR.as_posix()) + +# COMMAND ---------- + +from myjobpackage.processing import process_data + +# COMMAND ---------- + +input_table = dbutils.widgets.get('input_table') # noqa: F821 +output_table = dbutils.widgets.get('output_table') # noqa: F821 + +# COMMAND ---------- + +process_data( + session=spark, # noqa: F821 + input_table=input_table, + output_table=output_table, +) diff --git a/tests/processing.py b/example/myjobpackage/processing.py similarity index 100% rename from tests/processing.py rename to example/myjobpackage/processing.py diff --git a/example/tests/__init__.py b/example/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/tests/data/tables/example_input.ndjson b/example/tests/data/tables/example_input.ndjson new file mode 100644 index 0000000..cb274f4 --- /dev/null +++ b/example/tests/data/tables/example_input.ndjson @@ -0,0 +1,2 @@ +{"id": 0, "time_utc": "2024-01-08T11:00:00", "name": "Jorge", "feature": 0.5876} +{"id": 1, "time_utc": "2024-01-11T14:28:00", "name": "Ricardo", "feature": 0.42} diff --git a/example/tests/data/tables/expected_output.ndjson b/example/tests/data/tables/expected_output.ndjson new file mode 100644 index 0000000..8970947 --- /dev/null +++ b/example/tests/data/tables/expected_output.ndjson @@ -0,0 +1,2 @@ +{"id": 0, "time_utc": "2024-01-08T11:00:00", "name": "Jorge", "result": 58.76} +{"id": 1, "time_utc": "2024-01-11T14:28:00", "name": "Ricardo", "result": 42} diff --git a/example/tests/data/tables/schema/example_input.json b/example/tests/data/tables/schema/example_input.json new file mode 100644 index 0000000..c65c393 --- /dev/null +++ b/example/tests/data/tables/schema/example_input.json @@ -0,0 +1,30 @@ +{ + "type": "struct", + "fields": + [ + { + "name": "id", + "type": "long", + "nullable": false, + "metadata": {} + }, + { + "name": "time_utc", + "type": "timestamp", + "nullable": false, + "metadata": {} + }, + { + "name": "name", + "type": "string", + "nullable": true, + "metadata": {} + }, + { + "name": "feature", + "type": "double", + "nullable": true, + "metadata": {} + } + ] +} diff --git a/example/tests/data/tables/schema/expected_output.json b/example/tests/data/tables/schema/expected_output.json new file mode 100644 index 0000000..66ae323 --- /dev/null +++ b/example/tests/data/tables/schema/expected_output.json @@ -0,0 +1,30 @@ +{ + "type": "struct", + "fields": + [ + { + "name": "id", + "type": "long", + "nullable": false, + "metadata": {} + }, + { + "name": "time_utc", + "type": "timestamp", + "nullable": false, + "metadata": {} + }, + { + "name": "name", + "type": "string", + "nullable": true, + "metadata": {} + }, + { + "name": "result", + "type": "double", + "nullable": true, + "metadata": {} + } + ] +} diff --git a/tests/test_processing.py b/example/tests/test_processing.py similarity index 94% rename from tests/test_processing.py rename to example/tests/test_processing.py index ad6a0c7..eb50abd 100644 --- a/tests/test_processing.py +++ b/example/tests/test_processing.py @@ -1,13 +1,12 @@ import os +from myjobpackage.processing import process_data from pyspark.sql import SparkSession from pyspark.testing import assertDataFrameEqual from pytest import fixture from pysparkdt import reinit_local_metastore, spark_base -from .processing import process_data - DATA_DIR = f'{os.path.dirname(__file__)}/data' JSON_TABLES_DIR = f'{DATA_DIR}/tables' TMP_DIR = f'{DATA_DIR}/tmp' diff --git a/tests/test_example.py b/tests/test_example.py new file mode 100644 index 0000000..9e0f596 --- /dev/null +++ b/tests/test_example.py @@ -0,0 +1,35 @@ +import os + +from pyspark.sql import SparkSession +from pyspark.testing import assertDataFrameEqual +from pytest import fixture + +from example.myjobpackage.processing import process_data +from pysparkdt import reinit_local_metastore, spark_base + +DATA_DIR = f'{os.path.dirname(__file__)}/data' +JSON_TABLES_DIR = f'{DATA_DIR}/tables' +TMP_DIR = f'{DATA_DIR}/tmp' +METASTORE_DIR = f'{TMP_DIR}/metastore' + + +@fixture(scope='module') +def spark(): + yield from spark_base(METASTORE_DIR) + + +def test_process_data( + spark: SparkSession, +): + reinit_local_metastore(spark, JSON_TABLES_DIR) + process_data( + session=spark, + input_table='example_input', + output_table='output', + ) + output = spark.read.format('delta').table('output') + expected = spark.read.format('delta').table('expected_output') + assertDataFrameEqual( + actual=output.select(sorted(output.columns)), + expected=expected.select(sorted(expected.columns)), + ) From fba408042b17a56fe32b1616723d3bac15cf3d33 Mon Sep 17 00:00:00 2001 From: pall-j Date: Thu, 9 Jan 2025 16:55:57 +0100 Subject: [PATCH 2/2] Reflect review --- README.md | 4 ++-- example/myjobpackage/entrypoint.py | 2 +- example/myjobpackage/processing.py | 4 ++-- example/tests/test_processing.py | 2 +- tests/test_example.py | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index ffd865c..7bbec14 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ output_table = dbutils.widgets.get('output_table') # COMMAND ---------- process_data( - session=spark, + spark=spark, input_table=input_table, output_table=output_table, ) @@ -245,7 +245,7 @@ def test_process_data( reinit_local_metastore(spark, JSON_TABLES_DIR) process_data( - session=spark, + spark=spark, input_table='example_input', output_table='output', ) diff --git a/example/myjobpackage/entrypoint.py b/example/myjobpackage/entrypoint.py index 08908bb..3f0894e 100644 --- a/example/myjobpackage/entrypoint.py +++ b/example/myjobpackage/entrypoint.py @@ -17,7 +17,7 @@ # COMMAND ---------- process_data( - session=spark, # noqa: F821 + spark=spark, # noqa: F821 input_table=input_table, output_table=output_table, ) diff --git a/example/myjobpackage/processing.py b/example/myjobpackage/processing.py index d0848be..8a78e79 100644 --- a/example/myjobpackage/processing.py +++ b/example/myjobpackage/processing.py @@ -3,9 +3,9 @@ def process_data( - input_table: str, output_table: str, session: SparkSession + input_table: str, output_table: str, spark: SparkSession ) -> None: - df = session.read.table(input_table) + df = spark.read.table(input_table) df = df.withColumn('result', col('feature') * 100) df = df.drop('feature') df.write.format('delta').mode('overwrite').saveAsTable(output_table) diff --git a/example/tests/test_processing.py b/example/tests/test_processing.py index eb50abd..48327d5 100644 --- a/example/tests/test_processing.py +++ b/example/tests/test_processing.py @@ -23,7 +23,7 @@ def test_process_data( ): reinit_local_metastore(spark, JSON_TABLES_DIR) process_data( - session=spark, + spark=spark, input_table='example_input', output_table='output', ) diff --git a/tests/test_example.py b/tests/test_example.py index 9e0f596..9a990f1 100644 --- a/tests/test_example.py +++ b/tests/test_example.py @@ -23,7 +23,7 @@ def test_process_data( ): reinit_local_metastore(spark, JSON_TABLES_DIR) process_data( - session=spark, + spark=spark, input_table='example_input', output_table='output', )