Skip to content

Commit

Permalink
Move bucket_name validation out of __init__ in Google Marketing P…
Browse files Browse the repository at this point in the history
…latform operators (#19383)
  • Loading branch information
josh-fell committed Nov 28, 2021
1 parent e9e5309 commit fb478c0
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def __init__(
self.api_version = api_version
self.chunk_size = chunk_size
self.gzip = gzip
self.bucket_name = self._set_bucket_name(bucket_name)
self.bucket_name = bucket_name
self.report_name = report_name
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
Expand Down Expand Up @@ -254,8 +254,9 @@ def execute(self, context: dict) -> None:

temp_file.flush()
# Upload the local file to bucket
bucket_name = self._set_bucket_name(self.bucket_name)
gcs_hook.upload(
bucket_name=self.bucket_name,
bucket_name=bucket_name,
object_name=report_name,
gzip=self.gzip,
filename=temp_file.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def __init__(
self.report_id = report_id
self.chunk_size = chunk_size
self.gzip = gzip
self.bucket_name = self._set_bucket_name(bucket_name)
self.bucket_name = bucket_name
self.report_name = report_name
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
Expand Down Expand Up @@ -309,8 +309,9 @@ def execute(self, context: dict):

temp_file.flush()
# Upload the local file to bucket
bucket_name = self._set_bucket_name(self.bucket_name)
gcs_hook.upload(
bucket_name=self.bucket_name,
bucket_name=bucket_name,
object_name=report_name,
gzip=self.gzip,
filename=temp_file.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def __init__(
self.report_id = report_id
self.chunk_size = chunk_size
self.gzip = gzip
self.bucket_name = self._set_bucket_name(bucket_name)
self.bucket_name = bucket_name
self.report_name = report_name
self.impersonation_chain = impersonation_chain

Expand Down Expand Up @@ -232,8 +232,9 @@ def execute(self, context: dict):

temp_file.flush()

bucket_name = self._set_bucket_name(self.bucket_name)
gcs_hook.upload(
bucket_name=self.bucket_name,
bucket_name=bucket_name,
object_name=report_name,
gzip=self.gzip,
filename=temp_file.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from tempfile import NamedTemporaryFile
from unittest import TestCase, mock

from parameterized import parameterized

from airflow.models import DAG, TaskInstance as TI
from airflow.providers.google.marketing_platform.operators.campaign_manager import (
GoogleCampaignManagerBatchInsertConversionsOperator,
GoogleCampaignManagerBatchUpdateConversionsOperator,
Expand All @@ -27,6 +30,8 @@
GoogleCampaignManagerInsertReportOperator,
GoogleCampaignManagerRunReportOperator,
)
from airflow.utils import timezone
from airflow.utils.session import create_session

API_VERSION = "api_version"
GCP_CONN_ID = "google_cloud_default"
Expand All @@ -46,18 +51,24 @@
],
}

DEFAULT_DATE = timezone.datetime(2021, 1, 1)
PROFILE_ID = "profile_id"
REPORT_ID = "report_id"
FILE_ID = "file_id"
BUCKET_NAME = "test_bucket"
REPORT_NAME = "test_report.csv"
TEMP_FILE_NAME = "test"


class TestGoogleCampaignManagerDeleteReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerHook"
)
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator")
def test_execute(self, mock_base_op, hook_mock):
profile_id = "PROFILE_ID"
report_id = "REPORT_ID"
op = GoogleCampaignManagerDeleteReportOperator(
profile_id=profile_id,
report_id=report_id,
profile_id=PROFILE_ID,
report_id=REPORT_ID,
api_version=API_VERSION,
task_id="test_task",
)
Expand All @@ -69,11 +80,19 @@ def test_execute(self, mock_base_op, hook_mock):
impersonation_chain=None,
)
hook_mock.return_value.delete_report.assert_called_once_with(
profile_id=profile_id, report_id=report_id
profile_id=PROFILE_ID, report_id=REPORT_ID
)


class TestGoogleCampaignManagerGetReportOperator(TestCase):
class TestGoogleCampaignManagerDownloadReportOperator(TestCase):
def setUp(self):
with create_session() as session:
session.query(TI).delete()

def tearDown(self):
with create_session() as session:
session.query(TI).delete()

@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.http")
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile")
@mock.patch(
Expand All @@ -94,24 +113,17 @@ def test_execute(
tempfile_mock,
http_mock,
):
profile_id = "PROFILE_ID"
report_id = "REPORT_ID"
file_id = "FILE_ID"
bucket_name = "test_bucket"
report_name = "test_report.csv"
temp_file_name = "TEST"

http_mock.MediaIoBaseDownload.return_value.next_chunk.return_value = (
None,
True,
)
tempfile_mock.NamedTemporaryFile.return_value.__enter__.return_value.name = temp_file_name
tempfile_mock.NamedTemporaryFile.return_value.__enter__.return_value.name = TEMP_FILE_NAME
op = GoogleCampaignManagerDownloadReportOperator(
profile_id=profile_id,
report_id=report_id,
file_id=file_id,
bucket_name=bucket_name,
report_name=report_name,
profile_id=PROFILE_ID,
report_id=REPORT_ID,
file_id=FILE_ID,
bucket_name=BUCKET_NAME,
report_name=REPORT_NAME,
api_version=API_VERSION,
task_id="test_task",
)
Expand All @@ -123,21 +135,78 @@ def test_execute(
impersonation_chain=None,
)
hook_mock.return_value.get_report_file.assert_called_once_with(
profile_id=profile_id, report_id=report_id, file_id=file_id
profile_id=PROFILE_ID, report_id=REPORT_ID, file_id=FILE_ID
)
gcs_hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
delegate_to=None,
impersonation_chain=None,
)
gcs_hook_mock.return_value.upload.assert_called_once_with(
bucket_name=bucket_name,
object_name=report_name + ".gz",
bucket_name=BUCKET_NAME,
object_name=REPORT_NAME + ".gz",
gzip=True,
filename=TEMP_FILE_NAME,
mime_type="text/csv",
)
xcom_mock.assert_called_once_with(None, key="report_name", value=REPORT_NAME + ".gz")

@parameterized.expand([BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ ti.xcom_pull(task_ids='f') }}"])
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.http")
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile")
@mock.patch(
"airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerHook"
)
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.GCSHook")
def test_set_bucket_name(
self,
test_bucket_name,
gcs_hook_mock,
hook_mock,
tempfile_mock,
http_mock,
):
http_mock.MediaIoBaseDownload.return_value.next_chunk.return_value = (
None,
True,
)
tempfile_mock.NamedTemporaryFile.return_value.__enter__.return_value.name = TEMP_FILE_NAME

dag = DAG(
dag_id="test_set_bucket_name",
start_date=DEFAULT_DATE,
schedule_interval=None,
catchup=False,
)

if BUCKET_NAME not in test_bucket_name:

@dag.task
def f():
return BUCKET_NAME

taskflow_op = f()
taskflow_op.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

op = GoogleCampaignManagerDownloadReportOperator(
profile_id=PROFILE_ID,
report_id=REPORT_ID,
file_id=FILE_ID,
bucket_name=test_bucket_name if test_bucket_name != "XComArg" else taskflow_op,
report_name=REPORT_NAME,
api_version=API_VERSION,
task_id="test_task",
dag=dag,
)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

gcs_hook_mock.return_value.upload.assert_called_once_with(
bucket_name=BUCKET_NAME,
object_name=REPORT_NAME + ".gz",
gzip=True,
filename=temp_file_name,
filename=TEMP_FILE_NAME,
mime_type="text/csv",
)
xcom_mock.assert_called_once_with(None, key="report_name", value=report_name + ".gz")


class TestGoogleCampaignManagerInsertReportOperator(TestCase):
Expand All @@ -150,14 +219,12 @@ class TestGoogleCampaignManagerInsertReportOperator(TestCase):
"campaign_manager.GoogleCampaignManagerInsertReportOperator.xcom_push"
)
def test_execute(self, xcom_mock, mock_base_op, hook_mock):
profile_id = "PROFILE_ID"
report = {"report": "test"}
report_id = "test"

hook_mock.return_value.insert_report.return_value = {"id": report_id}
hook_mock.return_value.insert_report.return_value = {"id": REPORT_ID}

op = GoogleCampaignManagerInsertReportOperator(
profile_id=profile_id,
profile_id=PROFILE_ID,
report=report,
api_version=API_VERSION,
task_id="test_task",
Expand All @@ -169,17 +236,16 @@ def test_execute(self, xcom_mock, mock_base_op, hook_mock):
api_version=API_VERSION,
impersonation_chain=None,
)
hook_mock.return_value.insert_report.assert_called_once_with(profile_id=profile_id, report=report)
xcom_mock.assert_called_once_with(None, key="report_id", value=report_id)
hook_mock.return_value.insert_report.assert_called_once_with(profile_id=PROFILE_ID, report=report)
xcom_mock.assert_called_once_with(None, key="report_id", value=REPORT_ID)

def test_prepare_template(self):
profile_id = "PROFILE_ID"
report = {"key": "value"}
with NamedTemporaryFile("w+", suffix=".json") as f:
f.write(json.dumps(report))
f.flush()
op = GoogleCampaignManagerInsertReportOperator(
profile_id=profile_id,
profile_id=PROFILE_ID,
report=f.name,
api_version=API_VERSION,
task_id="test_task",
Expand All @@ -200,16 +266,13 @@ class TestGoogleCampaignManagerRunReportOperator(TestCase):
"campaign_manager.GoogleCampaignManagerRunReportOperator.xcom_push"
)
def test_execute(self, xcom_mock, mock_base_op, hook_mock):
profile_id = "PROFILE_ID"
report_id = "REPORT_ID"
file_id = "FILE_ID"
synchronous = True

hook_mock.return_value.run_report.return_value = {"id": file_id}
hook_mock.return_value.run_report.return_value = {"id": FILE_ID}

op = GoogleCampaignManagerRunReportOperator(
profile_id=profile_id,
report_id=report_id,
profile_id=PROFILE_ID,
report_id=REPORT_ID,
synchronous=synchronous,
api_version=API_VERSION,
task_id="test_task",
Expand All @@ -222,9 +285,9 @@ def test_execute(self, xcom_mock, mock_base_op, hook_mock):
impersonation_chain=None,
)
hook_mock.return_value.run_report.assert_called_once_with(
profile_id=profile_id, report_id=report_id, synchronous=synchronous
profile_id=PROFILE_ID, report_id=REPORT_ID, synchronous=synchronous
)
xcom_mock.assert_called_once_with(None, key="file_id", value=file_id)
xcom_mock.assert_called_once_with(None, key="file_id", value=FILE_ID)


class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
Expand All @@ -233,18 +296,17 @@ class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
)
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator")
def test_execute(self, mock_base_op, hook_mock):
profile_id = "PROFILE_ID"
op = GoogleCampaignManagerBatchInsertConversionsOperator(
task_id="insert_conversion",
profile_id=profile_id,
profile_id=PROFILE_ID,
conversions=[CONVERSION],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
encryption_entity_id=123456789,
)
op.execute(None)
hook_mock.return_value.conversions_batch_insert.assert_called_once_with(
profile_id=profile_id,
profile_id=PROFILE_ID,
conversions=[CONVERSION],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
Expand All @@ -259,18 +321,17 @@ class TestGoogleCampaignManagerBatchUpdateConversionOperator(TestCase):
)
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator")
def test_execute(self, mock_base_op, hook_mock):
profile_id = "PROFILE_ID"
op = GoogleCampaignManagerBatchUpdateConversionsOperator(
task_id="update_conversion",
profile_id=profile_id,
profile_id=PROFILE_ID,
conversions=[CONVERSION],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
encryption_entity_id=123456789,
)
op.execute(None)
hook_mock.return_value.conversions_batch_update.assert_called_once_with(
profile_id=profile_id,
profile_id=PROFILE_ID,
conversions=[CONVERSION],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
Expand Down

0 comments on commit fb478c0

Please sign in to comment.