Skip to content

Ensuring XCom return value can be mapped for dynamically-mapped @task_group's #51556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 1, 2025

Conversation

jroachgolf84
Copy link
Collaborator

@jroachgolf84 jroachgolf84 commented Jun 9, 2025

#51109 outlined an issue where trying to dynamically map over an XCom return value using a custom key resulted in unexpected behavior. Rather than leveraging the list that was stored at that key, it would dynamically-map TaskGroups using the key-value pairs in the return value itself.

This PR addresses this by applying the same logic present in the _TaskDecorator class to _TaskGroupFactory. When attempting to map over a list stored via a key b in the dict return value of a Task t`, the following exception will be raised:

ValueError: cannot map over XCom with custom key 'b' from <Task(_PythonDecoratedOperator): t>`

Rather than the faulty logic outlined in #51109, the DAG will fail to parse. This matches the same behavior as @task. This logic was implemented for both .expand() and .expand_kwargs().

To test this, the following DAG was written. The result is now a DAG Import Error, with the stack trace below.

import pendulum
from airflow.decorators import dag, task, task_group


@dag(schedule=None, start_date=pendulum.datetime(2022, 1, 1))
def pipeline():
    @task
    def t():
        return {"b": [2, 3]}

    @task_group()
    def tg(a, b):
        pass

    tg.partial(a=1).expand(b=t()["b"])

pipeline()
Traceback (most recent call last):
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 132, in ensure_xcomarg_return_value
    ensure_xcomarg_return_value(v)
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 127, in ensure_xcomarg_return_value
    raise ValueError(f"cannot map over XCom with custom key {key!r} from {operator}")
ValueError: cannot map over XCom with custom key 'b' from <Task(_PythonDecoratedOperator): t>

@jroachgolf84
Copy link
Collaborator Author

cc: @rawwar, do you mind taking a peek at this one?

@Dev-iL
Copy link
Contributor

Dev-iL commented Jun 12, 2025

Why is the expected behavior that mapping over a custom key is forbidden?

Would it possible to add a fix suggestion (or a doc link with fix suggestions) to the error message? It could be one of the following:

  • Encouraging the user to split the task such that mapped-on output is the only output. (Workaround suggested by @jroachgolf84 )
  • Add a passthrough task for unpacking the specific output to map on. (Workaround mentioned in the issue)
  • Mapping over a task group docs (after adding a note on this topic).

@uranusjr
Copy link
Member

Custom keys were forbidden to reduce the workload (both design and implementation) when the feature was initially rolled out. Feel free to open an issue to discuss how it can happen and propose an implementation.

@jroachgolf84
Copy link
Collaborator Author

From @rawwar, moving this here.

@jroachgolf84 , the issue seems to be reported for 2.x. While the PR is raised for 3.x. I'm going through the issue and will also review the PR. But, can you confirm if you verified the bug in 3.x as well? And, will you be raising a separate PR for 2.x branch?(If yes, you need to raise it to v2-11-test)

I verified this in 3.x - I will make sure to create a PR to v2-11-test.

@jroachgolf84
Copy link
Collaborator Author

@rawwar, a PR was opened for v2-11. Please see here: #51668

@rawwar rawwar self-requested a review June 17, 2025 18:13
Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be done in a separate PR, but the pattern I noticed is we’re always creating an ExpandInput object, and call this ensure function immediately afterwards. Maybe we should call the function in the class’s init instead.

@amoghrajesh amoghrajesh merged commit 5af60f5 into apache:main Jul 1, 2025
71 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants