Skip to content

Commit

Permalink
Fix Google DLP example and improve ops idempotency (#10608)
Browse files Browse the repository at this point in the history
  • Loading branch information
turbaszek committed Aug 28, 2020
1 parent 3867f76 commit 5ae82a5
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 30 deletions.
58 changes: 31 additions & 27 deletions airflow/providers/google/cloud/example_dags/example_dlp.py
Expand Up @@ -52,22 +52,25 @@
)
INSPECT_CONFIG = InspectConfig(info_types=[{"name": "PHONE_NUMBER"}, {"name": "US_TOLLFREE_PHONE_NUMBER"}])
INSPECT_TEMPLATE = InspectTemplate(inspect_config=INSPECT_CONFIG)
OUTPUT_BUCKET = os.environ.get("DLP_OUTPUT_BUCKET", "gs://test-dlp-airflow")
OUTPUT_FILENAME = "test.txt"

OBJECT_GCS_URI = os.path.join(OUTPUT_BUCKET, "tmp")
OBJECT_GCS_OUTPUT_URI = os.path.join(OUTPUT_BUCKET, "tmp", OUTPUT_FILENAME)

with models.DAG(
"example_gcp_dlp",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
) as dag1:
# [START howto_operator_dlp_create_inspect_template]
create_template = CloudDLPCreateInspectTemplateOperator(
project_id=GCP_PROJECT,
inspect_template=INSPECT_TEMPLATE,
template_id=TEMPLATE_ID,
task_id="create_template",
do_xcom_push=True,
dag=dag,
)
# [END howto_operator_dlp_create_inspect_template]

Expand All @@ -77,37 +80,42 @@
project_id=GCP_PROJECT,
item=ITEM,
inspect_template_name="{{ task_instance.xcom_pull('create_template', key='return_value')['name'] }}",
dag=dag,
)
# [END howto_operator_dlp_use_inspect_template]

# [START howto_operator_dlp_delete_inspect_template]
delete_template = CloudDLPDeleteInspectTemplateOperator(
task_id="delete_template", template_id=TEMPLATE_ID, project_id=GCP_PROJECT, dag=dag,
task_id="delete_template", template_id=TEMPLATE_ID, project_id=GCP_PROJECT,
)
# [END howto_operator_dlp_delete_inspect_template]

create_template >> inspect_content >> delete_template


CUSTOM_INFO_TYPES = [{"info_type": {"name": "C_MRN"}, "regex": {"pattern": "[1-9]{3}-[1-9]{1}-[1-9]{5}"},}]
CUSTOM_INFO_TYPE_ID = "custom_info_type"
UPDATE_CUSTOM_INFO_TYPE = [
{"info_type": {"name": "C_MRN"}, "regex": {"pattern": "[a-z]{3}-[a-z]{1}-[a-z]{5}"},}
]
CUSTOM_INFO_TYPES = {
"large_custom_dictionary": {
"output_path": {"path": OBJECT_GCS_OUTPUT_URI},
"cloud_storage_file_set": {"url": OBJECT_GCS_URI + "/"},
}
}
UPDATE_CUSTOM_INFO_TYPE = {
"large_custom_dictionary": {
"output_path": {"path": OBJECT_GCS_OUTPUT_URI},
"cloud_storage_file_set": {"url": OBJECT_GCS_URI + "/"},
}
}

with models.DAG(
"example_gcp_dlp_info_types",
schedule_interval=None,
start_date=days_ago(1),
tags=["example", "dlp", "info-types"],
) as dag:
) as dag2:
# [START howto_operator_dlp_create_info_type]
create_info_type = CloudDLPCreateStoredInfoTypeOperator(
project_id=GCP_PROJECT,
config=CUSTOM_INFO_TYPES,
stored_info_type_id=CUSTOM_INFO_TYPE_ID,
dag=dag,
task_id="create_info_type",
)
# [END howto_operator_dlp_create_info_type]
Expand All @@ -116,57 +124,53 @@
project_id=GCP_PROJECT,
stored_info_type_id=CUSTOM_INFO_TYPE_ID,
config=UPDATE_CUSTOM_INFO_TYPE,
dag=dag,
task_id="update_info_type",
)
# [END howto_operator_dlp_update_info_type]
# [START howto_operator_dlp_delete_info_type]
delete_info_type = CloudDLPDeleteStoredInfoTypeOperator(
project_id=GCP_PROJECT, stored_info_type_id=CUSTOM_INFO_TYPE_ID, dag=dag, task_id="delete_info_type",
project_id=GCP_PROJECT, stored_info_type_id=CUSTOM_INFO_TYPE_ID, task_id="delete_info_type",
)
# [END howto_operator_dlp_delete_info_type]
create_info_type >> update_info_type >> delete_info_type

SCHEDULE = {"recurrence_period_duration": {"seconds": 60 * 60 * 24}}
JOB = {
"inspect_config": INSPECT_CONFIG,
}

JOB_TRIGGER = {
"inspect_job": JOB,
"triggers": [{"schedule": SCHEDULE}],
"inspect_job": {
"storage_config": {
"datastore_options": {"partition_id": {"project_id": GCP_PROJECT}, "kind": {"name": "test"}}
}
},
"triggers": [{"schedule": {"recurrence_period_duration": {"seconds": 60 * 60 * 24}}}],
"status": "HEALTHY",
}

TRIGGER_ID = "example_trigger"

with models.DAG(
"example_gcp_dlp_job", schedule_interval=None, start_date=days_ago(1), tags=["example", "dlp_job"],
) as dag: # [START howto_operator_dlp_create_job_trigger]
"example_gcp_dlp_job", schedule_interval=None, start_date=days_ago(1), tags=["example", "dlp_job"]
) as dag3: # [START howto_operator_dlp_create_job_trigger]
create_trigger = CloudDLPCreateJobTriggerOperator(
project_id=GCP_PROJECT,
job_trigger=JOB_TRIGGER,
trigger_id=TRIGGER_ID,
dag=dag,
task_id="create_trigger",
)
# [END howto_operator_dlp_create_job_trigger]
UPDATED_SCHEDULE = {"recurrence_period_duration": {"seconds": 2 * 60 * 60 * 24}}

JOB_TRIGGER["triggers"] = [{"schedule": UPDATED_SCHEDULE}]
JOB_TRIGGER["triggers"] = [{"schedule": {"recurrence_period_duration": {"seconds": 2 * 60 * 60 * 24}}}]

# [START howto_operator_dlp_update_job_trigger]
update_trigger = CloudDLPUpdateJobTriggerOperator(
project_id=GCP_PROJECT,
job_trigger_id=TRIGGER_ID,
job_trigger=JOB_TRIGGER,
dag=dag,
task_id="update_info_type",
)
# [END howto_operator_dlp_update_job_trigger]
# [START howto_operator_dlp_delete_job_trigger]
delete_trigger = CloudDLPDeleteJobTriggerOperator(
project_id=GCP_PROJECT, job_trigger_id=TRIGGER_ID, dag=dag, task_id="delete_info_type",
project_id=GCP_PROJECT, job_trigger_id=TRIGGER_ID, task_id="delete_info_type"
)
# [END howto_operator_dlp_delete_job_trigger]
create_trigger >> update_trigger >> delete_trigger

8 changes: 6 additions & 2 deletions airflow/providers/google/cloud/operators/dlp.py
Expand Up @@ -520,7 +520,9 @@ def execute(self, context):
timeout=self.timeout,
metadata=self.metadata,
)
except AlreadyExists:
except InvalidArgument as e:
if "already in use" not in e.message:
raise
trigger = hook.get_job_trigger(
project_id=self.project_id,
job_trigger_id=self.trigger_id,
Expand Down Expand Up @@ -621,7 +623,9 @@ def execute(self, context):
timeout=self.timeout,
metadata=self.metadata,
)
except AlreadyExists:
except InvalidArgument as e:
if "already exists" not in e.message:
raise
info = hook.get_stored_info_type(
organization_id=self.organization_id,
project_id=self.project_id,
Expand Down
20 changes: 19 additions & 1 deletion tests/providers/google/cloud/operators/test_dlp_system.py
Expand Up @@ -25,11 +25,29 @@

from tests.providers.google.cloud.utils.gcp_authenticator import GCP_DLP_KEY
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
from airflow.providers.google.cloud.example_dags.example_dlp import OUTPUT_BUCKET, OUTPUT_FILENAME


@pytest.fixture(scope="class")
def helper():
GoogleSystemTest.create_gcs_bucket(OUTPUT_BUCKET)
GoogleSystemTest.upload_content_to_gcs("aaaa\nbbbb", OUTPUT_BUCKET, f"tmp/{OUTPUT_FILENAME}")
yield
GoogleSystemTest.delete_gcs_bucket(OUTPUT_BUCKET)


@pytest.mark.backend("mysql", "postgres")
@pytest.mark.usefixtures("helper")
@pytest.mark.credential_file(GCP_DLP_KEY)
class GcpDLPExampleDagsSystemTest(GoogleSystemTest):
@provide_gcp_context(GCP_DLP_KEY)
def test_run_example_dag_function(self):
def test_run_example_dag(self):
self.run_dag('example_gcp_dlp', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_DLP_KEY)
def test_run_example_info_types(self):
self.run_dag('example_gcp_dlp_info_types', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_DLP_KEY)
def test_run_example_dlp_job(self):
self.run_dag('example_gcp_dlp_job', CLOUD_DAG_FOLDER)

0 comments on commit 5ae82a5

Please sign in to comment.