diff --git a/docs/_toc.yml b/docs/_toc.yml index f4f5d46af4..5ba89a87f5 100644 --- a/docs/_toc.yml +++ b/docs/_toc.yml @@ -69,6 +69,7 @@ parts: - file: source/reference/databases/sqlite - file: source/reference/databases/mysql - file: source/reference/databases/mariadb + - file: source/reference/databases/clickhouse - file: source/reference/databases/github - file: source/reference/vector_databases/index diff --git a/docs/source/reference/databases/clickhouse.rst b/docs/source/reference/databases/clickhouse.rst new file mode 100644 index 0000000000..4d306f6ea7 --- /dev/null +++ b/docs/source/reference/databases/clickhouse.rst @@ -0,0 +1,37 @@ +Clickhouse +========== + +The connection to Clickhouse is based on the `clickhouse-sqlalchemy `_ library. + +Dependency +---------- + +* clickhouse-sqlalchemy + + + +Parameters +---------- + +Required: + +* `user` is the database user. +* `password` is the database password. +* `host` is the host name or IP address. +* `port` is the port used to make TCP/IP connection to the Clickhouse server. +* `database` is the database name. +* `protocol` (optional) Default- `native`. Its supported values are `http` and `https`. + + +Create Connection +----------------- + +.. code-block:: text + + CREATE DATABASE clickhouse_data WITH ENGINE = 'clickhouse', PARAMETERS = { + "user": "eva", + "password": "password", + "host": "localhost", + "port": "5432", + "database": "evadb" + }; diff --git a/evadb/third_party/databases/clickhouse/__init__.py b/evadb/third_party/databases/clickhouse/__init__.py new file mode 100644 index 0000000000..e9f1e2861b --- /dev/null +++ b/evadb/third_party/databases/clickhouse/__init__.py @@ -0,0 +1,15 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Clickhouse integrations""" diff --git a/evadb/third_party/databases/clickhouse/clickhouse_handler.py b/evadb/third_party/databases/clickhouse/clickhouse_handler.py new file mode 100644 index 0000000000..9ace27ef0d --- /dev/null +++ b/evadb/third_party/databases/clickhouse/clickhouse_handler.py @@ -0,0 +1,174 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pandas as pd +from sqlalchemy import create_engine + +from evadb.third_party.databases.types import ( + DBHandler, + DBHandlerResponse, + DBHandlerStatus, +) + + +class ClickHouseHandler(DBHandler): + def __init__(self, name: str, **kwargs): + """ + Initialize the handler. + Args: + name (str): name of the DB handler instance + **kwargs: arbitrary keyword arguments for establishing the connection. + """ + super().__init__(name) + self.host = kwargs.get("host") + self.port = kwargs.get("port") + self.user = kwargs.get("user") + self.password = kwargs.get("password") + self.database = kwargs.get("database") + self.protocol = kwargs.get("protocol") + protocols_map = { + "native": "clickhouse+native", + "http": "clickhouse+http", + "https": "clickhouse+https", + } + if self.protocol in protocols_map: + self.protocol = protocols_map[self.protocol] + + def connect(self): + """ + Set up the connection required by the handler. + Returns: + DBHandlerStatus + """ + try: + protocol = self.protocol + host = self.host + port = self.port + user = self.user + password = self.password + database = self.database + url = f"{protocol}://{user}:{password}@{host}:{port}/{database}" + if self.protocol == "clickhouse+https": + url = url + "?protocol=https" + + engine = create_engine(url) + self.connection = engine.raw_connection() + return DBHandlerStatus(status=True) + except Exception as e: + return DBHandlerStatus(status=False, error=str(e)) + + def disconnect(self): + """ + Close any existing connections. + """ + if self.connection: + self.disconnect() + + def check_connection(self) -> DBHandlerStatus: + """ + Check connection to the handler. + Returns: + DBHandlerStatus + """ + if self.connection: + return DBHandlerStatus(status=True) + else: + return DBHandlerStatus(status=False, error="Not connected to the database.") + + def get_tables(self) -> DBHandlerResponse: + """ + Return the list of tables in the database. + Returns: + DBHandlerResponse + """ + if not self.connection: + return DBHandlerResponse(data=None, error="Not connected to the database.") + + try: + query = f"SHOW TABLES FROM {self.connection_data['database']}" + tables_df = pd.read_sql_query(query, self.connection) + return DBHandlerResponse(data=tables_df) + except Exception as e: + return DBHandlerResponse(data=None, error=str(e)) + + def get_columns(self, table_name: str) -> DBHandlerResponse: + """ + Returns the list of columns for the given table. + Args: + table_name (str): name of the table whose columns are to be retrieved. + Returns: + DBHandlerResponse + """ + if not self.connection: + return DBHandlerResponse(data=None, error="Not connected to the database.") + + try: + query = f"DESCRIBE {table_name}" + columns_df = pd.read_sql_query(query, self.connection) + columns_df["dtype"] = columns_df["dtype"].apply( + self._clickhouse_to_python_types + ) + return DBHandlerResponse(data=columns_df) + except Exception as e: + return DBHandlerResponse(data=None, error=str(e)) + + def _fetch_results_as_df(self, cursor): + try: + res = cursor.fetchall() + if not res: + return pd.DataFrame({"status": ["success"]}) + res_df = pd.DataFrame(res, columns=[desc[0] for desc in cursor.description]) + return res_df + except Exception as e: + if str(e) == "no results to fetch": + return pd.DataFrame({"status": ["success"]}) + raise e + + def execute_native_query(self, query_string: str) -> DBHandlerResponse: + """ + Executes the native query on the database. + Args: + query_string (str): query in native format + Returns: + DBHandlerResponse + """ + if not self.connection: + return DBHandlerResponse(data=None, error="Not connected to the database.") + + try: + cursor = self.connection.cursor() + cursor.execute(query_string) + return DBHandlerResponse(data=self._fetch_results_as_df(cursor)) + except Exception as e: + return DBHandlerResponse(data=None, error=str(e)) + + def _clickhouse_to_python_types(self, clickhouse_type: str): + mapping = { + "char": str, + "varchar": str, + "text": str, + "boolean": bool, + "integer": int, + "int": int, + "float": float, + "double": float, + # Add more mappings as needed + } + + if clickhouse_type in mapping: + return mapping[clickhouse_type] + else: + raise Exception( + f"Unsupported column {clickhouse_type} encountered in the clickhouse table. Please raise a feature request!" + ) diff --git a/evadb/third_party/databases/clickhouse/requirements.txt b/evadb/third_party/databases/clickhouse/requirements.txt new file mode 100644 index 0000000000..b10607c3e7 --- /dev/null +++ b/evadb/third_party/databases/clickhouse/requirements.txt @@ -0,0 +1 @@ +sqlalchemy \ No newline at end of file diff --git a/evadb/third_party/databases/interface.py b/evadb/third_party/databases/interface.py index cdfd127610..5e30dc8220 100644 --- a/evadb/third_party/databases/interface.py +++ b/evadb/third_party/databases/interface.py @@ -42,6 +42,8 @@ def _get_database_handler(engine: str, **kwargs): return mod.MysqlHandler(engine, **kwargs) elif engine == "mariadb": return mod.MariaDbHandler(engine, **kwargs) + elif engine == "clickhouse": + return mod.ClickHouseHandler(engine, **kwargs) elif engine == "github": return mod.GithubHandler(engine, **kwargs) else: diff --git a/test/third_party_tests/test_native_executor.py b/test/third_party_tests/test_native_executor.py index 27762cb82a..40647328b1 100644 --- a/test/third_party_tests/test_native_executor.py +++ b/test/third_party_tests/test_native_executor.py @@ -210,6 +210,25 @@ def test_should_run_query_in_mariadb(self): self._execute_native_query() self._execute_evadb_query() + def test_should_run_query_in_clickhouse(self): + # Create database. + params = { + "user": "eva", + "password": "password", + "host": "localhost", + "port": "9000", + "database": "evadb", + } + query = f"""CREATE DATABASE test_data_source + WITH ENGINE = "clickhouse", + PARAMETERS = {params};""" + execute_query_fetch_all(self.evadb, query) + + # Test executions. + self._execute_native_query() + self._execute_evadb_query() + + def test_should_run_query_in_sqlite(self): # Create database. import os diff --git a/test/unit_tests/storage/test_clickhouse_native_storage_engine.py b/test/unit_tests/storage/test_clickhouse_native_storage_engine.py new file mode 100644 index 0000000000..9f6f769548 --- /dev/null +++ b/test/unit_tests/storage/test_clickhouse_native_storage_engine.py @@ -0,0 +1,140 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from test.util import get_evadb_for_testing +from unittest.mock import patch + +import pytest + +from evadb.catalog.models.utils import DatabaseCatalogEntry +from evadb.server.command_handler import execute_query_fetch_all + + +class NativeQueryResponse: + def __init__(self): + self.error = None + self.data = None + + +@pytest.mark.notparallel +class ClickHouseNativeStorageEngineTest(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_clickhouse_params(self): + return { + "database": "evadb.db", + } + + def setUp(self): + connection_params = self.get_clickhouse_params() + self.evadb = get_evadb_for_testing() + + # Create all class level patches + self.get_database_catalog_entry_patcher = patch( + "evadb.catalog.catalog_manager.CatalogManager.get_database_catalog_entry" + ) + self.get_database_catalog_entry_mock = ( + self.get_database_catalog_entry_patcher.start() + ) + + self.execute_native_query_patcher = patch( + "evadb.third_party.databases.clickhouse.clickhouse_handler.ClickHouseHandler.execute_native_query" + ) + self.execute_native_query_mock = self.execute_native_query_patcher.start() + + self.connect_patcher = patch( + "evadb.third_party.databases.clickhouse.clickhouse_handler.ClickHouseHandler.connect" + ) + self.connect_mock = self.connect_patcher.start() + + self.disconnect_patcher = patch( + "evadb.third_party.databases.clickhouse.clickhouse_handler.ClickHouseHandler.disconnect" + ) + self.disconnect_mock = self.disconnect_patcher.start() + + # set return values + self.execute_native_query_mock.return_value = NativeQueryResponse() + self.get_database_catalog_entry_mock.return_value = DatabaseCatalogEntry( + name="test_data_source", + engine="clickhouse", + params=connection_params, + row_id=1, + ) + + def tearDown(self): + self.get_database_catalog_entry_patcher.stop() + self.execute_native_query_patcher.stop() + self.connect_patcher.stop() + self.disconnect_patcher.stop() + + def test_execute_clickhouse_select_query(self): + execute_query_fetch_all( + self.evadb, + """USE test_data_source { + SELECT * FROM test_table + }""", + ) + + self.connect_mock.assert_called_once() + self.execute_native_query_mock.assert_called_once() + self.get_database_catalog_entry_mock.assert_called_once() + self.disconnect_mock.assert_called_once() + + def test_execute_clickhouse_insert_query(self): + execute_query_fetch_all( + self.evadb, + """USE test_data_source { + INSERT INTO test_table ( + name, age, comment + ) VALUES ( + 'val', 5, 'testing' + ) + }""", + ) + self.connect_mock.assert_called_once() + self.execute_native_query_mock.assert_called_once() + self.get_database_catalog_entry_mock.assert_called_once() + self.disconnect_mock.assert_called_once() + + def test_execute_clickhouse_update_query(self): + execute_query_fetch_all( + self.evadb, + """USE test_data_source { + UPDATE test_table + SET comment = 'update' + WHERE age > 5 + }""", + ) + + self.connect_mock.assert_called_once() + self.execute_native_query_mock.assert_called_once() + self.get_database_catalog_entry_mock.assert_called_once() + self.disconnect_mock.assert_called_once() + + def test_execute_clickhouse_delete_query(self): + execute_query_fetch_all( + self.evadb, + """USE test_data_source { + DELETE FROM test_table + WHERE age < 5 + }""", + ) + + self.connect_mock.assert_called_once() + self.execute_native_query_mock.assert_called_once() + self.get_database_catalog_entry_mock.assert_called_once() + self.disconnect_mock.assert_called_once()