Skip to content

Commit

Permalink
fix: improve messaging and fix bugs (#1228)
Browse files Browse the repository at this point in the history
1. `CREATE TABLE` returns better response messages. 
2. Skipping `PINECONE` testcase as it is flaky. 
3. Handle the scenario when `CREATE TABLE AS SELECT` fails in the native
database table.
  • Loading branch information
gaurav274 authored Sep 27, 2023
1 parent e32e528 commit e06a859
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 33 deletions.
79 changes: 48 additions & 31 deletions evadb/executor/create_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
# limitations under the License.
import contextlib

import pandas as pd

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import (
create_table_catalog_entry_for_native_table,
handle_if_not_exists,
)
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.create_plan import CreatePlan
from evadb.storage.storage_engine import StorageEngine
from evadb.utils.errors import CatalogError
Expand All @@ -37,41 +40,55 @@ def exec(self, *args, **kwargs):
check_if_exists = handle_if_not_exists(
self.catalog(), self.node.table_info, self.node.if_not_exists
)
name = self.node.table_info.table_name
if check_if_exists:
yield Batch(pd.DataFrame([f"Table {name} already exists"]))
return

if not check_if_exists:
create_table_done = False
logger.debug(f"Creating table {self.node.table_info}")
create_table_done = False
logger.debug(f"Creating table {self.node.table_info}")

if not is_native_table:
catalog_entry = self.catalog().create_and_insert_table_catalog_entry(
self.node.table_info, self.node.column_list
)
else:
catalog_entry = create_table_catalog_entry_for_native_table(
self.node.table_info, self.node.column_list
if not is_native_table:
catalog_entry = self.catalog().create_and_insert_table_catalog_entry(
self.node.table_info, self.node.column_list
)
else:
catalog_entry = create_table_catalog_entry_for_native_table(
self.node.table_info, self.node.column_list
)
storage_engine = StorageEngine.factory(self.db, catalog_entry)

try:
storage_engine.create(table=catalog_entry)
create_table_done = True

msg = f"The table {name} has been successfully created"
if self.children != []:
assert (
len(self.children) == 1
), "Create table from query expects 1 child, finds {}".format(
len(self.children)
)
storage_engine = StorageEngine.factory(self.db, catalog_entry)
child = self.children[0]

rows = 0
# Populate the table
for batch in child.exec():
batch.drop_column_alias()
storage_engine.write(catalog_entry, batch)
rows += len(batch)

try:
storage_engine.create(table=catalog_entry)
create_table_done = True
if self.children != []:
assert (
len(self.children) == 1
), "Create table from query expects 1 child, finds {}".format(
len(self.children)
)
child = self.children[0]
msg = (
f"The table {name} has been successfully created with {rows} rows."
)

# Populate the table
for batch in child.exec():
batch.drop_column_alias()
storage_engine.write(catalog_entry, batch)
except Exception as e:
# rollback if the create call fails
yield Batch(pd.DataFrame([msg]))
except Exception as e:
# rollback if the create call fails
with contextlib.suppress(CatalogError):
if create_table_done:
storage_engine.drop(catalog_entry)
# rollback catalog entry, suppress any errors raised by catalog
with contextlib.suppress(CatalogError):
self.catalog().delete_table_catalog_entry(catalog_entry)
raise e
# rollback catalog entry, suppress any errors raised by catalog
with contextlib.suppress(CatalogError):
self.catalog().delete_table_catalog_entry(catalog_entry)
raise e
2 changes: 1 addition & 1 deletion evadb/executor/load_csv_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def exec(self, *args, **kwargs):
pd.DataFrame(
{
"CSV": str(self.node.file_path),
"Number of loaded frames": num_loaded_frames,
"Number of loaded rows": num_loaded_frames,
},
index=[0],
)
Expand Down
24 changes: 24 additions & 0 deletions evadb/storage/native_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,27 @@ def read(self, table: TableCatalogEntry) -> Iterator[Batch]:
err_msg = f"Failed to read the table {table.name} in data source {table.database_name} with exception {str(e)}"
logger.exception(err_msg)
raise Exception(err_msg)

def drop(self, table: TableCatalogEntry):
try:
db_catalog_entry = self._get_database_catalog_entry(table.database_name)
with get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
) as handler:
uri = handler.get_sqlalchmey_uri()

# Create a metadata object
engine = create_engine(uri)
metadata = MetaData()
Session = sessionmaker(bind=engine)
session = Session()
# Retrieve the SQLAlchemy table object for the existing table
table_to_remove = Table(table.name, metadata, autoload_with=engine)

table_to_remove.drop(engine)
session.commit()
session.close()
except Exception as e:
err_msg = f"Failed to drop the table {table.name} in data source {table.database_name} with exception {str(e)}"
logger.error(err_msg)
raise Exception(err_msg)
2 changes: 1 addition & 1 deletion test/integration_tests/long/test_similarity.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_chromadb(
drop_query = "DROP INDEX testChromaDBIndexImageDataset"
execute_query_fetch_all(self.evadb, drop_query)

@pytest.mark.skip(reason="require pinecone")
@pytest.mark.skip(reason="Flaky testcase due to `bad request` error message")
@pinecone_skip_marker
def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_pinecone(
self,
Expand Down

0 comments on commit e06a859

Please # to comment.