From e441d14f3f78454fc44b7323d43ccd8b4f2f39a1 Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Wed, 29 Jan 2025 06:35:47 -0600 Subject: [PATCH] Remove `@sync_compatible` from `prefect.artifacts` (#16877) --- src/prefect/artifacts.py | 513 ++++++++++++++++++++++++++++++-------- tests/fixtures/api.py | 5 + tests/test_artifacts.py | 517 +++++++++++++++++++++++++++++++++------ 3 files changed, 859 insertions(+), 176 deletions(-) diff --git a/src/prefect/artifacts.py b/src/prefect/artifacts.py index 559093abb862..d4028fd08c6a 100644 --- a/src/prefect/artifacts.py +++ b/src/prefect/artifacts.py @@ -2,22 +2,27 @@ Interface for creating and reading artifacts. """ -import asyncio -import json # noqa: I001 +from __future__ import annotations + +import json import math import warnings -from typing import TYPE_CHECKING, Any, Optional, Union +from contextlib import nullcontext +from typing import TYPE_CHECKING, Any, Optional, Union, cast from uuid import UUID from typing_extensions import Self +from prefect._internal.compatibility.async_dispatch import async_dispatch +from prefect.client.orchestration import PrefectClient, get_client from prefect.client.schemas.actions import ArtifactCreate as ArtifactRequest from prefect.client.schemas.actions import ArtifactUpdate from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey +from prefect.client.schemas.objects import Artifact as ArtifactResponse from prefect.client.schemas.sorting import ArtifactSort -from prefect.client.utilities import get_or_create_client +from prefect.context import MissingContextError, get_run_context from prefect.logging.loggers import get_logger -from prefect.utilities.asyncutils import sync_compatible +from prefect.utilities.asyncutils import asyncnullcontext from prefect.utilities.context import get_task_and_flow_run_ids if TYPE_CHECKING: @@ -25,10 +30,6 @@ logger: "logging.Logger" = get_logger("artifacts") -if TYPE_CHECKING: - from prefect.client.orchestration import PrefectClient - from prefect.client.schemas.objects import Artifact as ArtifactResponse - class Artifact(ArtifactRequest): """ @@ -43,10 +44,47 @@ class Artifact(ArtifactRequest): data: A JSON payload that allows for a result to be retrieved. """ - @sync_compatible - async def create( + async def acreate( + self, + client: "PrefectClient | None" = None, + ) -> "ArtifactResponse": + """ + An async method to create an artifact. + + Arguments: + client: The PrefectClient + + Returns: + - The created artifact. + """ + + local_client_context = asyncnullcontext(client) if client else get_client() + async with local_client_context as client: + task_run_id, flow_run_id = get_task_and_flow_run_ids() + + try: + get_run_context() + except MissingContextError: + warnings.warn( + "Artifact creation outside of a flow or task run is deprecated and will be removed in a later version.", + FutureWarning, + ) + + return await client.create_artifact( + artifact=ArtifactRequest( + type=self.type, + key=self.key, + description=self.description, + task_run_id=self.task_run_id or task_run_id, + flow_run_id=self.flow_run_id or flow_run_id, + data=await self.aformat(), + ) + ) + + @async_dispatch(acreate) + def create( self: Self, - client: Optional["PrefectClient"] = None, + client: "PrefectClient | None" = None, ) -> "ArtifactResponse": """ A method to create an artifact. @@ -57,9 +95,9 @@ async def create( Returns: - The created artifact. """ - from prefect.context import MissingContextError, get_run_context - client, _ = get_or_create_client(client) + # Create sync client since this is a sync method. + sync_client = get_client(sync_client=True) task_run_id, flow_run_id = get_task_and_flow_run_ids() try: @@ -70,35 +108,67 @@ async def create( FutureWarning, ) - return await client.create_artifact( + return sync_client.create_artifact( artifact=ArtifactRequest( type=self.type, key=self.key, description=self.description, task_run_id=self.task_run_id or task_run_id, flow_run_id=self.flow_run_id or flow_run_id, - data=await self.format(), + data=cast(str, self.format(_sync=True)), # pyright: ignore[reportCallIssue] _sync is valid because .format is wrapped in async_dispatch ) ) @classmethod - @sync_compatible - async def get( - cls, key: Optional[str] = None, client: Optional["PrefectClient"] = None - ) -> Optional["ArtifactResponse"]: + async def aget( + cls, + key: str | None = None, + client: "PrefectClient | None" = None, + ) -> "ArtifactResponse | None": + """ + A async method to get an artifact. + + Arguments: + key: The key of the artifact to get. + client: A client to use when calling the Prefect API. + + Returns: + The artifact (if found). + """ + + local_client_context = asyncnullcontext(client) if client else get_client() + async with local_client_context as client: + filter_key_value = None if key is None else [key] + artifacts = await client.read_artifacts( + limit=1, + sort=ArtifactSort.UPDATED_DESC, + artifact_filter=ArtifactFilter( + key=ArtifactFilterKey(any_=filter_key_value) + ), + ) + return None if not artifacts else artifacts[0] + + @classmethod + @async_dispatch(aget) + def get( + cls, key: str | None = None, client: "PrefectClient | None" = None + ) -> "ArtifactResponse | None": """ A method to get an artifact. Arguments: - key (str, optional): The key of the artifact to get. - client (PrefectClient, optional): The PrefectClient + key: The key of the artifact to get. + client: A client to use when calling the Prefect API. Returns: - (ArtifactResponse, optional): The artifact (if found). + The artifact (if found). """ - client, _ = get_or_create_client(client) + + # Create sync client since this is a sync method. + sync_client = get_client(sync_client=True) + filter_key_value = None if key is None else [key] - artifacts = await client.read_artifacts( + artifacts = sync_client.read_artifacts( limit=1, sort=ArtifactSort.UPDATED_DESC, artifact_filter=ArtifactFilter( @@ -108,41 +178,75 @@ async def get( return None if not artifacts else artifacts[0] @classmethod - @sync_compatible - async def get_or_create( + async def aget_or_create( cls, - key: Optional[str] = None, - description: Optional[str] = None, - data: Optional[Union[dict[str, Any], Any]] = None, - client: Optional["PrefectClient"] = None, + key: str | None = None, + description: str | None = None, + data: dict[str, Any] | Any | None = None, + client: "PrefectClient | None" = None, + **kwargs: Any, + ) -> tuple["ArtifactResponse", bool]: + """ + A async method to get or create an artifact. + + Arguments: + key: The key of the artifact to get or create. + description: The description of the artifact to create. + data: The data of the artifact to create. + client: The PrefectClient + **kwargs: Additional keyword arguments to use when creating the artifact. + + Returns: + The artifact, either retrieved or created. + """ + artifact = await cls.aget(key, client) + if artifact: + return artifact, False + + new_artifact = cls(key=key, description=description, data=data, **kwargs) + created_artifact = await new_artifact.acreate(client) + return created_artifact, True + + @classmethod + @async_dispatch(aget_or_create) + def get_or_create( + cls, + key: str | None = None, + description: str | None = None, + data: dict[str, Any] | Any | None = None, + client: "PrefectClient | None" = None, **kwargs: Any, ) -> tuple["ArtifactResponse", bool]: """ A method to get or create an artifact. Arguments: - key (str, optional): The key of the artifact to get or create. - description (str, optional): The description of the artifact to create. - data (Union[Dict[str, Any], Any], optional): The data of the artifact to create. - client (PrefectClient, optional): The PrefectClient + key: The key of the artifact to get or create. + description: The description of the artifact to create. + data: The data of the artifact to create. + client: The PrefectClient + **kwargs: Additional keyword arguments to use when creating the artifact. Returns: - (ArtifactResponse): The artifact, either retrieved or created. + The artifact, either retrieved or created. """ - artifact_coro = cls.get(key, client) - if TYPE_CHECKING: - assert asyncio.iscoroutine(artifact_coro) - artifact = await artifact_coro + artifact = cast(ArtifactResponse, cls.get(key, _sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .get is wrapped in async_dispatch if artifact: return artifact, False new_artifact = cls(key=key, description=description, data=data, **kwargs) - create_coro = new_artifact.create(client) - if TYPE_CHECKING: - assert asyncio.iscoroutine(create_coro) - return await create_coro, True + created_artifact = cast( + ArtifactResponse, + new_artifact.create(_sync=True), # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch + ) + return created_artifact, True + + # TODO: Remove this when we remove async_dispatch because it doesn't need to be async + async def aformat(self) -> str | float | int | dict[str, Any]: + return json.dumps(self.data) - async def format(self) -> Optional[Union[dict[str, Any], Any]]: + @async_dispatch(aformat) + def format(self) -> str | float | int | dict[str, Any]: return json.dumps(self.data) @@ -151,19 +255,30 @@ class LinkArtifact(Artifact): link_text: Optional[str] = None type: Optional[str] = "markdown" - async def format(self) -> str: + def _format(self) -> str: return ( f"[{self.link_text}]({self.link})" if self.link_text else f"[{self.link}]({self.link})" ) + async def aformat(self) -> str: + return self._format() + + @async_dispatch(aformat) + def format(self) -> str: + return self._format() + class MarkdownArtifact(Artifact): markdown: str type: Optional[str] = "markdown" - async def format(self) -> str: + async def aformat(self) -> str: + return self.markdown + + @async_dispatch(aformat) + def format(self) -> str: return self.markdown @@ -173,8 +288,8 @@ class TableArtifact(Artifact): @classmethod def _sanitize( - cls, item: Union[dict[str, Any], list[Any], float] - ) -> Union[dict[str, Any], list[Any], int, float, None]: + cls, item: dict[str, Any] | list[Any] | float + ) -> dict[str, Any] | list[Any] | int | float | None: """ Sanitize NaN values in a given item. The item can be a dict, list or float. @@ -188,7 +303,11 @@ def _sanitize( else: return item - async def format(self) -> str: + async def aformat(self) -> str: + return json.dumps(self._sanitize(self.table)) + + @async_dispatch(aformat) + def format(self) -> str: return json.dumps(self._sanitize(self.table)) @@ -196,7 +315,7 @@ class ProgressArtifact(Artifact): progress: float type: Optional[str] = "progress" - async def format(self) -> float: + def _format(self) -> float: # Ensure progress is between 0 and 100 min_progress = 0.0 max_progress = 100.0 @@ -209,6 +328,13 @@ async def format(self) -> float: return self.progress + async def aformat(self) -> float: + return self._format() + + @async_dispatch(aformat) + def format(self) -> float: + return self._format() + class ImageArtifact(Artifact): """ @@ -221,11 +347,14 @@ class ImageArtifact(Artifact): image_url: str type: Optional[str] = "image" - async def format(self) -> str: + async def aformat(self) -> str: + return self.image_url + + @async_dispatch(aformat) + def format(self) -> str: """ This method is used to format the artifact data so it can be properly sent - to the API when the .create() method is called. It is async because the - method is awaited in the parent class. + to the API when the .create() method is called. Returns: str: The image URL. @@ -233,13 +362,46 @@ async def format(self) -> str: return self.image_url -@sync_compatible -async def create_link_artifact( +async def acreate_link_artifact( + link: str, + link_text: str | None = None, + key: str | None = None, + description: str | None = None, + client: "PrefectClient | None" = None, +) -> UUID: + """ + Create a link artifact. + + Arguments: + link: The link to create. + link_text: The link text. + key: A user-provided string identifier. + Required for the artifact to show in the Artifacts page in the UI. + The key must only contain lowercase letters, numbers, and dashes. + description: A user-specified description of the artifact. + + + Returns: + The table artifact ID. + """ + new_artifact = LinkArtifact( + key=key, + description=description, + link=link, + link_text=link_text, + ) + artifact = await new_artifact.acreate(client) + + return artifact.id + + +@async_dispatch(acreate_link_artifact) +def create_link_artifact( link: str, - link_text: Optional[str] = None, - key: Optional[str] = None, - description: Optional[str] = None, - client: Optional["PrefectClient"] = None, + link_text: str | None = None, + key: str | None = None, + description: str | None = None, + client: "PrefectClient | None" = None, ) -> UUID: """ Create a link artifact. @@ -262,19 +424,44 @@ async def create_link_artifact( link=link, link_text=link_text, ) - create_coro = new_artifact.create(client) - if TYPE_CHECKING: - assert asyncio.iscoroutine(create_coro) - artifact = await create_coro + artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch + + return artifact.id + + +async def acreate_markdown_artifact( + markdown: str, + key: str | None = None, + description: str | None = None, +) -> UUID: + """ + Create a markdown artifact. + + Arguments: + markdown: The markdown to create. + key: A user-provided string identifier. + Required for the artifact to show in the Artifacts page in the UI. + The key must only contain lowercase letters, numbers, and dashes. + description: A user-specified description of the artifact. + + Returns: + The table artifact ID. + """ + new_artifact = MarkdownArtifact( + key=key, + description=description, + markdown=markdown, + ) + artifact = await new_artifact.acreate() return artifact.id -@sync_compatible -async def create_markdown_artifact( +@async_dispatch(acreate_markdown_artifact) +def create_markdown_artifact( markdown: str, - key: Optional[str] = None, - description: Optional[str] = None, + key: str | None = None, + description: str | None = None, ) -> UUID: """ Create a markdown artifact. @@ -294,19 +481,45 @@ async def create_markdown_artifact( description=description, markdown=markdown, ) - create_coro = new_artifact.create() - if TYPE_CHECKING: - assert asyncio.iscoroutine(create_coro) - artifact = await create_coro + artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch return artifact.id -@sync_compatible -async def create_table_artifact( - table: Union[dict[str, list[Any]], list[dict[str, Any]], list[list[Any]]], - key: Optional[str] = None, - description: Optional[str] = None, +async def acreate_table_artifact( + table: dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]], + key: str | None = None, + description: str | None = None, +) -> UUID: + """ + Create a table artifact asynchronously. + + Arguments: + table: The table to create. + key: A user-provided string identifier. + Required for the artifact to show in the Artifacts page in the UI. + The key must only contain lowercase letters, numbers, and dashes. + description: A user-specified description of the artifact. + + Returns: + The table artifact ID. + """ + + new_artifact = TableArtifact( + key=key, + description=description, + table=table, + ) + artifact = await new_artifact.acreate() + + return artifact.id + + +@async_dispatch(acreate_table_artifact) +def create_table_artifact( + table: dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]], + key: str | None = None, + description: str | None = None, ) -> UUID: """ Create a table artifact. @@ -327,19 +540,45 @@ async def create_table_artifact( description=description, table=table, ) - create_coro = new_artifact.create() - if TYPE_CHECKING: - assert asyncio.iscoroutine(create_coro) - artifact = await create_coro + artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch return artifact.id -@sync_compatible -async def create_progress_artifact( +async def acreate_progress_artifact( progress: float, - key: Optional[str] = None, - description: Optional[str] = None, + key: str | None = None, + description: str | None = None, +) -> UUID: + """ + Create a progress artifact asynchronously. + + Arguments: + progress: The percentage of progress represented by a float between 0 and 100. + key: A user-provided string identifier. + Required for the artifact to show in the Artifacts page in the UI. + The key must only contain lowercase letters, numbers, and dashes. + description: A user-specified description of the artifact. + + Returns: + The progress artifact ID. + """ + + new_artifact = ProgressArtifact( + key=key, + description=description, + progress=progress, + ) + artifact = await new_artifact.acreate() + + return artifact.id + + +@async_dispatch(acreate_progress_artifact) +def create_progress_artifact( + progress: float, + key: str | None = None, + description: str | None = None, ) -> UUID: """ Create a progress artifact. @@ -360,20 +599,58 @@ async def create_progress_artifact( description=description, progress=progress, ) - create_coro = new_artifact.create() - if TYPE_CHECKING: - assert asyncio.iscoroutine(create_coro) - artifact = await create_coro + artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch return artifact.id -@sync_compatible -async def update_progress_artifact( +async def aupdate_progress_artifact( + artifact_id: UUID, + progress: float, + description: str | None = None, + client: "PrefectClient | None" = None, +) -> UUID: + """ + Update a progress artifact asynchronously. + + Arguments: + artifact_id: The ID of the artifact to update. + progress: The percentage of progress represented by a float between 0 and 100. + description: A user-specified description of the artifact. + + Returns: + The progress artifact ID. + """ + + local_client_context = nullcontext(client) if client else get_client() + async with local_client_context as client: + artifact = ProgressArtifact( + description=description, + progress=progress, + ) + update = ( + ArtifactUpdate( + description=artifact.description, + data=await artifact.aformat(), + ) + if description + else ArtifactUpdate(data=await artifact.aformat()) + ) + + await client.update_artifact( + artifact_id=artifact_id, + artifact=update, + ) + + return artifact_id + + +@async_dispatch(aupdate_progress_artifact) +def update_progress_artifact( artifact_id: UUID, progress: float, - description: Optional[str] = None, - client: Optional["PrefectClient"] = None, + description: str | None = None, + client: "PrefectClient | None" = None, ) -> UUID: """ Update a progress artifact. @@ -387,7 +664,7 @@ async def update_progress_artifact( The progress artifact ID. """ - client, _ = get_or_create_client(client) + sync_client = get_client(sync_client=True) artifact = ProgressArtifact( description=description, @@ -396,13 +673,13 @@ async def update_progress_artifact( update = ( ArtifactUpdate( description=artifact.description, - data=await artifact.format(), + data=cast(float, artifact.format(_sync=True)), # pyright: ignore[reportCallIssue] _sync is valid because .format is wrapped in async_dispatch ) if description - else ArtifactUpdate(data=await artifact.format()) + else ArtifactUpdate(data=cast(float, artifact.format(_sync=True))) # pyright: ignore[reportCallIssue] _sync is valid because .format is wrapped in async_dispatch ) - await client.update_artifact( + sync_client.update_artifact( artifact_id=artifact_id, artifact=update, ) @@ -410,11 +687,40 @@ async def update_progress_artifact( return artifact_id -@sync_compatible -async def create_image_artifact( +async def acreate_image_artifact( + image_url: str, + key: str | None = None, + description: str | None = None, +) -> UUID: + """ + Create an image artifact asynchronously. + + Arguments: + image_url: The URL of the image to display. + key: A user-provided string identifier. + Required for the artifact to show in the Artifacts page in the UI. + The key must only contain lowercase letters, numbers, and dashes. + description: A user-specified description of the artifact. + + Returns: + The image artifact ID. + """ + + new_artifact = ImageArtifact( + key=key, + description=description, + image_url=image_url, + ) + artifact = await new_artifact.acreate() + + return artifact.id + + +@async_dispatch(acreate_image_artifact) +def create_image_artifact( image_url: str, - key: Optional[str] = None, - description: Optional[str] = None, + key: str | None = None, + description: str | None = None, ) -> UUID: """ Create an image artifact. @@ -435,9 +741,6 @@ async def create_image_artifact( description=description, image_url=image_url, ) - create_coro = new_artifact.create() - if TYPE_CHECKING: - assert asyncio.iscoroutine(create_coro) - artifact = await create_coro + artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch return artifact.id diff --git a/tests/fixtures/api.py b/tests/fixtures/api.py index 26adaf9e65d8..8dae3c81a4cb 100644 --- a/tests/fixtures/api.py +++ b/tests/fixtures/api.py @@ -35,6 +35,11 @@ async def client(app) -> AsyncGenerator[AsyncClient, Any]: yield async_client +@pytest.fixture +def sync_client(app: FastAPI) -> TestClient: + return TestClient(app, base_url="https://test/api") + + @pytest.fixture async def hosted_api_client(use_hosted_api_server) -> AsyncGenerator[AsyncClient, Any]: async with httpx.AsyncClient(base_url=use_hosted_api_server) as async_client: diff --git a/tests/test_artifacts.py b/tests/test_artifacts.py index d5d38083a6c7..dcbc2194f063 100644 --- a/tests/test_artifacts.py +++ b/tests/test_artifacts.py @@ -1,10 +1,19 @@ +import asyncio import json -from typing import List +from typing import cast +from uuid import UUID +import httpx import pytest +from fastapi.testclient import TestClient from prefect import flow, task from prefect.artifacts import ( + Artifact, + acreate_link_artifact, + acreate_progress_artifact, + acreate_table_artifact, + aupdate_progress_artifact, create_image_artifact, create_link_artifact, create_markdown_artifact, @@ -12,14 +21,16 @@ create_table_artifact, update_progress_artifact, ) -from prefect.context import get_run_context +from prefect.client.orchestration import PrefectClient +from prefect.client.schemas.objects import Artifact as ArtifactResponse +from prefect.context import FlowRunContext, TaskRunContext, get_run_context from prefect.server import schemas from prefect.server.schemas.actions import ArtifactCreate class TestCreateArtifacts: @pytest.fixture - async def artifact(self): + def artifact(self): yield ArtifactCreate( key="voltaic", data=1, @@ -27,14 +38,14 @@ async def artifact(self): ) async def test_create_and_read_link_artifact_with_linktext_succeeds( - self, artifact, client + self, artifact: ArtifactCreate, client: httpx.AsyncClient ): my_link = "prefect.io" link_text = "Prefect" @flow async def my_flow(): - return await create_link_artifact( + return await acreate_link_artifact( key=artifact.key, link=my_link, link_text=link_text, @@ -47,10 +58,14 @@ async def my_flow(): result = schemas.core.Artifact.model_validate(response.json()) assert result.data == f"[{link_text}]({my_link})" - async def test_create_link_artifact_in_task_succeeds(self, client): + async def test_create_link_artifact_in_task_succeeds( + self, client: httpx.AsyncClient + ): @task def my_special_task(): - task_run_id = get_run_context().task_run.id + run_context = get_run_context() + assert isinstance(run_context, TaskRunContext) + task_run_id = run_context.task_run.id artifact_id = create_link_artifact( key="task-link-artifact-3", link="google.com", @@ -60,7 +75,10 @@ def my_special_task(): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id, task_run_id = my_special_task() return artifact_id, flow_run_id, task_run_id @@ -74,10 +92,15 @@ def my_flow(): assert my_link_artifact.flow_run_id == flow_run_id assert my_link_artifact.task_run_id == task_run_id - async def test_create_link_artifact_in_flow_succeeds(self, client): + async def test_create_link_artifact_in_flow_succeeds( + self, client: httpx.AsyncClient + ): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id = create_link_artifact( key="task-link-artifact-4", @@ -96,10 +119,15 @@ def my_flow(): assert my_link_artifact.flow_run_id == flow_run_id assert my_link_artifact.task_run_id is None - async def test_create_link_artifact_in_subflow_succeeds(self, client): + async def test_create_link_artifact_in_subflow_succeeds( + self, client: httpx.AsyncClient + ): @flow def my_subflow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id = create_link_artifact( key="task-link-artifact-5", link="google.com", @@ -130,7 +158,7 @@ async def test_create_link_artifact_using_map_succeeds(self): # An ode to prefect issue #5309. @task - def add_ten(x): + def add_ten(x: int) -> int: create_link_artifact( # TODO: uncomment this out once unique constraint is dropped on artifact key # key="new-markdown-artifact", @@ -140,17 +168,21 @@ def add_ten(x): return x + 10 @flow - def simple_map(nums: List[int]): + def simple_map(nums: list[int]): big_nums = add_ten.map(nums) return [big_num.result() for big_num in big_nums] my_big_nums = simple_map([1, 2, 3]) assert my_big_nums == [11, 12, 13] - async def test_create_markdown_artifact_in_task_succeeds(self, client): + async def test_create_markdown_artifact_in_task_succeeds( + self, client: httpx.AsyncClient + ): @task def my_special_task(): - task_run_id = get_run_context().task_run.id + run_context = get_run_context() + assert isinstance(run_context, TaskRunContext) + task_run_id = run_context.task_run.id artifact_id = create_markdown_artifact( key="task-link-artifact-3", markdown="my markdown", @@ -160,7 +192,10 @@ def my_special_task(): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id, task_run_id = my_special_task() return artifact_id, flow_run_id, task_run_id @@ -174,10 +209,15 @@ def my_flow(): assert my_markdown_artifact.flow_run_id == flow_run_id assert my_markdown_artifact.task_run_id == task_run_id - async def test_create_markdown_artifact_in_flow_succeeds(self, client): + async def test_create_markdown_artifact_in_flow_succeeds( + self, client: httpx.AsyncClient + ): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id = create_markdown_artifact( key="task-link-artifact-4", @@ -196,10 +236,15 @@ def my_flow(): assert my_markdown_artifact.flow_run_id == flow_run_id assert my_markdown_artifact.task_run_id is None - async def test_create_markdown_artifact_in_subflow_succeeds(self, client): + async def test_create_markdown_artifact_in_subflow_succeeds( + self, client: httpx.AsyncClient + ): @flow def my_subflow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id = create_markdown_artifact( key="task-link-artifact-3", markdown="my markdown", @@ -228,7 +273,7 @@ async def test_create_markdown_artifact_using_map_succeeds(self): """ @task - def add_ten(x): + def add_ten(x: int) -> int: create_markdown_artifact( key="new-markdown-artifact", markdown="my markdown", @@ -237,7 +282,7 @@ def add_ten(x): return x + 10 @flow - def simple_map(nums: List[int]): + def simple_map(nums: list[int]) -> list[int]: big_nums = add_ten.map(nums) return [big_num.result() for big_num in big_nums] @@ -245,13 +290,13 @@ def simple_map(nums: List[int]): assert my_big_nums == [11, 12, 13] async def test_create_and_read_dict_of_list_table_artifact_succeeds( - self, artifact, client + self, artifact: ArtifactCreate, client: httpx.AsyncClient ): my_table = {"a": [1, 3], "b": [2, 4]} @flow async def my_flow(): - return await create_table_artifact( + return await acreate_table_artifact( key=artifact.key, table=my_table, description=artifact.description, @@ -261,17 +306,18 @@ async def my_flow(): response = await client.get(f"/artifacts/{artifact_id}") assert response.status_code == 200 result = schemas.core.Artifact.model_validate(response.json()) + assert isinstance(result.data, str) result_data = json.loads(result.data) assert result_data == my_table async def test_create_and_read_list_of_dict_table_artifact_succeeds( - self, artifact, client + self, artifact: ArtifactCreate, client: httpx.AsyncClient ): my_table = [{"a": 1, "b": 2}, {"a": 3, "b": 4}] @flow async def my_flow(): - return await create_table_artifact( + return await acreate_table_artifact( key=artifact.key, table=my_table, description=artifact.description, @@ -282,17 +328,18 @@ async def my_flow(): assert response.status_code == 200 result = schemas.core.Artifact.model_validate(response.json()) + assert isinstance(result.data, str) result_data = json.loads(result.data) assert result_data == my_table async def test_create_and_read_list_of_list_table_artifact_succeeds( - self, artifact, client + self, artifact: ArtifactCreate, client: httpx.AsyncClient ): my_table = [[1, 2], [None, 4]] @flow async def my_flow(): - return await create_table_artifact( + return await acreate_table_artifact( key=artifact.key, table=my_table, description=artifact.description, @@ -302,14 +349,19 @@ async def my_flow(): response = await client.get(f"/artifacts/{artifact_id}") assert response.status_code == 200 result = schemas.core.Artifact.model_validate(response.json()) + assert isinstance(result.data, str) result_data = json.loads(result.data) assert result_data == my_table - async def test_create_table_artifact_in_task_succeeds(self, client): + async def test_create_table_artifact_in_task_succeeds( + self, client: httpx.AsyncClient + ): @task def my_special_task(): my_table = {"a": [1, 3], "b": [2, 4]} - task_run_id = get_run_context().task_run.id + run_context = get_run_context() + assert isinstance(run_context, TaskRunContext) + task_run_id = run_context.task_run.id artifact_id = create_table_artifact( key="task-link-artifact-3", table=my_table, @@ -319,7 +371,10 @@ def my_special_task(): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id, task_run_id = my_special_task() return artifact_id, flow_run_id, task_run_id @@ -331,14 +386,20 @@ def my_flow(): assert my_table_artifact.flow_run_id == flow_run_id assert my_table_artifact.task_run_id == task_run_id + assert isinstance(my_table_artifact.data, str) result_data = json.loads(my_table_artifact.data) assert result_data == {"a": [1, 3], "b": [2, 4]} - async def test_create_table_artifact_in_flow_succeeds(self, client): + async def test_create_table_artifact_in_flow_succeeds( + self, client: httpx.AsyncClient + ): @flow def my_flow(): my_table = {"a": [1, 3], "b": [2, 4]} - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id = create_table_artifact( key="task-link-artifact-4", @@ -356,14 +417,20 @@ def my_flow(): assert my_table_artifact.flow_run_id == flow_run_id assert my_table_artifact.task_run_id is None + assert isinstance(my_table_artifact.data, str) result_data = json.loads(my_table_artifact.data) assert result_data == {"a": [1, 3], "b": [2, 4]} - async def test_create_table_artifact_in_subflow_succeeds(self, client): + async def test_create_table_artifact_in_subflow_succeeds( + self, client: httpx.AsyncClient + ): @flow def my_subflow(): my_table = {"a": [1, 3], "b": [2, 4]} - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id = create_table_artifact( key="task-link-artifact-3", table=my_table, @@ -384,6 +451,7 @@ def my_flow(): my_table_artifact = schemas.core.Artifact.model_validate(response.json()) assert my_table_artifact.flow_run_id == flow_run_id + assert isinstance(my_table_artifact.data, str) result_data = json.loads(my_table_artifact.data) assert result_data == {"a": [1, 3], "b": [2, 4]} assert my_table_artifact.task_run_id is None @@ -395,7 +463,7 @@ async def test_create_table_artifact_using_map_succeeds(self): """ @task - def add_ten(x): + def add_ten(x: int) -> int: my_table = {"a": [1, 3], "b": [2, 4]} create_table_artifact( # TODO: uncomment this out once unique constraint is dropped on artifact key @@ -406,7 +474,7 @@ def add_ten(x): return x + 10 @flow - def simple_map(nums: List[int]): + def simple_map(nums: list[int]): big_nums = add_ten.map(nums) return [big_num.result() for big_num in big_nums] @@ -417,21 +485,23 @@ async def test_create_dict_table_artifact_with_none_succeeds(self): my_table = {"a": [1, 3], "b": [2, None]} @flow - async def my_flow(): - return await create_table_artifact( + def my_flow(): + return create_table_artifact( key="swiss-table", table=my_table, description="my-artifact-description", ) - await my_flow() + my_flow() - async def test_create_dict_table_artifact_with_nan_succeeds(self, client): + async def test_create_dict_table_artifact_with_nan_succeeds( + self, client: httpx.AsyncClient + ): my_table = {"a": [1, 3], "b": [2, float("nan")]} @flow async def my_flow(): - return await create_table_artifact( + return await acreate_table_artifact( key="swiss-table", table=my_table, description="my-artifact-description", @@ -441,6 +511,7 @@ async def my_flow(): response = await client.get(f"/artifacts/{artifact_id}") assert response.status_code == 200 my_artifact = schemas.core.Artifact.model_validate(response.json()) + assert isinstance(my_artifact.data, str) my_data = json.loads(my_artifact.data) assert my_data == {"a": [1, 3], "b": [2, None]} @@ -452,7 +523,7 @@ async def test_create_list_table_artifact_with_none_succeeds(self): @flow async def my_flow(): - await create_table_artifact( + await acreate_table_artifact( key="swiss-table", table=my_table, description="my-artifact-description", @@ -460,7 +531,9 @@ async def my_flow(): await my_flow() - async def test_create_list_table_artifact_with_nan_succeeds(self, client): + async def test_create_list_table_artifact_with_nan_succeeds( + self, client: httpx.AsyncClient + ): my_table = [ {"a": 1, "b": 2}, {"a": 3, "b": float("nan")}, @@ -468,7 +541,7 @@ async def test_create_list_table_artifact_with_nan_succeeds(self, client): @flow async def my_flow(): - return await create_table_artifact( + return await acreate_table_artifact( key="swiss-table", table=my_table, description="my-artifact-description", @@ -478,18 +551,21 @@ async def my_flow(): response = await client.get(f"/artifacts/{artifact_id}") assert response.status_code == 200 my_artifact = schemas.core.Artifact.model_validate(response.json()) + assert isinstance(my_artifact.data, str) my_data = json.loads(my_artifact.data) assert my_data == [ {"a": 1, "b": 2}, {"a": 3, "b": None}, ] - async def test_create_progress_artifact_without_key(self, client): + async def test_create_progress_artifact_without_key( + self, client: httpx.AsyncClient + ): progress = 0.0 @flow async def my_flow(): - return await create_progress_artifact( + return await acreate_progress_artifact( progress, description="my-description" ) @@ -501,16 +577,16 @@ async def my_flow(): assert my_artifact.type == "progress" assert my_artifact.description == "my-description" - async def test_create_progress_artifact_with_key(self, client): + async def test_create_progress_artifact_with_key(self, client: httpx.AsyncClient): progress = 0.0 @flow - async def my_flow(): - return await create_progress_artifact( + def my_flow(): + return create_progress_artifact( progress, key="progress-artifact", description="my-description" ) - artifact_id = await my_flow() + artifact_id = my_flow() response = await client.get(f"/artifacts/{artifact_id}") assert response.status_code == 200 my_artifact = schemas.core.Artifact.model_validate(response.json()) @@ -519,10 +595,15 @@ async def my_flow(): assert my_artifact.key == "progress-artifact" assert my_artifact.description == "my-description" - async def test_create_progress_artifact_in_task_succeeds(self, client): + async def test_create_progress_artifact_in_task_succeeds( + self, client: httpx.AsyncClient + ): @task def my_task(): - task_run_id = get_run_context().task_run.id + run_context = get_run_context() + assert isinstance(run_context, TaskRunContext) + assert run_context.task_run is not None + task_run_id = run_context.task_run.id artifact_id = create_progress_artifact( key="task-link-artifact-3", progress=0.0, @@ -532,7 +613,10 @@ def my_task(): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id, task_run_id = my_task() return artifact_id, flow_run_id, task_run_id @@ -549,10 +633,15 @@ def my_flow(): assert my_progress_artifact.type == "progress" assert my_progress_artifact.description == "my-artifact-description" - async def test_create_progess_artifact_in_flow_succeeds(self, client): + async def test_create_progess_artifact_in_flow_succeeds( + self, client: httpx.AsyncClient + ): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id = create_progress_artifact( key="task-link-artifact-4", @@ -574,10 +663,15 @@ def my_flow(): assert my_progress_artifact.type == "progress" assert my_progress_artifact.description == "my-artifact-description" - async def test_create_image_artifact_in_task_succeeds(self, client): + async def test_create_image_artifact_in_task_succeeds( + self, client: httpx.AsyncClient + ): @task def my_task(): - task_run_id = get_run_context().task_run.id + run_context = get_run_context() + assert isinstance(run_context, TaskRunContext) + assert run_context.task_run is not None + task_run_id = run_context.task_run.id artifact_id = create_image_artifact( image_url="https://www.google.com/images/branding/googlelogo/1x/googlelogo_color_272x92dp.png", key="task-link-artifact-3", @@ -587,7 +681,10 @@ def my_task(): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id, task_run_id = my_task() return artifact_id, flow_run_id, task_run_id @@ -607,10 +704,15 @@ def my_flow(): assert my_image_artifact.type == "image" assert my_image_artifact.description == "my-artifact-description" - async def test_create_image_artifact_in_flow_succeeds(self, client): + async def test_create_image_artifact_in_flow_succeeds( + self, client: httpx.AsyncClient + ): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id = create_image_artifact( image_url="https://www.google.com/images/branding/googlelogo/1x/googlelogo_color_272x92dp.png", @@ -637,16 +739,22 @@ def my_flow(): async def test_creating_artifact_outside_of_flow_run_context_warns(self): with pytest.warns(FutureWarning): - await create_link_artifact("https://www.google.com", "Google") + create_link_artifact("https://www.google.com", "Google", _sync=True) # pyright: ignore[reportCallIssue] + + with pytest.warns(FutureWarning): + await acreate_link_artifact("https://www.google.com", "Google") class TestUpdateArtifacts: - async def test_update_progress_artifact_updates_progress(self, client): + async def test_update_progress_artifact_updates_progress_async( + self, client: httpx.AsyncClient + ): progress = 0.0 @flow async def my_flow(): - artifact_id = await create_progress_artifact(progress) + artifact_id = await acreate_progress_artifact(progress) + assert isinstance(artifact_id, UUID) response = await client.get(f"/artifacts/{artifact_id}") my_artifact = schemas.core.Artifact.model_validate(response.json()) @@ -654,7 +762,7 @@ async def my_flow(): assert my_artifact.type == "progress" new_progress = 50.0 - await update_progress_artifact(artifact_id, new_progress) + await aupdate_progress_artifact(artifact_id, new_progress) response = await client.get(f"/artifacts/{artifact_id}") assert response.status_code == 200 my_artifact = schemas.core.Artifact.model_validate(response.json()) @@ -662,21 +770,54 @@ async def my_flow(): await my_flow() - async def test_update_progress_artifact_in_task(self, client): + def test_update_progress_artifact_updates_progress_sync( + self, sync_client: TestClient + ): + progress = 0.0 + + @flow + def my_flow(): + artifact_id = create_progress_artifact(progress) + assert isinstance(artifact_id, UUID) + + response = sync_client.get(f"/artifacts/{artifact_id}") + my_artifact = schemas.core.Artifact.model_validate(response.json()) + assert my_artifact.data == progress + assert my_artifact.type == "progress" + + new_progress = 50.0 + update_progress_artifact(artifact_id, new_progress) + response = sync_client.get(f"/artifacts/{artifact_id}") + assert response.status_code == 200 + my_artifact = schemas.core.Artifact.model_validate(response.json()) + assert my_artifact.data == new_progress + + my_flow() + + async def test_update_progress_artifact_in_task(self, client: httpx.AsyncClient): @task def my_task(): - task_run_id = get_run_context().task_run.id + run_context = get_run_context() + assert isinstance(run_context, TaskRunContext) + assert run_context.task_run is not None + task_run_id = run_context.task_run.id + artifact_id = create_progress_artifact( key="task-link-artifact-3", progress=0.0, description="my-artifact-description", ) + assert isinstance(artifact_id, UUID) update_progress_artifact(artifact_id, 50.0) return artifact_id, task_run_id @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id + artifact_id, task_run_id = my_task() return artifact_id, flow_run_id, task_run_id @@ -693,16 +834,66 @@ def my_flow(): assert my_progress_artifact.type == "progress" assert my_progress_artifact.description == "my-artifact-description" - async def test_update_progress_artifact_in_flow(self, client): + async def test_update_progress_artifact_in_async_task( + self, client: httpx.AsyncClient + ): + @task + async def my_task(): + run_context = get_run_context() + assert isinstance(run_context, TaskRunContext) + assert run_context.task_run is not None + task_run_id = run_context.task_run.id + + artifact_id_coro = create_progress_artifact( + key="task-link-artifact-3", + progress=0.0, + description="my-artifact-description", + ) + assert asyncio.iscoroutine(artifact_id_coro) + artifact_id = await artifact_id_coro + assert isinstance(artifact_id, UUID) + update_coro = update_progress_artifact(artifact_id, 50.0) + assert asyncio.iscoroutine(update_coro) + await update_coro + return artifact_id, task_run_id + + @flow + async def my_flow(): + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id + + artifact_id, task_run_id = await my_task() + + return artifact_id, flow_run_id, task_run_id + + my_artifact_id, flow_run_id, task_run_id = await my_flow() + + response = await client.get(f"/artifacts/{my_artifact_id}") + assert response.status_code == 200 + my_progress_artifact = schemas.core.Artifact.model_validate(response.json()) + + assert my_progress_artifact.flow_run_id == flow_run_id + assert my_progress_artifact.task_run_id == task_run_id + assert my_progress_artifact.data == 50.0 + assert my_progress_artifact.type == "progress" + assert my_progress_artifact.description == "my-artifact-description" + + async def test_update_progress_artifact_in_flow(self, client: httpx.AsyncClient): @flow def my_flow(): - flow_run_id = get_run_context().flow_run.id + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id artifact_id = create_progress_artifact( key="task-link-artifact-4", progress=0.0, description="my-artifact-description", ) + assert isinstance(artifact_id, UUID) update_progress_artifact(artifact_id, 50.0) return artifact_id, flow_run_id @@ -718,3 +909,187 @@ def my_flow(): assert my_progress_artifact.data == 50.0 assert my_progress_artifact.type == "progress" assert my_progress_artifact.description == "my-artifact-description" + + async def test_update_progress_artifact_in_async_flow( + self, client: httpx.AsyncClient + ): + @flow + async def my_flow(): + run_context = get_run_context() + assert isinstance(run_context, FlowRunContext) + assert run_context.flow_run is not None + flow_run_id = run_context.flow_run.id + + artifact_id_coro = create_progress_artifact( + key="task-link-artifact-4", + progress=0.0, + description="my-artifact-description", + ) + assert asyncio.iscoroutine(artifact_id_coro) + artifact_id = await artifact_id_coro + assert isinstance(artifact_id, UUID) + update_coro = update_progress_artifact(artifact_id, 50.0) + assert asyncio.iscoroutine(update_coro) + await update_coro + + return artifact_id, flow_run_id + + my_artifact_id, flow_run_id = await my_flow() + + response = await client.get(f"/artifacts/{my_artifact_id}") + assert response.status_code == 200 + my_progress_artifact = schemas.core.Artifact.model_validate(response.json()) + + assert my_progress_artifact.flow_run_id == flow_run_id + assert my_progress_artifact.task_run_id is None + assert my_progress_artifact.data == 50.0 + assert my_progress_artifact.type == "progress" + assert my_progress_artifact.description == "my-artifact-description" + + +class TestArtifact: + async def test_artifact_format(self): + """Test creating an artifact and formatting its data""" + data = {"key": "value"} + artifact = Artifact( + type="test", key="test-artifact", description="Test artifact", data=data + ) + + formatted_data = await artifact.aformat() + assert formatted_data == '{"key": "value"}' + + sync_formatted = cast(str, artifact.format(_sync=True)) # pyright: ignore[reportCallIssue] + assert sync_formatted == '{"key": "value"}' + + async def test_artifact_create_methods(self, prefect_client: PrefectClient): + """Test both sync and async create methods""" + data = {"test": "data"} + artifact = Artifact( + type="test", key="test-artifact", description="Test artifact", data=data + ) + + # Test async create + @flow + async def my_async_flow(): + return await artifact.acreate(prefect_client) + + response = await my_async_flow() + assert response.key == "test-artifact" + assert response.type == "test" + assert response.description == "Test artifact" + + # Test sync create + @flow + def my_sync_flow(): + return artifact.create() + + response = my_sync_flow() + assert isinstance(response, ArtifactResponse) + assert response.key == "test-artifact" + assert response.type == "test" + assert response.description == "Test artifact" + + async def test_artifact_get_async(self, prefect_client: PrefectClient): + """Test both sync and async get methods""" + # Create an artifact first + artifact = Artifact( + type="test", key="get-test", description="Test get", data={"test": "get"} + ) + + @flow + async def my_flow(): + return await artifact.acreate(prefect_client) + + await my_flow() + + # Test async get + retrieved = await Artifact.aget("get-test", prefect_client) + assert retrieved is not None + assert retrieved.key == "get-test" + assert retrieved.type == "test" + + def test_artifact_get_sync(self, prefect_client: PrefectClient): + # Create an artifact first + artifact = Artifact( + type="test", key="get-test", description="Test get", data={"test": "get"} + ) + + @flow + def my_flow(): + artifact.create() + + my_flow() + + # Test sync get + retrieved = Artifact.get("get-test") + assert isinstance(retrieved, ArtifactResponse) + assert retrieved.key == "get-test" + assert retrieved.type == "test" + + def test_artifact_get_or_create_sync(self): + # Test sync get_or_create + + @flow + def my_flow(): + return Artifact.get_or_create( + key="get-or-create-test-sync", + type="test", + description="Test sync get or create", + data={"test": "sync"}, + ) + + get_or_create_result = my_flow() + assert isinstance(get_or_create_result, tuple) + artifact, created = get_or_create_result + assert created is True + assert artifact.key == "get-or-create-test-sync" + + get_or_create_result = my_flow() + assert isinstance(get_or_create_result, tuple) + artifact, created = get_or_create_result + assert created is False + assert artifact.key == "get-or-create-test-sync" + + async def test_artifact_get_or_create_async(self): + @flow + async def my_flow(): + return await Artifact.aget_or_create( + key="get-or-create-test", + type="test", + description="Test get or create", + data={"test": "get-or-create"}, + ) + + artifact, created = await my_flow() + assert created is True + assert artifact.key == "get-or-create-test" + + # Test getting existing - should not create new + artifact, created = await my_flow() + assert created is False + assert artifact.key == "get-or-create-test" + + async def test_artifact_creation_outside_run_context_warns(self): + """Test that creating artifacts outside run context raises warning""" + artifact = Artifact( + type="test", + key="warning-test", + description="Test warning", + data={"test": "warning"}, + ) + + with pytest.warns(FutureWarning): + await artifact.acreate() + + with pytest.warns(FutureWarning): + artifact.create(_sync=True) # pyright: ignore[reportCallIssue] + + def test_get_nonexistent_artifact_sync(self): + """Test getting an artifact that doesn't exist returns None""" + retrieved = Artifact.get("nonexistent-key") + assert retrieved is None + + async def test_get_nonexistent_artifact_async(self, prefect_client: PrefectClient): + """Test getting an artifact that doesn't exist returns None""" + retrieved = await Artifact.aget("nonexistent-key", prefect_client) + assert retrieved is None