Skip to content

Commit

Permalink
Added GoogleDisplayVideo360UploadLineItemsOperator (#8216)
Browse files Browse the repository at this point in the history
Co-authored-by: michalslowikowski00 <[email protected]>
  • Loading branch information
michalslowikowski00 and michalslowikowski00 committed Apr 13, 2020
1 parent 62a0396 commit 327b0a9
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 4 deletions.
Expand Up @@ -24,14 +24,17 @@
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
)
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360ReportSensor,
)
from airflow.utils import dates

# [START howto_display_video_env_variables]

BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
OBJECT_NAME = os.environ.get("OBJECT_NAME", "file.csv")
REPORT = {
"kind": "doubleclickbidmanager#query",
"metadata": {
Expand Down Expand Up @@ -92,6 +95,13 @@
delete_report = GoogleDisplayVideo360DeleteReportOperator(
report_id=report_id, task_id="delete_report"
)
# [END howto_google_display_video_deletequery_report_operator]
# # [END howto_google_display_video_deletequery_report_operator]

# [START howto_google_display_video_upload_line_items_operator]
upload_line_items = GoogleDisplayVideo360UploadLineItemsOperator(
task_id="upload_line_items",
bucket_name=BUCKET,
object_name=OBJECT_NAME,
)
# [END howto_google_display_video_upload_line_items_operator]
create_report >> run_report >> wait_for_report >> get_report >> delete_report
24 changes: 24 additions & 0 deletions airflow/providers/google/marketing_platform/hooks/display_video.py
Expand Up @@ -127,3 +127,27 @@ def run_query(self, query_id: str, params: Dict[str, Any]) -> None:
.runquery(queryId=query_id, body=params)
.execute(num_retries=self.num_retries)
)

def upload_line_items(self, line_items: Any) -> List[Dict[str, Any]]:
"""
Uploads line items in CSV format.
:param line_items: downloaded data from GCS and passed to the body request
:type line_items: Any
:return: response body.
:rtype: List[Dict[str, Any]]
"""

request_body = {
"lineItems": line_items,
"dryRun": False,
"format": "CSV",
}

response = (
self.get_conn() # pylint: disable=no-member
.lineitems()
.uploadlineitems(body=request_body)
.execute(num_retries=self.num_retries)
)
return response
Expand Up @@ -247,11 +247,11 @@ def execute(self, context: Dict):
resource = hook.get_query(query_id=self.report_id)
# Check if report is ready
if resource["metadata"]["running"]:
raise AirflowException('Report {} is still running'.format(self.report_id))
raise AirflowException("Report {} is still running".format(self.report_id))

# If no custom report_name provided, use DV360 name
file_url = resource["metadata"]["googleCloudStoragePathForLatestReport"]
report_name = self.report_name or urlparse(file_url).path.split('/')[2]
report_name = self.report_name or urlparse(file_url).path.split("/")[2]
report_name = self._resolve_file_name(report_name)

# Download the report
Expand All @@ -275,7 +275,7 @@ def execute(self, context: Dict):
self.bucket_name,
report_name,
)
self.xcom_push(context, key='report_name', value=report_name)
self.xcom_push(context, key="report_name", value=report_name)


class GoogleDisplayVideo360RunReportOperator(BaseOperator):
Expand Down Expand Up @@ -336,3 +336,73 @@ def execute(self, context: Dict):
self.params,
)
hook.run_query(query_id=self.report_id, params=self.params)


class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator):
"""
Uploads line items in CSV format.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleDisplayVideo360UploadLineItemsOperator`
.. seealso::
Check also the official API docs:
`https://developers.google.com/bid-manager/v1.1/lineitems/uploadlineitems`
:param request_body: request to upload line items.
:type request_body: Dict[str, Any]
:param bucket_name: The bucket form data is downloaded.
:type bucket_name: str
:param object_name: The object to fetch.
:type object_name: str,
:param filename: The filename to fetch.
:type filename: str,
:param dry_run: Upload status without actually persisting the line items.
:type filename: str,
"""

template_fields = (
"bucket_name",
"object_name",
)

@apply_defaults
def __init__(
self,
bucket_name: str,
object_name: str,
api_version: str = "v1.1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.bucket_name = bucket_name
self.object_name = object_name
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to

def execute(self, context: Dict):
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
api_version=self.api_version,
)

self.log.info("Uploading file %s...")
# Saving file in the temporary directory,
# downloaded file from the GCS could be a 1GB size or even more
with tempfile.NamedTemporaryFile("w+") as f:
line_items = gcs_hook.download(
bucket_name=self.bucket_name,
object_name=self.object_name,
filename=f.name,
)
f.flush()
hook.upload_line_items(line_items=line_items)
19 changes: 19 additions & 0 deletions docs/howto/operator/gcp/display_video.rst
Expand Up @@ -119,3 +119,22 @@ To run Display&Video 360 report use
Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunReportOperator`
parameters which allow you to dynamically determine values.


.. _howto/operator:GoogleDisplayVideo360UploadLineItemsOperator:

Upload line items
^^^^^^^^^^^^^^^^^

To run Display&Video 360 uploading line items use
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360UploadLineItemsOperator`.

.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
:language: python
:dedent: 4
:start-after: [START howto_google_display_video_upload_line_items_operator]
:end-before: [END howto_google_display_video_upload_line_items_operator]

Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360UploadLineItemsOperator`
parameters which allow you to dynamically determine values.
Expand Up @@ -136,3 +136,52 @@ def test_run_query(self, get_conn_mock):
get_conn_mock.return_value.queries.return_value.runquery.assert_called_once_with(
queryId=query_id, body=params
)

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_upload_line_items_should_be_called_once(self, get_conn_mock):
line_items = ["this", "is", "super", "awesome", "test"]

self.hook.upload_line_items(line_items)
get_conn_mock.return_value \
.lineitems.return_value \
.uploadlineitems.assert_called_once()

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_upload_line_items_should_be_called_with_params(self, get_conn_mock):
line_items = "I spent too much time on this"
request_body = {
"lineItems": line_items,
"dryRun": False,
"format": "CSV",
}

self.hook.upload_line_items(line_items)

get_conn_mock.return_value \
.lineitems.return_value \
.uploadlineitems.assert_called_once_with(body=request_body)

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_upload_line_items_should_return_equal_values(self, get_conn_mock):
line_items = {
"lineItems": "string",
"format": "string",
"dryRun": False
}
return_value = "TEST"
get_conn_mock.return_value \
.lineitems.return_value \
.uploadlineitems.return_value \
.execute.return_value = return_value
result = self.hook.upload_line_items(line_items)

self.assertEqual(return_value, result)
Expand Up @@ -21,10 +21,12 @@
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
)

API_VERSION = "api_version"
GCP_CONN_ID = "google_cloud_default"
DELEGATE_TO = None


class TestGoogleDisplayVideo360CreateReportOperator(TestCase):
Expand Down Expand Up @@ -179,3 +181,52 @@ def test_execute(self, mock_base_op, hook_mock):
hook_mock.return_value.run_query.assert_called_once_with(
query_id=report_id, params=params
)


class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.tempfile"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GCSHook"
)
def test_execute(self, gcs_hook_mock, hook_mock, mock_tempfile):
filename = "filename"
object_name = "object_name"
bucket_name = "bucket_name"
line_items = "holy_hand_grenade"
gcs_hook_mock.return_value.download.return_value = line_items
mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = filename

op = GoogleDisplayVideo360UploadLineItemsOperator(
bucket_name=bucket_name,
object_name=object_name,
api_version=API_VERSION,
gcp_conn_id=GCP_CONN_ID,
task_id="test_task",
)
op.execute(context=None)
hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
api_version=API_VERSION,
delegate_to=DELEGATE_TO
)

gcs_hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
delegate_to=DELEGATE_TO,
)

gcs_hook_mock.return_value.download.assert_called_once_with(
bucket_name=bucket_name,
object_name=object_name,
filename=filename,
)
hook_mock.return_value.upload_line_items.assert_called_once()
hook_mock.return_value.upload_line_items.assert_called_once_with(line_items=line_items)

0 comments on commit 327b0a9

Please sign in to comment.