Skip to content

Commit

Permalink
Added Facebook Ads Operator #7887 (#8008)
Browse files Browse the repository at this point in the history
  • Loading branch information
randr97 committed Apr 14, 2020
1 parent 8cae07e commit eee4eba
Show file tree
Hide file tree
Showing 31 changed files with 761 additions and 24 deletions.
12 changes: 6 additions & 6 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,11 @@ This is the full list of those extras:
.. START EXTRAS HERE
all, all_dbs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, dask, databricks,
datadog, devel, devel_ci, devel_hadoop, doc, docker, druid, elasticsearch, exasol, gcp, gcp_api,
github_enterprise, google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap,
mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password, pinot, postgres, presto, qds,
rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, singularity, slack, snowflake, ssh,
statsd, tableau, vertica, virtualenv, webhdfs, winrm, yandexcloud
datadog, devel, devel_ci, devel_hadoop, doc, docker, druid, elasticsearch, exasol, facebook, gcp,
gcp_api, github_enterprise, google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos,
kubernetes, ldap, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password, pinot,
postgres, presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, singularity,
slack, snowflake, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm, yandexcloud

.. END EXTRAS HERE
Expand Down Expand Up @@ -457,7 +457,7 @@ apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
apache.livy http
dingding http
discord http
google amazon,apache.cassandra,cncf.kubernetes,microsoft.azure,microsoft.mssql,mysql,postgres,presto,sftp
google amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,postgres,presto,sftp
hashicorp google
microsoft.azure oracle
microsoft.mssql odbc
Expand Down
10 changes: 5 additions & 5 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ pip install . --constraint requirements/requirements-python3.7.txt
# START EXTRAS HERE

all, all_dbs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, dask, databricks,
datadog, devel, devel_ci, devel_hadoop, doc, docker, druid, elasticsearch, exasol, gcp, gcp_api,
github_enterprise, google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap,
mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password, pinot, postgres, presto, qds,
rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, singularity, slack, snowflake, ssh,
statsd, tableau, vertica, virtualenv, webhdfs, winrm, yandexcloud
datadog, devel, devel_ci, devel_hadoop, doc, docker, druid, elasticsearch, exasol, facebook, gcp,
gcp_api, github_enterprise, google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos,
kubernetes, ldap, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password, pinot,
postgres, presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, singularity,
slack, snowflake, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm, yandexcloud

# END EXTRAS HERE

Expand Down
1 change: 1 addition & 0 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class Connection(Base, LoggingMixin):
('docker', 'Docker Registry'),
('elasticsearch', 'Elasticsearch'),
('exasol', 'Exasol'),
('facebook_social', 'Facebook Social'),
('fs', 'File (path)'),
('ftp', 'FTP'),
('google_cloud_platform', 'Google Cloud Platform'),
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"amazon",
"apache.cassandra",
"cncf.kubernetes",
"facebook",
"microsoft.azure",
"microsoft.mssql",
"mysql",
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/facebook/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/providers/facebook/ads/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/providers/facebook/ads/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
139 changes: 139 additions & 0 deletions airflow/providers/facebook/ads/hooks/ads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains Facebook Ads Reporting hooks
"""
import time
from enum import Enum
from typing import Any, Dict, List

from cached_property import cached_property
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.adreportrun import AdReportRun
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.api import FacebookAdsApi

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


class JobStatus(Enum):
"""
Available options for facebook async task status
"""
COMPLETED = 'Job Completed'
STARTED = 'Job Started'
RUNNING = 'Job Running'
FAILED = 'Job Failed'
SKIPPED = 'Job Skipped'


class FacebookAdsReportingHook(BaseHook):
"""
Hook for the Facebook Ads API
.. seealso::
For more information on the Facebook Ads API, take a look at the API docs:
https://developers.facebook.com/docs/marketing-apis/
:param facebook_conn_id: Airflow Facebook Ads connection ID
:type facebook_conn_id: str
:param api_version: The version of Facebook API. Default to v6.0
:type api_version: str
"""

def __init__(
self,
facebook_conn_id: str = "facebook_default",
api_version: str = "v6.0",
) -> None:
super().__init__()
self.facebook_conn_id = facebook_conn_id
self.api_version = api_version
self.client_required_fields = ["app_id",
"app_secret",
"access_token",
"account_id"]

def _get_service(self) -> FacebookAdsApi:
""" Returns Facebook Ads Client using a service account"""
config = self.facebook_ads_config
missings = [_each for _each in self.client_required_fields if _each not in config]
if missings:
message = "{missings} fields are missing".format(missings=missings)
raise AirflowException(message)
return FacebookAdsApi.init(app_id=config["app_id"],
app_secret=config["app_secret"],
access_token=config["access_token"],
account_id=config["account_id"],
api_version=self.api_version)

@cached_property
def facebook_ads_config(self) -> None:
"""
Gets Facebook ads connection from meta db and sets
facebook_ads_config attribute with returned config file
"""
self.log.info("Fetching fb connection: %s", self.facebook_conn_id)
conn = self.get_connection(self.facebook_conn_id)
if "facebook_ads_client" not in conn.extra_dejson:
raise AirflowException("facebook_ads_client not found")
return conn.extra_dejson["facebook_ads_client"]

def bulk_facebook_report(
self,
params: Dict[str, Any],
fields: List[str],
sleep_time: int = 5,
) -> List[AdsInsights]:
"""
Pulls data from the Facebook Ads API
:param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class.
https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0
:type fields: List[str]
:param params: Parameters that determine the query for Facebook
https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0
:type fields: Dict[str, Any]
:param sleep_time: Time to sleep when async call is happening
:type sleep_time: int
:return: Facebook Ads API response, converted to Facebook Ads Row objects
:rtype: List[AdsInsights]
"""
api = self._get_service()
ad_account = AdAccount(api.get_default_account_id(), api=api)
_async = ad_account.get_insights(params=params, fields=fields, is_async=True)
while True:
request = _async.api_get()
async_status = request[AdReportRun.Field.async_status]
percent = request[AdReportRun.Field.async_percent_completion]
self.log.info("%s %s completed, async_status: %s", percent, "%", async_status)
if async_status == JobStatus.COMPLETED.value:
self.log.info("Job run completed")
break
if async_status in [JobStatus.SKIPPED.value, JobStatus.FAILED.value]:
message = "{async_status}. Please retry.".format(async_status=async_status)
raise AirflowException(message)
time.sleep(sleep_time)
report_run_id = _async.api_get()["report_run_id"]
report_object = AdReportRun(report_run_id, api=api)
insights = report_object.get_insights()
self.log.info("Extracting data from returned Facebook Ads Iterators")
return list(insights)
16 changes: 16 additions & 0 deletions airflow/providers/google/facebook_ads_to_gcs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that shows how to use FacebookAdsReportToGcsOperator.
"""
import os

from facebook_business.adobjects.adsinsights import AdsInsights

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryExecuteQueryOperator,
)
from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.facebook_ads_to_gcs.operators.ads import FacebookAdsReportToGcsOperator
from airflow.utils.dates import days_ago

# [START howto_GCS_env_variables]
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "free-tier-1997")
GCS_BUCKET = os.environ.get("GCS_BUCKET", "airflow_bucket_fb")
GCS_OBJ_PATH = os.environ.get("GCS_OBJ_PATH", "Temp/this_is_my_report_csv.csv")
GCS_CONN_ID = os.environ.get("GCS_CONN_ID", "google_cloud_default")
DATASET_NAME = os.environ.get("DATASET_NAME", "airflow_test_dataset")
TABLE_NAME = os.environ.get("FB_TABLE_NAME", "airflow_test_datatable")
# [END howto_GCS_env_variables]

# [START howto_FB_ADS_variables]
FIELDS = [
AdsInsights.Field.campaign_name,
AdsInsights.Field.campaign_id,
AdsInsights.Field.ad_id,
AdsInsights.Field.clicks,
AdsInsights.Field.impressions,
]
PARAMS = {
'level': 'ad',
'date_preset': 'yesterday'
}
# [END howto_FB_ADS_variables]

default_args = {"start_date": days_ago(1)}

with models.DAG(
"example_fb_operator",
default_args=default_args,
schedule_interval=None, # Override to match your needs
) as dag:

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset",
dataset_id=DATASET_NAME)

create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
schema_fields=[
{'name': 'campaign_name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'campaign_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'ad_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'clicks', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'impressions', 'type': 'STRING', 'mode': 'NULLABLE'},
],
)

# [START howto_FB_ADS_to_gcs_operator]
run_operator = FacebookAdsReportToGcsOperator(
task_id='run_fetch_data',
start_date=days_ago(2),
owner='airflow',
bucket_name=GCS_BUCKET,
params=PARAMS,
fields=FIELDS,
gcp_conn_id=GCS_CONN_ID,
object_name=GCS_OBJ_PATH,
)
# [END howto_FB_ADS_to_gcs_operator]

# [START howto_operator_gcs_to_bq]
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bq_example',
bucket=GCS_BUCKET,
source_objects=[GCS_OBJ_PATH],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
write_disposition='WRITE_TRUNCATE')
# [END howto_operator_gcs_to_bq]

# [START howto_operator_read_data_from_gcs]
read_data_from_gcs_many_chunks = BigQueryExecuteQueryOperator(
task_id="read_data_from_gcs_many_chunks",
sql=f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}`",
use_legacy_sql=False,
)
# [END howto_operator_read_data_from_gcs]

create_dataset >> create_table >> run_operator >> load_csv >> read_data_from_gcs_many_chunks

0 comments on commit eee4eba

Please sign in to comment.