Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: Built-in handling of default-target-schema for SQL Targets #1157

Merged
8 changes: 8 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ObjectType,
PropertiesList,
Property,
StringType,
)

_EnumMemberT = TypeVar("_EnumMemberT")
Expand Down Expand Up @@ -47,6 +48,13 @@
description="The max depth to flatten schemas.",
),
).to_dict()
TARGET_SCHEMA_CONFIG = PropertiesList(
Property(
"default_target_schema",
StringType(),
description="The Default schema to place all streams",
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
),
).to_dict()


class DeprecatedEnum(Enum):
Expand Down
22 changes: 21 additions & 1 deletion singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,31 @@ def schema_name(self) -> Optional[str]:
Returns:
The target schema name.
"""
# Get the current SQL Dialect being used
target_sqla_dialect = self.connection.engine.dialect.name
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
# Look for a default_target_scheme in the configuraion fle
default_target_schema = self.config.get("default_target_schema", None)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
parts = self.stream_name.split("-")

# 1) When default_target_scheme is in the configuration use it
# 2) if the streams are in <schema>-<table> format use the
# stream <schema>
# 3) Return None if you don't find anything
if default_target_schema:
return default_target_schema

if len(parts) in {2, 3}:
# Stream name is a two-part or three-part identifier.
# Use the second-to-last part as the schema name.
return self.conform_name(parts[-2], "schema")
stream_schema = self.conform_name(parts[-2], "schema")

# MS SQL Server has a public database role so the name is reserved
# and it can not be created as a schema. To avoid this common error
# we convert "public" to "dbo" if the target dialet is mssql
if target_sqla_dialect == "mssql" and stream_schema == "public":
return "dbo"
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
else:
return stream_schema

# Schema name not detected.
return None
Expand Down
48 changes: 47 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from singer_sdk.helpers._batch import BaseBatchFileEncoding
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._compat import final
from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities
from singer_sdk.helpers.capabilities import (
TARGET_SCHEMA_CONFIG,
CapabilitiesEnum,
PluginCapabilities,
TargetCapabilities,
)
from singer_sdk.io_base import SingerMessageType, SingerReader
from singer_sdk.mapper import PluginMapper
from singer_sdk.plugin_base import PluginBase
Expand Down Expand Up @@ -569,4 +574,45 @@ def cli(
class SQLTarget(Target):
"""Target implementation for SQL destinations."""

@classproperty
def capabilities(self) -> List[CapabilitiesEnum]:
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
"""Get target capabilities.

Returns:
A list of capabilities supported by this target.
"""
sql_target_capabilities: List[CapabilitiesEnum] = super().capabilities
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
sql_target_capabilities.extend([TargetCapabilities.TARGET_SCHEMA])

return sql_target_capabilities

def append_builtin_config(self, config_jsonschema: dict) -> None:
"""Appends built-in config to `config_jsonschema` if not already set.

To customize or disable this behavior, developers may either override this class
method or override the `capabilities` property to disabled any unwanted
built-in capabilities.

For all except very advanced use cases, we recommend leaving these
implementations "as-is", since this provides the most choice to users and is
the most "future proof" in terms of taking advantage of built-in capabilities
which may be added in the future.

Args:
config_jsonschema: [description]
"""

def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
# Append any missing properties in the target with those from source.
for k, v in source_jsonschema["properties"].items():
if k not in target_jsonschema["properties"]:
target_jsonschema["properties"][k] = v

capabilities = self.capabilities

if TargetCapabilities.TARGET_SCHEMA in capabilities:
_merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema)

super().append_builtin_config(config_jsonschema)

pass