Skip to content

Assets belonging to AssetAlias do not get created and do not show up on Airflow UI #52438

Open
@rohitghatol

Description

@rohitghatol

Apache Airflow version

3.0.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I ran the following code in airflow 3.0.2. I was hoping to see an asset in the airflow ui, none was shown. Also I checked the code for consumer the asset alias doesn't have the necessary assets in them.

Example DAG for demonstrating the behavior of the AssetAlias feature in Airflow, including conditional and
asset expression-based scheduling.

Notes on usage:

Turn on all the DAGs.

Once the "asset_s3_bucket_producer" DAG is triggered, the "asset_s3_bucket_consumer" DAG should be triggered upon completion.
This is because the asset alias "example-alias" is used to add an asset event to the asset "s3://bucket/my-task"
during the "produce_asset_events_through_asset_alias" task.
As the DAG "asset-alias-consumer" relies on asset alias "example-alias" which was previously unresolved,
the DAG "asset-alias-consumer" (along with all the DAGs in the same file) will be re-parsed and
thus update its schedule to the asset "s3://bucket/my-task" and will also be triggered.

`
from future import annotations

import pendulum

from airflow.sdk import DAG, Asset, AssetAlias, task

with DAG(
dag_id="asset_alias_example_alias_producer",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
tags=["producer", "asset-alias"],
):

@task(outlets=[AssetAlias("example-alias")])
def produce_asset_events_through_asset_alias(*, outlet_events=None):
    bucket_name = "bucket-99"
    object_path = "my-task"
    outlet_events[AssetAlias("example-alias")].add(Asset(f"s3://{bucket_name}/{object_path}"))


produce_asset_events_through_asset_alias()

with DAG(
dag_id="asset_alias_example_alias_consumer",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[AssetAlias("example-alias")],
catchup=False,
tags=["consumer", "asset-alias"],
):

@task(inlets=[AssetAlias("example-alias")])
def consume_asset_event_from_asset_alias(*, inlet_events=None):
     print(f"inlet_events received: {inlet_events}") # Print the entire object
     //output shown is inlet_events received: InletEventsAccessors(_inlets=[AssetAlias(name='example-alias', group='asset')], _assets={}, _asset_aliases={AssetAliasUniqueKey(name='example-alias'): AssetAlias(name='example-alias', group='asset')}): chan="stdout": source="task"
    for event in inlet_events[AssetAlias("example-alias")]:
        // doesn't go here
        print(event)


consume_asset_event_from_asset_alias()

`

What you think should happen instead?

No response

How to reproduce

Run the above dag in airflow 3.0.2

Operating System

Macbook

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:UIRelated to UI/UX. For Frontend Developers.area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetpriority:mediumBug that should be fixed before next release but would not block a release

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions