Skip to content

Improve GCSToSFTPOperator paths handling #11284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 5, 2021

Conversation

TobKed
Copy link
Contributor

@TobKed TobKed commented Oct 5, 2020

Improve GCSToSFTPOperator paths handling.

Now for every file on GCS is created separate folder on SFTP.

now:

copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
        task_id="file-copy-gsc-to-sftp",
        source_bucket="bucket_name",
        source_object="folder/subfolder/file.txt",
        destination_path="/tmp",
    )
# downloads file to "/tmp/folder/subfolder/file.txt"

EDIT:
I made previous behavior default (for backward compatibility) but possible to change it by ignore_gcs_path_in_destination argument:

copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
        task_id="file-copy-gsc-to-sftp",
        source_bucket="bucket_name",
        source_object="folder/subfolder/file.txt",
        destination_path="/tmp",
        keep_directory_structure=False,
    )
# downloads file to "/tmp/file.txt"

@boring-cyborg boring-cyborg bot added the provider:google Google (including GCP) related issues label Oct 5, 2020
@github-actions
Copy link

github-actions bot commented Oct 5, 2020

The CI and PROD Docker Images for the build are prepared in a separate "Build Image" workflow,
that you will not see in the list of checks (you will see "Wait for images" jobs instead).

You can checks the status of those images in The workflow run

@TobKed
Copy link
Contributor Author

TobKed commented Oct 5, 2020

@turbaszek
Copy link
Member

Should we make this configurable to avoid breaking change? I see the point of downloading a single file but this may be problematic when downloading multiple files with the same name (for example structure like output/2020-09-20/data.csv and downloading data for multiple days). WDYT @TobKed ?

@olchas
Copy link
Contributor

olchas commented Oct 6, 2020

Should we make this configurable to avoid breaking change? I see the point of downloading a single file but this may be problematic when downloading multiple files with the same name (for example structure like output/2020-09-20/data.csv and downloading data for multiple days). WDYT @TobKed ?

From what I understood by reading the code, only the part up to the wildcard character is stripped from the source_object path. In other word, when downloading multiple files from your example, you would set source_object to something like this: output/*. Then, when uploading them to destination_path, only the output would be stripped from every collected object's path, so you should get multiple directories for multiple days in your destination.

Did I get this right, @TobKed?

@TobKed
Copy link
Contributor Author

TobKed commented Oct 6, 2020

@olchas , yes, you are right. In your example of wildcard folders will be created.

@turbaszek I made this behavior optional for backward compatibility. PTAL

@@ -52,6 +52,9 @@ class GCSToSFTPOperator(BaseOperator):
:param destination_path: The sftp remote path. This is the specified directory path for
uploading to the SFTP server.
:type destination_path: str
:param ignore_gcs_path_in_destination: (Optional) When set to False the path of the file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It confused me a little bit. Can you explain this parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example: If you want to copy file from bucket: gs://bucket_name/data/2020/file.txt to /tmp directory, by default it will be copied reflecting object path on the bucket, so file path on sftp will be /tmp/data/2020/file.txt. If parameter ignore_gcs_path_in_destination will be set to True the file path on sftp will be /tmp/file.txt.

I am open for proposals for better name for this parameter :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I do not have better solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an example to the description?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Example was added.

@TobKed TobKed force-pushed the improve-gcs-to-sftp branch from 590be6c to 6b1e58e Compare October 7, 2020 08:39
Comment on lines 151 to 155
if self.ignore_gcs_path_in_destination:
source_object_basename = os.path.basename(self.source_object)
destination_path = os.path.join(self.destination_path, source_object_basename)
else:
destination_path = os.path.join(self.destination_path, self.source_object)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

def _resolve_destination_path(self, prefix=None):
    if self.ignore_gcs_path_in_destination:
        source_object_basename = os.path.basename(self.source_object, start=prefix)
        return os.path.join(self.destination_path, source_object_basename)
    else:
        return os.path.join(self.destination_path, self.source_object)

then we can avoid duplication of the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @turbaszek , it is a great suggestion.
I modified it slightly: 9b9ef60

@TobKed TobKed force-pushed the improve-gcs-to-sftp branch 2 times, most recently from ec06638 to e7694f4 Compare October 9, 2020 06:26
@@ -52,6 +52,9 @@ class GCSToSFTPOperator(BaseOperator):
:param destination_path: The sftp remote path. This is the specified directory path for
uploading to the SFTP server.
:type destination_path: str
:param ignore_gcs_path_in_destination: (Optional) When set to False the path of the file
Copy link
Member

@mik-laj mik-laj Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param ignore_gcs_path_in_destination: (Optional) When set to False the path of the file
:param use_relative_paths: (Optional) When set to False the path of the file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or keep_directory_structure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it to keep_directory_structure. Thanks @mik-laj !

DESTINATION_SFTP = "destination_path"


# pylint: disable=unused-argument
class TestGoogleCloudStorageToSFTPOperator(unittest.TestCase):
@parameterized.expand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect 💪

@TobKed TobKed force-pushed the improve-gcs-to-sftp branch 2 times, most recently from 5276964 to 40f928f Compare October 9, 2020 13:04
@turbaszek turbaszek requested a review from mik-laj October 11, 2020 14:47
task_id="file-move-gsc-to-sftp",
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC_2,
destination_path=DESTINATION_PATH_1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use values here instead of undefined constants?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed it. Thank you! FIxed.

@TobKed TobKed force-pushed the improve-gcs-to-sftp branch from 40f928f to 19d2004 Compare October 12, 2020 16:53
@TobKed
Copy link
Contributor Author

TobKed commented Oct 12, 2020

I rebased on the latest master and fixed the logic mistake. Previously i forget that after changing parameter from ignore_gcs_path_in_destination to keep_directory_structure the logic of resolving paths should be inverted.

@turbaszek turbaszek requested a review from mik-laj October 21, 2020 14:17
@mik-laj
Copy link
Member

mik-laj commented Oct 21, 2020

We test it together with the customer. Pllease wait for customer feedback.

@kaxil
Copy link
Member

kaxil commented Nov 4, 2020

Can you please rebase your PR on latest Master since we have applied Black and PyUpgrade on Master.

It will help if your squash your commits into single commit first so that there are less conflicts.

@TobKed TobKed force-pushed the improve-gcs-to-sftp branch 2 times, most recently from e85e838 to 0cfa7d9 Compare November 20, 2020 17:47
@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@TobKed TobKed force-pushed the improve-gcs-to-sftp branch from 7da3d9b to 8466393 Compare November 21, 2020 15:18
@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@TobKed TobKed force-pushed the improve-gcs-to-sftp branch from 8466393 to a3192d4 Compare November 23, 2020 12:35
@TobKed TobKed requested a review from turbaszek November 27, 2020 11:33
@turbaszek
Copy link
Member

@mik-laj any feedback about your tests?

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Nov 27, 2020
@github-actions
Copy link

The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!

@mik-laj
Copy link
Member

mik-laj commented Nov 27, 2020

@mik-laj any feedback about your tests?

As far as I know, they were successful. Tobias has more details.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@TobKed TobKed force-pushed the improve-gcs-to-sftp branch 2 times, most recently from 7b59717 to 50931c9 Compare February 3, 2021 19:43
@TobKed TobKed force-pushed the improve-gcs-to-sftp branch from 50931c9 to 2036748 Compare February 5, 2021 12:01
@potiuk potiuk merged commit 5d7d46b into apache:master Feb 5, 2021
@potiuk potiuk deleted the improve-gcs-to-sftp branch February 5, 2021 19:15
@marksworn
Copy link

marksworn commented Feb 18, 2021

This does not appear to be working for me - the documentation is unclear which should be used to exclude the GCS path structure from the destination path - True or False? I have tested with both and with both it is still adding elements of the GCS filepath that I do not want to include.

This is a useful feature in some cases, but it feels like it should be default that the user controls definition of the source and destination paths without any unexpected behaviour.

With keep_directory_structure=False and destination set to /Activity/pending/:

Executing copy of gs://<bucket>/Activity/2021/02/activity_20210217.csv to /Activity/pending/Activity/2021/02/activity_20210217.csv

With keep_directory_structure=True:

Executing copy of gs://<bucket>/Activity/2021/02/activity_20210217.csv to /Activity/pending/Activity/2021/02/activity_20210217.csv

@TobKed
Copy link
Contributor Author

TobKed commented Feb 22, 2021

@marksworn I am sorry to hear that. Would like to create issue with link to this PR? Would you like to create fix for it? I would be happy to assist you with this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers full tests needed We need to run full set of tests for this PR to merge provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants