Skip to content

Commit

Permalink
Add support of path parameter for GCloud Storage Transfer Service o…
Browse files Browse the repository at this point in the history
…perators (#17446)

Co-authored-by: ekarimovDH <[email protected]>
  • Loading branch information
eskarimov and ekarimovDH committed Oct 30, 2021
1 parent a355213 commit a3c9956
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class GcpTransferOperationStatus:
NAME = 'name'
OBJECT_CONDITIONS = 'object_conditions'
OPERATIONS = 'operations'
PATH = 'path'
PROJECT_ID = 'projectId'
SCHEDULE = 'schedule'
SCHEDULE_END_DATE = 'scheduleEndDate'
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/google/cloud/hooks/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from google.cloud.exceptions import GoogleCloudError

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.utils.helpers import normalize_directory_path
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from airflow.utils import timezone
from airflow.version import version
Expand Down Expand Up @@ -1065,8 +1066,8 @@ def sync(
source_bucket_obj = client.bucket(source_bucket)
destination_bucket_obj = client.bucket(destination_bucket)
# Normalize parameters when they are passed
source_object = self._normalize_directory_path(source_object)
destination_object = self._normalize_directory_path(destination_object)
source_object = normalize_directory_path(source_object)
destination_object = normalize_directory_path(destination_object)
# Calculate the number of characters that remove from the name, because they contain information
# about the parent's path
source_object_prefix_len = len(source_object) if source_object else 0
Expand Down Expand Up @@ -1137,9 +1138,6 @@ def _calculate_sync_destination_path(
else blob.name[source_object_prefix_len:]
)

def _normalize_directory_path(self, source_object: Optional[str]) -> Optional[str]:
return source_object + "/" if source_object and not source_object.endswith("/") else source_object

@staticmethod
def _prepare_sync_plan(
source_bucket: storage.Bucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
MONTH,
NAME,
OBJECT_CONDITIONS,
PATH,
PROJECT_ID,
SCHEDULE,
SCHEDULE_END_DATE,
Expand All @@ -53,6 +54,7 @@
CloudDataTransferServiceHook,
GcpTransferJobsStatus,
)
from airflow.providers.google.cloud.utils.helpers import normalize_directory_path


class TransferJobPreprocessor:
Expand Down Expand Up @@ -763,6 +765,10 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator):
:param gcs_bucket: The destination Google Cloud Storage bucket
where you want to store the files. (templated)
:type gcs_bucket: str
:param s3_path: Optional root path where the source objects are. (templated)
:type s3_path: str
:param gcs_path: Optional root path for transferred objects. (templated)
:type gcs_path: str
:param project_id: Optional ID of the Google Cloud Console project that
owns the job
:type project_id: str
Expand Down Expand Up @@ -815,6 +821,8 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator):
'gcp_conn_id',
's3_bucket',
'gcs_bucket',
's3_path',
'gcs_path',
'description',
'object_conditions',
'google_impersonation_chain',
Expand All @@ -826,6 +834,8 @@ def __init__(
*,
s3_bucket: str,
gcs_bucket: str,
s3_path: Optional[str] = None,
gcs_path: Optional[str] = None,
project_id: Optional[str] = None,
aws_conn_id: str = 'aws_default',
gcp_conn_id: str = 'google_cloud_default',
Expand All @@ -844,6 +854,8 @@ def __init__(
super().__init__(**kwargs)
self.s3_bucket = s3_bucket
self.gcs_bucket = gcs_bucket
self.s3_path = s3_path
self.gcs_path = gcs_path
self.project_id = project_id
self.aws_conn_id = aws_conn_id
self.gcp_conn_id = gcp_conn_id
Expand Down Expand Up @@ -884,8 +896,14 @@ def _create_body(self) -> dict:
DESCRIPTION: self.description,
STATUS: GcpTransferJobsStatus.ENABLED,
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: self.s3_bucket},
GCS_DATA_SINK: {BUCKET_NAME: self.gcs_bucket},
AWS_S3_DATA_SOURCE: {
BUCKET_NAME: self.s3_bucket,
PATH: normalize_directory_path(self.s3_path),
},
GCS_DATA_SINK: {
BUCKET_NAME: self.gcs_bucket,
PATH: normalize_directory_path(self.gcs_path),
},
},
}

Expand Down Expand Up @@ -935,6 +953,10 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator):
:param destination_bucket: The destination Google Cloud Storage bucket
where the object should be. (templated)
:type destination_bucket: str
:param source_path: Optional root path where the source objects are. (templated)
:type source_path: str
:param destination_path: Optional root path for transferred objects. (templated)
:type destination_path: str
:param project_id: The ID of the Google Cloud Console project that
owns the job
:type project_id: str
Expand Down Expand Up @@ -985,6 +1007,8 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator):
'gcp_conn_id',
'source_bucket',
'destination_bucket',
'source_path',
'destination_path',
'description',
'object_conditions',
'google_impersonation_chain',
Expand All @@ -996,6 +1020,8 @@ def __init__(
*,
source_bucket: str,
destination_bucket: str,
source_path: Optional[str] = None,
destination_path: Optional[str] = None,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
Expand All @@ -1013,6 +1039,8 @@ def __init__(
super().__init__(**kwargs)
self.source_bucket = source_bucket
self.destination_bucket = destination_bucket
self.source_path = source_path
self.destination_path = destination_path
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
Expand Down Expand Up @@ -1053,8 +1081,14 @@ def _create_body(self) -> dict:
DESCRIPTION: self.description,
STATUS: GcpTransferJobsStatus.ENABLED,
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: self.source_bucket},
GCS_DATA_SINK: {BUCKET_NAME: self.destination_bucket},
GCS_DATA_SOURCE: {
BUCKET_NAME: self.source_bucket,
PATH: normalize_directory_path(self.source_path),
},
GCS_DATA_SINK: {
BUCKET_NAME: self.destination_bucket,
PATH: normalize_directory_path(self.destination_path),
},
},
}

Expand Down
24 changes: 24 additions & 0 deletions airflow/providers/google/cloud/utils/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""This module contains helper functions for Google Cloud operators."""
from typing import Optional


def normalize_directory_path(source_object: Optional[str]) -> Optional[str]:
"""Makes sure dir path ends with a slash"""
return source_object + "/" if source_object and not source_object.endswith("/") else source_object
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
HTTP_DATA_SOURCE,
LIST_URL,
NAME,
PATH,
SCHEDULE,
SCHEDULE_END_DATE,
SCHEDULE_START_DATE,
Expand Down Expand Up @@ -76,6 +77,8 @@
OPERATION_NAME = "operation-name"
AWS_BUCKET_NAME = "aws-bucket-name"
GCS_BUCKET_NAME = "gcp-bucket-name"
SOURCE_PATH = None
DESTINATION_PATH = None
DESCRIPTION = "description"

DEFAULT_DATE = timezone.datetime(2017, 1, 1)
Expand All @@ -102,16 +105,16 @@
START_TIME_OF_DAY: {'hours': 11, 'minutes': 42, 'seconds': 43},
}

SOURCE_AWS = {AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET_NAME}}
SOURCE_GCS = {GCS_DATA_SOURCE: {BUCKET_NAME: GCS_BUCKET_NAME}}
SOURCE_AWS = {AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET_NAME, PATH: SOURCE_PATH}}
SOURCE_GCS = {GCS_DATA_SOURCE: {BUCKET_NAME: GCS_BUCKET_NAME, PATH: SOURCE_PATH}}
SOURCE_HTTP = {HTTP_DATA_SOURCE: {LIST_URL: "http://example.com"}}

VALID_TRANSFER_JOB_BASE = {
NAME: JOB_NAME,
DESCRIPTION: DESCRIPTION,
STATUS: 'ENABLED',
SCHEDULE: SCHEDULE_DICT,
TRANSFER_SPEC: {GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME}},
TRANSFER_SPEC: {GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME, PATH: DESTINATION_PATH}},
} # type: Dict
VALID_TRANSFER_JOB_GCS = deepcopy(VALID_TRANSFER_JOB_BASE)
VALID_TRANSFER_JOB_GCS[TRANSFER_SPEC].update(deepcopy(SOURCE_GCS))
Expand All @@ -124,16 +127,16 @@
STATUS: 'ENABLED',
SCHEDULE: SCHEDULE_NATIVE,
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: GCS_BUCKET_NAME},
GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME},
GCS_DATA_SOURCE: {BUCKET_NAME: GCS_BUCKET_NAME, PATH: SOURCE_PATH},
GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME, PATH: DESTINATION_PATH},
},
}

VALID_TRANSFER_JOB_RAW = {
DESCRIPTION: DESCRIPTION,
STATUS: 'ENABLED',
SCHEDULE: SCHEDULE_DICT,
TRANSFER_SPEC: {GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME}},
TRANSFER_SPEC: {GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME, PATH: DESTINATION_PATH}},
} # type: Dict

VALID_TRANSFER_JOB_GCS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
Expand Down
25 changes: 25 additions & 0 deletions tests/providers/google/cloud/utils/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.providers.google.cloud.utils.helpers import normalize_directory_path


class TestHelpers:
def test_normalize_directory_path(self):
assert normalize_directory_path("dir_path") == "dir_path/"
assert normalize_directory_path("dir_path/") == "dir_path/"
assert normalize_directory_path(None) is None

0 comments on commit a3c9956

Please sign in to comment.