Skip to content

Commit

Permalink
feat: debug for workflow module
Browse files Browse the repository at this point in the history
  • Loading branch information
0hsn committed Oct 21, 2024
1 parent c0519d4 commit 7bae674
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion chk/modules/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from chk.infrastructure.file_loader import ExecuteContext, FileContext
from chk.infrastructure.helper import data_get, slugify
from chk.infrastructure.logging import debug, error, with_catch_log
from chk.infrastructure.symbol_table import (
ExecResponse,
ExposeManager,
Expand Down Expand Up @@ -122,14 +123,20 @@ def process_task_template(

is_success = True
for task in document.tasks:
debug(task)

if not is_success:
debug(f"is_success: {is_success}")
break

if not isinstance(task, dict):
error("`tasks.*.item` should be map.")
raise RuntimeError("`tasks.*.item` should be map.")

# replace values in tasks
task_d_: dict = replace_value(task, variables.data)
debug(task_d_)

task_o_ = ChkwareTaskSupport.make_task(
task_d_, **dict(base_file_path=base_fpath)
)
Expand All @@ -140,6 +147,7 @@ def process_task_template(
exctx_args["arguments"] = task_o_.arguments.model_dump_json()

execution_ctx = ExecuteContext({"dump": True, "format": True}, exctx_args)
debug(execution_ctx)

task_fn = None

Expand All @@ -151,7 +159,11 @@ def process_task_template(

if task_fn:
te_param = TaskExecParam(task=task_o_, exec_ctx=execution_ctx)
debug(execution_ctx)

task_resp: ExecResponse = cls.execute_task(task_fn, te_param, variables)
debug(execution_ctx)

is_success = task_resp.report.pop("is_success")

exec_report.append(
Expand All @@ -178,29 +190,40 @@ def execute_task(
return _task_res


@with_catch_log
def call(file_ctx: FileContext, exec_ctx: ExecuteContext) -> ExecResponse:
"""Call a workflow document"""

debug(file_ctx)
debug(exec_ctx)

wflow_doc = WorkflowDocument.from_file_context(file_ctx)
debug(wflow_doc.model_dump_json())

variable_doc = Variables()
VariableTableManager.handle(variable_doc, wflow_doc, exec_ctx)
debug(variable_doc.data)

service = WorkflowDocumentSupport()
# @TODO make sure the document do not call self making it repeating
service.set_step_template(variable_doc)

r_exception: Exception | None = None

try:
exec_report = service.process_task_template(wflow_doc, variable_doc)
with with_catch_log():
exec_report = service.process_task_template(wflow_doc, variable_doc)
except Exception as ex:
r_exception = ex
error(ex)

output_data = Variables({"_steps": variable_doc[WorkflowConfigNode.NODE.value]})
debug(output_data.data)

exposed_data: dict = ExposeManager.get_exposed_replaced_data(
wflow_doc, variable_doc.data
)
debug(exposed_data)

# TODO also send failed_details (fail code, message, stacktrace, etc)
return ExecResponse(
Expand All @@ -217,6 +240,7 @@ def call(file_ctx: FileContext, exec_ctx: ExecuteContext) -> ExecResponse:
)


@with_catch_log
def execute(
ctx: FileContext, exec_ctx: ExecuteContext, cb: abc.Callable = lambda *args: ...
) -> None:
Expand Down

0 comments on commit 7bae674

Please # to comment.