Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: Make logger file sink non-blocking when used with asyncio #4301

Merged
merged 2 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/backend/base/langflow/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path

from dotenv import load_dotenv
from loguru import logger

from langflow.graph import Graph
from langflow.graph.schema import RunOutputs
Expand Down Expand Up @@ -43,7 +44,7 @@ def load_flow_from_json(
"""
# If input is a file path, load JSON from the file
log_file_path = Path(log_file) if log_file else None
configure(log_level=log_level, log_file=log_file_path, disable=disable_logs)
configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=True)

# override env variables with .env file
if env_file:
Expand Down Expand Up @@ -119,7 +120,7 @@ async def arun_flow_from_json(
cache=cache,
disable_logs=disable_logs,
)
return await run_graph(
result = await run_graph(
graph=graph,
session_id=session_id,
input_value=input_value,
Expand All @@ -128,6 +129,8 @@ async def arun_flow_from_json(
output_component=output_component,
fallback_to_env_vars=fallback_to_env_vars,
)
await logger.complete()
return result


def run_flow_from_json(
Expand Down
28 changes: 24 additions & 4 deletions src/backend/base/langflow/logging/logger.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import logging
import os
Expand All @@ -8,7 +9,10 @@
from typing import TypedDict

import orjson
from loguru import logger
from loguru import _defaults, logger
from loguru._error_interceptor import ErrorInterceptor
from loguru._file_sink import FileSink
from loguru._simple_sinks import AsyncSink
from platformdirs import user_cache_dir
from rich.logging import RichHandler
from typing_extensions import NotRequired
Expand Down Expand Up @@ -136,12 +140,30 @@ class LogConfig(TypedDict):
log_env: NotRequired[str]


class AsyncFileSink(AsyncSink):
def __init__(self, file):
self._sink = FileSink(
path=file,
rotation="10 MB", # Log rotation based on file size
)
super().__init__(self.write_async, None, ErrorInterceptor(_defaults.LOGURU_CATCH, -1))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we can't get the handle id here. But I don't think this is a big problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, unless we can define it in the logger.configure( call


async def complete(self):
await asyncio.to_thread(self._sink.stop)
for task in self._tasks:
await self._complete_task(task)

async def write_async(self, message):
await asyncio.to_thread(self._sink.write, message)


def configure(
*,
log_level: str | None = None,
log_file: Path | None = None,
disable: bool | None = False,
log_env: str | None = None,
async_file: bool = False,
) -> None:
if disable and log_level is None and log_file is None:
logger.disable("langflow")
Expand Down Expand Up @@ -187,14 +209,12 @@ def configure(
log_file = cache_dir / "langflow.log"
logger.debug(f"Log file: {log_file}")
try:
log_file = Path(log_file)
log_file.parent.mkdir(parents=True, exist_ok=True)

logger.add(
sink=str(log_file),
sink=AsyncFileSink(log_file) if async_file else log_file,
level=log_level.upper(),
format=log_format,
rotation="10 MB", # Log rotation based on file size
serialize=True,
)
except Exception: # noqa: BLE001
Expand Down
2 changes: 2 additions & 0 deletions src/backend/base/langflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def _initialize():

@asynccontextmanager
async def lifespan(_app: FastAPI):
configure(async_file=True)
# Startup message
if version:
rprint(f"[bold green]Starting Langflow v{version}...[/bold green]")
Expand All @@ -113,6 +114,7 @@ async def lifespan(_app: FastAPI):
# Shutdown message
rprint("[bold red]Shutting down Langflow...[/bold red]")
await teardown_services()
await logger.complete()

return lifespan

Expand Down
Loading