diff --git a/alembic/versions/7498717d42ef_adds_markdown_dataset.py b/alembic/versions/7498717d42ef_adds_markdown_dataset.py new file mode 100644 index 00000000..1961a236 --- /dev/null +++ b/alembic/versions/7498717d42ef_adds_markdown_dataset.py @@ -0,0 +1,128 @@ +"""adds markdown dataset + +Revision ID: 7498717d42ef +Revises: 9ec726915d3c +Create Date: 2023-11-12 12:07:55.382820 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '7498717d42ef' +down_revision = '9ec726915d3c' +branch_labels = None +depends_on = None + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("markdown_file", sa.Column("dataset_id", postgresql.UUID(as_uuid=True), nullable=True), schema="cognition") + op.add_column("markdown_file", sa.Column("started_at", sa.DateTime(), nullable=True), schema="cognition") + op.add_column("markdown_llm_logs", sa.Column("model_used", sa.String(), nullable=True), schema="cognition") + + op.create_table( + "markdown_dataset", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("organization_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("refinery_project_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("created_at",sa.DateTime(), nullable=True), + sa.Column("name", sa.String(), nullable=True), + sa.Column("tokenizer", sa.String(), nullable=True), + sa.Column("description", sa.String(), nullable=True), + sa.Column("category_origin", sa.String(), nullable=True), + sa.ForeignKeyConstraint( + ["organization_id"], ["organization.id"], ondelete="CASCADE" + ), + sa.PrimaryKeyConstraint("id"), + schema="cognition", + ) + + op.create_foreign_key( + None, + "markdown_file", + "markdown_dataset", + ["dataset_id"], + ["id"], + ondelete="CASCADE", + source_schema="cognition", + referent_schema="cognition" + ) + + op.create_index( + op.f("ix_cognition_markdown_file_dataset_id"), + "markdown_file", + ["dataset_id"], + unique=False, + schema="cognition", + ) + + op.create_index( + op.f("ix_cognition_markdown_dataset_created_by"), + "markdown_dataset", + ["created_by"], + unique=False, + schema="cognition", + ) + + op.create_index( + op.f("ix_cognition_markdown_dataset_organization_id"), + "markdown_dataset", + ["organization_id"], + unique=False, + schema="cognition", + ) + + op.create_index( + op.f("ix_cognition_markdown_dataset_refinery_project_id"), + "markdown_dataset", + ["refinery_project_id"], + unique=False, + schema="cognition", + ) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + + op.drop_index( + op.f("ix_cognition_markdown_file_dataset_id"), + table_name="markdown_file", + schema="cognition", + ) + + op.drop_constraint( + None, + "markdown_file", + type_="foreignkey", + schema="cognition", + ) + + op.drop_column("markdown_file", "dataset_id", schema="cognition") + op.drop_column("markdown_file", "started_at", schema="cognition") + op.drop_column("markdown_llm_logs", "model_used", schema="cognition") + + op.drop_index( + op.f("ix_cognition_markdown_dataset_created_by"), + table_name="markdown_dataset", + schema="cognition", + ) + + op.drop_index( + op.f("ix_cognition_markdown_dataset_organization_id"), + table_name="markdown_dataset", + schema="cognition", + ) + + op.drop_index( + op.f("ix_cognition_markdown_dataset_refinery_project_id"), + table_name="markdown_dataset", + schema="cognition", + ) + + op.drop_table("markdown_dataset", schema="cognition") + + # ### end Alembic commands ### diff --git a/alembic/versions/9ec726915d3c_adds_markdown_file_llm_transformation_.py b/alembic/versions/9ec726915d3c_adds_markdown_file_llm_transformation_.py new file mode 100644 index 00000000..ab91a39e --- /dev/null +++ b/alembic/versions/9ec726915d3c_adds_markdown_file_llm_transformation_.py @@ -0,0 +1,45 @@ +"""adds markdown file llm transformation logs + +Revision ID: 9ec726915d3c +Revises: 491ea68a7baf +Create Date: 2023-11-03 21:20:45.305683 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '9ec726915d3c' +down_revision = '491ea68a7baf' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('markdown_llm_logs', + sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('markdown_file_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('finished_at', sa.DateTime(), nullable=True), + sa.Column('input', sa.String(), nullable=True), + sa.Column('output', sa.String(), nullable=True), + sa.Column('error', sa.String(), nullable=True), + sa.ForeignKeyConstraint(['markdown_file_id'], ['cognition.markdown_file.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + schema='cognition' + ) + op.create_index(op.f('ix_cognition_markdown_llm_logs_markdown_file_id'), 'markdown_llm_logs', ['markdown_file_id'], unique=False, schema='cognition') + op.add_column('markdown_file', sa.Column('state', sa.String(), nullable=True), schema='cognition') + op.add_column('markdown_file', sa.Column('finished_at', sa.DateTime(), nullable=True), schema='cognition') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('markdown_file', 'finished_at', schema='cognition') + op.drop_column('markdown_file', 'state', schema='cognition') + op.drop_index(op.f('ix_cognition_markdown_llm_logs_markdown_file_id'), table_name='markdown_llm_logs', schema='cognition') + op.drop_table('markdown_llm_logs', schema='cognition') + # ### end Alembic commands ### diff --git a/api/transfer.py b/api/transfer.py index b527bce9..754c720e 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -19,6 +19,7 @@ general, organization, tokenization, + project as refinery_project, ) from submodules.model.cognition_objects import project as cognition_project @@ -38,8 +39,8 @@ from util import daemon, notification from controller.task_queue import manager as task_queue_manager -from submodules.model.enums import TaskType, RecordTokenizationScope - +from submodules.model.enums import TaskType, RecordTokenizationScope, TaskQueueAction +import requests logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) @@ -231,6 +232,29 @@ def put(self, request) -> PlainTextResponse: ) return PlainTextResponse("OK") + + +class CognitionParseMarkdownFile(HTTPEndpoint): + def post(self, request) -> PlainTextResponse: + refinery_project_id = request.path_params["project_id"] + refinery_project_item = refinery_project.get(refinery_project_id) + if not refinery_project_item: + return PlainTextResponse("Bad project id", status_code=400) + + dataset_id = request.path_params["dataset_id"] + file_id = request.path_params["file_id"] + + task_queue_manager.add_task( + refinery_project_id, + TaskType.PARSE_MARKDOWN_FILE, + refinery_project_item.created_by, + { + "dataset_id": dataset_id, + "file_id": file_id, + }, + ) + + return PlainTextResponse("OK") class AssociationsImport(HTTPEndpoint): diff --git a/app.py b/app.py index 8bc08c78..a1025d02 100644 --- a/app.py +++ b/app.py @@ -13,6 +13,7 @@ UploadTaskInfo, CognitionImport, CognitionPrepareProject, + CognitionParseMarkdownFile, ) from middleware.database_session import DatabaseSessionHandler from starlette.applications import Starlette @@ -55,6 +56,10 @@ "/project/{cognition_project_id:str}/cognition/continue/{task_id:str}/finalize", CognitionPrepareProject, ), + Route( + "/project/{project_id:str}/cognition/datasets/{dataset_id:str}/files/{file_id:str}/queue", + CognitionParseMarkdownFile, + ), Route("/project/{project_id:str}/import/task/{task_id:str}", UploadTaskInfo), Route("/project", ProjectCreationFromWorkflow), Route("/is_managed", IsManagedRest), diff --git a/controller/task_queue/handler/markdown_file.py b/controller/task_queue/handler/markdown_file.py new file mode 100644 index 00000000..0bb86eaf --- /dev/null +++ b/controller/task_queue/handler/markdown_file.py @@ -0,0 +1,44 @@ +from typing import Any, Dict, Tuple, Callable + +import requests +from submodules.model.business_objects import ( + task_queue as task_queue_db_bo, + general, +) +from submodules.model.cognition_objects import ( + markdown_file as markdown_file_db_bo, +) +from submodules.model import enums + + +def get_task_functions() -> Tuple[Callable, Callable, int]: + return __start_task, __check_finished, 1 + + +def __start_task(task: Dict[str, Any]) -> bool: + # check task still relevant + task_db_obj = task_queue_db_bo.get(task["id"]) + if task_db_obj is None or task_db_obj.is_active: + return False + + print("Starting markdown file task", flush=True) + action = task["task_info"] + dataset_id = action["dataset_id"] + file_id = action["file_id"] + + task_db_obj.is_active = True + general.commit() + requests.post(f"http://cognition-gateway:80/converters-noop/datasets/{dataset_id}/files/{file_id}/parse") + return True + + +def __check_finished(task: Dict[str, Any]) -> bool: + action = task["task_info"] + file_id = action["file_id"] + markdown_file_entity = markdown_file_db_bo.get(file_id) + + if markdown_file_entity.state == enums.CognitionMarkdownFileState.FINISHED.value or markdown_file_entity.state == enums.CognitionMarkdownFileState.FAILED.value: + print("Markdown file finished", flush=True) + return True + else: + return False diff --git a/controller/task_queue/manager.py b/controller/task_queue/manager.py index 91d19f17..c7b4212d 100644 --- a/controller/task_queue/manager.py +++ b/controller/task_queue/manager.py @@ -16,6 +16,7 @@ tokenization as tokenization_handler, attribute_calculation as attribute_calculation_handler, task_queue as task_queue_handler, + markdown_file as markdown_file_handler, ) from .util import if_task_queue_send_websocket @@ -87,6 +88,8 @@ def get_task_function_by_type(task_type: str) -> Tuple[Callable, Callable, int]: return attribute_calculation_handler.get_task_functions() if task_type == enums.TaskType.TASK_QUEUE.value: return task_queue_handler.get_task_functions() + if task_type == enums.TaskType.PARSE_MARKDOWN_FILE.value: + return markdown_file_handler.get_task_functions() raise ValueError(f"Task type {task_type} not supported yet") diff --git a/submodules/model b/submodules/model index 74dac66c..b2aee19f 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 74dac66ce9538e05f6aab9fe8ec77bb4337efbf5 +Subproject commit b2aee19fbbca3d648244381d68d930aa8158d158