Skip to content

Commit

Permalink
[AIRFLOW-6281] Create guide for GCS to GCS transfer operators (#8442)
Browse files Browse the repository at this point in the history
  • Loading branch information
wkhudgins92 committed Apr 26, 2020
1 parent 3f8f4ad commit 37fdfa9
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 25 deletions.
97 changes: 88 additions & 9 deletions airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import os

from airflow import models
from airflow.providers.google.cloud.operators.gcs_to_gcs import GCSSynchronizeBuckets
from airflow.providers.google.cloud.operators.gcs_to_gcs import GCSSynchronizeBuckets, GCSToGCSOperator
from airflow.utils.dates import days_ago

default_args = {"start_date": days_ago(1)}
Expand All @@ -36,27 +36,106 @@
BUCKET_3_SRC = os.environ.get("GCP_GCS_BUCKET_3_SRC", "test-gcs-sync-3-src")
BUCKET_3_DST = os.environ.get("GCP_GCS_BUCKET_3_DST", "test-gcs-sync-3-dst")

OBJECT_1 = os.environ.get("GCP_GCS_OBJECT_1", "test-gcs-to-gcs-1")
OBJECT_2 = os.environ.get("GCP_GCS_OBJECT_2", "test-gcs-to-gcs-2")

with models.DAG(
"example_gcs_to_gcs", default_args=default_args, schedule_interval=None, tags=['example']
) as dag:
sync_full_bucket = GCSSynchronizeBuckets(
task_id="sync-full-bucket",
# [START howto_synch_bucket]
sync_bucket = GCSSynchronizeBuckets(
task_id="sync_bucket",
source_bucket=BUCKET_1_SRC,
destination_bucket=BUCKET_1_DST
)
# [END howto_synch_bucket]

sync_to_subdirectory_and_delete_extra_files = GCSSynchronizeBuckets(
task_id="sync_to_subdirectory_and_delete_extra_files",
# [START howto_synch_full_bucket]
sync_full_bucket = GCSSynchronizeBuckets(
task_id="sync_full_bucket",
source_bucket=BUCKET_1_SRC,
destination_bucket=BUCKET_1_DST,
destination_object="subdir/",
delete_extra_files=True,
allow_overwrite=True
)
# [END howto_synch_full_bucket]

# [START howto_synch_to_subdir]
sync_to_subdirectory = GCSSynchronizeBuckets(
task_id="sync_to_subdirectory",
source_bucket=BUCKET_1_SRC,
destination_bucket=BUCKET_1_DST,
destination_object="subdir/"
)
# [END howto_synch_to_subdir]

sync_from_subdirectory_and_allow_overwrite_and_non_recursive = GCSSynchronizeBuckets(
task_id="sync_from_subdirectory_and_allow_overwrite_and_non_recursive",
# [START howto_sync_from_subdir]
sync_from_subdirectory = GCSSynchronizeBuckets(
task_id="sync_from_subdirectory",
source_bucket=BUCKET_1_SRC,
source_object="subdir/",
destination_bucket=BUCKET_1_DST
)
# [END howto_sync_from_subdir]

# [START howto_operator_gcs_to_gcs_single_file]
copy_single_file = GCSToGCSOperator(
task_id="copy_single_gcs_file",
source_bucket=BUCKET_1_SRC,
source_object=OBJECT_1,
destination_bucket=BUCKET_1_DST, # If not supplied the source_bucket value will be used
destination_object="backup_" + OBJECT_1 # If not supplied the source_object value will be used
)
# [END howto_operator_gcs_to_gcs_single_file]

# [START howto_operator_gcs_to_gcs_wildcard]
copy_files_with_wildcard = GCSToGCSOperator(
task_id="copy_files_with_wildcard",
source_bucket=BUCKET_1_SRC,
source_object="data/*.txt",
destination_bucket=BUCKET_1_DST,
destination_object="backup/"
)
# [END howto_operator_gcs_to_gcs_wildcard]

# [START howto_operator_gcs_to_gcs_delimiter]
copy_files_with_delimiter = GCSToGCSOperator(
task_id="copy_files_with_wildcard",
source_bucket=BUCKET_1_SRC,
source_object="data/",
destination_bucket=BUCKET_1_DST,
destination_object="backup/",
delimiter='.txt'
)
# [END howto_operator_gcs_to_gcs_delimiter]

# [START howto_operator_gcs_to_gcs_list]
copy_files_with_list = GCSToGCSOperator(
task_id="copy_files_with_list",
source_bucket=BUCKET_1_SRC,
source_objects=[OBJECT_1, OBJECT_2], # Instead of files each element could be a wildcard expression
destination_bucket=BUCKET_1_DST,
destination_object="backup/"
)
# [END howto_operator_gcs_to_gcs_list]

# [START howto_operator_gcs_to_gcs_single_file_move]
move_single_file = GCSToGCSOperator(
task_id="move_single_file",
source_bucket=BUCKET_1_SRC,
source_object=OBJECT_1,
destination_bucket=BUCKET_1_DST,
destination_object="backup_" + OBJECT_1,
move_object=True
)
# [END howto_operator_gcs_to_gcs_single_file_move]

# [START howto_operator_gcs_to_gcs_list_move]
move_files_with_list = GCSToGCSOperator(
task_id="move_files_with_list",
source_bucket=BUCKET_1_SRC,
source_objects=[OBJECT_1, OBJECT_2],
destination_bucket=BUCKET_1_DST,
recursive=False,
destination_object="backup/"
)
# [END howto_operator_gcs_to_gcs_list_move]
172 changes: 156 additions & 16 deletions docs/howto/operator/gcp/gcs_to_gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
Transfer data in Google Cloud Storage
=====================================

This page will show operators whose purpose is to copy data as part of the Google Cloud Service.
Each of them has its own specific use cases, as well as limitations.
The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) is used to store large data from various applications.
Note that files are called objects in GCS terminology, so the use of the term "object" and "file" in this guide is
interchangeable. There are several operators for whose purpose is to copy data as part of the Google CLoud Service.
This page shows how to use these operators.

Overview
--------
Expand All @@ -33,16 +35,15 @@ task, use the operator
You can also use the previous operator for this service -
:class:`~airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceGCSToGCSOperator`

These operators does not control the copying process locally, but uses Google resources, which allows to
perform this task faster and more economically. The economic effects are especially prominent when the
existence of Airflow is not found in the Google Cloud Platform, because this operator allows egress
traffic reductions.
These operators do not control the copying process locally, but uses Google resources, which allows them to
perform this task faster and more economically. The economic effects are especially prominent when
Airflow is not hosted in Google Cloud Platform, because these operators reduce egress traffic.

This operator modifies source objects if the option that specifies whether objects should be deleted
These operators modify source objects if the option that specifies whether objects should be deleted
from the source after they are transferred to the sink is enabled.

When you use this service, you can specify whether overwriting objects that already exist in the sink is
allowed, whether objects that exist only in the sink should be deleted, or whether objects should be deleted
When you use the Google Cloud Data Transfer service, you can specify whether overwriting objects that already exist in
the sink is allowed, whether objects that exist only in the sink should be deleted, or whether objects should be deleted
from the source after they are transferred to the sink.

Source objects can be specified using include and exclusion prefixes, as well as based on the file
Expand All @@ -57,6 +58,12 @@ There are two operators that are used to copy data, where the entire process is

In the next section they will be described.

Prerequisite Tasks
------------------

.. include:: _partials/prerequisite_tasks.rst


Operators
---------

Expand All @@ -65,19 +72,92 @@ Operators
GCSToGCSOperator
~~~~~~~~~~~~~~~~


:class:`~airflow.providers.google.cloud.operators.gcs_to_gcs.GCSToGCSOperator` allows you to copy
one or more files. The copying always takes place without taking into account the initial state of
the destination bucket.
one or more files within GCS. The files may be copied between two different buckets or within one bucket.
The copying always takes place without taking into account the initial state of the destination bucket.

This operator never deletes data in the destination bucket and it deletes objects in the source bucket
if the file move option is active.
This operator only deletes objects in the source bucket if the file move option is active. When copying files
between two different buckets, this operator never deletes data in the destination bucket.

When you use this operator, you can specify whether objects should be deleted from the source after
they are transferred to the sink. Source objects can be specified using single wildcard, as
they are transferred to the sink. Source objects can be specified using a single wildcard, as
well as based on the file modification date.

The way this operator works can be compared to the ``cp`` command.
The way this operator works by default can be compared to the ``cp`` command. When the file move option is active, this
operator functions like the ``mv`` command.

Below are examples of using the GCSToGCSOperator to copy a single file, to copy multiple files with a wild card,
to copy multiple files, to move a single file, and to move multiple files.

Copy single file
----------------

The following example would copy a single file, ``OBJECT_1`` from the ``BUCKET_1_SRC`` GCS bucket to the ``BUCKET_1_DST`` bucket.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_gcs_single_file]
:end-before: [END howto_operator_gcs_to_gcs_single_file]

Copy multiple files
-------------------

There are several ways to copy multiple files, various examples of which are presented following.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_gcs_wildcard]
:end-before: [END howto_operator_gcs_to_gcs_wildcard]

The ``source_object`` value may contain one wild card, denoted as "*". All files matching the wild card expression will
be copied. In this example, all root level files ending with ``.txt`` in ``BUCKET_1_SRC`` will be copied to the ``data``
folder in ``BUCKET_1_DST``, with file names unchanged.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_gcs_delimiter]
:end-before: [END howto_operator_gcs_to_gcs_delimiter]

The delimiter filed may be specified to select any source files starting with ``source_object`` and ending with the
value supplied to ``delimiter``. This example uses the ``delimiter`` value to implement the same functionality as the
prior example.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_gcs_list]
:end-before: [END howto_operator_gcs_to_gcs_list]

Lastly, files may be copied by omitting the ``source_object`` argument and instead supplying a list to ``source_objects``
argument. In this example, ``OBJECT_1`` and ``OBJECT_2`` will be copied from ``BUCKET_1_SRC`` to ``BUCKET_1_DST``. Instead
of specific file names, the list can contain one or more wild card expressions, each with no more than one wild card.
Supplying a list of size 1 functions the same as supplying a value to the ``source_object`` argument.

Move single file
----------------

Supplying ``True`` to the ``move`` argument causes the operator to delete ``source_object`` once the copy is complete.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_gcs_single_file_move]
:end-before: [END howto_operator_gcs_to_gcs_single_file_move]

Move multiple files
-------------------

Multiple files may be moved by supplying ``True`` to the ``move`` argument. The same rules concerning wild cards and
the ``delimiter`` argument apply to moves as well as copies.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_gcs_list_move]
:end-before: [END howto_operator_gcs_to_gcs_list_move]


.. _howto/operator:GCSSynchronizeBuckets:
Expand All @@ -98,3 +178,63 @@ objects that exist only in the sink should be deleted, whether subdirectories ar
which subdirectory is to be processed.

The way this operator works can be compared to the ``rsync`` command.

Basic Synchronization
---------------------

The following example will ensure all files in ``BUCKET_1_SRC``, including any in subdirectories, are also in
``BUCKET_1_DST``. It will not overwrite identically named files in ``BUCKET_1_DST`` if they already exist. It will not
delete any files in ``BUCKET_1_DST`` not in ``BUCKET_1_SRC``.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_synch_bucket]
:end-before: [END howto_synch_bucket]

Full Bucket Synchronization
---------------------------

This example will ensure all files in ``BUCKET_1_SRC``, including any in subdirectories, are also in
``BUCKET_1_DST``. It will overwrite identically named files in ``BUCKET_1_DST`` if they already exist. It will
delete any files in ``BUCKET_1_DST`` not in ``BUCKET_1_SRC``.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_synch_full_bucket]
:end-before: [END howto_synch_full_bucket]

Synchronize to a Subdirectory
-----------------------------

The following example will ensure all files in ``BUCKET_1_SRC``, including any in subdirectories, are also in the
``subdir`` folder in ``BUCKET_1_DST``. It will not overwrite identically named files in ``BUCKET_1_DST/subdir`` if they
already exist and it will not delete any files in ``BUCKET_1_DST/subdir`` not in ``BUCKET_1_SRC``.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_synch_to_subdir]
:end-before: [END howto_synch_to_subdir]

Synchronize from a Subdirectory
-------------------------------

This example will ensure all files in ``BUCKET_1_SRC/subdir``, including any in subdirectories, are also in the
in ``BUCKET_1_DST``. It will not overwrite identically named files in ``BUCKET_1_DST`` if they
already exist and it will not delete any files in ``BUCKET_1_DST`` not in ``BUCKET_1_SRC/subdir``.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_sync_from_subdir]
:end-before: [END howto_sync_from_subdir]

Reference
---------

For further information, look at:

* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__
* `Google Cloud Storage Utilities rsync command <https://cloud.google.com/storage/docs/gsutil/commands/rsync>`__

0 comments on commit 37fdfa9

Please sign in to comment.