Skip to content

Commit

Permalink
Add ability to specify a maximum modified time for objects in GCSToGC…
Browse files Browse the repository at this point in the history
…SOperator (#7791)
  • Loading branch information
ephraimbuddy committed Mar 22, 2020
1 parent 8c56388 commit 0daf5d7
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 16 deletions.
96 changes: 86 additions & 10 deletions airflow/providers/google/cloud/hooks/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,39 +272,115 @@ def exists(self, bucket_name, object_name):
blob = bucket.blob(blob_name=object_name)
return blob.exists()

def is_updated_after(self, bucket_name, object_name, ts):
def get_blob_update_time(self, bucket_name, object_name):
"""
Checks if an blob_name is updated in Google Cloud Storage.
Get the update time of a file in Google Cloud Storage
:param bucket_name: The Google Cloud Storage bucket where the object is.
:type bucket_name: str
:param object_name: The name of the object to check in the Google cloud
:param object_name: The name of the blob to get updated time from the Google cloud
storage bucket.
:type object_name: str
:param ts: The timestamp to check against.
:type ts: datetime.datetime
"""
client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.get_blob(blob_name=object_name)

if blob is None:
raise ValueError("Object ({}) not found in Bucket ({})".format(
object_name, bucket_name))
return blob.updated

blob_update_time = blob.updated
def is_updated_after(self, bucket_name, object_name, ts):
"""
Checks if an blob_name is updated in Google Cloud Storage.
:param bucket_name: The Google Cloud Storage bucket where the object is.
:type bucket_name: str
:param object_name: The name of the object to check in the Google cloud
storage bucket.
:type object_name: str
:param ts: The timestamp to check against.
:type ts: datetime.datetime
"""
blob_update_time = self.get_blob_update_time(bucket_name, object_name)
if blob_update_time is not None:
import dateutil.tz

if not ts.tzinfo:
ts = ts.replace(tzinfo=dateutil.tz.tzutc())

self.log.info("Verify object date: %s > %s", blob_update_time, ts)

if blob_update_time > ts:
return True
return False

def is_updated_between(self, bucket_name, object_name, min_ts, max_ts):
"""
Checks if an blob_name is updated in Google Cloud Storage.
:param bucket_name: The Google Cloud Storage bucket where the object is.
:type bucket_name: str
:param object_name: The name of the object to check in the Google cloud
storage bucket.
:type object_name: str
:param min_ts: The minimum timestamp to check against.
:type min_ts: datetime.datetime
:param max_ts: The maximum timestamp to check against.
:type max_ts: datetime.datetime
"""
blob_update_time = self.get_blob_update_time(bucket_name, object_name)
if blob_update_time is not None:
import dateutil.tz
if not min_ts.tzinfo:
min_ts = min_ts.replace(tzinfo=dateutil.tz.tzutc())
if not max_ts.tzinfo:
max_ts = max_ts.replace(tzinfo=dateutil.tz.tzutc())
self.log.info("Verify object date: %s is between %s and %s", blob_update_time, min_ts, max_ts)
if min_ts <= blob_update_time < max_ts:
return True
return False

def is_updated_before(self, bucket_name, object_name, ts):
"""
Checks if an blob_name is updated before given time in Google Cloud Storage.
:param bucket_name: The Google Cloud Storage bucket where the object is.
:type bucket_name: str
:param object_name: The name of the object to check in the Google cloud
storage bucket.
:type object_name: str
:param ts: The timestamp to check against.
:type ts: datetime.datetime
"""
blob_update_time = self.get_blob_update_time(bucket_name, object_name)
if blob_update_time is not None:
import dateutil.tz
if not ts.tzinfo:
ts = ts.replace(tzinfo=dateutil.tz.tzutc())
self.log.info("Verify object date: %s < %s", blob_update_time, ts)
if blob_update_time < ts:
return True
return False

def is_older_than(self, bucket_name, object_name, seconds):
"""
Check if object is older than given time
:param bucket_name: The Google Cloud Storage bucket where the object is.
:type bucket_name: str
:param object_name: The name of the object to check in the Google cloud
storage bucket.
:type object_name: str
:param seconds: The time in seconds to check against
:type seconds: int
"""
blob_update_time = self.get_blob_update_time(bucket_name, object_name)
if blob_update_time is not None:
from airflow.utils import timezone
from datetime import timedelta
current_time = timezone.utcnow()
given_time = current_time - timedelta(seconds=seconds)
self.log.info("Verify object date: %s is older than %s", blob_update_time, given_time)
if blob_update_time < given_time:
return True
return False

def delete(self, bucket_name, object_name):
Expand Down
51 changes: 49 additions & 2 deletions airflow/providers/google/cloud/operators/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ class GCSToGCSOperator(BaseOperator):
only if they were modified after last_modified_time.
If tzinfo has not been set, UTC will be assumed.
:type last_modified_time: datetime.datetime
:param maximum_modified_time: When specified, the objects will be copied or moved,
only if they were modified before maximum_modified_time.
If tzinfo has not been set, UTC will be assumed.
:type maximum_modified_time: datetime.datetime
:param is_older_than: When specified, the objects will be copied if they are older
than the specified time in seconds.
:type is_older_than: int
:Example:
Expand Down Expand Up @@ -174,6 +181,8 @@ def __init__(self, # pylint: disable=too-many-arguments
google_cloud_storage_conn_id=None,
delegate_to=None,
last_modified_time=None,
maximum_modified_time=None,
is_older_than=None,
*args,
**kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -193,6 +202,8 @@ def __init__(self, # pylint: disable=too-many-arguments
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.last_modified_time = last_modified_time
self.maximum_modified_time = maximum_modified_time
self.is_older_than = is_older_than

def execute(self, context):

Expand Down Expand Up @@ -277,13 +288,49 @@ def _copy_source_with_wildcard(self, hook, prefix):
destination_object=destination_object)

def _copy_single_object(self, hook, source_object, destination_object):
if self.last_modified_time is not None:
if self.is_older_than:
# Here we check if the given object is older than the given time
# If given, last_modified_time and maximum_modified_time is ignored
if hook.is_older_than(self.source_bucket,
source_object,
self.is_older_than
):
self.log.info("Object is older than %s seconds ago", self.is_older_than)
else:
self.log.debug("Object is not older than %s seconds ago", self.is_older_than)
return
elif self.last_modified_time and self.maximum_modified_time:
# check to see if object was modified between last_modified_time and
# maximum_modified_time
if hook.is_updated_between(self.source_bucket,
source_object,
self.last_modified_time,
self.maximum_modified_time
):
self.log.info("Object has been modified between %s and %s",
self.last_modified_time, self.maximum_modified_time)
else:
self.log.debug("Object was not modified between %s and %s",
self.last_modified_time, self.maximum_modified_time)
return

elif self.last_modified_time is not None:
# Check to see if object was modified after last_modified_time
if hook.is_updated_after(self.source_bucket,
source_object,
self.last_modified_time):
self.log.debug("Object has been modified after %s ", self.last_modified_time)
self.log.info("Object has been modified after %s ", self.last_modified_time)
else:
self.log.debug("Object was not modified after %s ", self.last_modified_time)
return
elif self.maximum_modified_time is not None:
# Check to see if object was modified before maximum_modified_time
if hook.is_updated_before(self.source_bucket,
source_object,
self.maximum_modified_time):
self.log.info("Object has been modified before %s ", self.maximum_modified_time)
else:
self.log.debug("Object was not modified before %s ", self.maximum_modified_time)
return

self.log.info('Executing copy of gs://%s/%s to gs://%s/%s',
Expand Down
80 changes: 76 additions & 4 deletions tests/providers/google/cloud/hooks/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import os
import tempfile
import unittest
from datetime import datetime
from datetime import datetime, timedelta
from unittest import mock

import dateutil
from google.cloud import exceptions, storage

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks import gcs
from airflow.utils import timezone
from airflow.version import version
from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id

Expand Down Expand Up @@ -70,7 +71,7 @@ def setUp(self):
new=mock_base_gcp_hook_default_project_id,
):
self.gcs_hook = gcs.GCSHook(
google_cloud_storage_conn_id='test')
gcp_conn_id='test')

@mock.patch(
'airflow.providers.google.cloud.hooks.base.CloudBaseHook.client_info',
Expand Down Expand Up @@ -152,6 +153,77 @@ def test_is_updated_after(self, mock_service):
# Then
self.assertTrue(response)

@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
def test_is_updated_before(self, mock_service):
test_bucket = 'test_bucket'
test_object = 'test_object'

# Given
mock_service.return_value.bucket.return_value.get_blob \
.return_value.updated = datetime(2019, 8, 28, 14, 7, 20, 700000, dateutil.tz.tzutc())

# When
response = self.gcs_hook.is_updated_before(
bucket_name=test_bucket, object_name=test_object,
ts=datetime(2020, 1, 1, 1, 1, 1)
)

# Then
self.assertTrue(response)

@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
def test_is_updated_between(self, mock_service):
test_bucket = 'test_bucket'
test_object = 'test_object'

# Given
mock_service.return_value.bucket.return_value.get_blob \
.return_value.updated = datetime(2019, 8, 28, 14, 7, 20, 700000, dateutil.tz.tzutc())

# When
response = self.gcs_hook.is_updated_between(
bucket_name=test_bucket, object_name=test_object,
min_ts=datetime(2018, 1, 1, 1, 1, 1),
max_ts=datetime(2020, 1, 1, 1, 1, 1)
)

# Then
self.assertTrue(response)

@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
def test_is_older_than_with_true_cond(self, mock_service):
test_bucket = 'test_bucket'
test_object = 'test_object'

# Given
mock_service.return_value.bucket.return_value.get_blob \
.return_value.updated = datetime(2020, 1, 28, 14, 7, 20, 700000, dateutil.tz.tzutc())

# When
response = self.gcs_hook.is_older_than(
bucket_name=test_bucket, object_name=test_object,
seconds=86400 # 24hr
)

# Then
self.assertTrue(response)

@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
def test_is_older_than_with_false_cond(self, mock_service):
test_bucket = 'test_bucket'
test_object = 'test_object'

# Given
mock_service.return_value.bucket.return_value.get_blob \
.return_value.updated = timezone.utcnow() + timedelta(days=2)
# When
response = self.gcs_hook.is_older_than(
bucket_name=test_bucket, object_name=test_object,
seconds=86400 # 24hr
)
# Then
self.assertFalse(response)

@mock.patch('google.cloud.storage.Bucket')
@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
def test_copy(self, mock_service, mock_bucket):
Expand Down Expand Up @@ -591,7 +663,7 @@ class TestGCSHookUpload(unittest.TestCase):
def setUp(self):
with mock.patch(BASE_STRING.format('CloudBaseHook.__init__')):
self.gcs_hook = gcs.GCSHook(
google_cloud_storage_conn_id='test'
gcp_conn_id='test'
)

# generate a 384KiB test file (larger than the minimum 256KiB multipart chunk size)
Expand Down Expand Up @@ -735,7 +807,7 @@ def setUp(self):
with mock.patch(
GCS_STRING.format("CloudBaseHook.__init__"), new=mock_base_gcp_hook_default_project_id
):
self.gcs_hook = gcs.GCSHook(google_cloud_storage_conn_id="test")
self.gcs_hook = gcs.GCSHook(gcp_conn_id="test")

@mock.patch(GCS_STRING.format("GCSHook.copy"))
@mock.patch(GCS_STRING.format("GCSHook.rewrite"))
Expand Down
46 changes: 46 additions & 0 deletions tests/providers/google/cloud/operators/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
DELIMITER = '.json'

MOD_TIME_1 = datetime(2016, 1, 1)
MOD_TIME_2 = datetime(2019, 1, 1)


class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
Expand Down Expand Up @@ -257,6 +258,35 @@ def test_no_prefix_with_last_modified_time_with_true_cond(self, mock_hook):
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, 'test_object.txt', DESTINATION_BUCKET, 'test_object.txt')

@mock.patch('airflow.providers.google.cloud.operators.gcs_to_gcs.GCSHook')
def test_no_prefix_with_maximum_modified_time_with_true_cond(self, mock_hook):
mock_hook.return_value.is_updated_before.return_value = True
operator = GCSToGCSOperator(
task_id=TASK_ID, source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=DESTINATION_BUCKET,
destination_object=SOURCE_OBJECT_NO_WILDCARD,
maximum_modified_time=MOD_TIME_1)

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, 'test_object.txt', DESTINATION_BUCKET, 'test_object.txt')

@mock.patch('airflow.providers.google.cloud.operators.gcs_to_gcs.GCSHook')
def test_exe_last_modified_time_and_maximum_modified_time_with_true_cond(self, mock_hook):
mock_hook.return_value.is_updated_between.return_value = True
operator = GCSToGCSOperator(
task_id=TASK_ID, source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=DESTINATION_BUCKET,
destination_object=SOURCE_OBJECT_NO_WILDCARD,
last_modified_time=MOD_TIME_1,
maximum_modified_time=MOD_TIME_2)

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, 'test_object.txt', DESTINATION_BUCKET, 'test_object.txt')

@mock.patch('airflow.providers.google.cloud.operators.gcs_to_gcs.GCSHook')
def test_execute_no_prefix_with_no_last_modified_time(self, mock_hook):
operator = GCSToGCSOperator(
Expand All @@ -283,6 +313,22 @@ def test_no_prefix_with_last_modified_time_with_false_cond(self, mock_hook):
operator.execute(None)
mock_hook.return_value.rewrite.assert_not_called()

@mock.patch('airflow.providers.google.cloud.operators.gcs_to_gcs.GCSHook')
def test_executes_with_is_older_than_with_true_cond(self, mock_hook):
mock_hook.return_value.is_older_than.return_value = True
operator = GCSToGCSOperator(
task_id=TASK_ID, source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=DESTINATION_BUCKET,
destination_object=SOURCE_OBJECT_NO_WILDCARD,
last_modified_time=MOD_TIME_1,
maximum_modified_time=MOD_TIME_2,
is_older_than=3600)

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, 'test_object.txt', DESTINATION_BUCKET, 'test_object.txt')

@mock.patch('airflow.providers.google.cloud.operators.gcs_to_gcs.GCSHook')
def test_execute_more_than_1_wildcard(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
Expand Down

0 comments on commit 0daf5d7

Please sign in to comment.