-
Notifications
You must be signed in to change notification settings - Fork 15.3k
Update apache Cassandra and Kylin examples #52497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Update apache Cassandra and Kylin examples #52497
Conversation
Thats it apache providers example are fine :) |
@@ -27,6 +27,7 @@ | |||
|
|||
from airflow import DAG | |||
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator | |||
from airflow.sdk import chain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this according to Airflow 3, whats the plan if someone uses it for Airflow 2 it would fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we need to add fallback if its Airflow 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we should have compat until we get rid of the double paths imo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can try with try / except
table_sensor = CassandraTableSensor(task_id="cassandra_table_sensor", table="<table_name>") | ||
|
||
record_sensor = CassandraRecordSensor( | ||
task_id="cassandra_record_sensor", keys={"p1": "v1", "p2": "v2"}, table="<table_name>" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With addition of the table
field, will the dag be a functional one? Can i just run it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah table is mandatory filed, so updated that one. i think if you have correct conn_id then it should work? i have not ran it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting that it wasn't caught.
task_id="cassandra_record_sensor", keys={"p1": "v1", "p2": "v2"}, table="<table_name>" | ||
) | ||
|
||
# Replace <table_name> with your actual table name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets move this to line 47?
start_time="{{ task_instance.xcom_pull(task_ids='gen_build_time')['date_start'] }}", | ||
end_time="{{ task_instance.xcom_pull(task_ids='gen_build_time')['date_end'] }}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the older way was more readable..? WDYT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That wont work, its wrong way declared IMHO, i tried it with the dag was failing with xcom pulling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is probably because you are adding the task call later in chain so its delaying execution ig. Anyways, this is ok too.
@@ -27,6 +27,7 @@ | |||
|
|||
from airflow import DAG | |||
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator | |||
from airflow.sdk import chain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we should have compat until we get rid of the double paths imo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments @gopidesupavan.
table_sensor = CassandraTableSensor(task_id="cassandra_table_sensor", table="<table_name>") | ||
|
||
record_sensor = CassandraRecordSensor( | ||
task_id="cassandra_record_sensor", keys={"p1": "v1", "p2": "v2"}, table="<table_name>" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting that it wasn't caught.
@@ -27,6 +27,7 @@ | |||
|
|||
from airflow import DAG | |||
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator | |||
from airflow.sdk import chain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can try with try / except
start_time="{{ task_instance.xcom_pull(task_ids='gen_build_time')['date_start'] }}", | ||
end_time="{{ task_instance.xcom_pull(task_ids='gen_build_time')['date_end'] }}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is probably because you are adding the task call later in chain so its delaying execution ig. Anyways, this is ok too.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.