Skip to content

Commit

Permalink
[AIRFLOW-7075] Operators for storing information from GCS into GA (#7743
Browse files Browse the repository at this point in the history
)
  • Loading branch information
turbaszek committed Mar 30, 2020
1 parent dd29724 commit 7790239
Show file tree
Hide file tree
Showing 10 changed files with 761 additions and 105 deletions.
4 changes: 4 additions & 0 deletions airflow/providers/google/cloud/hooks/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ def download(self, bucket_name, object_name, filename=None):
:param filename: If set, a local file path where the file should be written to.
:type filename: str
"""

# TODO: future improvement check file size before downloading,
# to check for local space availability

client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=object_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@

from airflow import models
from airflow.providers.google.marketing_platform.operators.analytics import (
GoogleAnalyticsDataImportUploadOperator, GoogleAnalyticsDeletePreviousDataUploadsOperator,
GoogleAnalyticsGetAdsLinkOperator, GoogleAnalyticsListAccountsOperator,
GoogleAnalyticsRetrieveAdsLinksListOperator,
GoogleAnalyticsModifyFileHeadersDataImportOperator, GoogleAnalyticsRetrieveAdsLinksListOperator,
)
from airflow.utils import dates

ACCOUNT_ID = os.environ.get("GA_ACCOUNT_ID", "123456789")
WEB_PROPERTY = os.environ.get("WEB_PROPERTY_ID", "UA-12345678-1")

BUCKET = os.environ.get("GMP_ANALYTICS_BUCKET", "test-airflow-analytics-bucket")
BUCKET_FILENAME = "data.csv"
WEB_PROPERTY_ID = os.environ.get("GA_WEB_PROPERTY", "UA-12345678-1")
WEB_PROPERTY_AD_WORDS_LINK_ID = os.environ.get(
"WEB_PROPERTY_AD_WORDS_LINK_ID", "rQafFTPOQdmkx4U-fxUfhj"
"GA_WEB_PROPERTY_AD_WORDS_LINK_ID", "rQafFTPOQdmkx4U-fxUfhj"
)
DATA_ID = "kjdDu3_tQa6n8Q1kXFtSmg"

default_args = {"start_date": dates.days_ago(1)}

Expand All @@ -46,14 +51,38 @@
# [START howto_marketing_platform_get_ads_link_operator]
get_ad_words_link = GoogleAnalyticsGetAdsLinkOperator(
web_property_ad_words_link_id=WEB_PROPERTY_AD_WORDS_LINK_ID,
web_property_id=WEB_PROPERTY,
web_property_id=WEB_PROPERTY_ID,
account_id=ACCOUNT_ID,
task_id="get_ad_words_link",
)
# [END howto_marketing_platform_get_ads_link_operator]

# [START howto_marketing_platform_retrieve_ads_links_list_operator]
list_ad_words_link = GoogleAnalyticsRetrieveAdsLinksListOperator(
task_id="list_ad_link", account_id=ACCOUNT_ID, web_property_id=WEB_PROPERTY
task_id="list_ad_link", account_id=ACCOUNT_ID, web_property_id=WEB_PROPERTY_ID
)
# [END howto_marketing_platform_retrieve_ads_links_list_operator]

upload = GoogleAnalyticsDataImportUploadOperator(
task_id="upload",
storage_bucket=BUCKET,
storage_name_object=BUCKET_FILENAME,
account_id=ACCOUNT_ID,
web_property_id=WEB_PROPERTY_ID,
custom_data_source_id=DATA_ID,
)

delete = GoogleAnalyticsDeletePreviousDataUploadsOperator(
task_id="delete",
account_id=ACCOUNT_ID,
web_property_id=WEB_PROPERTY_ID,
custom_data_source_id=DATA_ID,
)

transform = GoogleAnalyticsModifyFileHeadersDataImportOperator(
task_id="transform",
storage_bucket=BUCKET,
storage_name_object=BUCKET_FILENAME,
)

upload >> [delete, transform]
164 changes: 134 additions & 30 deletions airflow/providers/google/marketing_platform/hooks/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from googleapiclient.discovery import Resource, build
from googleapiclient.http import MediaFileUpload

from airflow.providers.google.cloud.hooks.base import CloudBaseHook

Expand All @@ -30,15 +31,30 @@ class GoogleAnalyticsHook(CloudBaseHook):
def __init__(
self,
api_version: str = "v3",
gcp_connection_id: str = "google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.api_version = api_version
self.gcp_connection_is = gcp_connection_id
self.gcp_connection_is = gcp_conn_id
self._conn = None

def _paginate(self, resource: Resource, list_args: Optional[Dict[str, Any]] = None):
list_args = list_args or {}
result: List[Dict] = []
while True:
# start index has value 1
request = resource.list(start_index=len(result) + 1, **list_args) # pylint: disable=no-member
response = request.execute(num_retries=self.num_retries)
result.extend(response.get("items", []))
# result is the number of fetched links from Analytics
# when all links will be added to the result
# the loop will break
if response["totalResults"] <= len(result):
break
return result

def get_conn(self) -> Resource:
"""
Retrieves connection to Google Analytics 360.
Expand All @@ -59,19 +75,9 @@ def list_accounts(self) -> List[Dict[str, Any]]:
"""

self.log.info("Retrieving accounts list...")
result = [] # type: List[Dict]
conn = self.get_conn()
accounts = conn.management().accounts() # pylint: disable=no-member
while True:
# start index has value 1
request = accounts.list(start_index=len(result) + 1)
response = request.execute(num_retries=self.num_retries)
result.extend(response.get("items", []))
# result is the number of fetched accounts from Analytics
# when all accounts will be add to the result
# the loop will be break
if response["totalResults"] <= len(result):
break
result = self._paginate(accounts)
return result

def get_ad_words_link(
Expand Down Expand Up @@ -121,21 +127,119 @@ def list_ad_words_links(
"""

self.log.info("Retrieving ad words list...")
result = [] # type: List[Dict]
conn = self.get_conn()
ads_links = conn.management().webPropertyAdWordsLinks() # pylint: disable=no-member
while True:
# start index has value 1
request = ads_links.list(
accountId=account_id,
webPropertyId=web_property_id,
start_index=len(result) + 1,
)
response = request.execute(num_retries=self.num_retries)
result.extend(response.get("items", []))
# result is the number of fetched links from Analytics
# when all links will be added to the result
# the loop will break
if response["totalResults"] <= len(result):
break
ads_links = (
conn.management().webPropertyAdWordsLinks() # pylint: disable=no-member
)
list_args = {"accountId": account_id, "webPropertyId": web_property_id}
result = self._paginate(ads_links, list_args)
return result

def upload_data(
self,
file_location: str,
account_id: str,
web_property_id: str,
custom_data_source_id: str,
resumable_upload: bool = False,
) -> None:

"""
Uploads file to GA via the Data Import API
:param file_location: The path and name of the file to upload.
:type file_location: str
:param account_id: The GA account Id to which the data upload belongs.
:type account_id: str
:param web_property_id: UA-string associated with the upload.
:type web_property_id: str
:param custom_data_source_id: Custom Data Source Id to which this data import belongs.
:type custom_data_source_id: str
:param resumable_upload: flag to upload the file in a resumable fashion, using a
series of at least two requests.
:type resumable_upload: bool
"""

media = MediaFileUpload(
file_location,
mimetype="application/octet-stream",
resumable=resumable_upload,
)

self.log.info(
"Uploading file to GA file for accountId: %s, webPropertyId:%s and customDataSourceId:%s ",
account_id,
web_property_id,
custom_data_source_id,
)

self.get_conn().management().uploads().uploadData( # pylint: disable=no-member
accountId=account_id,
webPropertyId=web_property_id,
customDataSourceId=custom_data_source_id,
media_body=media,
).execute()

def delete_upload_data(
self,
account_id: str,
web_property_id: str,
custom_data_source_id: str,
delete_request_body: Dict[str, Any],
) -> None:
"""
Deletes the uploaded data for a given account/property/dataset
:param account_id: The GA account Id to which the data upload belongs.
:type account_id: str
:param web_property_id: UA-string associated with the upload.
:type web_property_id: str
:param custom_data_source_id: Custom Data Source Id to which this data import belongs.
:type custom_data_source_id: str
:param delete_request_body: Dict of customDataImportUids to delete.
:type delete_request_body: dict
"""

self.log.info(
"Deleting previous uploads to GA file for accountId:%s, "
"webPropertyId:%s and customDataSourceId:%s ",
account_id,
web_property_id,
custom_data_source_id,
)

self.get_conn().management().uploads().deleteUploadData( # pylint: disable=no-member
accountId=account_id,
webPropertyId=web_property_id,
customDataSourceId=custom_data_source_id,
body=delete_request_body,
).execute()

def list_uploads(
self, account_id, web_property_id, custom_data_source_id
) -> List[Dict[str, Any]]:
"""
Get list of data upload from GA
:param account_id: The GA account Id to which the data upload belongs.
:type account_id: str
:param web_property_id: UA-string associated with the upload.
:type web_property_id: str
:param custom_data_source_id: Custom Data Source Id to which this data import belongs.
:type custom_data_source_id: str
"""
self.log.info(
"Getting list of uploads for accountId:%s, webPropertyId:%s and customDataSourceId:%s ",
account_id,
web_property_id,
custom_data_source_id,
)

uploads = self.get_conn().management().uploads() # pylint: disable=no-member
list_args = {
"accountId": account_id,
"webPropertyId": web_property_id,
"customDataSourceId": custom_data_source_id,
}
result = self._paginate(uploads, list_args)
return result

0 comments on commit 7790239

Please sign in to comment.