Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Unable to get mapped task xcom value via REST API. Getting MultipleResultsFound error #32367

Closed
1 of 2 tasks
pvaling opened this issue Jul 5, 2023 · 2 comments · Fixed by #32453
Closed
1 of 2 tasks
Assignees
Labels
area:API Airflow's REST/HTTP API good first issue kind:bug This is a clearly a bug

Comments

@pvaling
Copy link

pvaling commented Jul 5, 2023

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Airflow 2.3.4 (but actual code seems to have same behaviour).

I have mapped task with xcom value.

I want to get xcom value of particular instance or xcom values of all task instances.

I am using standard REST API method /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}

And it throws an error
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2850, in one_or_none return self._iter().one_or_none() File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1510, in one_or_none return self._only_one_row( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 614, in _only_one_row raise exc.MultipleResultsFound( sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one or none was required

Is it any way of getting xcom of mapped tasks via API?

What you think should happen instead

No response

How to reproduce

Make dag with mapped task. Return xcom value in every task. Try to get xcom value via API.

Operating System

ubuntu 20.04

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

Standard

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@pvaling pvaling added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jul 5, 2023
@potiuk potiuk added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Jul 5, 2023
@tirkarthi
Copy link
Contributor

The endpoint doesn't accept map_index. Would the fix be to allow map_index as query parameter defaulting to -1 for backwards compatibility through which users can get the relevant xcom? Another approach would be a new endpoint that has map_index in the url path but that wouldn't fix this issue though.

def get_xcom_entry(
*,
dag_id: str,
task_id: str,
dag_run_id: str,
xcom_key: str,
deserialize: bool = False,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get an XCom entry."""
if deserialize:
query = select(XCom, XCom.value)
else:
query = select(XCom)
query = query.where(XCom.dag_id == dag_id, XCom.task_id == task_id, XCom.key == xcom_key)
query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id == DR.run_id))
query = query.where(DR.run_id == dag_run_id)
if deserialize:
item = session.execute(query).one_or_none()
else:
item = session.scalars(query).one_or_none()

@pvaling
Copy link
Author

pvaling commented Jul 5, 2023

I think that first approach is better (map_index in existing endpoint with fallback to -1).

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
area:API Airflow's REST/HTTP API good first issue kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants