Skip to content

Commit

Permalink
Fix async usage in app startup
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored and ogabrielluiz committed Oct 25, 2024
1 parent ee101e6 commit acce253
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 25 deletions.
8 changes: 1 addition & 7 deletions src/backend/base/langflow/initial_setup/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import shutil
import time
from collections import defaultdict
from collections.abc import Awaitable
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
Expand Down Expand Up @@ -600,12 +599,7 @@ def find_existing_flow(session, flow_id, flow_endpoint_name):
return None


async def create_or_update_starter_projects(get_all_components_coro: Awaitable[dict]) -> None:
try:
all_types_dict = await get_all_components_coro
except Exception:
logger.exception("Error loading components")
raise
def create_or_update_starter_projects(all_types_dict: dict) -> None:
with session_scope() as session:
new_folder = create_starter_folder(session)
starter_projects = load_starter_projects()
Expand Down
24 changes: 10 additions & 14 deletions src/backend/base/langflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from pathlib import Path
from urllib.parse import urlencode

import nest_asyncio
from fastapi import FastAPI, HTTPException, Request, Response, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
Expand Down Expand Up @@ -87,28 +86,25 @@ async def dispatch(self, request: Request, call_next):
return response


telemetry_service_tasks = set()


def get_lifespan(*, fix_migration=False, version=None):
def _initialize():
initialize_services(fix_migration=fix_migration)
setup_llm_caching()
initialize_super_user_if_needed()

@asynccontextmanager
async def lifespan(_app: FastAPI):
nest_asyncio.apply()
# Startup message
if version:
rprint(f"[bold green]Starting Langflow v{version}...[/bold green]")
else:
rprint("[bold green]Starting Langflow...[/bold green]")
try:
initialize_services(fix_migration=fix_migration)
setup_llm_caching()
initialize_super_user_if_needed()
task = asyncio.create_task(get_and_cache_all_types_dict(get_settings_service()))
await create_or_update_starter_projects(task)
telemetry_service_task = asyncio.create_task(get_telemetry_service().start())
telemetry_service_tasks.add(telemetry_service_task)
telemetry_service_task.add_done_callback(telemetry_service_tasks.discard)
load_flows_from_directory()
await asyncio.to_thread(_initialize)
all_types_dict = await get_and_cache_all_types_dict(get_settings_service())
await asyncio.to_thread(create_or_update_starter_projects, all_types_dict)
get_telemetry_service().start()
await asyncio.to_thread(load_flows_from_directory)
yield
except Exception as exc:
if "langflow migration --fix" not in str(exc):
Expand Down
17 changes: 13 additions & 4 deletions src/backend/base/langflow/services/telemetry/service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio
import contextlib
import os
import platform
from datetime import datetime, timezone
Expand Down Expand Up @@ -112,7 +111,7 @@ async def log_package_playground(self, payload: PlaygroundPayload) -> None:
async def log_package_component(self, payload: ComponentPayload) -> None:
await self._queue_event((self.send_telemetry_data, payload, "component"))

async def start(self) -> None:
def start(self) -> None:
if self.running or self.do_not_track:
return
try:
Expand Down Expand Up @@ -140,9 +139,19 @@ async def stop(self) -> None:
await self.flush()
self.running = False
if self.worker_task:
self.worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
self.worker_task.cancel("Cancel telemetry worker task")
try:
await self.worker_task
except asyncio.CancelledError:
if asyncio.current_task().cancelling() > 0:
raise
if self.log_package_version_task:
self.log_package_version_task.cancel("Cancel telemetry log package version task")
try:
await self.log_package_version_task
except asyncio.CancelledError:
if asyncio.current_task().cancelling() > 0:
raise
await self.client.aclose()
except Exception: # noqa: BLE001
logger.exception("Error stopping tracing service")
Expand Down

0 comments on commit acce253

Please # to comment.