Skip to content

Commit

Permalink
Add system test and docs for Facebook Ads operators (#8503)
Browse files Browse the repository at this point in the history
  • Loading branch information
randr97 committed May 3, 2020
1 parent ac59735 commit bc45fa6
Show file tree
Hide file tree
Showing 15 changed files with 194 additions and 128 deletions.
15 changes: 7 additions & 8 deletions airflow/providers/facebook/ads/hooks/ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,26 @@ def __init__(
def _get_service(self) -> FacebookAdsApi:
""" Returns Facebook Ads Client using a service account"""
config = self.facebook_ads_config
missings = [_each for _each in self.client_required_fields if _each not in config]
if missings:
message = "{missings} fields are missing".format(missings=missings)
raise AirflowException(message)
return FacebookAdsApi.init(app_id=config["app_id"],
app_secret=config["app_secret"],
access_token=config["access_token"],
account_id=config["account_id"],
api_version=self.api_version)

@cached_property
def facebook_ads_config(self) -> None:
def facebook_ads_config(self) -> Dict:
"""
Gets Facebook ads connection from meta db and sets
facebook_ads_config attribute with returned config file
"""
self.log.info("Fetching fb connection: %s", self.facebook_conn_id)
conn = self.get_connection(self.facebook_conn_id)
if "facebook_ads_client" not in conn.extra_dejson:
raise AirflowException("facebook_ads_client not found")
return conn.extra_dejson["facebook_ads_client"]
config = conn.extra_dejson
missings_keys = self.client_required_fields - config.keys()
if missings_keys:
message = "{missings_keys} fields are missing".format(missings_keys=missings_keys)
raise AirflowException(message)
return config

def bulk_facebook_report(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryExecuteQueryOperator,
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator,
BigQueryExecuteQueryOperator,
)
from airflow.providers.google.cloud.operators.facebook_ads_to_gcs import FacebookAdsReportToGcsOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.facebook_ads_to_gcs.operators.ads import FacebookAdsReportToGcsOperator
from airflow.utils.dates import days_ago

# [START howto_GCS_env_variables]
Expand Down Expand Up @@ -56,13 +58,21 @@
default_args = {"start_date": days_ago(1)}

with models.DAG(
"example_fb_operator",
"example_facebook_ads_to_gcs",
default_args=default_args,
schedule_interval=None, # Override to match your needs
) as dag:

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset",
dataset_id=DATASET_NAME)
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket",
bucket_name=GCS_BUCKET,
project_id=GCP_PROJECT_ID,
)

create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset",
dataset_id=DATASET_NAME,
)

create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
Expand All @@ -77,7 +87,7 @@
],
)

# [START howto_FB_ADS_to_gcs_operator]
# [START howto_operator_facebook_ads_to_gcs]
run_operator = FacebookAdsReportToGcsOperator(
task_id='run_fetch_data',
start_date=days_ago(2),
Expand All @@ -88,23 +98,33 @@
gcp_conn_id=GCS_CONN_ID,
object_name=GCS_OBJ_PATH,
)
# [END howto_FB_ADS_to_gcs_operator]
# [END howto_operator_facebook_ads_to_gcs]

# [START howto_operator_gcs_to_bq]
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bq_example',
bucket=GCS_BUCKET,
source_objects=[GCS_OBJ_PATH],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
write_disposition='WRITE_TRUNCATE')
# [END howto_operator_gcs_to_bq]
write_disposition='WRITE_TRUNCATE'
)

# [START howto_operator_read_data_from_gcs]
read_data_from_gcs_many_chunks = BigQueryExecuteQueryOperator(
task_id="read_data_from_gcs_many_chunks",
sql=f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}`",
use_legacy_sql=False,
)
# [END howto_operator_read_data_from_gcs]

create_dataset >> create_table >> run_operator >> load_csv >> read_data_from_gcs_many_chunks
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket",
bucket_name=GCS_BUCKET,
)

delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset",
project_id=GCP_PROJECT_ID,
dataset_id=DATASET_NAME,
delete_contents=True,
)

create_bucket >> create_dataset >> create_table >> run_operator >> load_csv
load_csv >> read_data_from_gcs_many_chunks >> delete_bucket >> delete_dataset
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
For more information on the Facebook Ads Python SDK, take a look at the docs:
https://github.com/facebook/facebook-python-business-sdk
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:FacebookAdsReportToGcsOperator`
:param bucket: The GCS bucket to upload to
:type bucket: str
:param obj: GCS path to save the object. Must be the full file path (ex. `path/to/file.txt`)
Expand Down
16 changes: 0 additions & 16 deletions airflow/providers/google/facebook_ads_to_gcs/__init__.py

This file was deleted.

This file was deleted.

16 changes: 0 additions & 16 deletions airflow/providers/google/facebook_ads_to_gcs/operators/__init__.py

This file was deleted.

13 changes: 5 additions & 8 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,12 @@ def create_default_connections(session=None):
Connection(
conn_id="facebook_default",
conn_type="facebook_social",
schema="""
{
"facebook_ads_client": {
"account_id": "act_123456789",
"app_id": "1234567890",
"app_secret": "1f45tghxxxx12345",
"access_token": "ABcdEfghiJKlmnoxxyz"
extra="""
{ "account_id": "<AD_ACCOUNNT_ID>",
"app_id": "<FACEBOOK_APP_ID>",
"app_secret": "<FACEBOOK_APP_SECRET>",
"access_token": "<FACEBOOK_AD_ACCESS_TOKEN>"
}
}
""",
),
session
Expand Down
2 changes: 0 additions & 2 deletions docs/autoapi_templates/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ All operators are in the following packages:

airflow/providers/exasol/operators/index

airflow/providers/google/facebook_ads_to_gcs/operators/index

airflow/providers/ftp/sensors/index

airflow/providers/google/ads/operators/index
Expand Down
52 changes: 52 additions & 0 deletions docs/howto/operator/gcp/facebook_ads_to_gcs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Facebook Ads To GCS Operators
==============================

.. contents::
:depth: 1
:local:

Prerequisite Tasks
^^^^^^^^^^^^^^^^^^

.. include:: _partials/prerequisite_tasks.rst

.. _howto/operator:FacebookAdsReportToGcsOperator:

FacebookAdsReportToGcsOperator
------------------------------

Use the
:class:`~airflow.providers.google.cloud.operators.facebook_ads_to_gcs.FacebookAdsReportToGcsOperator`
to execute a Facebook ads report fetch and load to GCS.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
:language: python
:start-after: [START howto_operator_facebook_ads_to_gcs]
:end-before: [END howto_operator_facebook_ads_to_gcs]

Reference
^^^^^^^^^

For further information, look at:

* `Client Library Documentation <https://github.com/facebook/facebook-python-business-sdk>`__
* `Product Documentation <https://developers.facebook.com/docs/business-manager-api/>`__
4 changes: 2 additions & 2 deletions docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -806,8 +806,8 @@ These integrations allow you to copy data from/to Google Cloud Platform.

* - `Facebook Ads <http://business.facebook.com>`__
- `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
-
- :mod:`airflow.providers.google.facebook_ads_to_gcs.operators.ads`
- :doc:`How to use <howto/operator/gcp/facebook_ads_to_gcs>`
- :mod:`airflow.providers.google.cloud.operators.facebook_ads_to_gcs`

* - `Google BigQuery <https://cloud.google.com/bigquery/>`__
- `MySQL <https://www.mysql.com/>`__
Expand Down
19 changes: 8 additions & 11 deletions tests/providers/facebook/ads/hooks/test_ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@

API_VERSION = "api_version"
EXTRAS = {
"facebook_ads_client": {
"account_id": "act_12345",
"app_id": "12345",
"app_secret": "1fg444",
"access_token": "Ab35gf7E"
}
"account_id": "act_12345",
"app_id": "12345",
"app_secret": "1fg444",
"access_token": "Ab35gf7E"
}
FIELDS = [
"campaign_name",
Expand Down Expand Up @@ -55,11 +53,10 @@ class TestFacebookAdsReportingHook:
def test_get_service(self, mock_api, mock_hook):
mock_hook._get_service()
api = mock_api.init
creds = EXTRAS["facebook_ads_client"]
api.assert_called_once_with(app_id=creds["app_id"],
app_secret=creds["app_secret"],
access_token=creds["access_token"],
account_id=creds["account_id"],
api.assert_called_once_with(app_id=EXTRAS["app_id"],
app_secret=EXTRAS["app_secret"],
access_token=EXTRAS["access_token"],
account_id=EXTRAS["account_id"],
api_version=API_VERSION)

@mock.patch("airflow.providers.facebook.ads.hooks.ads.AdAccount")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from unittest import mock

from airflow.providers.google.facebook_ads_to_gcs.operators.ads import FacebookAdsReportToGcsOperator
from airflow.providers.google.cloud.operators.facebook_ads_to_gcs import FacebookAdsReportToGcsOperator

GCS_BUCKET = "airflow_bucket_fb"
GCS_OBJ_PATH = "Temp/this_is_my_report_json.json"
Expand Down Expand Up @@ -47,8 +47,8 @@

class TestFacebookAdsReportToGcsOperator:

@mock.patch("airflow.providers.google.facebook_ads_to_gcs.operators.ads.FacebookAdsReportingHook")
@mock.patch("airflow.providers.google.facebook_ads_to_gcs.operators.ads.GCSHook")
@mock.patch("airflow.providers.google.cloud.operators.facebook_ads_to_gcs.FacebookAdsReportingHook")
@mock.patch("airflow.providers.google.cloud.operators.facebook_ads_to_gcs.GCSHook")
def test_execute(self, mock_gcs_hook, mock_ads_hook):
mock_ads_hook.return_value.bulk_facebook_report.return_value = FACEBOOK_RETURN_VALUE
op = FacebookAdsReportToGcsOperator(facebook_conn_id=FACEBOOK_ADS_CONN_ID,
Expand Down

0 comments on commit bc45fa6

Please sign in to comment.