Skip to content

Commit

Permalink
Added GoogleDisplayVideo360DownloadLineItemsOperator (#8174)
Browse files Browse the repository at this point in the history
Co-authored-by: michalslowikowski00 <[email protected]>
  • Loading branch information
michalslowikowski00 and michalslowikowski00 committed Apr 23, 2020
1 parent ffcbb22 commit 912aa4b
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 18 deletions.
Expand Up @@ -23,18 +23,19 @@
from airflow import models
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
)
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360ReportSensor,
)
from airflow.utils import dates

# [START howto_display_video_env_variables]

BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
OBJECT_NAME = os.environ.get("OBJECT_NAME", "file.csv")
ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567)
OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv")

REPORT = {
"kind": "doubleclickbidmanager#query",
"metadata": {
Expand All @@ -56,6 +57,13 @@
PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
# [END howto_display_video_env_variables]

# download_line_items variables
REQUEST_BODY = {
"filterType": ADVERTISER_ID,
"format": "CSV",
"fileSpec": "EWF"
}

default_args = {"start_date": dates.days_ago(1)}

with models.DAG(
Expand Down Expand Up @@ -95,7 +103,17 @@
delete_report = GoogleDisplayVideo360DeleteReportOperator(
report_id=report_id, task_id="delete_report"
)
# # [END howto_google_display_video_deletequery_report_operator]
# [END howto_google_display_video_deletequery_report_operator]

# [START howto_google_display_video_download_line_items_operator]
download_line_items = GoogleDisplayVideo360DownloadLineItemsOperator(
task_id="download_line_items",
request_body=REQUEST_BODY,
bucket_name=BUCKET,
object_name=OBJECT_NAME,
gzip=False,
)
# [END howto_google_display_video_download_line_items_operator]

# [START howto_google_display_video_upload_line_items_operator]
upload_line_items = GoogleDisplayVideo360UploadLineItemsOperator(
Expand Down
19 changes: 19 additions & 0 deletions airflow/providers/google/marketing_platform/hooks/display_video.py
Expand Up @@ -18,6 +18,7 @@
"""
This module contains Google DisplayVideo hook.
"""

from typing import Any, Dict, List, Optional

from googleapiclient.discovery import Resource, build
Expand Down Expand Up @@ -151,3 +152,21 @@ def upload_line_items(self, line_items: Any) -> List[Dict[str, Any]]:
.execute(num_retries=self.num_retries)
)
return response

def download_line_items(self, request_body: Dict[str, Any]) -> List[Any]:
"""
Retrieves line items in CSV format.
:param request_body: dictionary with parameters that should be passed into.
More information about it can be found here:
https://developers.google.com/bid-manager/v1.1/lineitems/downloadlineitems
:type request_body: Dict[str, Any]
"""

response = (
self.get_conn() # pylint: disable=no-member
.lineitems()
.downloadlineitems(body=request_body)
.execute(num_retries=self.num_retries)
)
return response["lineItems"]
Expand Up @@ -18,10 +18,11 @@
"""
This module contains Google DisplayVideo operators.
"""
import csv
import shutil
import tempfile
import urllib.request
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -221,13 +222,9 @@ def __init__(
self.delegate_to = delegate_to

def _resolve_file_name(self, name: str) -> str:
csv = ".csv"
gzip = ".gz"
if not name.endswith(csv):
name += csv
if self.gzip:
name += gzip
return name
new_name = name if name.endswith(".csv") else f"{name}.csv"
new_name = f"{new_name}.gz" if self.gzip else new_name
return new_name

@staticmethod
def _set_bucket_name(name: str) -> str:
Expand All @@ -247,11 +244,11 @@ def execute(self, context: Dict):
resource = hook.get_query(query_id=self.report_id)
# Check if report is ready
if resource["metadata"]["running"]:
raise AirflowException("Report {} is still running".format(self.report_id))
raise AirflowException(f"Report {self.report_id} is still running")

# If no custom report_name provided, use DV360 name
file_url = resource["metadata"]["googleCloudStoragePathForLatestReport"]
report_name = self.report_name or urlparse(file_url).path.split("/")[2]
report_name = self.report_name or urlparse(file_url).path.split("/")[-1]
report_name = self._resolve_file_name(report_name)

# Download the report
Expand Down Expand Up @@ -299,7 +296,7 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
:param delegate_to: The account to impersonate, if any. For this to work, the service account making the
request must have domain-wide delegation enabled.
:type delegate_to: str
"""
Expand Down Expand Up @@ -338,6 +335,73 @@ def execute(self, context: Dict):
hook.run_query(query_id=self.report_id, params=self.params)


class GoogleDisplayVideo360DownloadLineItemsOperator(BaseOperator):
"""
Retrieves line items in CSV format.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleDisplayVideo360DownloadLineItemsOperator`
.. seealso::
Check also the official API docs:
`https://developers.google.com/bid-manager/v1.1/lineitems/downloadlineitems`
:param request_body: dictionary with parameters that should be passed into.
More information about it can be found here:
https://developers.google.com/bid-manager/v1.1/lineitems/downloadlineitems
:type request_body: Dict[str, Any],
"""

template_fields = ("request_body", "bucket_name", "object_name")

@apply_defaults
def __init__(
self,
request_body: Dict[str, Any],
bucket_name: str,
object_name: str,
gzip: bool = False,
api_version: str = "v1.1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.request_body = request_body
self.object_name = object_name
self.bucket_name = bucket_name
self.gzip = gzip
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to

def execute(self, context: Dict) -> str:
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
delegate_to=self.delegate_to,
)

self.log.info("Retrieving report...")
content: List[str] = hook.download_line_items(request_body=self.request_body)

with tempfile.NamedTemporaryFile("w+") as temp_file:
writer = csv.writer(temp_file)
writer.writerows(content)
temp_file.flush()
gcs_hook.upload(
bucket_name=self.bucket_name,
object_name=self.object_name,
filename=temp_file.name,
mime_type="text/csv",
gzip=self.gzip,
)
return f"{self.bucket_name}/{self.object_name}"


class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator):
"""
Uploads line items in CSV format.
Expand Down
29 changes: 29 additions & 0 deletions docs/howto/operator/gcp/display_video.rst
Expand Up @@ -121,6 +121,35 @@ Use :ref:`Jinja templating <jinja-templating>` with
parameters which allow you to dynamically determine values.


.. _howto/operator:GoogleDisplayVideo360DownloadLineItemsOperator:

Downloading Line Items
^^^^^^^^^^^^^^^^^^^^^^

The operator accepts body request:

- consistent with `Google API <https://developers.google.com/bid-manager/v1.1/lineitems/downloadlineitems>`_ ::

REQUEST_BODY = {
"filterType": ADVERTISER_ID,
"format": "CSV",
"fileSpec": "EWF"
}

To download line items in CSV format report use
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DownloadLineItemsOperator`.

.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
:language: python
:dedent: 4
:start-after: [START howto_google_display_video_download_line_items_operator]
:end-before: [END howto_google_display_video_download_line_items_operator]

Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DownloadLineItemsOperator`
parameters which allow you to dynamically determine values.


.. _howto/operator:GoogleDisplayVideo360UploadLineItemsOperator:

Upload line items
Expand Down
Expand Up @@ -137,6 +137,60 @@ def test_run_query(self, get_conn_mock):
queryId=query_id, body=params
)

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_download_line_items_should_be_called_once(self, get_conn_mock):
request_body = {
"filterType": "filter_type",
"filterIds": [],
"format": "format",
"fileSpec": "file_spec"
}
self.hook.download_line_items(request_body=request_body)
get_conn_mock.return_value\
.lineitems.return_value\
.downloadlineitems.assert_called_once()

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_download_line_items_should_be_called_with_params(self, get_conn_mock):
request_body = {
"filterType": "filter_type",
"filterIds": [],
"format": "format",
"fileSpec": "file_spec"
}
self.hook.download_line_items(request_body=request_body)

get_conn_mock.return_value \
.lineitems.return_value \
.downloadlineitems.assert_called_once_with(body=request_body)

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_download_line_items_should_return_equal_values(self, get_conn_mock):
line_item = ["holy_hand_grenade"]
response = {"lineItems": line_item}
request_body = {
"filterType": "filter_type",
"filterIds": [],
"format": "format",
"fileSpec": "file_spec"
}

get_conn_mock.return_value \
.lineitems.return_value \
.downloadlineitems.return_value.execute.return_value = response

result = self.hook.download_line_items(request_body)
self.assertEqual(line_item, result)

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
Expand Down
Expand Up @@ -20,8 +20,8 @@

from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
)

API_VERSION = "api_version"
Expand Down Expand Up @@ -183,6 +183,65 @@ def test_execute(self, mock_base_op, hook_mock):
)


class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GCSHook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.tempfile"
)
def test_execute(self, mock_temp, gcs_hook_mock, hook_mock):
request_body = {
"filterType": "filter_type",
"filterIds": [],
"format": "format",
"fileSpec": "file_spec"
}
bucket_name = "bucket_name"
object_name = "object_name"
filename = "test"
mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
gzip = False

op = GoogleDisplayVideo360DownloadLineItemsOperator(
request_body=request_body,
bucket_name=bucket_name,
object_name=object_name,
gzip=gzip,
api_version=API_VERSION,
gcp_conn_id=GCP_CONN_ID,
delegate_to=DELEGATE_TO,
task_id="test_task",
)

op.execute(context=None)

gcs_hook_mock.return_value.upload.assert_called_with(
bucket_name=bucket_name,
object_name=object_name,
filename=filename,
gzip=gzip,
mime_type='text/csv',
)

gcs_hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
delegate_to=DELEGATE_TO,
)
hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO
)
hook_mock.return_value.download_line_items.assert_called_once_with(
request_body=request_body
)


class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
Expand Down

0 comments on commit 912aa4b

Please sign in to comment.