Skip to content

Commit

Permalink
Added Upload Multiple Entity Read Files to specified big query dataset (
Browse files Browse the repository at this point in the history
#8610)

Co-authored-by: michalslowikowski00 <[email protected]>
  • Loading branch information
michalslowikowski00 and michalslowikowski00 committed May 10, 2020
1 parent cbebed2 commit 79ef8be
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow import models
from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
Expand All @@ -38,15 +39,14 @@
BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567)
OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv")
PATH_TO_UPLOAD_FILE = os.environ.get(
"GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt"
)
PATH_TO_SAVED_FILE = os.environ.get(
"GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt"
)
PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt")
PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt")
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
GMP_PARTNER_ID = os.environ.get("GMP_PARTNER_ID", 123)
ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem")
ERF_SOURCE_OBJECT = GoogleDisplayVideo360Hook.erf_uri(GMP_PARTNER_ID, ENTITY_TYPE)

REPORT = {
"kind": "doubleclickbidmanager#query",
Expand All @@ -68,15 +68,17 @@

PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}

BODY_REQUEST: Dict = {
CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: Dict = {
"version": SDF_VERSION,
"advertiserId": ADVERTISER_ID,
"inventorySourceFilter": {"inventorySourceIds": []},
}
# [END howto_display_video_env_variables]

# download_line_items variables
REQUEST_BODY = {"filterType": ADVERTISER_ID, "format": "CSV", "fileSpec": "EWF"}
DOWNLOAD_LINE_ITEMS_REQUEST: Dict = {
"filterType": ADVERTISER_ID,
"format": "CSV",
"fileSpec": "EWF"}
# [END howto_display_video_env_variables]

default_args = {"start_date": dates.days_ago(1)}

Expand Down Expand Up @@ -119,10 +121,20 @@
)
# [END howto_google_display_video_deletequery_report_operator]

# [START howto_google_display_video_upload_multiple_entity_read_files_to_big_query]
upload_erf_to_bq = GCSToBigQueryOperator(
task_id='upload_erf_to_bq',
bucket=BUCKET,
source_objects=ERF_SOURCE_OBJECT,
destination_project_dataset_table=f"{BQ_DATA_SET}.gcs_to_bq_table",
write_disposition='WRITE_TRUNCATE',
dag=dag)
# [END howto_google_display_video_upload_multiple_entity_read_files_to_big_query]

# [START howto_google_display_video_download_line_items_operator]
download_line_items = GoogleDisplayVideo360DownloadLineItemsOperator(
task_id="download_line_items",
request_body=REQUEST_BODY,
request_body=DOWNLOAD_LINE_ITEMS_REQUEST,
bucket_name=BUCKET,
object_name=OBJECT_NAME,
gzip=False,
Expand All @@ -139,7 +151,7 @@

# [START howto_google_display_video_create_sdf_download_task_operator]
create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
task_id="create_sdf_download_task", body_request=BODY_REQUEST
task_id="create_sdf_download_task", body_request=CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST
)
operation_name = '{{ task_instance.xcom_pull("create_sdf_download_task")["name"] }}'
# [END howto_google_display_video_create_sdf_download_task_operator]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,28 @@ def get_conn_to_display_video(self) -> Resource:
)
return self._conn

@staticmethod
def erf_uri(partner_id, entity_type) -> List[str]:
"""
Return URI for all Entity Read Files in bucket.
For example, if you were generating a file name to retrieve the entity read file
for partner 123 accessing the line_item table from April 2, 2013, your filename
would look something like this:
gdbm-123/entity/20130402.0.LineItem.json
More information:
https://developers.google.com/bid-manager/guides/entity-read/overview
:param partner_id The numeric ID of your Partner.
:type partner_id: int
:param entity_type: The type of file Partner, Advertiser, InsertionOrder,
LineItem, Creative, Pixel, InventorySource, UserList, UniversalChannel, and summary.
:type entity_type: str
"""

return [f"gdbm-{partner_id}/entity/{{{{ ds_nodash }}}}.*.{entity_type}.json"]

def create_query(self, query: Dict[str, Any]) -> Dict:
"""
Creates a query.
Expand Down Expand Up @@ -125,7 +147,7 @@ def list_queries(self, ) -> List[Dict]:
.listqueries()
.execute(num_retries=self.num_retries)
)
return response.get("queries", [])
return response.get('queries', [])

def run_query(self, query_id: str, params: Dict[str, Any]) -> None:
"""
Expand Down

0 comments on commit 79ef8be

Please sign in to comment.