Skip to content

Commit

Permalink
Cloud Video Intelligence Operators assets & system tests migration (A…
Browse files Browse the repository at this point in the history
…IP-47) (#26132)

 Cloud Video Intelligence Operators assets & system tests migration (AIP-47)
  • Loading branch information
bkossakowska committed Sep 10, 2022
1 parent 954349a commit 23ad7e2
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ Using the operator

Input uri is an uri to a file in Google Cloud Storage

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_video_intelligence.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/video_intelligence/example_video_intelligence.py
:language: python
:start-after: [START howto_operator_video_intelligence_other_args]
:end-before: [END howto_operator_video_intelligence_other_args]

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_video_intelligence.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/video_intelligence/example_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_labels]
:end-before: [END howto_operator_video_intelligence_detect_labels]

You can use the annotation output via Xcom:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_video_intelligence.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/video_intelligence/example_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_labels_result]
Expand Down Expand Up @@ -87,23 +87,23 @@ Arguments

Input uri is an uri to a file in Google Cloud Storage

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_video_intelligence.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/video_intelligence/example_video_intelligence.py
:language: python
:start-after: [START howto_operator_video_intelligence_other_args]
:end-before: [END howto_operator_video_intelligence_other_args]

Using the operator
""""""""""""""""""

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_video_intelligence.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/video_intelligence/example_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_explicit_content]
:end-before: [END howto_operator_video_intelligence_detect_explicit_content]

You can use the annotation output via Xcom:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_video_intelligence.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/video_intelligence/example_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_explicit_content_result]
Expand Down Expand Up @@ -139,23 +139,23 @@ Arguments

Input uri is an uri to a file in Google Cloud Storage

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_video_intelligence.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/video_intelligence/example_video_intelligence.py
:language: python
:start-after: [START howto_operator_video_intelligence_other_args]
:end-before: [END howto_operator_video_intelligence_other_args]

Using the operator
""""""""""""""""""

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_video_intelligence.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/video_intelligence/example_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_video_shots]
:end-before: [END howto_operator_video_intelligence_detect_video_shots]

You can use the annotation output via Xcom:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_video_intelligence.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/video_intelligence/example_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_video_shots_result]
Expand Down

This file was deleted.

16 changes: 16 additions & 0 deletions tests/system/providers/google/cloud/video_intelligence/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,61 @@
This DAG relies on the following OS environment variables:
* GCP_BUCKET_NAME - Google Cloud Storage bucket where the file exists.
* BUCKET_NAME - Google Cloud Storage bucket where the file exists.
"""
import os
from datetime import datetime

from google.api_core.retry import Retry

from airflow import models
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.operators.video_intelligence import (
CloudVideoIntelligenceDetectVideoExplicitContentOperator,
CloudVideoIntelligenceDetectVideoLabelsOperator,
CloudVideoIntelligenceDetectVideoShotsOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")

DAG_ID = "example_gcp_video_intelligence"

# Public bucket holding the sample data
BUCKET_NAME_SRC = "cloud-samples-data"
# Path to the data inside the public bucket
PATH_SRC = "video/cat.mp4"

# [START howto_operator_video_intelligence_os_args]
GCP_BUCKET_NAME = os.environ.get("GCP_VIDEO_INTELLIGENCE_BUCKET_NAME", "INVALID BUCKET NAME")
BUCKET_NAME_DST = f"bucket-src-{DAG_ID}-{ENV_ID}"
# [END howto_operator_video_intelligence_os_args]

FILE_NAME = "video.mp4"

# [START howto_operator_video_intelligence_other_args]
INPUT_URI = f"gs://{GCP_BUCKET_NAME}/video.mp4"
INPUT_URI = f"gs://{BUCKET_NAME_DST}/{FILE_NAME}"
# [END howto_operator_video_intelligence_other_args]


with models.DAG(
"example_gcp_video_intelligence",
DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:

create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME_DST)

copy_single_file = GCSToGCSOperator(
task_id="copy_single_gcs_file",
source_bucket=BUCKET_NAME_SRC,
source_object=PATH_SRC,
destination_bucket=BUCKET_NAME_DST,
destination_object=FILE_NAME,
)

# [START howto_operator_video_intelligence_detect_labels]
detect_video_label = CloudVideoIntelligenceDetectVideoLabelsOperator(
input_uri=INPUT_URI,
Expand Down Expand Up @@ -110,6 +133,33 @@
)
# [END howto_operator_video_intelligence_detect_video_shots_result]

detect_video_label >> detect_video_label_result
detect_video_explicit_content >> detect_video_explicit_content_result
detect_video_shots >> detect_video_shots_result
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME_DST, trigger_rule=TriggerRule.ALL_DONE
)

chain(
# TEST SETUP
create_bucket,
copy_single_file,
# TEST BODY
detect_video_label,
detect_video_label_result,
detect_video_explicit_content,
detect_video_explicit_content_result,
detect_video_shots,
detect_video_shots_result,
# TEST TEARDOWN
delete_bucket,
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 23ad7e2

Please sign in to comment.