Skip to content

[v2-11-test] Ensuring XCom return value can be mapped for dynamically-mapped @task_group's #51668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 26, 2025

Conversation

jroachgolf84
Copy link
Collaborator

Fix implemented for #51109 in v2-11. Closely mirrors #51556.

#51109 outlined an issue where trying to dynamically map over an XCom return value using a custom key resulted in unexpected behavior. Rather than leveraging the list that was stored at that key, it would dynamically-map TaskGroups using the key-value pairs in the return value itself.

This PR addresses this by applying the same logic present in the _TaskDecorator class to _TaskGroupFactory. When attempting to map over a list stored via a key b in the dict return value of a Task t`, the following exception will be raised:

ValueError: cannot map over XCom with custom key 'b' from <Task(_PythonDecoratedOperator): t>`

Rather than the faulty logic outlined in #51109, the DAG will fail to parse. This matches the same behavior as @task. This logic was implemented for both .expand() and .expand_kwargs().

To test this, the following DAG was written. The result is now a DAG Import Error, with the stack trace below.

import pendulum
from airflow.decorators import dag, task, task_group


@dag(schedule=None, start_date=pendulum.datetime(2022, 1, 1))
def pipeline():
    @task
    def t():
        return {"b": [2, 3]}

    @task_group()
    def tg(a, b):
        pass

    tg.partial(a=1).expand(b=t()["b"])

pipeline()
Traceback (most recent call last):
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 132, in ensure_xcomarg_return_value
    ensure_xcomarg_return_value(v)
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 127, in ensure_xcomarg_return_value
    raise ValueError(f"cannot map over XCom with custom key {key!r} from {operator}")
ValueError: cannot map over XCom with custom key 'b' from <Task(_PythonDecoratedOperator): t>

@jroachgolf84 jroachgolf84 requested review from rawwar and uranusjr June 12, 2025 18:05
@jroachgolf84 jroachgolf84 changed the title [v2-11] Ensuring XCom return value can be mapped for dynamically-mapped @task_group's [v2-11-test] Ensuring XCom return value can be mapped for dynamically-mapped @task_group's Jun 12, 2025
@jroachgolf84
Copy link
Collaborator Author

@rawwar and @uranusjr, do you mind taking a look at this PR? Thanks!

@RNHTTR RNHTTR merged commit 9ef3dab into apache:v2-11-test Jun 26, 2025
42 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants