Skip to content

Commit

Permalink
Updating miscellaneous provider DAGs to use TaskFlow API where applic…
Browse files Browse the repository at this point in the history
…able (#18278)
  • Loading branch information
josh-fell committed Sep 27, 2021
1 parent 391da64 commit a458fcc
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 227 deletions.
11 changes: 5 additions & 6 deletions airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
# under the License.
import os

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.utils.dates import days_ago

BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345')


@task(task_id="s3_bucket_dag_add_keys_to_bucket")
def upload_keys():
"""This is a python callback to add keys into the s3 bucket"""
# add keys to bucket
Expand All @@ -41,24 +42,22 @@ def upload_keys():
dag_id='s3_bucket_dag',
schedule_interval=None,
start_date=days_ago(2),
default_args={"bucket_name": BUCKET_NAME},
max_active_runs=1,
tags=['example'],
) as dag:

# [START howto_operator_s3_bucket]
create_bucket = S3CreateBucketOperator(
task_id='s3_bucket_dag_create',
bucket_name=BUCKET_NAME,
region_name='us-east-1',
)

add_keys_to_bucket = PythonOperator(
task_id="s3_bucket_dag_add_keys_to_bucket", python_callable=upload_keys
)
# Using a task-decorated function to add keys
add_keys_to_bucket = upload_keys()

delete_bucket = S3DeleteBucketOperator(
task_id='s3_bucket_dag_delete',
bucket_name=BUCKET_NAME,
force_delete=True,
)
# [END howto_operator_s3_bucket]
Expand Down
31 changes: 15 additions & 16 deletions airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from os import getenv

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
Expand All @@ -35,12 +36,14 @@
# [END howto_operator_s3_to_redshift_env_variables]


def _add_sample_data_to_s3():
@task(task_id='setup__add_sample_data_to_s3')
def add_sample_data_to_s3():
s3_hook = S3Hook()
s3_hook.load_string("0,Airflow", f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET, replace=True)


def _remove_sample_data_from_s3():
@task(task_id='teardown__remove_sample_data_from_s3')
def remove_sample_data_from_s3():
s3_hook = S3Hook()
if s3_hook.check_for_key(f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET):
s3_hook.delete_objects(S3_BUCKET, f'{S3_KEY}/{REDSHIFT_TABLE}')
Expand All @@ -49,9 +52,8 @@ def _remove_sample_data_from_s3():
with DAG(
dag_id="example_s3_to_redshift", start_date=days_ago(1), schedule_interval=None, tags=['example']
) as dag:
setup__task_add_sample_data_to_s3 = PythonOperator(
python_callable=_add_sample_data_to_s3, task_id='setup__add_sample_data_to_s3'
)
add_sample_data_to_s3 = add_sample_data_to_s3()

setup__task_create_table = PostgresOperator(
sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name varchar)',
postgres_conn_id='redshift_default',
Expand All @@ -72,14 +74,11 @@ def _remove_sample_data_from_s3():
postgres_conn_id='redshift_default',
task_id='teardown__drop_table',
)
teardown__task_remove_sample_data_from_s3 = PythonOperator(
python_callable=_remove_sample_data_from_s3, task_id='teardown__remove_sample_data_from_s3'
)
(
[setup__task_add_sample_data_to_s3, setup__task_create_table]
>> task_transfer_s3_to_redshift
>> [
teardown__task_drop_table,
teardown__task_remove_sample_data_from_s3,
]

remove_sample_data_from_s3 = remove_sample_data_from_s3()

chain(
[add_sample_data_to_s3, setup__task_create_table],
task_transfer_s3_to_redshift,
[teardown__task_drop_table, remove_sample_data_from_s3],
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
***Screenshot:***
<img src="http://i.imgur.com/rRpSO12.png" width="99%"/>

***Example Structure:*** In this example dag, we are collecting tweets for four users account or twitter handle. Each twitter handle has two channels, incoming tweets and outgoing tweets. Hence, in this example, by running the fetch_tweet task, we should have eight output files. For better management, each of the eight output files should be saved with the yesterday's date (we are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are using three kind of operators: PythonOperator, BashOperator, and HiveOperator. However, for this example only the Python scripts are stored externally. Hence this example DAG only has the following directory structure:
***Example Structure:*** In this example dag, we are collecting tweets for four users account or twitter handle. Each twitter handle has two channels, incoming tweets and outgoing tweets. Hence, in this example, by running the fetch_tweet task, we should have eight output files. For better management, each of the eight output files should be saved with the yesterday's date (we are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are using two kinds of operators (BashOperator and HiveOperator) along with task-decorated functions. However, for this example only the Python scripts are stored externally. Hence this example DAG only has the following directory structure:

The python functions here are just placeholders. In case you are interested to actually make this DAG fully functional, first start with filling out the scripts as separate files and importing them into the DAG with absolute or relative import. My approach was to store the retrieved data in memory using Pandas dataframe first, and then use the built in method to save the CSV file on hard-disk.
The eight different CSV files are then put into eight different folders within HDFS. Each of the newly inserted files are then loaded into eight different external hive tables. Hive tables can be external or internal. In this case, we are inserting the data right into the table, and so we are making our tables internal. Each file is inserted into the respected Hive table named after the twitter channel, i.e. toTwitter_A or fromTwitter_A. It is also important to note that when we created the tables, we facilitated for partitioning by date using the variable dt and declared comma as the row deliminator. The partitioning is very handy and ensures our query execution time remains constant even with growing volume of data.
Expand Down
134 changes: 48 additions & 86 deletions airflow/providers/apache/hive/example_dags/example_twitter_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,92 +31,61 @@
from datetime import date, timedelta

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.utils.dates import days_ago

# --------------------------------------------------------------------------------
# Create a few placeholder scripts. In practice these would be different python
# script files, which are imported in this section with absolute or relative imports
# --------------------------------------------------------------------------------


def fetchtweets():
@task
def fetch_tweets():
"""
This is a placeholder for fetchtweets.
This task should call Twitter API and retrieve tweets from yesterday from and to for the four twitter
users (Twitter_A,..,Twitter_D) There should be eight csv output files generated by this task and naming
convention is direction(from or to)_twitterHandle_date.csv
"""


def cleantweets():
@task
def clean_tweets():
"""
This is a placeholder for cleantweets.
This is a placeholder to clean the eight files. In this step you can get rid of or cherry pick columns
and different parts of the text.
"""


def analyzetweets():
@task
def analyze_tweets():
"""
This is a placeholder for analyzetweets.
This is a placeholder to analyze the twitter data. Could simply be a sentiment analysis through algorithms
like bag of words or something more complicated. You can also take a look at Web Services to do such
tasks.
"""


def transfertodb():
@task
def transfer_to_db():
"""
This is a placeholder for transfertodb.
This is a placeholder to extract summary from Hive data and store it to MySQL.
"""


with DAG(
dag_id='example_twitter_dag',
default_args={
'owner': 'Ekhtiar',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
schedule_interval="@daily",
start_date=days_ago(5),
tags=['example'],
) as dag:
fetch_tweets = fetch_tweets()
clean_tweets = clean_tweets()
analyze_tweets = analyze_tweets()
hive_to_mysql = transfer_to_db()

# --------------------------------------------------------------------------------
# This task should call Twitter API and retrieve tweets from yesterday from and to
# for the four twitter users (Twitter_A,..,Twitter_D) There should be eight csv
# output files generated by this task and naming convention
# is direction(from or to)_twitterHandle_date.csv
# --------------------------------------------------------------------------------

fetch_tweets = PythonOperator(task_id='fetch_tweets', python_callable=fetchtweets)

# --------------------------------------------------------------------------------
# Clean the eight files. In this step you can get rid of or cherry pick columns
# and different parts of the text
# --------------------------------------------------------------------------------

clean_tweets = PythonOperator(task_id='clean_tweets', python_callable=cleantweets)

clean_tweets << fetch_tweets

# --------------------------------------------------------------------------------
# In this section you can use a script to analyze the twitter data. Could simply
# be a sentiment analysis through algorithms like bag of words or something more
# complicated. You can also take a look at Web Services to do such tasks
# --------------------------------------------------------------------------------

analyze_tweets = PythonOperator(task_id='analyze_tweets', python_callable=analyzetweets)

analyze_tweets << clean_tweets

# --------------------------------------------------------------------------------
# Although this is the last task, we need to declare it before the next tasks as we
# will use set_downstream This task will extract summary from Hive data and store
# it to MySQL
# --------------------------------------------------------------------------------

hive_to_mysql = PythonOperator(task_id='hive_to_mysql', python_callable=transfertodb)
fetch_tweets >> clean_tweets >> analyze_tweets

# --------------------------------------------------------------------------------
# The following tasks are generated using for loop. The first task puts the eight
Expand All @@ -137,49 +106,42 @@ def transfertodb():

for channel in to_channels:

file_name = "to_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv"
file_name = f"to_{channel}_{dt}.csv"

load_to_hdfs = BashOperator(
task_id="put_" + channel + "_to_hdfs",
bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f "
+ local_dir
+ file_name
+ hdfs_dir
+ channel
+ "/",
task_id=f"put_{channel}_to_hdfs",
bash_command=(
f"HADOOP_USER_NAME=hdfs hadoop fs -put -f {local_dir}{file_name}{hdfs_dir}{channel}/"
),
)

load_to_hdfs << analyze_tweets

load_to_hive = HiveOperator(
task_id="load_" + channel + "_to_hive",
hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name + "' "
"INTO TABLE " + channel + " "
"PARTITION(dt='" + dt + "')",
task_id=f"load_{channel}_to_hive",
hql=(
f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}'"
f"INTO TABLE {channel}"
f"PARTITION(dt='{dt}')"
),
)
load_to_hive << load_to_hdfs
load_to_hive >> hive_to_mysql

analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql

for channel in from_channels:
file_name = "from_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv"
file_name = f"from_{channel}_{dt}.csv"
load_to_hdfs = BashOperator(
task_id="put_" + channel + "_to_hdfs",
bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f "
+ local_dir
+ file_name
+ hdfs_dir
+ channel
+ "/",
task_id=f"put_{channel}_to_hdfs",
bash_command=(
f"HADOOP_USER_NAME=hdfs hadoop fs -put -f {local_dir}{file_name}{hdfs_dir}{channel}/"
),
)

load_to_hdfs << analyze_tweets

load_to_hive = HiveOperator(
task_id="load_" + channel + "_to_hive",
hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name + "' "
"INTO TABLE " + channel + " "
"PARTITION(dt='" + dt + "')",
task_id=f"load_{channel}_to_hive",
hql=(
f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}' "
f"INTO TABLE {channel} "
f"PARTITION(dt='{dt}')"
),
)

load_to_hive << load_to_hdfs
load_to_hive >> hive_to_mysql
analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql

0 comments on commit a458fcc

Please sign in to comment.