-
Notifications
You must be signed in to change notification settings - Fork 13.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Updating miscellaneous provider DAGs to use TaskFlow API where applic…
…able (#18278)
- Loading branch information
Showing
12 changed files
with
160 additions
and
227 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,92 +31,61 @@ | |
from datetime import date, timedelta | ||
|
||
from airflow import DAG | ||
from airflow.decorators import task | ||
from airflow.operators.bash import BashOperator | ||
from airflow.operators.python import PythonOperator | ||
from airflow.providers.apache.hive.operators.hive import HiveOperator | ||
from airflow.utils.dates import days_ago | ||
|
||
# -------------------------------------------------------------------------------- | ||
# Create a few placeholder scripts. In practice these would be different python | ||
# script files, which are imported in this section with absolute or relative imports | ||
# -------------------------------------------------------------------------------- | ||
|
||
|
||
def fetchtweets(): | ||
@task | ||
def fetch_tweets(): | ||
""" | ||
This is a placeholder for fetchtweets. | ||
This task should call Twitter API and retrieve tweets from yesterday from and to for the four twitter | ||
users (Twitter_A,..,Twitter_D) There should be eight csv output files generated by this task and naming | ||
convention is direction(from or to)_twitterHandle_date.csv | ||
""" | ||
|
||
|
||
def cleantweets(): | ||
@task | ||
def clean_tweets(): | ||
""" | ||
This is a placeholder for cleantweets. | ||
This is a placeholder to clean the eight files. In this step you can get rid of or cherry pick columns | ||
and different parts of the text. | ||
""" | ||
|
||
|
||
def analyzetweets(): | ||
@task | ||
def analyze_tweets(): | ||
""" | ||
This is a placeholder for analyzetweets. | ||
This is a placeholder to analyze the twitter data. Could simply be a sentiment analysis through algorithms | ||
like bag of words or something more complicated. You can also take a look at Web Services to do such | ||
tasks. | ||
""" | ||
|
||
|
||
def transfertodb(): | ||
@task | ||
def transfer_to_db(): | ||
""" | ||
This is a placeholder for transfertodb. | ||
This is a placeholder to extract summary from Hive data and store it to MySQL. | ||
""" | ||
|
||
|
||
with DAG( | ||
dag_id='example_twitter_dag', | ||
default_args={ | ||
'owner': 'Ekhtiar', | ||
'depends_on_past': False, | ||
'email': ['[email protected]'], | ||
'email_on_failure': False, | ||
'email_on_retry': False, | ||
'retries': 1, | ||
'retry_delay': timedelta(minutes=5), | ||
}, | ||
schedule_interval="@daily", | ||
start_date=days_ago(5), | ||
tags=['example'], | ||
) as dag: | ||
fetch_tweets = fetch_tweets() | ||
clean_tweets = clean_tweets() | ||
analyze_tweets = analyze_tweets() | ||
hive_to_mysql = transfer_to_db() | ||
|
||
# -------------------------------------------------------------------------------- | ||
# This task should call Twitter API and retrieve tweets from yesterday from and to | ||
# for the four twitter users (Twitter_A,..,Twitter_D) There should be eight csv | ||
# output files generated by this task and naming convention | ||
# is direction(from or to)_twitterHandle_date.csv | ||
# -------------------------------------------------------------------------------- | ||
|
||
fetch_tweets = PythonOperator(task_id='fetch_tweets', python_callable=fetchtweets) | ||
|
||
# -------------------------------------------------------------------------------- | ||
# Clean the eight files. In this step you can get rid of or cherry pick columns | ||
# and different parts of the text | ||
# -------------------------------------------------------------------------------- | ||
|
||
clean_tweets = PythonOperator(task_id='clean_tweets', python_callable=cleantweets) | ||
|
||
clean_tweets << fetch_tweets | ||
|
||
# -------------------------------------------------------------------------------- | ||
# In this section you can use a script to analyze the twitter data. Could simply | ||
# be a sentiment analysis through algorithms like bag of words or something more | ||
# complicated. You can also take a look at Web Services to do such tasks | ||
# -------------------------------------------------------------------------------- | ||
|
||
analyze_tweets = PythonOperator(task_id='analyze_tweets', python_callable=analyzetweets) | ||
|
||
analyze_tweets << clean_tweets | ||
|
||
# -------------------------------------------------------------------------------- | ||
# Although this is the last task, we need to declare it before the next tasks as we | ||
# will use set_downstream This task will extract summary from Hive data and store | ||
# it to MySQL | ||
# -------------------------------------------------------------------------------- | ||
|
||
hive_to_mysql = PythonOperator(task_id='hive_to_mysql', python_callable=transfertodb) | ||
fetch_tweets >> clean_tweets >> analyze_tweets | ||
|
||
# -------------------------------------------------------------------------------- | ||
# The following tasks are generated using for loop. The first task puts the eight | ||
|
@@ -137,49 +106,42 @@ def transfertodb(): | |
|
||
for channel in to_channels: | ||
|
||
file_name = "to_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv" | ||
file_name = f"to_{channel}_{dt}.csv" | ||
|
||
load_to_hdfs = BashOperator( | ||
task_id="put_" + channel + "_to_hdfs", | ||
bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " | ||
+ local_dir | ||
+ file_name | ||
+ hdfs_dir | ||
+ channel | ||
+ "/", | ||
task_id=f"put_{channel}_to_hdfs", | ||
bash_command=( | ||
f"HADOOP_USER_NAME=hdfs hadoop fs -put -f {local_dir}{file_name}{hdfs_dir}{channel}/" | ||
), | ||
) | ||
|
||
load_to_hdfs << analyze_tweets | ||
|
||
load_to_hive = HiveOperator( | ||
task_id="load_" + channel + "_to_hive", | ||
hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name + "' " | ||
"INTO TABLE " + channel + " " | ||
"PARTITION(dt='" + dt + "')", | ||
task_id=f"load_{channel}_to_hive", | ||
hql=( | ||
f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}'" | ||
f"INTO TABLE {channel}" | ||
f"PARTITION(dt='{dt}')" | ||
), | ||
) | ||
load_to_hive << load_to_hdfs | ||
load_to_hive >> hive_to_mysql | ||
|
||
analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql | ||
|
||
for channel in from_channels: | ||
file_name = "from_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv" | ||
file_name = f"from_{channel}_{dt}.csv" | ||
load_to_hdfs = BashOperator( | ||
task_id="put_" + channel + "_to_hdfs", | ||
bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " | ||
+ local_dir | ||
+ file_name | ||
+ hdfs_dir | ||
+ channel | ||
+ "/", | ||
task_id=f"put_{channel}_to_hdfs", | ||
bash_command=( | ||
f"HADOOP_USER_NAME=hdfs hadoop fs -put -f {local_dir}{file_name}{hdfs_dir}{channel}/" | ||
), | ||
) | ||
|
||
load_to_hdfs << analyze_tweets | ||
|
||
load_to_hive = HiveOperator( | ||
task_id="load_" + channel + "_to_hive", | ||
hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name + "' " | ||
"INTO TABLE " + channel + " " | ||
"PARTITION(dt='" + dt + "')", | ||
task_id=f"load_{channel}_to_hive", | ||
hql=( | ||
f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}' " | ||
f"INTO TABLE {channel} " | ||
f"PARTITION(dt='{dt}')" | ||
), | ||
) | ||
|
||
load_to_hive << load_to_hdfs | ||
load_to_hive >> hive_to_mysql | ||
analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql |
Oops, something went wrong.