Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

feat: support TOP operator #756

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions data_diff/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ def render_select(self, parent_c: Compiler, elem: Select) -> str:

if elem.limit_expr is not None:
has_order_by = bool(elem.order_by_exprs)
select += " " + self.offset_limit(0, elem.limit_expr, has_order_by=has_order_by)
select = self.limit_select(select_query=select, offset=0, limit=elem.limit_expr, has_order_by=has_order_by)

if parent_c.in_select:
select = f"({select}) {c.new_unique_name()}"
Expand Down Expand Up @@ -605,14 +605,17 @@ def render_inserttotable(self, c: Compiler, elem: InsertToTable) -> str:

return f"INSERT INTO {self.compile(c, elem.path)}{columns} {expr}"

def offset_limit(
self, offset: Optional[int] = None, limit: Optional[int] = None, has_order_by: Optional[bool] = None
def limit_select(
self,
select_query: str,
offset: Optional[int] = None,
limit: Optional[int] = None,
has_order_by: Optional[bool] = None,
) -> str:
"Provide SQL fragment for limit and offset inside a select"
if offset:
raise NotImplementedError("No support for OFFSET in query")

return f"LIMIT {limit}"
return f"SELECT * FROM ({select_query}) AS LIMITED_SELECT LIMIT {limit}"

def concat(self, items: List[str]) -> str:
"Provide SQL for concatenating a bunch of columns into a string"
Expand Down Expand Up @@ -1103,7 +1106,7 @@ def _query_cursor(self, c, sql_code: str) -> QueryResult:
return result
except Exception as _e:
# logger.exception(e)
# logger.error(f'Caused by SQL: {sql_code}')
# logger.error(f"Caused by SQL: {sql_code}")
raise

def _query_conn(self, conn, sql_code: Union[str, ThreadLocalInterpreter]) -> QueryResult:
Expand Down
10 changes: 7 additions & 3 deletions data_diff/databases/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ def is_distinct_from(self, a: str, b: str) -> str:
# See: https://stackoverflow.com/a/18684859/857383
return f"(({a}<>{b} OR {a} IS NULL OR {b} IS NULL) AND NOT({a} IS NULL AND {b} IS NULL))"

def offset_limit(
self, offset: Optional[int] = None, limit: Optional[int] = None, has_order_by: Optional[bool] = None
def limit_select(
self,
select_query: str,
offset: Optional[int] = None,
limit: Optional[int] = None,
has_order_by: Optional[bool] = None,
) -> str:
if offset:
raise NotImplementedError("No support for OFFSET in query")
Expand All @@ -121,7 +125,7 @@ def offset_limit(
result += "ORDER BY 1"

result += f" OFFSET 0 ROWS FETCH NEXT {limit} ROWS ONLY"
return result
return f"SELECT * FROM ({select_query}) AS LIMITED_SELECT {result}"

def constant_values(self, rows) -> str:
values = ", ".join("(%s)" % ", ".join(self._constant_value(v) for v in row) for row in rows)
Expand Down
10 changes: 7 additions & 3 deletions data_diff/databases/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ def quote(self, s: str):
def to_string(self, s: str):
return f"cast({s} as varchar(1024))"

def offset_limit(
self, offset: Optional[int] = None, limit: Optional[int] = None, has_order_by: Optional[bool] = None
def limit_select(
self,
select_query: str,
offset: Optional[int] = None,
limit: Optional[int] = None,
has_order_by: Optional[bool] = None,
) -> str:
if offset:
raise NotImplementedError("No support for OFFSET in query")

return f"FETCH NEXT {limit} ROWS ONLY"
return f"SELECT * FROM ({select_query}) FETCH NEXT {limit} ROWS ONLY"

def concat(self, items: List[str]) -> str:
joined_exprs = " || ".join(items)
Expand Down
19 changes: 13 additions & 6 deletions tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@ def current_database(self) -> str:
def current_schema(self) -> str:
return "current_schema()"

def offset_limit(
self, offset: Optional[int] = None, limit: Optional[int] = None, has_order_by: Optional[bool] = None
def limit_select(
self,
select_query: str,
offset: Optional[int] = None,
limit: Optional[int] = None,
has_order_by: Optional[bool] = None,
) -> str:
x = offset and f"OFFSET {offset}", limit and f"LIMIT {limit}"
return " ".join(filter(None, x))
result = " ".join(filter(None, x))
return f"SELECT * FROM ({select_query}) AS LIMITED_SELECT {result}"

def explain_as_text(self, query: str) -> str:
return f"explain {query}"
Expand Down Expand Up @@ -192,7 +197,7 @@ def test_funcs(self):
t = table("a")

q = c.compile(t.order_by(Random()).limit(10))
self.assertEqual(q, "SELECT * FROM a ORDER BY random() LIMIT 10")
self.assertEqual(q, "SELECT * FROM (SELECT * FROM a ORDER BY random()) AS LIMITED_SELECT LIMIT 10")

q = c.compile(t.select(coalesce(this.a, this.b)))
self.assertEqual(q, "SELECT COALESCE(a, b) FROM a")
Expand All @@ -210,7 +215,7 @@ def test_select_distinct(self):

# selects stay apart
q = c.compile(t.limit(10).select(this.b, distinct=True))
self.assertEqual(q, "SELECT DISTINCT b FROM (SELECT * FROM a LIMIT 10) tmp1")
self.assertEqual(q, "SELECT DISTINCT b FROM (SELECT * FROM (SELECT * FROM a) AS LIMITED_SELECT LIMIT 10) tmp1")

q = c.compile(t.select(this.b, distinct=True).select(distinct=False))
self.assertEqual(q, "SELECT * FROM (SELECT DISTINCT b FROM a) tmp2")
Expand All @@ -226,7 +231,9 @@ def test_select_with_optimizer_hints(self):
self.assertEqual(q, "SELECT /*+ PARALLEL(a 16) */ b FROM a WHERE (b > 10)")

q = c.compile(t.limit(10).select(this.b, optimizer_hints="PARALLEL(a 16)"))
self.assertEqual(q, "SELECT /*+ PARALLEL(a 16) */ b FROM (SELECT * FROM a LIMIT 10) tmp1")
self.assertEqual(
q, "SELECT /*+ PARALLEL(a 16) */ b FROM (SELECT * FROM (SELECT * FROM a) AS LIMITED_SELECT LIMIT 10) tmp1"
)

q = c.compile(t.select(this.a).group_by(this.b).agg(this.c).select(optimizer_hints="PARALLEL(a 16)"))
self.assertEqual(
Expand Down