Skip to content

Commit

Permalink
Add GCS timespan transform operator (#13996)
Browse files Browse the repository at this point in the history
This change adds a new GCS transform operator. It is based on the existing transform operator with the addition that it will transform multiple files that match the prefix and that were updated within a time-span. The time-span is implicitly defined: it is the time between the current execution timestamp of the DAG instance (time-span start) and the next execution timestamp of the DAG (time-span end).

The use-case is some entity generates files at irregular intervals and an undefined number. The operator will pick up all files that were updated since it executed last. Typically the transform script will iterate over the files, open them, extract some information, collate them into one or more files and upload them to GCS. These result files can then be loaded into BigQuery or processed further or served via a webserver.
  • Loading branch information
sschaetz committed Mar 15, 2021
1 parent fd37390 commit ddc9133
Show file tree
Hide file tree
Showing 7 changed files with 783 additions and 23 deletions.
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google Cloud Storage time-span file transform operator.
"""

import os

from airflow import models
from airflow.providers.google.cloud.operators.gcs import GCSTimeSpanFileTransformOperator
from airflow.utils.dates import days_ago
from airflow.utils.state import State

SOURCE_BUCKET = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
SOURCE_PREFIX = "gcs_timespan_file_transform_source"
SOURCE_GCP_CONN_ID = "google_cloud_default"
DESTINATION_BUCKET = SOURCE_BUCKET
DESTINATION_PREFIX = "gcs_timespan_file_transform_destination"
DESTINATION_GCP_CONN_ID = "google_cloud_default"

PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
'GCP_GCS_PATH_TO_TRANSFORM_SCRIPT', 'test_gcs_timespan_transform_script.py'
)


with models.DAG(
"example_gcs_timespan_file_transform",
start_date=days_ago(1),
schedule_interval=None,
tags=['example'],
) as dag:

# [START howto_operator_gcs_timespan_file_transform_operator_Task]
gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
task_id="gcs_timespan_transform_files",
source_bucket=SOURCE_BUCKET,
source_prefix=SOURCE_PREFIX,
source_gcp_conn_id=SOURCE_GCP_CONN_ID,
destination_bucket=DESTINATION_BUCKET,
destination_prefix=DESTINATION_PREFIX,
destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
transform_script=["python", PATH_TO_TRANSFORM_SCRIPT],
)
# [END howto_operator_gcs_timespan_file_transform_operator_Task]


if __name__ == '__main__':
dag.clear(dag_run_state=State.NONE)
dag.run()
167 changes: 145 additions & 22 deletions airflow/providers/google/cloud/hooks/gcs.py
Expand Up @@ -21,9 +21,11 @@
import gzip as gz
import os
import shutil
import time
import warnings
from contextlib import contextmanager
from datetime import datetime
from functools import partial
from io import BytesIO
from os import path
from tempfile import NamedTemporaryFile
Expand All @@ -32,9 +34,11 @@

from google.api_core.exceptions import NotFound
from google.cloud import storage
from google.cloud.exceptions import GoogleCloudError

from airflow.exceptions import AirflowException
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from airflow.utils import timezone
from airflow.version import version

RT = TypeVar('RT') # pylint: disable=invalid-name
Expand Down Expand Up @@ -266,6 +270,7 @@ def download(
filename: Optional[str] = None,
chunk_size: Optional[int] = None,
timeout: Optional[int] = DEFAULT_TIMEOUT,
num_max_attempts: Optional[int] = 1,
) -> Union[str, bytes]:
"""
Downloads a file from Google Cloud Storage.
Expand All @@ -285,20 +290,43 @@ def download(
:type chunk_size: int
:param timeout: Request timeout in seconds.
:type timeout: int
:param num_max_attempts: Number of attempts to download the file.
:type num_max_attempts: int
"""
# TODO: future improvement check file size before downloading,
# to check for local space availability

client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)

if filename:
blob.download_to_filename(filename, timeout=timeout)
self.log.info('File downloaded to %s', filename)
return filename
else:
return blob.download_as_string()
num_file_attempts = 0

while num_file_attempts < num_max_attempts:
try:
num_file_attempts += 1
client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)

if filename:
blob.download_to_filename(filename, timeout=timeout)
self.log.info('File downloaded to %s', filename)
return filename
else:
return blob.download_as_string()

except GoogleCloudError:
if num_file_attempts == num_max_attempts:
self.log.error(
'Download attempt of object: %s from %s has failed. Attempt: %s, max %s.',
object_name,
object_name,
num_file_attempts,
num_max_attempts,
)
raise

# Wait with exponential backoff scheme before retrying.
timeout_seconds = 1.0 * 2 ** (num_file_attempts - 1)
time.sleep(timeout_seconds)
continue

@_fallback_object_url_to_object_name_and_bucket_name()
@contextmanager
Expand Down Expand Up @@ -362,7 +390,7 @@ def provide_file_and_upload(
tmp_file.flush()
self.upload(bucket_name=bucket_name, object_name=object_name, filename=tmp_file.name)

def upload(
def upload( # pylint: disable=too-many-arguments
self,
bucket_name: str,
object_name: str,
Expand All @@ -373,6 +401,7 @@ def upload(
encoding: str = 'utf-8',
chunk_size: Optional[int] = None,
timeout: Optional[int] = DEFAULT_TIMEOUT,
num_max_attempts: int = 1,
) -> None:
"""
Uploads a local file or file data as string or bytes to Google Cloud Storage.
Expand All @@ -395,7 +424,38 @@ def upload(
:type chunk_size: int
:param timeout: Request timeout in seconds.
:type timeout: int
:param num_max_attempts: Number of attempts to try to upload the file.
:type num_max_attempts: int
"""

def _call_with_retry(f: Callable[[], None]) -> None:
"""Helper functions to upload a file or a string with a retry mechanism and exponential back-off.
:param f: Callable that should be retried.
:type f: Callable[[], None]
"""
num_file_attempts = 0

while num_file_attempts < num_max_attempts:
try:
num_file_attempts += 1
f()

except GoogleCloudError as e:
if num_file_attempts == num_max_attempts:
self.log.error(
'Upload attempt of object: %s from %s has failed. Attempt: %s, max %s.',
object_name,
object_name,
num_file_attempts,
num_max_attempts,
)
raise e

# Wait with exponential backoff scheme before retrying.
timeout_seconds = 1.0 * 2 ** (num_file_attempts - 1)
time.sleep(timeout_seconds)
continue

client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)
Expand All @@ -416,7 +476,10 @@ def upload(
shutil.copyfileobj(f_in, f_out)
filename = filename_gz

blob.upload_from_filename(filename=filename, content_type=mime_type, timeout=timeout)
_call_with_retry(
partial(blob.upload_from_filename, filename=filename, content_type=mime_type, timeout=timeout)
)

if gzip:
os.remove(filename)
self.log.info('File %s uploaded to %s in %s bucket', filename, object_name, bucket_name)
Expand All @@ -430,7 +493,9 @@ def upload(
with gz.GzipFile(fileobj=out, mode="w") as f:
f.write(data)
data = out.getvalue()
blob.upload_from_string(data, content_type=mime_type, timeout=timeout)

_call_with_retry(partial(blob.upload_from_string, data, content_type=mime_type, timeout=timeout))

self.log.info('Data stream uploaded to %s in %s bucket', object_name, bucket_name)
else:
raise ValueError("'filename' and 'data' parameter missing. One is required to upload to gcs.")
Expand Down Expand Up @@ -481,10 +546,9 @@ def is_updated_after(self, bucket_name: str, object_name: str, ts: datetime) ->
"""
blob_update_time = self.get_blob_update_time(bucket_name, object_name)
if blob_update_time is not None:
import dateutil.tz

if not ts.tzinfo:
ts = ts.replace(tzinfo=dateutil.tz.tzutc())
ts = ts.replace(tzinfo=timezone.utc)
self.log.info("Verify object date: %s > %s", blob_update_time, ts)
if blob_update_time > ts:
return True
Expand All @@ -508,12 +572,11 @@ def is_updated_between(
"""
blob_update_time = self.get_blob_update_time(bucket_name, object_name)
if blob_update_time is not None:
import dateutil.tz

if not min_ts.tzinfo:
min_ts = min_ts.replace(tzinfo=dateutil.tz.tzutc())
min_ts = min_ts.replace(tzinfo=timezone.utc)
if not max_ts.tzinfo:
max_ts = max_ts.replace(tzinfo=dateutil.tz.tzutc())
max_ts = max_ts.replace(tzinfo=timezone.utc)
self.log.info("Verify object date: %s is between %s and %s", blob_update_time, min_ts, max_ts)
if min_ts <= blob_update_time < max_ts:
return True
Expand All @@ -533,10 +596,9 @@ def is_updated_before(self, bucket_name: str, object_name: str, ts: datetime) ->
"""
blob_update_time = self.get_blob_update_time(bucket_name, object_name)
if blob_update_time is not None:
import dateutil.tz

if not ts.tzinfo:
ts = ts.replace(tzinfo=dateutil.tz.tzutc())
ts = ts.replace(tzinfo=timezone.utc)
self.log.info("Verify object date: %s < %s", blob_update_time, ts)
if blob_update_time < ts:
return True
Expand All @@ -558,8 +620,6 @@ def is_older_than(self, bucket_name: str, object_name: str, seconds: int) -> boo
if blob_update_time is not None:
from datetime import timedelta

from airflow.utils import timezone

current_time = timezone.utcnow()
given_time = current_time - timedelta(seconds=seconds)
self.log.info("Verify object date: %s is older than %s", blob_update_time, given_time)
Expand Down Expand Up @@ -650,6 +710,69 @@ def list(self, bucket_name, versions=None, max_results=None, prefix=None, delimi
break
return ids

def list_by_timespan(
self,
bucket_name: str,
timespan_start: datetime,
timespan_end: datetime,
versions: bool = None,
max_results: int = None,
prefix: str = None,
delimiter: str = None,
) -> list:
"""
List all objects from the bucket with the give string prefix in name that were
updated in the time between ``timespan_start`` and ``timespan_end``.
:param bucket_name: bucket name
:type bucket_name: str
:param timespan_start: will return objects that were updated at or after this datetime (UTC)
:type timespan_start: datetime
:param timespan_end: will return objects that were updated before this datetime (UTC)
:type timespan_end: datetime
:param versions: if true, list all versions of the objects
:type versions: bool
:param max_results: max count of items to return in a single page of responses
:type max_results: int
:param prefix: prefix string which filters objects whose name begin with
this prefix
:type prefix: str
:param delimiter: filters objects based on the delimiter (for e.g '.csv')
:type delimiter: str
:return: a stream of object names matching the filtering criteria
"""
client = self.get_conn()
bucket = client.bucket(bucket_name)

ids = []
page_token = None

while True:
blobs = bucket.list_blobs(
max_results=max_results,
page_token=page_token,
prefix=prefix,
delimiter=delimiter,
versions=versions,
)

blob_names = []
for blob in blobs:
if timespan_start <= blob.updated.replace(tzinfo=timezone.utc) < timespan_end:
blob_names.append(blob.name)

prefixes = blobs.prefixes
if prefixes:
ids += list(prefixes)
else:
ids += blob_names

page_token = blobs.next_page_token
if page_token is None:
# empty next page token
break
return ids

def get_size(self, bucket_name: str, object_name: str) -> int:
"""
Gets the size of a file in Google Cloud Storage.
Expand Down

0 comments on commit ddc9133

Please sign in to comment.