Description
Apache Airflow version
3.0.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
I ran the following code in airflow 3.0.2. I was hoping to see an asset in the airflow ui, none was shown. Also I checked the code for consumer the asset alias doesn't have the necessary assets in them.
Example DAG for demonstrating the behavior of the AssetAlias feature in Airflow, including conditional and
asset expression-based scheduling.
Notes on usage:
Turn on all the DAGs.
Once the "asset_s3_bucket_producer" DAG is triggered, the "asset_s3_bucket_consumer" DAG should be triggered upon completion.
This is because the asset alias "example-alias" is used to add an asset event to the asset "s3://bucket/my-task"
during the "produce_asset_events_through_asset_alias" task.
As the DAG "asset-alias-consumer" relies on asset alias "example-alias" which was previously unresolved,
the DAG "asset-alias-consumer" (along with all the DAGs in the same file) will be re-parsed and
thus update its schedule to the asset "s3://bucket/my-task" and will also be triggered.
`
from future import annotations
import pendulum
from airflow.sdk import DAG, Asset, AssetAlias, task
with DAG(
dag_id="asset_alias_example_alias_producer",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
tags=["producer", "asset-alias"],
):
@task(outlets=[AssetAlias("example-alias")])
def produce_asset_events_through_asset_alias(*, outlet_events=None):
bucket_name = "bucket-99"
object_path = "my-task"
outlet_events[AssetAlias("example-alias")].add(Asset(f"s3://{bucket_name}/{object_path}"))
produce_asset_events_through_asset_alias()
with DAG(
dag_id="asset_alias_example_alias_consumer",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[AssetAlias("example-alias")],
catchup=False,
tags=["consumer", "asset-alias"],
):
@task(inlets=[AssetAlias("example-alias")])
def consume_asset_event_from_asset_alias(*, inlet_events=None):
print(f"inlet_events received: {inlet_events}") # Print the entire object
//output shown is inlet_events received: InletEventsAccessors(_inlets=[AssetAlias(name='example-alias', group='asset')], _assets={}, _asset_aliases={AssetAliasUniqueKey(name='example-alias'): AssetAlias(name='example-alias', group='asset')}): chan="stdout": source="task"
for event in inlet_events[AssetAlias("example-alias")]:
// doesn't go here
print(event)
consume_asset_event_from_asset_alias()
`
What you think should happen instead?
No response
How to reproduce
Run the above dag in airflow 3.0.2
Operating System
Macbook
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct