Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

docs: Add example and improve readme #12

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dynamic_workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
run: |
poetry run ruff check .
poetry run ruff format --check .
poetry run pytest .
poetry run pytest tests
publish:
runs-on: ubuntu-22.04
Expand Down
31 changes: 17 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,17 @@ process_data(
myjobpackage
├── __init__.py
├── entrypoint.py # Databricks Notebook
── processing.py
└── tests
├── __init__.py
├── test_processing.py
└── data
── tables
├── example_input.ndjson
── expected_output.ndjson
└── schema
├── example_input.json
└── expected_output.json
── processing.py
tests
├── __init__.py
├── test_processing.py
└── data
── tables
├── example_input.ndjson
── expected_output.ndjson
└── schema
├── example_input.json
└── expected_output.json
```

**Data Format**
Expand Down Expand Up @@ -265,6 +265,8 @@ def test_process_data(
`assertDataFrameEqual` (this can be adjusted using the `checkRowOrder`
parameter).

**ℹ️ For complete example, please look at [example](https://github.com/datamole-ai/pysparkdt/blob/main/example).**

**⚠️ Note on running tests in parallel**

With the setup above, the metastore is shared on the module scope.
Expand Down Expand Up @@ -296,11 +298,12 @@ the processing function.

```python
def process_data(
spark: SparkSession,
input_table: str,
output_table: str,
checkpoint_location: str,
) -> StreamingQuery
load_query = session.readStream.format('delta').table(input_table)
) -> StreamingQuery:
load_query = spark.readStream.format('delta').table(input_table)

def process_batch(df: pyspark.sql.DataFrame, _) -> None:
... process df ...
Expand All @@ -323,7 +326,7 @@ def process_data(
def test_process_data(spark: SparkSession):
...
spark_processing = process_data(
session=spark,
spark=spark,
input_table_name='example_input',
output_table='output',
checkpoint_location=f'{TMP_DIR}/_checkpoint/output',
Expand Down
Empty file.
23 changes: 23 additions & 0 deletions example/myjobpackage/entrypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Databricks notebook source
import sys
from pathlib import Path

MODULE_DIR = Path.cwd().parent
sys.path.append(MODULE_DIR.as_posix())

# COMMAND ----------

from myjobpackage.processing import process_data

# COMMAND ----------

input_table = dbutils.widgets.get('input_table') # noqa: F821
output_table = dbutils.widgets.get('output_table') # noqa: F821

# COMMAND ----------

process_data(
session=spark, # noqa: F821
pall-j marked this conversation as resolved.
Show resolved Hide resolved
input_table=input_table,
output_table=output_table,
)
File renamed without changes.
Empty file added example/tests/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions example/tests/data/tables/example_input.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id": 0, "time_utc": "2024-01-08T11:00:00", "name": "Jorge", "feature": 0.5876}
{"id": 1, "time_utc": "2024-01-11T14:28:00", "name": "Ricardo", "feature": 0.42}
2 changes: 2 additions & 0 deletions example/tests/data/tables/expected_output.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id": 0, "time_utc": "2024-01-08T11:00:00", "name": "Jorge", "result": 58.76}
{"id": 1, "time_utc": "2024-01-11T14:28:00", "name": "Ricardo", "result": 42}
30 changes: 30 additions & 0 deletions example/tests/data/tables/schema/example_input.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"type": "struct",
"fields":
[
{
"name": "id",
"type": "long",
"nullable": false,
"metadata": {}
},
{
"name": "time_utc",
"type": "timestamp",
"nullable": false,
"metadata": {}
},
{
"name": "name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "feature",
"type": "double",
"nullable": true,
"metadata": {}
}
]
}
30 changes: 30 additions & 0 deletions example/tests/data/tables/schema/expected_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"type": "struct",
"fields":
[
{
"name": "id",
"type": "long",
"nullable": false,
"metadata": {}
},
{
"name": "time_utc",
"type": "timestamp",
"nullable": false,
"metadata": {}
},
{
"name": "name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "result",
"type": "double",
"nullable": true,
"metadata": {}
}
]
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import os

from myjobpackage.processing import process_data
from pyspark.sql import SparkSession
from pyspark.testing import assertDataFrameEqual
from pytest import fixture

from pysparkdt import reinit_local_metastore, spark_base

from .processing import process_data

DATA_DIR = f'{os.path.dirname(__file__)}/data'
JSON_TABLES_DIR = f'{DATA_DIR}/tables'
TMP_DIR = f'{DATA_DIR}/tmp'
Expand Down
35 changes: 35 additions & 0 deletions tests/test_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os

from pyspark.sql import SparkSession
from pyspark.testing import assertDataFrameEqual
from pytest import fixture

from example.myjobpackage.processing import process_data
from pysparkdt import reinit_local_metastore, spark_base

DATA_DIR = f'{os.path.dirname(__file__)}/data'
JSON_TABLES_DIR = f'{DATA_DIR}/tables'
TMP_DIR = f'{DATA_DIR}/tmp'
METASTORE_DIR = f'{TMP_DIR}/metastore'


@fixture(scope='module')
def spark():
yield from spark_base(METASTORE_DIR)


def test_process_data(
spark: SparkSession,
):
reinit_local_metastore(spark, JSON_TABLES_DIR)
process_data(
session=spark,
pall-j marked this conversation as resolved.
Show resolved Hide resolved
input_table='example_input',
output_table='output',
)
output = spark.read.format('delta').table('output')
expected = spark.read.format('delta').table('expected_output')
assertDataFrameEqual(
actual=output.select(sorted(output.columns)),
expected=expected.select(sorted(expected.columns)),
)