Skip to content

Commit

Permalink
fix #44, #61 annotate parquet Database, #62 export
Browse files Browse the repository at this point in the history
  • Loading branch information
antonylebechec committed Jun 8, 2023
1 parent 9d77aac commit c0498d0
Show file tree
Hide file tree
Showing 11 changed files with 580 additions and 752 deletions.
151 changes: 148 additions & 3 deletions howard/objects/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,123 @@ def is_compressed(self, database:str = None) -> bool:
if not database:
database = self.get_database()

return get_file_compressed(database)
if type(database) == duckdb.DuckDBPyConnection:
return False
else:
return get_file_compressed(database)


def get_header_infos_list(self, database:str = None) -> list:
"""
This function returns a list of header information for a given database or the current database
if none is specified.
:param database: The `database` parameter is a string that represents the name of the database
from which the header information is to be retrieved. If no database name is provided, the
method will use the default database name obtained from the `get_database()` method
:type database: str
:return: A list of header information from a database, or an empty list if the database header
is not available.
"""

if not database:
database = self.get_database()

# Database header
database_header = self.get_header(database=database)

# Init
database_header_infos_list = []

if database_header:
database_header_infos_list = list(database_header.infos)

return database_header_infos_list


def find_column(self, database:str = None, table:str = None, column:str = "INFO", prefixes:list = ["INFO/"]) -> str:
"""
This function finds a specific column in a database table, with the option to search for a
column with a specific prefix or within the INFO column header.
:param database: The name of the database to search for the column in. If not provided, it will
use the current database that the code is connected to
:type database: str
:param table: The name of the table in the database where the column is located
:type table: str
:param column: The default value for the "column" parameter is "INFO", but it can be changed to
search for a specific column name, defaults to INFO
:type column: str (optional)
:param prefixes: The prefixes parameter is a list of strings that are used to search for a
column with a specific prefix in the database. For example, if the prefixes list contains "DP/",
the function will search for a column named "DP/INFO" in addition to the default "INFO" column
:type prefixes: list
:return: a string that represents the name of the column found in the database, based on the
input parameters. If the column is found, it returns the column name. If the column is not
found, it returns None.
"""

if not database:
database = self.get_database()

# Database columns
database_columns = self.get_columns(database=database, table=table)

# Init
column_found = None

# Column exists
if column in database_columns:
column_found = column

# Column with prefix
elif prefixes:
for prefix in prefixes:
if prefix + column in database_columns:
column_found = prefix + column
break

# Column in INFO column (test if in header)
if not column_found and "INFO" in database_columns :
database_header_infos = self.get_header_infos_list(database=database)
if column in database_header_infos:
column_found = "INFO"

return column_found


def map_columns(self, database:str = None, table:str = None, columns:list = [], prefixes:list = ["INFO/"]) -> dict:
"""
This function maps columns in a database table to their corresponding columns with specified
prefixes.
:param database: The name of the database to search for columns in. If no database is specified,
the method will use the default database set in the connection
:type database: str
:param table: The name of the table in the database that you want to map the columns for
:type table: str
:param columns: A list of column names that you want to map to their corresponding column names
in the database
:type columns: list
:param prefixes: The `prefixes` parameter is a list of strings that are used to filter the
columns that are searched for. Only columns that start with one of the prefixes in the list will
be considered. In the code above, the default value for `prefixes` is `["INFO/"]`, which
:type prefixes: list
:return: a dictionary that maps the input columns to their corresponding columns found in the
specified database and table, with the specified prefixes.
"""

if not database:
database = self.get_database()

# Init
columns_mapping = {}

for column in columns:
column_found = self.find_column(database=database, table=table, column=column, prefixes=prefixes)
columns_mapping[column] = column_found

return columns_mapping


def get_columns(self, database:str = None, table:str = None) -> list:
Expand Down Expand Up @@ -1252,7 +1368,7 @@ def get_conn(self):
return self.conn


def export(self, output_database:str, output_header:str = None, database:str = None) -> bool:
def export(self, output_database:str, output_header:str = None, database:str = None, table:str = "variants") -> bool:
"""
This function exports data from a database to a specified output format and compresses it if
necessary.
Expand Down Expand Up @@ -1335,6 +1451,31 @@ def export(self, output_database:str, output_header:str = None, database:str = N
query_export_format = f"FORMAT CSV, DELIMITER '{delimiter}', HEADER"
include_header = True

# duckDB
elif output_type in ["duckdb"]:

# Export database as Parquet
database_export_parquet_file = f"""{output_database}.{random_tmp}.database_export.parquet"""
self.export(database=database, output_database=database_export_parquet_file)

# Create database and connexion
output_database_conn = duckdb.connect(output_database)

# Create table in database connexion with Parquet file
query_copy = f"""
CREATE TABLE {table}
AS {self.get_sql_database_link(database=database_export_parquet_file)}
"""
output_database_conn.execute(query_copy)

# Close connexion
output_database_conn.close()

# remove tmp
remove_if_exists([database_export_parquet_file])

return os.path.exists(output_database)

# else:
# log.debug("Not available")

Expand Down Expand Up @@ -1366,7 +1507,7 @@ def export(self, output_database:str, output_header:str = None, database:str = N
query_output_database_tmp = output_database
else:
query_output_database_tmp = f"""{output_database}.{random_tmp}"""

query_copy = f"""
COPY (
SELECT {query_export_columns}
Expand All @@ -1390,10 +1531,14 @@ def export(self, output_database:str, output_header:str = None, database:str = N
concat_file(input_files=[query_output_header_tmp, query_output_database_tmp], output_file=query_output_database_header_tmp)
# move file
shutil.move(query_output_database_header_tmp, query_output_database_tmp)
# remove tmp
remove_if_exists([query_output_header_tmp])

# Compress
if compressed:
compress_file(input_file=query_output_database_tmp, output_file=output_database)
# remove tmp
remove_if_exists([query_output_database_tmp])
else:
shutil.move(query_output_database_tmp, output_database)

Expand Down
Loading

0 comments on commit c0498d0

Please # to comment.