diff --git a/evadb/executor/create_executor.py b/evadb/executor/create_executor.py index 905912adc0..21c788f000 100644 --- a/evadb/executor/create_executor.py +++ b/evadb/executor/create_executor.py @@ -14,12 +14,15 @@ # limitations under the License. import contextlib +import pandas as pd + from evadb.database import EvaDBDatabase from evadb.executor.abstract_executor import AbstractExecutor from evadb.executor.executor_utils import ( create_table_catalog_entry_for_native_table, handle_if_not_exists, ) +from evadb.models.storage.batch import Batch from evadb.plan_nodes.create_plan import CreatePlan from evadb.storage.storage_engine import StorageEngine from evadb.utils.errors import CatalogError @@ -37,41 +40,55 @@ def exec(self, *args, **kwargs): check_if_exists = handle_if_not_exists( self.catalog(), self.node.table_info, self.node.if_not_exists ) + name = self.node.table_info.table_name + if check_if_exists: + yield Batch(pd.DataFrame([f"Table {name} already exists"])) + return - if not check_if_exists: - create_table_done = False - logger.debug(f"Creating table {self.node.table_info}") + create_table_done = False + logger.debug(f"Creating table {self.node.table_info}") - if not is_native_table: - catalog_entry = self.catalog().create_and_insert_table_catalog_entry( - self.node.table_info, self.node.column_list - ) - else: - catalog_entry = create_table_catalog_entry_for_native_table( - self.node.table_info, self.node.column_list + if not is_native_table: + catalog_entry = self.catalog().create_and_insert_table_catalog_entry( + self.node.table_info, self.node.column_list + ) + else: + catalog_entry = create_table_catalog_entry_for_native_table( + self.node.table_info, self.node.column_list + ) + storage_engine = StorageEngine.factory(self.db, catalog_entry) + + try: + storage_engine.create(table=catalog_entry) + create_table_done = True + + msg = f"The table {name} has been successfully created" + if self.children != []: + assert ( + len(self.children) == 1 + ), "Create table from query expects 1 child, finds {}".format( + len(self.children) ) - storage_engine = StorageEngine.factory(self.db, catalog_entry) + child = self.children[0] + + rows = 0 + # Populate the table + for batch in child.exec(): + batch.drop_column_alias() + storage_engine.write(catalog_entry, batch) + rows += len(batch) - try: - storage_engine.create(table=catalog_entry) - create_table_done = True - if self.children != []: - assert ( - len(self.children) == 1 - ), "Create table from query expects 1 child, finds {}".format( - len(self.children) - ) - child = self.children[0] + msg = ( + f"The table {name} has been successfully created with {rows} rows." + ) - # Populate the table - for batch in child.exec(): - batch.drop_column_alias() - storage_engine.write(catalog_entry, batch) - except Exception as e: - # rollback if the create call fails + yield Batch(pd.DataFrame([msg])) + except Exception as e: + # rollback if the create call fails + with contextlib.suppress(CatalogError): if create_table_done: storage_engine.drop(catalog_entry) - # rollback catalog entry, suppress any errors raised by catalog - with contextlib.suppress(CatalogError): - self.catalog().delete_table_catalog_entry(catalog_entry) - raise e + # rollback catalog entry, suppress any errors raised by catalog + with contextlib.suppress(CatalogError): + self.catalog().delete_table_catalog_entry(catalog_entry) + raise e diff --git a/evadb/executor/load_csv_executor.py b/evadb/executor/load_csv_executor.py index 372286e56a..91449a54e9 100644 --- a/evadb/executor/load_csv_executor.py +++ b/evadb/executor/load_csv_executor.py @@ -80,7 +80,7 @@ def exec(self, *args, **kwargs): pd.DataFrame( { "CSV": str(self.node.file_path), - "Number of loaded frames": num_loaded_frames, + "Number of loaded rows": num_loaded_frames, }, index=[0], ) diff --git a/evadb/storage/native_storage_engine.py b/evadb/storage/native_storage_engine.py index 9b4d5a0dba..57b7b8624d 100644 --- a/evadb/storage/native_storage_engine.py +++ b/evadb/storage/native_storage_engine.py @@ -202,3 +202,27 @@ def read(self, table: TableCatalogEntry) -> Iterator[Batch]: err_msg = f"Failed to read the table {table.name} in data source {table.database_name} with exception {str(e)}" logger.exception(err_msg) raise Exception(err_msg) + + def drop(self, table: TableCatalogEntry): + try: + db_catalog_entry = self._get_database_catalog_entry(table.database_name) + with get_database_handler( + db_catalog_entry.engine, **db_catalog_entry.params + ) as handler: + uri = handler.get_sqlalchmey_uri() + + # Create a metadata object + engine = create_engine(uri) + metadata = MetaData() + Session = sessionmaker(bind=engine) + session = Session() + # Retrieve the SQLAlchemy table object for the existing table + table_to_remove = Table(table.name, metadata, autoload_with=engine) + + table_to_remove.drop(engine) + session.commit() + session.close() + except Exception as e: + err_msg = f"Failed to drop the table {table.name} in data source {table.database_name} with exception {str(e)}" + logger.error(err_msg) + raise Exception(err_msg) diff --git a/test/integration_tests/long/test_similarity.py b/test/integration_tests/long/test_similarity.py index 5e22cf6651..81bbdc0080 100644 --- a/test/integration_tests/long/test_similarity.py +++ b/test/integration_tests/long/test_similarity.py @@ -463,7 +463,7 @@ def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_chromadb( drop_query = "DROP INDEX testChromaDBIndexImageDataset" execute_query_fetch_all(self.evadb, drop_query) - @pytest.mark.skip(reason="require pinecone") + @pytest.mark.skip(reason="Flaky testcase due to `bad request` error message") @pinecone_skip_marker def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_pinecone( self,