Skip to content

Commit

Permalink
Use parameters instead of params (#18143)
Browse files Browse the repository at this point in the history
  • Loading branch information
msumit committed Sep 11, 2021
1 parent 2776e08 commit 9140ad8
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
AdsInsights.Field.clicks,
AdsInsights.Field.impressions,
]
PARAMS = {'level': 'ad', 'date_preset': 'yesterday'}
PARAMETERS = {'level': 'ad', 'date_preset': 'yesterday'}
# [END howto_FB_ADS_variables]

with models.DAG(
Expand Down Expand Up @@ -90,7 +90,7 @@
start_date=days_ago(2),
owner='airflow',
bucket_name=GCS_BUCKET,
params=PARAMS,
parameters=PARAMETERS,
fields=FIELDS,
gcp_conn_id=GCS_CONN_ID,
object_name=GCS_OBJ_PATH,
Expand Down
27 changes: 23 additions & 4 deletions airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
"""This module contains Facebook Ad Reporting to GCS operators."""
import csv
import tempfile
import warnings
from typing import Any, Dict, List, Optional, Sequence, Union

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.facebook.ads.hooks.ads import FacebookAdsReportingHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
Expand Down Expand Up @@ -56,9 +58,13 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
:param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class.
https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0
:type fields: List[str]
:param params: Parameters that determine the query for Facebook
:param params: Parameters that determine the query for Facebook. This keyword is deprecated,
please use `parameters` keyword to pass the parameters.
https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0
:type params: Dict[str, Any]
:param parameters: Parameters that determine the query for Facebook
https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0
:type parameters: Dict[str, Any]
:param gzip: Option to compress local file or file data for upload
:type gzip: bool
:param impersonation_chain: Optional service account to impersonate using short-term
Expand All @@ -77,6 +83,7 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
"bucket_name",
"object_name",
"impersonation_chain",
"parameters",
)

def __init__(
Expand All @@ -85,7 +92,8 @@ def __init__(
bucket_name: str,
object_name: str,
fields: List[str],
params: Dict[str, Any],
params: Dict[str, Any] = None,
parameters: Dict[str, Any] = None,
gzip: bool = False,
api_version: str = "v6.0",
gcp_conn_id: str = "google_cloud_default",
Expand All @@ -100,15 +108,26 @@ def __init__(
self.facebook_conn_id = facebook_conn_id
self.api_version = api_version
self.fields = fields
self.params = params
self.parameters = parameters
self.gzip = gzip
self.impersonation_chain = impersonation_chain

if params is None and parameters is None:
raise AirflowException("Argument ['parameters'] is required")
if params and parameters is None:
# TODO: Remove in provider version 6.0
warnings.warn(
"Please use 'parameters' instead of 'params'",
DeprecationWarning,
stacklevel=2,
)
self.parameters = params

def execute(self, context: dict):
service = FacebookAdsReportingHook(
facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
)
rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
rows = service.bulk_facebook_report(params=self.parameters, fields=self.fields)

converted_rows = [dict(row) for row in rows]
self.log.info("Facebook Returned %s data points", len(converted_rows))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"schedule": {"frequency": "ONE_TIME"},
}

PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
PARAMETERS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}

CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: Dict = {
"version": SDF_VERSION,
Expand All @@ -94,7 +94,7 @@

# [START howto_google_display_video_runquery_report_operator]
run_report = GoogleDisplayVideo360RunReportOperator(
report_id=report_id, params=PARAMS, task_id="run_report"
report_id=report_id, parameters=PARAMETERS, task_id="run_report"
)
# [END howto_google_display_video_runquery_report_operator]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import shutil
import tempfile
import urllib.request
import warnings
from typing import Any, Dict, List, Optional, Sequence, Union
from urllib.parse import urlparse

Expand Down Expand Up @@ -339,8 +340,12 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
:param report_id: Report ID to run.
:type report_id: str
:param params: Parameters for running a report as described here:
https://developers.google.com/bid-manager/v1/queries/runquery
https://developers.google.com/bid-manager/v1/queries/runquery. Please note that this
keyword is deprecated, please use `parameters` keyword to pass the parameters.
:type params: Dict[str, Any]
:param parameters: Parameters for running a report as described here:
https://developers.google.com/bid-manager/v1/queries/runquery
:type parameters: Dict[str, Any]
:param api_version: The version of the api that will be requested for example 'v3'.
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
Expand All @@ -362,15 +367,16 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):

template_fields = (
"report_id",
"params",
"parameters",
"impersonation_chain",
)

def __init__(
self,
*,
report_id: str,
params: Dict[str, Any],
params: Dict[str, Any] = None,
parameters: Dict[str, Any] = None,
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
Expand All @@ -379,12 +385,23 @@ def __init__(
) -> None:
super().__init__(**kwargs)
self.report_id = report_id
self.params = params
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.parameters = parameters
self.impersonation_chain = impersonation_chain

if params is None and parameters is None:
raise AirflowException("Argument ['parameters'] is required")
if params and parameters is None:
# TODO: Remove in provider version 6.0
warnings.warn(
"Please use 'parameters' instead of 'params'",
DeprecationWarning,
stacklevel=2,
)
self.parameters = params

def execute(self, context: dict) -> None:
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
Expand All @@ -395,9 +412,9 @@ def execute(self, context: dict) -> None:
self.log.info(
"Running report %s with the following params:\n %s",
self.report_id,
self.params,
self.parameters,
)
hook.run_query(query_id=self.report_id, params=self.params)
hook.run_query(query_id=self.report_id, params=self.parameters)


class GoogleDisplayVideo360DownloadLineItemsOperator(BaseOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"clicks",
"impressions",
]
PARAMS = {"level": "ad", "date_preset": "yesterday"}
PARAMETERS = {"level": "ad", "date_preset": "yesterday"}
FACEBOOK_RETURN_VALUE = [
{
"campaign_name": "abcd",
Expand All @@ -51,15 +51,17 @@ def test_execute(self, mock_gcs_hook, mock_ads_hook):
op = FacebookAdsReportToGcsOperator(
facebook_conn_id=FACEBOOK_ADS_CONN_ID,
fields=FIELDS,
params=PARAMS,
parameters=PARAMETERS,
object_name=GCS_OBJ_PATH,
bucket_name=GCS_BUCKET,
task_id="run_operator",
impersonation_chain=IMPERSONATION_CHAIN,
)
op.execute({})
mock_ads_hook.assert_called_once_with(facebook_conn_id=FACEBOOK_ADS_CONN_ID, api_version=API_VERSION)
mock_ads_hook.return_value.bulk_facebook_report.assert_called_once_with(params=PARAMS, fields=FIELDS)
mock_ads_hook.return_value.bulk_facebook_report.assert_called_once_with(
params=PARAMETERS, fields=FIELDS
)
mock_gcs_hook.assert_called_once_with(
gcp_conn_id=GCS_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ class TestGoogleDisplayVideo360RunReportOperator(TestCase):
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.BaseOperator")
def test_execute(self, mock_base_op, hook_mock):
report_id = "QUERY_ID"
params = {"param": "test"}
parameters = {"param": "test"}
op = GoogleDisplayVideo360RunReportOperator(
report_id=report_id,
params=params,
parameters=parameters,
api_version=API_VERSION,
task_id="test_task",
)
Expand All @@ -183,7 +183,7 @@ def test_execute(self, mock_base_op, hook_mock):
api_version=API_VERSION,
impersonation_chain=None,
)
hook_mock.return_value.run_query.assert_called_once_with(query_id=report_id, params=params)
hook_mock.return_value.run_query.assert_called_once_with(query_id=report_id, params=parameters)


class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
Expand Down

0 comments on commit 9140ad8

Please sign in to comment.