Skip to content

Commit

Permalink
Display Video 360 cleanup v1 API usage (#30577)
Browse files Browse the repository at this point in the history
* Display Video 360 cleanup v1 API usage

* Update docs
  • Loading branch information
Łukasz Wyszomirski committed Apr 14, 2023
1 parent 6f2277c commit 57c09e5
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 758 deletions.
Expand Up @@ -30,20 +30,16 @@
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateQueryOperator,
GoogleDisplayVideo360CreateReportOperator,
GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadLineItemsOperator,
GoogleDisplayVideo360DownloadReportOperator,
GoogleDisplayVideo360DownloadReportV2Operator,
GoogleDisplayVideo360RunQueryOperator,
GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360SDFtoGCSOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
)
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360GetSDFDownloadOperationSensor,
GoogleDisplayVideo360ReportSensor,
GoogleDisplayVideo360RunQuerySensor,
)

Expand All @@ -60,24 +56,6 @@
ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem")
ERF_SOURCE_OBJECT = GoogleDisplayVideo360Hook.erf_uri(GMP_PARTNER_ID, ENTITY_TYPE)

REPORT = {
"kind": "doubleclickbidmanager#query",
"metadata": {
"title": "Polidea Test Report",
"dataRange": "LAST_7_DAYS",
"format": "CSV",
"sendNotification": False,
},
"params": {
"type": "TYPE_GENERAL",
"groupBys": ["FILTER_DATE", "FILTER_PARTNER"],
"filters": [{"type": "FILTER_PARTNER", "value": 1486931}],
"metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"],
"includeInviteData": True,
},
"schedule": {"frequency": "ONE_TIME"},
}

REPORT_V2 = {
"metadata": {
"title": "Airflow Test Report",
Expand Down Expand Up @@ -109,48 +87,6 @@

START_DATE = datetime(2021, 1, 1)

with models.DAG(
"example_display_video",
start_date=START_DATE,
catchup=False,
) as dag1:
# [START howto_google_display_video_createquery_report_operator]
create_report = GoogleDisplayVideo360CreateReportOperator(body=REPORT, task_id="create_report")
report_id = cast(str, XComArg(create_report, key="report_id"))
# [END howto_google_display_video_createquery_report_operator]

# [START howto_google_display_video_runquery_report_operator]
run_report = GoogleDisplayVideo360RunReportOperator(
report_id=report_id, parameters=PARAMETERS, task_id="run_report"
)
# [END howto_google_display_video_runquery_report_operator]

# [START howto_google_display_video_wait_report_operator]
wait_for_report = GoogleDisplayVideo360ReportSensor(task_id="wait_for_report", report_id=report_id)
# [END howto_google_display_video_wait_report_operator]

# [START howto_google_display_video_getquery_report_operator]
get_report = GoogleDisplayVideo360DownloadReportOperator(
report_id=report_id,
task_id="get_report",
bucket_name=BUCKET,
report_name="test1.csv",
)
# [END howto_google_display_video_getquery_report_operator]

# [START howto_google_display_video_deletequery_report_operator]
delete_report = GoogleDisplayVideo360DeleteReportOperator(report_id=report_id, task_id="delete_report")
# [END howto_google_display_video_deletequery_report_operator]

run_report >> wait_for_report >> get_report >> delete_report

# Task dependencies created via `XComArgs`:
# create_report >> run_report
# create_report >> wait_for_report
# create_report >> get_report
# create_report >> delete_report


with models.DAG(
"example_display_video_misc",
start_date=START_DATE,
Expand Down
45 changes: 7 additions & 38 deletions airflow/providers/google/marketing_platform/hooks/display_video.py
Expand Up @@ -18,7 +18,6 @@
"""This module contains Google DisplayVideo hook."""
from __future__ import annotations

import warnings
from typing import Any, Sequence

from googleapiclient.discovery import Resource, build
Expand All @@ -43,11 +42,6 @@ def __init__(
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
if api_version in ["v1", "v1.1"]:
warnings.warn(
f"API {api_version} is deprecated and shortly will be removed please use v2",
DeprecationWarning,
)
self.api_version = api_version

def get_conn(self) -> Resource:
Expand Down Expand Up @@ -99,10 +93,7 @@ def create_query(self, query: dict[str, Any]) -> dict:
:param query: Query object to be passed to request body.
"""
if self.api_version in ["v1", "v1.1"]:
response = self.get_conn().queries().createquery(body=query).execute(num_retries=self.num_retries)
else:
response = self.get_conn().queries().create(body=query).execute(num_retries=self.num_retries)
response = self.get_conn().queries().create(body=query).execute(num_retries=self.num_retries)
return response

def delete_query(self, query_id: str) -> None:
Expand All @@ -111,31 +102,20 @@ def delete_query(self, query_id: str) -> None:
:param query_id: Query ID to delete.
"""
if self.api_version in ["v1", "v1.1"]:
self.get_conn().queries().deletequery(queryId=query_id).execute(num_retries=self.num_retries)
else:
self.get_conn().queries().delete(queryId=query_id).execute(num_retries=self.num_retries)
self.get_conn().queries().delete(queryId=query_id).execute(num_retries=self.num_retries)

def get_query(self, query_id: str) -> dict:
"""
Retrieves a stored query.
:param query_id: Query ID to retrieve.
"""
if self.api_version in ["v1", "v1.1"]:
response = (
self.get_conn().queries().getquery(queryId=query_id).execute(num_retries=self.num_retries)
)
else:
response = self.get_conn().queries().get(queryId=query_id).execute(num_retries=self.num_retries)
response = self.get_conn().queries().get(queryId=query_id).execute(num_retries=self.num_retries)
return response

def list_queries(self) -> list[dict]:
"""Retrieves stored queries."""
if self.api_version in ["v1", "v1.1"]:
response = self.get_conn().queries().listqueries().execute(num_retries=self.num_retries)
else:
response = self.get_conn().queries().list().execute(num_retries=self.num_retries)
response = self.get_conn().queries().list().execute(num_retries=self.num_retries)
return response.get("queries", [])

def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict:
Expand All @@ -145,20 +125,9 @@ def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict:
:param query_id: Query ID to run.
:param params: Parameters for the report.
"""
if self.api_version in ["v1", "v1.1"]:
return (
self.get_conn()
.queries()
.runquery(queryId=query_id, body=params)
.execute(num_retries=self.num_retries)
)
else:
return (
self.get_conn()
.queries()
.run(queryId=query_id, body=params)
.execute(num_retries=self.num_retries)
)
return (
self.get_conn().queries().run(queryId=query_id, body=params).execute(num_retries=self.num_retries)
)

def get_report(self, query_id: str, report_id: str) -> dict:
"""
Expand Down

0 comments on commit 57c09e5

Please sign in to comment.