Skip to content

Commit ed2bc00

Browse files
authored
Add Google Ads list accounts operator (#8007)
1 parent 99370fe commit ed2bc00

File tree

6 files changed

+212
-6
lines changed

6 files changed

+212
-6
lines changed

airflow/providers/google/ads/example_dags/example_ads.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@
2121
import os
2222

2323
from airflow import models
24-
from airflow.providers.google.ads.operators.ads import GoogleAdsToGcsOperator
24+
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator, GoogleAdsToGcsOperator
2525
from airflow.utils import dates
2626

2727
# [START howto_google_ads_env_variables]
2828
CLIENT_IDS = ["1111111111", "2222222222"]
2929
BUCKET = os.environ.get("GOOGLE_ADS_BUCKET", "gs://test-google-ads-bucket")
3030
GCS_OBJ_PATH = "folder_name/google-ads-api-results.csv"
31+
GCS_ACCOUNTS_CSV = "folder_name/accounts.csv"
3132
QUERY = """
3233
SELECT
3334
segments.date,
@@ -79,3 +80,11 @@
7980
task_id="run_operator",
8081
)
8182
# [END howto_google_ads_to_gcs_operator]
83+
84+
# [START howto_ads_list_accounts_operator]
85+
list_accounts = GoogleAdsListAccountsOperator(
86+
task_id="list_accounts",
87+
bucket=BUCKET,
88+
object_name=GCS_ACCOUNTS_CSV
89+
)
90+
# [END howto_ads_list_accounts_operator]

airflow/providers/google/ads/hooks/ads.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from tempfile import NamedTemporaryFile
2222
from typing import IO, Any, Dict, Generator, List
2323

24+
from cached_property import cached_property
2425
from google.ads.google_ads.client import GoogleAdsClient
2526
from google.ads.google_ads.errors import GoogleAdsException
2627
from google.ads.google_ads.v2.types import GoogleAdsRow
@@ -52,7 +53,7 @@ def __init__(
5253
self,
5354
gcp_conn_id: str = "google_cloud_default",
5455
google_ads_conn_id: str = "google_ads_default",
55-
api_version: str = "v2",
56+
api_version: str = "v3",
5657
) -> None:
5758
super().__init__()
5859
self.gcp_conn_id = gcp_conn_id
@@ -61,8 +62,11 @@ def __init__(
6162
self.api_version = api_version
6263
self.google_ads_config: Dict[str, Any] = {}
6364

64-
def _get_service(self) -> GoogleAdsClient:
65-
"""Connects and authenticates with the Google Ads API using a service account"""
65+
@cached_property
66+
def _get_service(self):
67+
"""
68+
Connects and authenticates with the Google Ads API using a service account
69+
"""
6670
with NamedTemporaryFile("w", suffix=".json") as secrets_temp:
6771
self._get_config()
6872
self._update_config_with_secret(secrets_temp)
@@ -73,6 +77,21 @@ def _get_service(self) -> GoogleAdsClient:
7377
self.log.error("Google Auth Error: %s", e)
7478
raise
7579

80+
@cached_property
81+
def _get_customer_service(self):
82+
"""
83+
Connects and authenticates with the Google Ads API using a service account
84+
"""
85+
with NamedTemporaryFile("w", suffix=".json") as secrets_temp:
86+
self._get_config()
87+
self._update_config_with_secret(secrets_temp)
88+
try:
89+
client = GoogleAdsClient.load_from_dict(self.google_ads_config)
90+
return client.get_service("CustomerService", version=self.api_version)
91+
except GoogleAuthError as e:
92+
self.log.error("Google Auth Error: %s", e)
93+
raise
94+
7695
def _get_config(self) -> None:
7796
"""
7897
Gets google ads connection from meta db and sets google_ads_config attribute with returned config file
@@ -112,7 +131,7 @@ def search(
112131
:return: Google Ads API response, converted to Google Ads Row objects
113132
:rtype: list[GoogleAdsRow]
114133
"""
115-
service = self._get_service()
134+
service = self._get_service
116135
iterators = (
117136
service.search(client_id, query=query, page_size=page_size, **kwargs)
118137
for client_id in client_ids
@@ -150,3 +169,27 @@ def _extract_rows(
150169
"\t\tOn field: %s", field_path_element.field_name
151170
)
152171
raise
172+
173+
def list_accessible_customers(self) -> List[str]:
174+
"""
175+
Returns resource names of customers directly accessible by the user authenticating the call.
176+
The resulting list of customers is based on your OAuth credentials. The request returns a list
177+
of all accounts that you are able to act upon directly given your current credentials. This will
178+
not necessarily include all accounts within the account hierarchy; rather, it will only include
179+
accounts where your authenticated user has been added with admin or other rights in the account.
180+
181+
..seealso::
182+
https://developers.google.com/google-ads/api/reference/rpc
183+
184+
:return: List of names of customers
185+
"""
186+
try:
187+
accessible_customers = self._get_customer_service.list_accessible_customers()
188+
return accessible_customers.resource_names
189+
except GoogleAdsException as ex:
190+
for error in ex.failure.errors:
191+
self.log.error('\tError with message "%s".', error.message)
192+
if error.location:
193+
for field_path_element in error.location.field_path_elements:
194+
self.log.error('\t\tOn field: %s', field_path_element.field_name)
195+
raise

airflow/providers/google/ads/operators/ads.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,83 @@ def execute(self, context: Dict):
120120
gzip=self.gzip,
121121
)
122122
self.log.info("%s uploaded to GCS", self.obj)
123+
124+
125+
class GoogleAdsListAccountsOperator(BaseOperator):
126+
"""
127+
Saves list of customers on GCS in form of a csv file.
128+
129+
The resulting list of customers is based on your OAuth credentials. The request returns a list
130+
of all accounts that you are able to act upon directly given your current credentials. This will
131+
not necessarily include all accounts within the account hierarchy; rather, it will only include
132+
accounts where your authenticated user has been added with admin or other rights in the account.
133+
134+
..seealso::
135+
https://developers.google.com/google-ads/api/reference/rpc
136+
137+
138+
.. seealso::
139+
For more information on how to use this operator, take a look at the guide:
140+
:ref:`howto/operator:GoogleAdsListAccountsOperator`
141+
142+
:param bucket: The GCS bucket to upload to
143+
:type bucket: str
144+
:param object_name: GCS path to save the csv file. Must be the full file path (ex. `path/to/file.csv`)
145+
:type object_name: str
146+
:param gcp_conn_id: Airflow Google Cloud Platform connection ID
147+
:type gcp_conn_id: str
148+
:param google_ads_conn_id: Airflow Google Ads connection ID
149+
:type google_ads_conn_id: str
150+
:param page_size: The number of results per API page request. Max 10,000
151+
:type page_size: int
152+
:param gzip: Option to compress local file or file data for upload
153+
:type gzip: bool
154+
"""
155+
156+
template_fields = ("bucket", "object_name")
157+
158+
@apply_defaults
159+
def __init__(
160+
self,
161+
bucket: str,
162+
object_name: str,
163+
gcp_conn_id: str = "google_cloud_default",
164+
google_ads_conn_id: str = "google_ads_default",
165+
gzip: bool = False,
166+
*args,
167+
**kwargs,
168+
) -> None:
169+
super().__init__(*args, **kwargs)
170+
self.bucket = bucket
171+
self.object_name = object_name
172+
self.gcp_conn_id = gcp_conn_id
173+
self.google_ads_conn_id = google_ads_conn_id
174+
self.gzip = gzip
175+
176+
def execute(self, context: Dict):
177+
uri = f"gs://{self.bucket}/{self.object_name}"
178+
179+
ads_hook = GoogleAdsHook(
180+
gcp_conn_id=self.gcp_conn_id,
181+
google_ads_conn_id=self.google_ads_conn_id
182+
)
183+
184+
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
185+
186+
with NamedTemporaryFile("w+") as temp_file:
187+
# Download accounts
188+
accounts = ads_hook.list_accessible_customers()
189+
writer = csv.writer(temp_file)
190+
writer.writerows(accounts)
191+
temp_file.flush()
192+
193+
# Upload to GCS
194+
gcs_hook.upload(
195+
bucket_name=self.bucket,
196+
object_name=self.object_name,
197+
gzip=self.gzip,
198+
filename=temp_file.name
199+
)
200+
self.log.info("Uploaded %s to %s", len(accounts), uri)
201+
202+
return uri

docs/howto/operator/gcp/ads.rst

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,22 @@ Use :ref:`Jinja templating <jinja-templating>` with
4747
:template-fields:`airflow.providers.google.ads.operators.ads.GoogleAdsToGcsOperator`
4848
parameters which allow you to dynamically determine values.
4949
The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to be used by other operators.
50+
51+
.. _howto/operator:GoogleAdsListAccountsOperator:
52+
53+
Upload Google Ads Accounts to GCS
54+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
55+
56+
To upload Google Ads accounts to Google Cloud Storage bucket use the
57+
:class:`~airflow.providers.google.ads.operators.ads.GoogleAdsListAccountsOperator`.
58+
59+
.. exampleinclude:: ../../../../airflow/providers/google/ads/example_dags/example_ads.py
60+
:language: python
61+
:dedent: 4
62+
:start-after: [START howto_ads_list_accounts_operator]
63+
:end-before: [END howto_ads_list_accounts_operator]
64+
65+
Use :ref:`Jinja templating <jinja-templating>` with
66+
:template-fields:`airflow.providers.google.ads.operators.ads.GoogleAdsToGcsOperator`
67+
parameters which allow you to dynamically determine values.
68+
The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to be used by other operators.

tests/providers/google/ads/hooks/test_ads.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ def mock_hook():
3838

3939

4040
class TestGoogleAdsHook:
41+
@mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient")
42+
def test_get_customer_service(self, mock_client, mock_hook):
43+
mock_hook._get_customer_service()
44+
client = mock_client.load_from_dict
45+
client.assert_called_once_with(mock_hook.google_ads_config)
46+
client.return_value.get_service.assert_called_once_with(
47+
"CustomerService", version=API_VERSION
48+
)
49+
4150
@mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient")
4251
def test_get_service(self, mock_client, mock_hook):
4352
mock_hook._get_service()
@@ -66,3 +75,13 @@ def test_search(self, mock_client, mock_hook):
6675
def test_extract_rows(self, mock_hook):
6776
iterators = [[1, 2, 3], [4, 5, 6]]
6877
assert mock_hook._extract_rows(iterators) == sum(iterators, [])
78+
79+
@mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient")
80+
def test_list_accessible_customers(self, mock_client, mock_hook):
81+
accounts = ["a", "b", "c"]
82+
service = mock_client.load_from_dict.return_value.get_service.return_value
83+
service.list_accessible_customers.return_value = mock.MagicMock(resource_names=accounts)
84+
85+
result = mock_hook.list_accessible_customers()
86+
service.list_accessible_customers.assert_called_once_with()
87+
assert accounts == result

tests/providers/google/ads/operators/test_ads.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
from unittest import mock
1818

19-
from airflow.providers.google.ads.operators.ads import GoogleAdsToGcsOperator
19+
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator, GoogleAdsToGcsOperator
2020

2121
CLIENT_IDS = ["1111111111", "2222222222"]
2222
BUCKET = "gs://test-google-ads-bucket"
@@ -63,3 +63,39 @@ def test_execute(self, mock_gcs_hook, mock_ads_hook):
6363
mock_gcs_hook.return_value.upload.assert_called_once_with(
6464
bucket_name=BUCKET, object_name=GCS_OBJ_PATH, filename=mock.ANY, gzip=False
6565
)
66+
67+
68+
class TestGoogleAdsListAccountsOperator:
69+
@mock.patch("airflow.providers.google.ads.operators.ads.GoogleAdsHook")
70+
@mock.patch("airflow.providers.google.ads.operators.ads.GCSHook")
71+
@mock.patch("airflow.providers.google.ads.operators.ads.NamedTemporaryFile")
72+
@mock.patch("airflow.providers.google.ads.operators.ads.csv.writer")
73+
def test_execute(self, mocks_csv_writer, mock_tempfile, mock_gcs_hook, mock_ads_hook):
74+
filename = "test.csv"
75+
file_object = mock_tempfile.return_value.__enter__.return_value
76+
file_object.name = filename
77+
accounts = ["a", "b", "c"]
78+
mock_ads_hook.return_value.list_accessible_customers.return_value = accounts
79+
80+
op = GoogleAdsListAccountsOperator(
81+
gcp_conn_id=gcp_conn_id,
82+
google_ads_conn_id=google_ads_conn_id,
83+
object_name=GCS_OBJ_PATH,
84+
bucket=BUCKET,
85+
task_id="run_operator",
86+
)
87+
op.execute({})
88+
89+
mock_ads_hook.assert_called_once_with(
90+
gcp_conn_id=gcp_conn_id, google_ads_conn_id=google_ads_conn_id
91+
)
92+
mock_gcs_hook.assert_called_once_with(gcp_conn_id=gcp_conn_id)
93+
94+
mock_ads_hook.return_value.list_accessible_customers.assert_called_once_with()
95+
mocks_csv_writer.assert_called_once_with(file_object)
96+
mocks_csv_writer.return_value.writerows.assert_called_once_with(accounts)
97+
file_object.flush.assert_called_once_with()
98+
99+
mock_gcs_hook.return_value.upload.assert_called_once_with(
100+
bucket_name=BUCKET, object_name=GCS_OBJ_PATH, filename=filename, gzip=False
101+
)

0 commit comments

Comments
 (0)