Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add xcom map_index as a filter to xcom endpoint #32453

Merged
merged 3 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def get_xcom_entries(
dag_id: str,
dag_run_id: str,
task_id: str,
map_index: int | None = None,
xcom_key: str | None = None,
limit: int | None,
offset: int | None = None,
session: Session = NEW_SESSION,
Expand All @@ -67,6 +69,10 @@ def get_xcom_entries(
query = query.where(XCom.task_id == task_id)
if dag_run_id != "~":
query = query.where(DR.run_id == dag_run_id)
if map_index is not None:
query = query.where(XCom.map_index == map_index)
if xcom_key is not None:
query = query.where(XCom.key == xcom_key)
query = query.order_by(DR.execution_date, XCom.task_id, XCom.dag_id, XCom.key)
total_entries = session.execute(select(func.count()).select_from(query)).scalar()
query = session.scalars(query.offset(offset).limit(limit))
Expand All @@ -88,6 +94,7 @@ def get_xcom_entry(
task_id: str,
dag_run_id: str,
xcom_key: str,
map_index: int = -1,
deserialize: bool = False,
session: Session = NEW_SESSION,
) -> APIResponse:
Expand All @@ -97,7 +104,9 @@ def get_xcom_entry(
else:
query = select(XCom)

query = query.where(XCom.dag_id == dag_id, XCom.task_id == task_id, XCom.key == xcom_key)
query = query.where(
XCom.dag_id == dag_id, XCom.task_id == task_id, XCom.key == xcom_key, XCom.map_index == map_index
)
query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id == DR.run_id))
query = query.where(DR.run_id == dag_run_id)

Expand Down
13 changes: 13 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,8 @@ paths:
operationId: get_xcom_entries
tags: [XCom]
parameters:
- $ref: '#/components/parameters/FilterMapIndex'
- $ref: '#/components/parameters/FilterXcomKey'
- $ref: '#/components/parameters/PageLimit'
- $ref: '#/components/parameters/PageOffset'
responses:
Expand Down Expand Up @@ -1600,6 +1602,7 @@ paths:
operationId: get_xcom_entry
tags: [XCom]
parameters:
- $ref: '#/components/parameters/FilterMapIndex'
- in: query
name: deserialize
schema:
Expand Down Expand Up @@ -3489,6 +3492,8 @@ components:
execution_date:
type: string
format: datetime
map_index:
type: integer
task_id:
type: string
dag_id:
Expand Down Expand Up @@ -5095,6 +5100,14 @@ components:

*New in version 2.6.0*

FilterXcomKey:
in: query
name: xcom_key
schema:
type: string
required: false
description: Only filter the XCom records which have the provided key.

# Other parameters
FileToken:
in: path
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Meta:
key = auto_field()
timestamp = auto_field()
execution_date = auto_field()
map_index = auto_field()
task_id = auto_field()
dag_id = auto_field()

Expand Down
9 changes: 9 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,7 @@ export interface components {
timestamp?: string;
/** Format: datetime */
execution_date?: string;
map_index?: number;
task_id?: string;
dag_id?: string;
};
Expand Down Expand Up @@ -2390,6 +2391,8 @@ export interface components {
* *New in version 2.6.0*
*/
Paused: boolean;
/** @description Only filter the XCom records which have the provided key. */
FilterXcomKey: string;
/**
* @description The key containing the encrypted path to the file. Encryption and decryption take place only on
* the server. This prevents the client from reading an non-DAG file. This also ensures API
Expand Down Expand Up @@ -3839,6 +3842,10 @@ export interface operations {
task_id: components["parameters"]["TaskID"];
};
query: {
/** Filter on map index for mapped task. */
map_index?: components["parameters"]["FilterMapIndex"];
/** Only filter the XCom records which have the provided key. */
xcom_key?: components["parameters"]["FilterXcomKey"];
/** The numbers of items to return. */
limit?: components["parameters"]["PageLimit"];
/** The number of items to skip before starting to collect the result set. */
Expand Down Expand Up @@ -3869,6 +3876,8 @@ export interface operations {
xcom_key: components["parameters"]["XComKey"];
};
query: {
/** Filter on map index for mapped task. */
map_index?: components["parameters"]["FilterMapIndex"];
/**
* Whether to deserialize an XCom value when using a custom XCom backend.
*
Expand Down
Loading