Skip to content

Commit

Permalink
Fix GCSToGoogleDriveOperator and gdrive system tests (#34545)
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov committed Oct 12, 2023
1 parent b7f532a commit 4dc2c40
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 48 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/google/suite/transfers/gcs_to_gdrive.py
Expand Up @@ -95,7 +95,7 @@ def __init__(
source_bucket: str,
source_object: str,
destination_object: str | None = None,
destination_folder_id: str | None = None,
destination_folder_id: str = "root",
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
Expand Down
14 changes: 7 additions & 7 deletions tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
Expand Up @@ -66,7 +66,7 @@ def test_should_copy_single_file(self, mock_named_temporary_file, mock_gdrive, m
mock.call().upload_file(
local_location="TMP1",
remote_location="copied_sales/2017/january-backup.avro",
folder_id=None,
folder_id="root",
),
]
)
Expand Down Expand Up @@ -156,13 +156,13 @@ def test_should_copy_files(self, mock_named_temporary_file, mock_gdrive, mock_gc
impersonation_chain=IMPERSONATION_CHAIN,
),
mock.call().upload_file(
local_location="TMP1", remote_location="sales/A.avro", folder_id=None
local_location="TMP1", remote_location="sales/A.avro", folder_id="root"
),
mock.call().upload_file(
local_location="TMP2", remote_location="sales/B.avro", folder_id=None
local_location="TMP2", remote_location="sales/B.avro", folder_id="root"
),
mock.call().upload_file(
local_location="TMP3", remote_location="sales/C.avro", folder_id=None
local_location="TMP3", remote_location="sales/C.avro", folder_id="root"
),
]
)
Expand Down Expand Up @@ -210,13 +210,13 @@ def test_should_move_files(self, mock_named_temporary_file, mock_gdrive, mock_gc
impersonation_chain=IMPERSONATION_CHAIN,
),
mock.call().upload_file(
local_location="TMP1", remote_location="sales/A.avro", folder_id=None
local_location="TMP1", remote_location="sales/A.avro", folder_id="root"
),
mock.call().upload_file(
local_location="TMP2", remote_location="sales/B.avro", folder_id=None
local_location="TMP2", remote_location="sales/B.avro", folder_id="root"
),
mock.call().upload_file(
local_location="TMP3", remote_location="sales/C.avro", folder_id=None
local_location="TMP3", remote_location="sales/C.avro", folder_id="root"
),
]
)
Expand Down
93 changes: 78 additions & 15 deletions tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
Expand Up @@ -23,109 +23,172 @@
"""
from __future__ import annotations

import json
import logging
import os
from datetime import datetime
from pathlib import Path

from airflow.decorators import task
from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator
from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", "abcd1234")
FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", None)

DAG_ID = "example_gcs_to_gdrive"

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"

TMP_PATH = "tmp"

WORK_DIR = f"folder_{DAG_ID}_{ENV_ID}".replace("-", "_")
CURRENT_FOLDER = Path(__file__).parent
LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources")

FILE_LOCAL_PATH = str(Path(LOCAL_PATH))
FILE_NAME = "example_upload.txt"

log = logging.getLogger(__name__)


with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "gcs"],
tags=["example", "gcs", "gdrive"],
) as dag:

@task
def create_temp_gcp_connection():
conn = Connection(
conn_id=CONNECTION_ID,
conn_type="google_cloud_platform",
)
conn_extra_json = json.dumps(
{
"scope": "https://www.googleapis.com/auth/drive,"
"https://www.googleapis.com/auth/cloud-platform"
}
)
conn.set_extra(conn_extra_json)

session: Session = Session()
if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first():
log.warning("Connection %s already exists", CONNECTION_ID)
return None
session.add(conn)
session.commit()

create_temp_gcp_connection_task = create_temp_gcp_connection()

create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
upload_file_1 = LocalFilesystemToGCSOperator(
task_id="upload_file_1",
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
dst=f"{TMP_PATH}/{FILE_NAME}",
bucket=BUCKET_NAME,
)

upload_file_2 = LocalFilesystemToGCSOperator(
task_id="upload_fil_2",
task_id="upload_file_2",
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
dst=f"{TMP_PATH}/2_{FILE_NAME}",
bucket=BUCKET_NAME,
)
# [START howto_operator_gcs_to_gdrive_copy_single_file]
copy_single_file = GCSToGoogleDriveOperator(
task_id="copy_single_file",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/{FILE_NAME}",
destination_object=f"copied_tmp/copied_{FILE_NAME}",
destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
)
# [END howto_operator_gcs_to_gdrive_copy_single_file]

# [START howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
copy_single_file_into_folder = GCSToGoogleDriveOperator(
task_id="copy_single_file_into_folder",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/{FILE_NAME}",
destination_object=f"copied_tmp/copied_{FILE_NAME}",
destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
destination_folder_id=FOLDER_ID,
)
# [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder]

# [START howto_operator_gcs_to_gdrive_copy_files]
copy_files = GCSToGoogleDriveOperator(
task_id="copy_files",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/*",
destination_object="copied_tmp/",
destination_object=f"{WORK_DIR}/",
)
# [END howto_operator_gcs_to_gdrive_copy_files]

# [START howto_operator_gcs_to_gdrive_move_files]
move_files = GCSToGoogleDriveOperator(
task_id="move_files",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/*.txt",
destination_object=f"{WORK_DIR}/",
move_object=True,
)
# [END howto_operator_gcs_to_gdrive_move_files]

@task(trigger_rule=TriggerRule.ALL_DONE)
def remove_files_from_drive():
service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
root_path = (
service.files()
.list(q=f"name = '{WORK_DIR}' and mimeType = 'application/vnd.google-apps.folder'")
.execute()
)
if files := root_path["files"]:
batch = service.new_batch_http_request()
for file in files:
log.info("Preparing to remove file: {}", file)
batch.add(service.files().delete(fileId=file["id"]))
batch.execute()
log.info("Selected files removed.")

remove_files_from_drive_task = remove_files_from_drive()

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

delete_temp_gcp_connection_task = BashOperator(
task_id="delete_temp_gcp_connection",
bash_command=f"airflow connections delete {CONNECTION_ID}",
trigger_rule=TriggerRule.ALL_DONE,
)

# TEST SETUP
create_bucket >> [upload_file_1, upload_file_2]
(
# TEST SETUP
create_bucket
>> upload_file
>> upload_file_2
[upload_file_1, upload_file_2, create_temp_gcp_connection_task]
# TEST BODY
>> copy_single_file
>> copy_single_file_into_folder
>> copy_files
>> move_files
# TEST TEARDOWN
>> delete_bucket
>> remove_files_from_drive_task
>> [delete_bucket, delete_temp_gcp_connection_task]
)

from tests.system.utils.watcher import watcher
Expand Down
74 changes: 65 additions & 9 deletions tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py
Expand Up @@ -17,16 +17,23 @@
# under the License.
from __future__ import annotations

import json
import logging
import os
from datetime import datetime
from pathlib import Path

from airflow.decorators import task
from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.gdrive_to_gcs import GoogleDriveToGCSOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor
from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator
from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
Expand All @@ -35,20 +42,48 @@
DAG_ID = "example_gdrive_to_gcs_with_gdrive_sensor"

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"

OBJECT = "abc123xyz"
FOLDER_ID = ""
FILE_NAME = "example_upload.txt"
DRIVE_FILE_NAME = f"example_upload_{DAG_ID}_{ENV_ID}.txt"
LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)

log = logging.getLogger(__name__)


with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
tags=["example", "gcs", "gdrive"],
) as dag:

@task
def create_temp_gcp_connection():
conn = Connection(
conn_id=CONNECTION_ID,
conn_type="google_cloud_platform",
)
conn_extra_json = json.dumps(
{
"scope": "https://www.googleapis.com/auth/drive,"
"https://www.googleapis.com/auth/cloud-platform"
}
)
conn.set_extra(conn_extra_json)

session: Session = Session()
if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first():
log.warning("Connection %s already exists", CONNECTION_ID)
return None
session.add(conn)
session.commit()

create_temp_gcp_connection_task = create_temp_gcp_connection()

create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)
Expand All @@ -62,41 +97,62 @@

copy_single_file = GCSToGoogleDriveOperator(
task_id="copy_single_file",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=FILE_NAME,
destination_object=FILE_NAME,
destination_object=DRIVE_FILE_NAME,
)

# [START detect_file]
detect_file = GoogleDriveFileExistenceSensor(
task_id="detect_file", folder_id=FOLDER_ID, file_name=FILE_NAME
task_id="detect_file",
folder_id=FOLDER_ID,
file_name=DRIVE_FILE_NAME,
gcp_conn_id=CONNECTION_ID,
)
# [END detect_file]

# [START upload_gdrive_to_gcs]
upload_gdrive_to_gcs = GoogleDriveToGCSOperator(
task_id="upload_gdrive_object_to_gcs",
gcp_conn_id=CONNECTION_ID,
folder_id=FOLDER_ID,
file_name=FILE_NAME,
file_name=DRIVE_FILE_NAME,
bucket_name=BUCKET_NAME,
object_name=OBJECT,
)
# [END upload_gdrive_to_gcs]

@task(trigger_rule=TriggerRule.ALL_DONE)
def remove_files_from_drive():
service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
response = service.files().list(q=f"name = '{DRIVE_FILE_NAME}'").execute()
if files := response["files"]:
file = files[0]
log.info("Deleting file {}...", file)
service.files().delete(fileId=file["id"])
log.info("Done.")

remove_files_from_drive_task = remove_files_from_drive()

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

delete_temp_gcp_connection_task = BashOperator(
task_id="delete_temp_gcp_connection",
bash_command=f"airflow connections delete {CONNECTION_ID}",
trigger_rule=TriggerRule.ALL_DONE,
)

(
# TEST SETUP
create_bucket
>> upload_file
>> copy_single_file
[create_bucket >> upload_file >> copy_single_file, create_temp_gcp_connection_task]
# TEST BODY
>> detect_file
>> upload_gdrive_to_gcs
# TEST TEARDOWN
>> delete_bucket
>> remove_files_from_drive_task
>> [delete_bucket, delete_temp_gcp_connection_task]
)

from tests.system.utils.watcher import watcher
Expand Down

0 comments on commit 4dc2c40

Please sign in to comment.