From 44b6ae9a045b7c1ca64ffe266fc18e79608d3a97 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" <18150651+aaronsteers@users.noreply.github.com> Date: Wed, 24 May 2023 08:51:11 -0700 Subject: [PATCH] feat: add `batch_config` handling in `append_builtin_config()` (#1572) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add `batch_config` handling in `append_builtin_config()` * Fix types * Add capability --------- Co-authored-by: Edgar Ramírez Mondragón Co-authored-by: Ken Payne --- singer_sdk/configuration/_dict_config.py | 15 +++++++++++++ singer_sdk/helpers/capabilities.py | 28 ++++++++++++++++++++++++ singer_sdk/plugin_base.py | 17 ++++++-------- singer_sdk/tap_base.py | 25 +++++++++++++++++++++ 4 files changed, 75 insertions(+), 10 deletions(-) diff --git a/singer_sdk/configuration/_dict_config.py b/singer_sdk/configuration/_dict_config.py index 106f46d9e..021099adb 100644 --- a/singer_sdk/configuration/_dict_config.py +++ b/singer_sdk/configuration/_dict_config.py @@ -101,3 +101,18 @@ def merge_config_sources( config.update(read_json_file(config_path)) return config + + +def merge_missing_config_jsonschema( + source_jsonschema: dict, + target_jsonschema: dict, +) -> None: + """Append any missing properties in the target with those from source. + + Args: + source_jsonschema: The source json schema from which to import. + target_jsonschema: The json schema to update. + """ + for k, v in source_jsonschema["properties"].items(): + if k not in target_jsonschema["properties"]: + target_jsonschema["properties"][k] = v diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index 63ed135d8..00a420daf 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -50,6 +50,34 @@ description="The max depth to flatten schemas.", ), ).to_dict() +BATCH_CONFIG = PropertiesList( + Property( + "batch_config", + description="", + wrapped=ObjectType( + Property( + "encoding", + description="Specifies the format and compression of the batch files.", + wrapped=ObjectType( + Property("format", StringType, allowed_values=["jsonl"]), + Property( + "compression", + StringType, + allowed_values=["gzip", "none"], + ), + ), + ), + Property( + "storage", + description="Defines the storage layer to use when writing batch files", + wrapped=ObjectType( + Property("root", StringType), + Property("prefix", StringType), + ), + ), + ), + ), +).to_dict() TARGET_SCHEMA_CONFIG = PropertiesList( Property( "default_target_schema", diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index 799b8e9d0..042ee5b54 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -13,7 +13,10 @@ from jsonschema import Draft7Validator from singer_sdk import about, metrics -from singer_sdk.configuration._dict_config import parse_environment_config +from singer_sdk.configuration._dict_config import ( + merge_missing_config_jsonschema, + parse_environment_config, +) from singer_sdk.exceptions import ConfigValidationError from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._compat import metadata @@ -129,6 +132,7 @@ def capabilities(self) -> list[CapabilitiesEnum]: return [ PluginCapabilities.STREAM_MAPS, PluginCapabilities.FLATTENING, + PluginCapabilities.BATCH, ] @classproperty @@ -339,19 +343,12 @@ def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> Non Args: config_jsonschema: [description] """ - - def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: - # Append any missing properties in the target with those from source. - for k, v in source_jsonschema["properties"].items(): - if k not in target_jsonschema["properties"]: - target_jsonschema["properties"][k] = v - capabilities = cls.capabilities if PluginCapabilities.STREAM_MAPS in capabilities: - _merge_missing(STREAM_MAPS_CONFIG, config_jsonschema) + merge_missing_config_jsonschema(STREAM_MAPS_CONFIG, config_jsonschema) if PluginCapabilities.FLATTENING in capabilities: - _merge_missing(FLATTENING_CONFIG, config_jsonschema) + merge_missing_config_jsonschema(FLATTENING_CONFIG, config_jsonschema) @classmethod def print_about( diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 791ff1a0c..fb6443a60 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -13,6 +13,7 @@ from singer_sdk._singerlib import Catalog from singer_sdk.cli import common_options +from singer_sdk.configuration._dict_config import merge_missing_config_jsonschema from singer_sdk.exceptions import AbortedSyncFailedException, AbortedSyncPausedException from singer_sdk.helpers import _state from singer_sdk.helpers._classproperty import classproperty @@ -20,6 +21,7 @@ from singer_sdk.helpers._state import write_stream_state from singer_sdk.helpers._util import read_json_file from singer_sdk.helpers.capabilities import ( + BATCH_CONFIG, CapabilitiesEnum, PluginCapabilities, TapCapabilities, @@ -181,8 +183,31 @@ def capabilities(self) -> list[CapabilitiesEnum]: PluginCapabilities.ABOUT, PluginCapabilities.STREAM_MAPS, PluginCapabilities.FLATTENING, + PluginCapabilities.BATCH, ] + @classmethod + def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> None: + """Appends built-in config to `config_jsonschema` if not already set. + + To customize or disable this behavior, developers may either override this class + method or override the `capabilities` property to disabled any unwanted + built-in capabilities. + + For all except very advanced use cases, we recommend leaving these + implementations "as-is", since this provides the most choice to users and is + the most "future proof" in terms of taking advantage of built-in capabilities + which may be added in the future. + + Args: + config_jsonschema: [description] + """ + PluginBase.append_builtin_config(config_jsonschema) + + capabilities = cls.capabilities + if PluginCapabilities.BATCH in capabilities: + merge_missing_config_jsonschema(BATCH_CONFIG, config_jsonschema) + # Connection and sync tests: @final