Skip to content

etl pipeline #172

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 12 commits into
base: dev
Choose a base branch
from
128 changes: 128 additions & 0 deletions alembic/versions/7498717d42ef_adds_markdown_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""adds markdown dataset

Revision ID: 7498717d42ef
Revises: 9ec726915d3c
Create Date: 2023-11-12 12:07:55.382820

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '7498717d42ef'
down_revision = '9ec726915d3c'
branch_labels = None
depends_on = None

def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("markdown_file", sa.Column("dataset_id", postgresql.UUID(as_uuid=True), nullable=True), schema="cognition")
op.add_column("markdown_file", sa.Column("started_at", sa.DateTime(), nullable=True), schema="cognition")
op.add_column("markdown_llm_logs", sa.Column("model_used", sa.String(), nullable=True), schema="cognition")

op.create_table(
"markdown_dataset",
sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("organization_id", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("refinery_project_id", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("created_at",sa.DateTime(), nullable=True),
sa.Column("name", sa.String(), nullable=True),
sa.Column("tokenizer", sa.String(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("category_origin", sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["organization_id"], ["organization.id"], ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id"),
schema="cognition",
)

op.create_foreign_key(
None,
"markdown_file",
"markdown_dataset",
["dataset_id"],
["id"],
ondelete="CASCADE",
source_schema="cognition",
referent_schema="cognition"
)

op.create_index(
op.f("ix_cognition_markdown_file_dataset_id"),
"markdown_file",
["dataset_id"],
unique=False,
schema="cognition",
)

op.create_index(
op.f("ix_cognition_markdown_dataset_created_by"),
"markdown_dataset",
["created_by"],
unique=False,
schema="cognition",
)

op.create_index(
op.f("ix_cognition_markdown_dataset_organization_id"),
"markdown_dataset",
["organization_id"],
unique=False,
schema="cognition",
)

op.create_index(
op.f("ix_cognition_markdown_dataset_refinery_project_id"),
"markdown_dataset",
["refinery_project_id"],
unique=False,
schema="cognition",
)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###

op.drop_index(
op.f("ix_cognition_markdown_file_dataset_id"),
table_name="markdown_file",
schema="cognition",
)

op.drop_constraint(
None,
"markdown_file",
type_="foreignkey",
schema="cognition",
)

op.drop_column("markdown_file", "dataset_id", schema="cognition")
op.drop_column("markdown_file", "started_at", schema="cognition")
op.drop_column("markdown_llm_logs", "model_used", schema="cognition")

op.drop_index(
op.f("ix_cognition_markdown_dataset_created_by"),
table_name="markdown_dataset",
schema="cognition",
)

op.drop_index(
op.f("ix_cognition_markdown_dataset_organization_id"),
table_name="markdown_dataset",
schema="cognition",
)

op.drop_index(
op.f("ix_cognition_markdown_dataset_refinery_project_id"),
table_name="markdown_dataset",
schema="cognition",
)

op.drop_table("markdown_dataset", schema="cognition")

# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""adds markdown file llm transformation logs

Revision ID: 9ec726915d3c
Revises: 491ea68a7baf
Create Date: 2023-11-03 21:20:45.305683

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '9ec726915d3c'
down_revision = '491ea68a7baf'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('markdown_llm_logs',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('markdown_file_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('finished_at', sa.DateTime(), nullable=True),
sa.Column('input', sa.String(), nullable=True),
sa.Column('output', sa.String(), nullable=True),
sa.Column('error', sa.String(), nullable=True),
sa.ForeignKeyConstraint(['markdown_file_id'], ['cognition.markdown_file.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
schema='cognition'
)
op.create_index(op.f('ix_cognition_markdown_llm_logs_markdown_file_id'), 'markdown_llm_logs', ['markdown_file_id'], unique=False, schema='cognition')
op.add_column('markdown_file', sa.Column('state', sa.String(), nullable=True), schema='cognition')
op.add_column('markdown_file', sa.Column('finished_at', sa.DateTime(), nullable=True), schema='cognition')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('markdown_file', 'finished_at', schema='cognition')
op.drop_column('markdown_file', 'state', schema='cognition')
op.drop_index(op.f('ix_cognition_markdown_llm_logs_markdown_file_id'), table_name='markdown_llm_logs', schema='cognition')
op.drop_table('markdown_llm_logs', schema='cognition')
# ### end Alembic commands ###
28 changes: 26 additions & 2 deletions api/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
general,
organization,
tokenization,
project as refinery_project,
)

from submodules.model.cognition_objects import project as cognition_project
Expand All @@ -38,8 +39,8 @@
from util import daemon, notification

from controller.task_queue import manager as task_queue_manager
from submodules.model.enums import TaskType, RecordTokenizationScope

from submodules.model.enums import TaskType, RecordTokenizationScope, TaskQueueAction
import requests

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -231,6 +232,29 @@ def put(self, request) -> PlainTextResponse:
)

return PlainTextResponse("OK")


class CognitionParseMarkdownFile(HTTPEndpoint):
def post(self, request) -> PlainTextResponse:
refinery_project_id = request.path_params["project_id"]
refinery_project_item = refinery_project.get(refinery_project_id)
if not refinery_project_item:
return PlainTextResponse("Bad project id", status_code=400)

dataset_id = request.path_params["dataset_id"]
file_id = request.path_params["file_id"]

task_queue_manager.add_task(
refinery_project_id,
TaskType.PARSE_MARKDOWN_FILE,
refinery_project_item.created_by,
{
"dataset_id": dataset_id,
"file_id": file_id,
},
)

return PlainTextResponse("OK")


class AssociationsImport(HTTPEndpoint):
Expand Down
5 changes: 5 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
UploadTaskInfo,
CognitionImport,
CognitionPrepareProject,
CognitionParseMarkdownFile,
)
from middleware.database_session import DatabaseSessionHandler
from starlette.applications import Starlette
Expand Down Expand Up @@ -55,6 +56,10 @@
"/project/{cognition_project_id:str}/cognition/continue/{task_id:str}/finalize",
CognitionPrepareProject,
),
Route(
"/project/{project_id:str}/cognition/datasets/{dataset_id:str}/files/{file_id:str}/queue",
CognitionParseMarkdownFile,
),
Route("/project/{project_id:str}/import/task/{task_id:str}", UploadTaskInfo),
Route("/project", ProjectCreationFromWorkflow),
Route("/is_managed", IsManagedRest),
Expand Down
44 changes: 44 additions & 0 deletions controller/task_queue/handler/markdown_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Any, Dict, Tuple, Callable

import requests
from submodules.model.business_objects import (
task_queue as task_queue_db_bo,
general,
)
from submodules.model.cognition_objects import (
markdown_file as markdown_file_db_bo,
)
from submodules.model import enums


def get_task_functions() -> Tuple[Callable, Callable, int]:
return __start_task, __check_finished, 1


def __start_task(task: Dict[str, Any]) -> bool:
# check task still relevant
task_db_obj = task_queue_db_bo.get(task["id"])
if task_db_obj is None or task_db_obj.is_active:
return False

print("Starting markdown file task", flush=True)
action = task["task_info"]
dataset_id = action["dataset_id"]
file_id = action["file_id"]

task_db_obj.is_active = True
general.commit()
requests.post(f"http://cognition-gateway:80/converters-noop/datasets/{dataset_id}/files/{file_id}/parse")
return True


def __check_finished(task: Dict[str, Any]) -> bool:
action = task["task_info"]
file_id = action["file_id"]
markdown_file_entity = markdown_file_db_bo.get(file_id)

if markdown_file_entity.state == enums.CognitionMarkdownFileState.FINISHED.value or markdown_file_entity.state == enums.CognitionMarkdownFileState.FAILED.value:
print("Markdown file finished", flush=True)
return True
else:
return False
3 changes: 3 additions & 0 deletions controller/task_queue/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
tokenization as tokenization_handler,
attribute_calculation as attribute_calculation_handler,
task_queue as task_queue_handler,
markdown_file as markdown_file_handler,
)
from .util import if_task_queue_send_websocket

Expand Down Expand Up @@ -87,6 +88,8 @@ def get_task_function_by_type(task_type: str) -> Tuple[Callable, Callable, int]:
return attribute_calculation_handler.get_task_functions()
if task_type == enums.TaskType.TASK_QUEUE.value:
return task_queue_handler.get_task_functions()
if task_type == enums.TaskType.PARSE_MARKDOWN_FILE.value:
return markdown_file_handler.get_task_functions()
raise ValueError(f"Task type {task_type} not supported yet")


Expand Down
2 changes: 1 addition & 1 deletion submodules/model