Skip to content

Commit

Permalink
Update DV360 operators to use API v2 (#30326)
Browse files Browse the repository at this point in the history
* Update DV360 operators to use API v2

* Update display_video.rst

* fixup! Update display_video.rst

* fixup! Update display_video.rst

---------

Co-authored-by: Jarek Potiuk <[email protected]>
  • Loading branch information
Łukasz Wyszomirski and potiuk committed Apr 9, 2023
1 parent 806b027 commit 71db47a
Show file tree
Hide file tree
Showing 13 changed files with 928 additions and 18 deletions.
10 changes: 8 additions & 2 deletions airflow/providers/google/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
Changelog
---------

8.12.1
......
9.0.0
.....

Breaking changes
~~~~~~~~~~~~~~~~

Google announced sunset of Bid manager API v1 and v1.1 by April 27, 2023 for more information
please check: `docs <https://developers.google.com/bid-manager/v1.1>`_ As a result default value of api_version in GoogleDisplayVideo360Hook and related operators updated to v2

This version of provider contains a temporary workaround to issue with ``v11`` version of
google-ads API being discontinued, while the google provider dependencies preventing installing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,22 @@
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateQueryOperator,
GoogleDisplayVideo360CreateReportOperator,
GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadLineItemsOperator,
GoogleDisplayVideo360DownloadReportOperator,
GoogleDisplayVideo360DownloadReportV2Operator,
GoogleDisplayVideo360RunQueryOperator,
GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360SDFtoGCSOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
)
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360GetSDFDownloadOperationSensor,
GoogleDisplayVideo360ReportSensor,
GoogleDisplayVideo360RunQuerySensor,
)

# [START howto_display_video_env_variables]
Expand All @@ -50,7 +54,7 @@
PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt")
PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt")
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_5")
BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
GMP_PARTNER_ID = os.environ.get("GMP_PARTNER_ID", 123)
ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem")
Expand All @@ -74,7 +78,25 @@
"schedule": {"frequency": "ONE_TIME"},
}

PARAMETERS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
REPORT_V2 = {
"metadata": {
"title": "Airflow Test Report",
"dataRange": {"range": "LAST_7_DAYS"},
"format": "CSV",
"sendNotification": False,
},
"params": {
"type": "STANDARD",
"groupBys": ["FILTER_DATE", "FILTER_PARTNER"],
"filters": [{"type": "FILTER_PARTNER", "value": ADVERTISER_ID}],
"metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"],
},
"schedule": {"frequency": "ONE_TIME"},
}

PARAMETERS = {
"dataRange": {"range": "LAST_7_DAYS"},
}

CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: dict = {
"version": SDF_VERSION,
Expand Down Expand Up @@ -209,3 +231,46 @@

# Task dependency created via `XComArgs`:
# save_sdf_in_gcs >> upload_sdf_to_big_query

with models.DAG(
"example_display_video_v2",
start_date=START_DATE,
catchup=False,
) as dag:
# [START howto_google_display_video_create_query_operator]
create_query_v2 = GoogleDisplayVideo360CreateQueryOperator(body=REPORT_V2, task_id="create_query")

query_id = cast(str, XComArg(create_query_v2, key="query_id"))
# [END howto_google_display_video_create_query_operator]

# [START howto_google_display_video_run_query_report_operator]
run_query_v2 = GoogleDisplayVideo360RunQueryOperator(
query_id=query_id, parameters=PARAMETERS, task_id="run_report"
)

query_id = cast(str, XComArg(run_query_v2, key="query_id"))
report_id = cast(str, XComArg(run_query_v2, key="report_id"))
# [END howto_google_display_video_run_query_report_operator]

# [START howto_google_display_video_wait_run_query_sensor]
wait_for_query = GoogleDisplayVideo360RunQuerySensor(
task_id="wait_for_query",
query_id=query_id,
report_id=report_id,
)
# [END howto_google_display_video_wait_run_query_sensor]

# [START howto_google_display_video_get_report_operator]
get_report_v2 = GoogleDisplayVideo360DownloadReportV2Operator(
query_id=query_id,
report_id=report_id,
task_id="get_report",
bucket_name=BUCKET,
report_name="test1.csv",
)
# # [END howto_google_display_video_get_report_operator]
# # [START howto_google_display_video_delete_query_report_operator]
delete_report_v2 = GoogleDisplayVideo360DeleteReportOperator(report_id=report_id, task_id="delete_report")
# # [END howto_google_display_video_delete_query_report_operator]

create_query_v2 >> run_query_v2 >> wait_for_query >> get_report_v2 >> delete_report_v2
59 changes: 51 additions & 8 deletions airflow/providers/google/marketing_platform/hooks/display_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""This module contains Google DisplayVideo hook."""
from __future__ import annotations

import warnings
from typing import Any, Sequence

from googleapiclient.discovery import Resource, build
Expand All @@ -32,7 +33,7 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):

def __init__(
self,
api_version: str = "v1",
api_version: str = "v2",
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
Expand All @@ -42,6 +43,11 @@ def __init__(
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
if api_version in ["v1", "v1.1"]:
warnings.warn(
f"API {api_version} is deprecated and shortly will be removed please use v2",
DeprecationWarning,
)
self.api_version = api_version

def get_conn(self) -> Resource:
Expand Down Expand Up @@ -93,7 +99,10 @@ def create_query(self, query: dict[str, Any]) -> dict:
:param query: Query object to be passed to request body.
"""
response = self.get_conn().queries().createquery(body=query).execute(num_retries=self.num_retries)
if self.api_version in ["v1", "v1.1"]:
response = self.get_conn().queries().createquery(body=query).execute(num_retries=self.num_retries)
else:
response = self.get_conn().queries().create(body=query).execute(num_retries=self.num_retries)
return response

def delete_query(self, query_id: str) -> None:
Expand All @@ -102,33 +111,67 @@ def delete_query(self, query_id: str) -> None:
:param query_id: Query ID to delete.
"""
(self.get_conn().queries().deletequery(queryId=query_id).execute(num_retries=self.num_retries))
if self.api_version in ["v1", "v1.1"]:
self.get_conn().queries().deletequery(queryId=query_id).execute(num_retries=self.num_retries)
else:
self.get_conn().queries().delete(queryId=query_id).execute(num_retries=self.num_retries)

def get_query(self, query_id: str) -> dict:
"""
Retrieves a stored query.
:param query_id: Query ID to retrieve.
"""
response = self.get_conn().queries().getquery(queryId=query_id).execute(num_retries=self.num_retries)
if self.api_version in ["v1", "v1.1"]:
response = (
self.get_conn().queries().getquery(queryId=query_id).execute(num_retries=self.num_retries)
)
else:
response = self.get_conn().queries().get(queryId=query_id).execute(num_retries=self.num_retries)
return response

def list_queries(self) -> list[dict]:
"""Retrieves stored queries."""
response = self.get_conn().queries().listqueries().execute(num_retries=self.num_retries)
if self.api_version in ["v1", "v1.1"]:
response = self.get_conn().queries().listqueries().execute(num_retries=self.num_retries)
else:
response = self.get_conn().queries().list().execute(num_retries=self.num_retries)
return response.get("queries", [])

def run_query(self, query_id: str, params: dict[str, Any] | None) -> None:
def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict:
"""
Runs a stored query to generate a report.
:param query_id: Query ID to run.
:param params: Parameters for the report.
"""
(
if self.api_version in ["v1", "v1.1"]:
return (
self.get_conn()
.queries()
.runquery(queryId=query_id, body=params)
.execute(num_retries=self.num_retries)
)
else:
return (
self.get_conn()
.queries()
.run(queryId=query_id, body=params)
.execute(num_retries=self.num_retries)
)

def get_report(self, query_id: str, report_id: str) -> dict:
"""
Retrieves a report.
:param query_id: Query ID for which report was generated.
:param report_id: Report ID to retrieve.
"""
return (
self.get_conn()
.queries()
.runquery(queryId=query_id, body=params)
.reports()
.get(queryId=query_id, reportId=report_id)
.execute(num_retries=self.num_retries)
)

Expand Down

0 comments on commit 71db47a

Please sign in to comment.