Skip to content

Commit

Permalink
Life Science assets & system tests migration (AIP-47) (#25548)
Browse files Browse the repository at this point in the history
  • Loading branch information
bkossakowska committed Sep 9, 2022
1 parent 5066844 commit 3a539ff
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 16 deletions.
48 changes: 48 additions & 0 deletions airflow/providers/google/cloud/links/life_sciences.py
@@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import TYPE_CHECKING

from airflow.providers.google.cloud.links.base import BaseGoogleLink

if TYPE_CHECKING:
from airflow.utils.context import Context

BASE_LINK = "https://console.cloud.google.com/lifesciences"
LIFESCIENCES_LIST_LINK = BASE_LINK + "/pipelines?project={project_id}"


class LifeSciencesLink(BaseGoogleLink):
"""Helper class for constructing Life Sciences List link"""

name = "Life Sciences"
key = "lifesciences_key"
format_str = LIFESCIENCES_LIST_LINK

@staticmethod
def persist(
context: "Context",
task_instance,
project_id: str,
):
task_instance.xcom_push(
context=context,
key=LifeSciencesLink.key,
value={
"project_id": project_id,
},
)
10 changes: 9 additions & 1 deletion airflow/providers/google/cloud/operators/life_sciences.py
Expand Up @@ -22,6 +22,7 @@
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.life_sciences import LifeSciencesHook
from airflow.providers.google.cloud.links.life_sciences import LifeSciencesLink

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -57,6 +58,7 @@ class LifeSciencesRunPipelineOperator(BaseOperator):
"api_version",
"impersonation_chain",
)
operator_extra_links = (LifeSciencesLink(),)

def __init__(
self,
Expand Down Expand Up @@ -90,5 +92,11 @@ def execute(self, context: 'Context') -> dict:
api_version=self.api_version,
impersonation_chain=self.impersonation_chain,
)

project_id = self.project_id or hook.project_id
if project_id:
LifeSciencesLink.persist(
context=context,
task_instance=self,
project_id=project_id,
)
return hook.run_pipeline(body=self.body, location=self.location, project_id=self.project_id)
1 change: 1 addition & 0 deletions airflow/providers/google/provider.yaml
Expand Up @@ -1014,6 +1014,7 @@ extra-links:
- airflow.providers.google.cloud.links.cloud_build.CloudBuildListLink
- airflow.providers.google.cloud.links.cloud_build.CloudBuildTriggersListLink
- airflow.providers.google.cloud.links.cloud_build.CloudBuildTriggerDetailsLink
- airflow.providers.google.cloud.links.life_sciences.LifeSciencesLink
- airflow.providers.google.common.links.storage.StorageLink
- airflow.providers.google.common.links.storage.FileDetailsLink

Expand Down
7 changes: 5 additions & 2 deletions tests/providers/google/cloud/operators/test_life_sciences.py
Expand Up @@ -42,7 +42,9 @@ def test_executes(self, mock_hook):
operator = LifeSciencesRunPipelineOperator(
task_id='task-id', body=TEST_BODY, location=TEST_LOCATION, project_id=TEST_PROJECT_ID
)
result = operator.execute(None)
context = mock.MagicMock()
result = operator.execute(context=context)

assert result == TEST_OPERATION

@mock.patch("airflow.providers.google.cloud.operators.life_sciences.LifeSciencesHook")
Expand All @@ -54,5 +56,6 @@ def test_executes_without_project_id(self, mock_hook):
body=TEST_BODY,
location=TEST_LOCATION,
)
result = operator.execute(None)
context = mock.MagicMock()
result = operator.execute(context=context)
assert result == TEST_OPERATION
Expand Up @@ -18,7 +18,7 @@
import pytest

from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
from tests.system.providers.google.cloud.life_sciences.example_life_sciences import BUCKET
from tests.system.providers.google.cloud.life_sciences.example_life_sciences import BUCKET_NAME
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context


Expand All @@ -28,13 +28,13 @@ class GoogleDriveToGCSExampleDagsSystemTest(GoogleSystemTest):
@provide_gcp_context(GCP_GCS_KEY)
def setUp(self):
super().setUp()
self.create_gcs_bucket(BUCKET)
self.create_gcs_bucket(BUCKET_NAME)

@provide_gcp_context(GCP_GCS_KEY)
def test_run_example_dag_function(self):
self.run_dag('example_gdrive_to_gcs', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_GCS_KEY)
def tearDown(self):
self.delete_gcs_bucket(BUCKET)
self.delete_gcs_bucket(BUCKET_NAME)
super().tearDown()
1 change: 0 additions & 1 deletion tests/providers/google/cloud/utils/gcp_authenticator.py
Expand Up @@ -48,7 +48,6 @@
GCP_GCS_TRANSFER_KEY = 'gcp_gcs_transfer.json'
GCP_GKE_KEY = "gcp_gke.json"
GCP_KMS_KEY = "gcp_kms.json"
GCP_LIFE_SCIENCES_KEY = 'gcp_life_sciences.json'
GCP_MEMORYSTORE = 'gcp_memorystore.json'
GCP_PUBSUB_KEY = "gcp_pubsub.json"
GCP_SECRET_MANAGER_KEY = 'gcp_secret_manager.json'
Expand Down
Expand Up @@ -18,18 +18,26 @@

import os
from datetime import datetime
from pathlib import Path

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.operators.life_sciences import LifeSciencesRunPipelineOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_gcp_life_sciences"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
DAG_ID = "example_life_sciences"

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project-id")
BUCKET = os.environ.get("GCP_GCS_LIFE_SCIENCES_BUCKET", "INVALID BUCKET NAME")
FILENAME = os.environ.get("GCP_GCS_LIFE_SCIENCES_FILENAME", 'input.in')
LOCATION = os.environ.get("GCP_LIFE_SCIENCES_LOCATION", 'us-central1')
BUCKET_NAME = f"bucket_{DAG_ID}-{ENV_ID}"

FILE_NAME = "file"
LOCATION = "us-central1"

CURRENT_FOLDER = Path(__file__).parent
FILE_LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources" / FILE_NAME)

# [START howto_configure_simple_action_pipeline]
SIMPLE_ACTION_PIPELINE = {
Expand All @@ -53,16 +61,16 @@
"actions": [
{
"imageUri": "google/cloud-sdk",
"commands": ["gsutil", "cp", f"gs://{BUCKET}/{FILENAME}", "/tmp"],
"commands": ["gsutil", "cp", f"gs://{BUCKET_NAME}/{FILE_NAME}", "/tmp"],
},
{"imageUri": "bash", "commands": ["-c", "echo Hello, world"]},
{
"imageUri": "google/cloud-sdk",
"commands": [
"gsutil",
"cp",
f"gs://{BUCKET}/{FILENAME}",
f"gs://{BUCKET}/output.in",
f"gs://{BUCKET_NAME}/{FILE_NAME}",
f"gs://{BUCKET_NAME}/output.in",
],
},
],
Expand All @@ -83,6 +91,14 @@
catchup=False,
tags=['example'],
) as dag:
create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)

upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
src=FILE_LOCAL_PATH,
dst=FILE_NAME,
bucket=BUCKET_NAME,
)

# [START howto_run_pipeline]
simple_life_science_action_pipeline = LifeSciencesRunPipelineOperator(
Expand All @@ -97,7 +113,26 @@
task_id='multi-action-pipeline', body=MULTI_ACTION_PIPELINE, project_id=PROJECT_ID, location=LOCATION
)

simple_life_science_action_pipeline >> multiple_life_science_action_pipeline
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

chain(
# TEST SETUP
create_bucket,
upload_file,
# TEST BODY
simple_life_science_action_pipeline,
multiple_life_science_action_pipeline,
# TEST TEARDOWN
delete_bucket,
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402
Expand Down
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Empty file.

0 comments on commit 3a539ff

Please sign in to comment.