Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Oct 30, 2024
1 parent 856db17 commit 4bacd90
Showing 1 changed file with 51 additions and 35 deletions.
86 changes: 51 additions & 35 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import simplejson
import sqlalchemy as sa
from singer_sdk import SQLConnector
from singer_sdk import typing as th
from singer_sdk.connectors.sql import JSONSchemaToSQL
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, BYTEA, JSONB, UUID
from sqlalchemy.engine import URL
from sqlalchemy.engine.url import make_url
Expand Down Expand Up @@ -214,6 +214,43 @@ def clone_table(
new_table.create(bind=connection)
return new_table

def _handle_array_type(self, jsonschema: dict) -> ARRAY | JSONB:
"""Handle array type."""
items = jsonschema.get("items")
# Case 1: items is a string
if isinstance(items, str):
return ARRAY(self.to_sql_type({"type": items}))

# Case 2: items are more complex
if isinstance(items, dict):
# Case 2.1: items are variants
if "type" not in items:
return ARRAY(JSONB())

items_type = items["type"]

# Case 2.2: items are a single type
if isinstance(items_type, str):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 2.3: items are a list of types
if isinstance(items_type, list):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 3: tuples
return ARRAY(JSONB()) if isinstance(items, list) else JSONB()

@cached_property
def jsonschema_to_sql(self) -> JSONSchemaToSQL:
"""Return a JSONSchemaToSQL instance with custom type handling."""
to_sql = JSONSchemaToSQL()
to_sql.register("integer", BIGINT)
to_sql.register("object", JSONB)
to_sql.register("array", self._handle_array_type)
to_sql.register_format_handler("date-time", TIMESTAMP)
to_sql.register_format_handler("uuid", UUID)
return to_sql

def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine:
"""Return a JSON Schema representation of the provided type.
Expand Down Expand Up @@ -270,7 +307,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine:

return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array)

def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911
def pick_individual_type(self, jsonschema_type: dict):
"""Select the correct sql type assuming jsonschema_type has only a single type.
Args:
Expand All @@ -281,46 +318,25 @@ def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911
"""
if "null" in jsonschema_type["type"]:
return None
if "integer" in jsonschema_type["type"]:
return BIGINT()
if "object" in jsonschema_type["type"]:
return JSONB()
if "array" in jsonschema_type["type"]:
items = jsonschema_type.get("items")
# Case 1: items is a string
if isinstance(items, str):
return ARRAY(self.to_sql_type({"type": items}))

# Case 2: items are more complex
if isinstance(items, dict):
# Case 2.1: items are variants
if "type" not in items:
return ARRAY(JSONB())

items_type = items["type"]

# Case 2.2: items are a single type
if isinstance(items_type, str):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 2.3: items are a list of types
if isinstance(items_type, list):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 3: tuples
return ARRAY(JSONB()) if isinstance(items, list) else JSONB()
# if "integer" in jsonschema_type["type"]:
# return BIGINT()
# if "object" in jsonschema_type["type"]:
# return JSONB()
# if "array" in jsonschema_type["type"]:
# return self._handle_array_type(jsonschema_type)

# string formats
if jsonschema_type.get("format") == "date-time":
return TIMESTAMP()
if jsonschema_type.get("format") == "uuid":
return UUID()
# if jsonschema_type.get("format") == "date-time":
# return TIMESTAMP()
# if jsonschema_type.get("format") == "uuid":
# return UUID()
if (
self.interpret_content_encoding
and jsonschema_type.get("contentEncoding") == "base16"
):
return HexByteString()
individual_type = th.to_sql_type(jsonschema_type)

individual_type = self.jsonschema_to_sql.to_sql_type(jsonschema_type)
return TEXT() if isinstance(individual_type, VARCHAR) else individual_type

@staticmethod
Expand Down

0 comments on commit 4bacd90

Please # to comment.