Skip to content

Commit

Permalink
feat: add batch_config handling in append_builtin_config() (#1572)
Browse files Browse the repository at this point in the history
* feat: add `batch_config` handling in `append_builtin_config()`

* Fix types

* Add capability

---------

Co-authored-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Co-authored-by: Ken Payne <ken@meltano.com>
  • Loading branch information
3 people authored May 24, 2023
1 parent 2c879d6 commit 44b6ae9
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 10 deletions.
15 changes: 15 additions & 0 deletions singer_sdk/configuration/_dict_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,18 @@ def merge_config_sources(
config.update(read_json_file(config_path))

return config


def merge_missing_config_jsonschema(
source_jsonschema: dict,
target_jsonschema: dict,
) -> None:
"""Append any missing properties in the target with those from source.
Args:
source_jsonschema: The source json schema from which to import.
target_jsonschema: The json schema to update.
"""
for k, v in source_jsonschema["properties"].items():
if k not in target_jsonschema["properties"]:
target_jsonschema["properties"][k] = v
28 changes: 28 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,34 @@
description="The max depth to flatten schemas.",
),
).to_dict()
BATCH_CONFIG = PropertiesList(
Property(
"batch_config",
description="",
wrapped=ObjectType(
Property(
"encoding",
description="Specifies the format and compression of the batch files.",
wrapped=ObjectType(
Property("format", StringType, allowed_values=["jsonl"]),
Property(
"compression",
StringType,
allowed_values=["gzip", "none"],
),
),
),
Property(
"storage",
description="Defines the storage layer to use when writing batch files",
wrapped=ObjectType(
Property("root", StringType),
Property("prefix", StringType),
),
),
),
),
).to_dict()
TARGET_SCHEMA_CONFIG = PropertiesList(
Property(
"default_target_schema",
Expand Down
17 changes: 7 additions & 10 deletions singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from jsonschema import Draft7Validator

from singer_sdk import about, metrics
from singer_sdk.configuration._dict_config import parse_environment_config
from singer_sdk.configuration._dict_config import (
merge_missing_config_jsonschema,
parse_environment_config,
)
from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._compat import metadata
Expand Down Expand Up @@ -129,6 +132,7 @@ def capabilities(self) -> list[CapabilitiesEnum]:
return [
PluginCapabilities.STREAM_MAPS,
PluginCapabilities.FLATTENING,
PluginCapabilities.BATCH,
]

@classproperty
Expand Down Expand Up @@ -339,19 +343,12 @@ def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> Non
Args:
config_jsonschema: [description]
"""

def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
# Append any missing properties in the target with those from source.
for k, v in source_jsonschema["properties"].items():
if k not in target_jsonschema["properties"]:
target_jsonschema["properties"][k] = v

capabilities = cls.capabilities
if PluginCapabilities.STREAM_MAPS in capabilities:
_merge_missing(STREAM_MAPS_CONFIG, config_jsonschema)
merge_missing_config_jsonschema(STREAM_MAPS_CONFIG, config_jsonschema)

if PluginCapabilities.FLATTENING in capabilities:
_merge_missing(FLATTENING_CONFIG, config_jsonschema)
merge_missing_config_jsonschema(FLATTENING_CONFIG, config_jsonschema)

@classmethod
def print_about(
Expand Down
25 changes: 25 additions & 0 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@

from singer_sdk._singerlib import Catalog
from singer_sdk.cli import common_options
from singer_sdk.configuration._dict_config import merge_missing_config_jsonschema
from singer_sdk.exceptions import AbortedSyncFailedException, AbortedSyncPausedException
from singer_sdk.helpers import _state
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._compat import final
from singer_sdk.helpers._state import write_stream_state
from singer_sdk.helpers._util import read_json_file
from singer_sdk.helpers.capabilities import (
BATCH_CONFIG,
CapabilitiesEnum,
PluginCapabilities,
TapCapabilities,
Expand Down Expand Up @@ -181,8 +183,31 @@ def capabilities(self) -> list[CapabilitiesEnum]:
PluginCapabilities.ABOUT,
PluginCapabilities.STREAM_MAPS,
PluginCapabilities.FLATTENING,
PluginCapabilities.BATCH,
]

@classmethod
def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> None:
"""Appends built-in config to `config_jsonschema` if not already set.
To customize or disable this behavior, developers may either override this class
method or override the `capabilities` property to disabled any unwanted
built-in capabilities.
For all except very advanced use cases, we recommend leaving these
implementations "as-is", since this provides the most choice to users and is
the most "future proof" in terms of taking advantage of built-in capabilities
which may be added in the future.
Args:
config_jsonschema: [description]
"""
PluginBase.append_builtin_config(config_jsonschema)

capabilities = cls.capabilities
if PluginCapabilities.BATCH in capabilities:
merge_missing_config_jsonschema(BATCH_CONFIG, config_jsonschema)

# Connection and sync tests:

@final
Expand Down

0 comments on commit 44b6ae9

Please # to comment.