Skip to content

Add Google Ads list accounts operator #8007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add Google Ads list accounts operator
  • Loading branch information
turbaszek committed Mar 30, 2020
commit da06f97e32b1215ee9c211a766153ba33efdf894
11 changes: 10 additions & 1 deletion airflow/providers/google/ads/example_dags/example_ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import os

from airflow import models
from airflow.providers.google.ads.operators.ads import GoogleAdsToGcsOperator
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator, GoogleAdsToGcsOperator
from airflow.utils import dates

# [START howto_google_ads_env_variables]
CLIENT_IDS = ["1111111111", "2222222222"]
BUCKET = os.environ.get("GOOGLE_ADS_BUCKET", "gs://test-google-ads-bucket")
GCS_OBJ_PATH = "folder_name/google-ads-api-results.csv"
GCS_ACCOUNTS_CSV = "folder_name/accounts.csv"
QUERY = """
SELECT
segments.date,
Expand Down Expand Up @@ -79,3 +80,11 @@
task_id="run_operator",
)
# [END howto_google_ads_to_gcs_operator]

# [START howto_ads_list_accounts_operator]
list_accounts = GoogleAdsListAccountsOperator(
task_id="list_accounts",
bucket=BUCKET,
object_name=GCS_ACCOUNTS_CSV
)
# [END howto_ads_list_accounts_operator]
51 changes: 47 additions & 4 deletions airflow/providers/google/ads/hooks/ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from tempfile import NamedTemporaryFile
from typing import IO, Any, Dict, Generator, List

from cached_property import cached_property
from google.ads.google_ads.client import GoogleAdsClient
from google.ads.google_ads.errors import GoogleAdsException
from google.ads.google_ads.v2.types import GoogleAdsRow
Expand Down Expand Up @@ -52,7 +53,7 @@ def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
google_ads_conn_id: str = "google_ads_default",
api_version: str = "v2",
api_version: str = "v3",
) -> None:
super().__init__()
self.gcp_conn_id = gcp_conn_id
Expand All @@ -61,8 +62,11 @@ def __init__(
self.api_version = api_version
self.google_ads_config: Dict[str, Any] = {}

def _get_service(self) -> GoogleAdsClient:
"""Connects and authenticates with the Google Ads API using a service account"""
@cached_property
def _get_service(self):
"""
Connects and authenticates with the Google Ads API using a service account
"""
with NamedTemporaryFile("w", suffix=".json") as secrets_temp:
self._get_config()
self._update_config_with_secret(secrets_temp)
Expand All @@ -73,6 +77,21 @@ def _get_service(self) -> GoogleAdsClient:
self.log.error("Google Auth Error: %s", e)
raise

@cached_property
def _get_customer_service(self):
"""
Connects and authenticates with the Google Ads API using a service account
"""
with NamedTemporaryFile("w", suffix=".json") as secrets_temp:
self._get_config()
self._update_config_with_secret(secrets_temp)
try:
client = GoogleAdsClient.load_from_dict(self.google_ads_config)
return client.get_service("CustomerService", version=self.api_version)
except GoogleAuthError as e:
self.log.error("Google Auth Error: %s", e)
raise

def _get_config(self) -> None:
"""
Gets google ads connection from meta db and sets google_ads_config attribute with returned config file
Expand Down Expand Up @@ -112,7 +131,7 @@ def search(
:return: Google Ads API response, converted to Google Ads Row objects
:rtype: list[GoogleAdsRow]
"""
service = self._get_service()
service = self._get_service
iterators = (
service.search(client_id, query=query, page_size=page_size, **kwargs)
for client_id in client_ids
Expand Down Expand Up @@ -150,3 +169,27 @@ def _extract_rows(
"\t\tOn field: %s", field_path_element.field_name
)
raise

def list_accessible_customers(self) -> List[str]:
"""
Returns resource names of customers directly accessible by the user authenticating the call.
The resulting list of customers is based on your OAuth credentials. The request returns a list
of all accounts that you are able to act upon directly given your current credentials. This will
not necessarily include all accounts within the account hierarchy; rather, it will only include
accounts where your authenticated user has been added with admin or other rights in the account.

..seealso::
https://developers.google.com/google-ads/api/reference/rpc

:return: List of names of customers
"""
try:
accessible_customers = self._get_customer_service.list_accessible_customers()
return accessible_customers.resource_names
except GoogleAdsException as ex:
for error in ex.failure.errors:
self.log.error('\tError with message "%s".', error.message)
if error.location:
for field_path_element in error.location.field_path_elements:
self.log.error('\t\tOn field: %s', field_path_element.field_name)
raise
80 changes: 80 additions & 0 deletions airflow/providers/google/ads/operators/ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,83 @@ def execute(self, context: Dict):
gzip=self.gzip,
)
self.log.info("%s uploaded to GCS", self.obj)


class GoogleAdsListAccountsOperator(BaseOperator):
"""
Saves list of customers on GCS in form of a csv file.

The resulting list of customers is based on your OAuth credentials. The request returns a list
of all accounts that you are able to act upon directly given your current credentials. This will
not necessarily include all accounts within the account hierarchy; rather, it will only include
accounts where your authenticated user has been added with admin or other rights in the account.

..seealso::
https://developers.google.com/google-ads/api/reference/rpc


.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleAdsListAccountsOperator`

:param bucket: The GCS bucket to upload to
:type bucket: str
:param object_name: GCS path to save the csv file. Must be the full file path (ex. `path/to/file.csv`)
:type object_name: str
:param gcp_conn_id: Airflow Google Cloud Platform connection ID
:type gcp_conn_id: str
:param google_ads_conn_id: Airflow Google Ads connection ID
:type google_ads_conn_id: str
:param page_size: The number of results per API page request. Max 10,000
:type page_size: int
:param gzip: Option to compress local file or file data for upload
:type gzip: bool
"""

template_fields = ("bucket", "object_name")

@apply_defaults
def __init__(
self,
bucket: str,
object_name: str,
gcp_conn_id: str = "google_cloud_default",
google_ads_conn_id: str = "google_ads_default",
gzip: bool = False,
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.bucket = bucket
self.object_name = object_name
self.gcp_conn_id = gcp_conn_id
self.google_ads_conn_id = google_ads_conn_id
self.gzip = gzip

def execute(self, context: Dict):
uri = f"gs://{self.bucket}/{self.object_name}"

ads_hook = GoogleAdsHook(
gcp_conn_id=self.gcp_conn_id,
google_ads_conn_id=self.google_ads_conn_id
)

gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)

with NamedTemporaryFile("w+") as temp_file:
# Download accounts
accounts = ads_hook.list_accessible_customers()
writer = csv.writer(temp_file)
writer.writerows(accounts)
temp_file.flush()

# Upload to GCS
gcs_hook.upload(
bucket_name=self.bucket,
object_name=self.object_name,
gzip=self.gzip,
filename=temp_file.name
)
self.log.info("Uploaded %s to %s", len(accounts), uri)

return uri
19 changes: 19 additions & 0 deletions docs/howto/operator/gcp/ads.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,22 @@ Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.ads.operators.ads.GoogleAdsToGcsOperator`
parameters which allow you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to be used by other operators.

.. _howto/operator:GoogleAdsListAccountsOperator:

Upload Google Ads Accounts to GCS
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To upload Google Ads accounts to Google Cloud Storage bucket use the
:class:`~airflow.providers.google.ads.operators.ads.GoogleAdsListAccountsOperator`.

.. exampleinclude:: ../../../../airflow/providers/google/ads/example_dags/example_ads.py
:language: python
:dedent: 4
:start-after: [START howto_ads_list_accounts_operator]
:end-before: [END howto_ads_list_accounts_operator]

Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.ads.operators.ads.GoogleAdsToGcsOperator`
parameters which allow you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to be used by other operators.
19 changes: 19 additions & 0 deletions tests/providers/google/ads/hooks/test_ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ def mock_hook():


class TestGoogleAdsHook:
@mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient")
def test_get_customer_service(self, mock_client, mock_hook):
mock_hook._get_customer_service()
client = mock_client.load_from_dict
client.assert_called_once_with(mock_hook.google_ads_config)
client.return_value.get_service.assert_called_once_with(
"CustomerService", version=API_VERSION
)

@mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient")
def test_get_service(self, mock_client, mock_hook):
mock_hook._get_service()
Expand Down Expand Up @@ -66,3 +75,13 @@ def test_search(self, mock_client, mock_hook):
def test_extract_rows(self, mock_hook):
iterators = [[1, 2, 3], [4, 5, 6]]
assert mock_hook._extract_rows(iterators) == sum(iterators, [])

@mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient")
def test_list_accessible_customers(self, mock_client, mock_hook):
accounts = ["a", "b", "c"]
service = mock_client.load_from_dict.return_value.get_service.return_value
service.list_accessible_customers.return_value = mock.MagicMock(resource_names=accounts)

result = mock_hook.list_accessible_customers()
service.list_accessible_customers.assert_called_once_with()
assert accounts == result
38 changes: 37 additions & 1 deletion tests/providers/google/ads/operators/test_ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from unittest import mock

from airflow.providers.google.ads.operators.ads import GoogleAdsToGcsOperator
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator, GoogleAdsToGcsOperator

CLIENT_IDS = ["1111111111", "2222222222"]
BUCKET = "gs://test-google-ads-bucket"
Expand Down Expand Up @@ -63,3 +63,39 @@ def test_execute(self, mock_gcs_hook, mock_ads_hook):
mock_gcs_hook.return_value.upload.assert_called_once_with(
bucket_name=BUCKET, object_name=GCS_OBJ_PATH, filename=mock.ANY, gzip=False
)


class TestGoogleAdsListAccountsOperator:
@mock.patch("airflow.providers.google.ads.operators.ads.GoogleAdsHook")
@mock.patch("airflow.providers.google.ads.operators.ads.GCSHook")
@mock.patch("airflow.providers.google.ads.operators.ads.NamedTemporaryFile")
@mock.patch("airflow.providers.google.ads.operators.ads.csv.writer")
def test_execute(self, mocks_csv_writer, mock_tempfile, mock_gcs_hook, mock_ads_hook):
filename = "test.csv"
file_object = mock_tempfile.return_value.__enter__.return_value
file_object.name = filename
accounts = ["a", "b", "c"]
mock_ads_hook.return_value.list_accessible_customers.return_value = accounts

op = GoogleAdsListAccountsOperator(
gcp_conn_id=gcp_conn_id,
google_ads_conn_id=google_ads_conn_id,
object_name=GCS_OBJ_PATH,
bucket=BUCKET,
task_id="run_operator",
)
op.execute({})

mock_ads_hook.assert_called_once_with(
gcp_conn_id=gcp_conn_id, google_ads_conn_id=google_ads_conn_id
)
mock_gcs_hook.assert_called_once_with(gcp_conn_id=gcp_conn_id)

mock_ads_hook.return_value.list_accessible_customers.assert_called_once_with()
mocks_csv_writer.assert_called_once_with(file_object)
mocks_csv_writer.return_value.writerows.assert_called_once_with(accounts)
file_object.flush.assert_called_once_with()

mock_gcs_hook.return_value.upload.assert_called_once_with(
bucket_name=BUCKET, object_name=GCS_OBJ_PATH, filename=filename, gzip=False
)