Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for soft delete #1229

Merged
merged 7 commits into from
Mar 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
integration test
  • Loading branch information
cojenco committed Feb 22, 2024
commit dc17b12f8b12a4aa08bad085c5e7817aa450c0bc
49 changes: 49 additions & 0 deletions tests/system/test_bucket.py
Expand Up @@ -1141,3 +1141,52 @@ def test_config_autoclass_w_existing_bucket(
assert (
bucket.autoclass_terminal_storage_class_update_time != previous_tsc_update_time
)


def test_soft_delete_policy(
storage_client,
buckets_to_delete,
):
# Create a bucket with soft delete policy.
duration_secs = 7 * 86400
bucket = storage_client.bucket(_helpers.unique_name("w-soft-delete"))
bucket.soft_delete_retention_duration_seconds = duration_secs
bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket)
buckets_to_delete.append(bucket)

assert bucket.soft_delete_retention_duration_seconds == duration_secs
assert isinstance(bucket.soft_delete_effective_time, datetime.datetime)

# Insert an object and get object metadata prior soft-deleted.
payload = b"DEADBEEF"
blob_name = _helpers.unique_name("soft-delete")
blob = bucket.blob(blob_name)
blob.upload_from_string(payload)

blob = bucket.get_blob(blob_name)
gen = blob.generation
assert blob.soft_delete_time is None
assert blob.hard_delete_time is None

# Delete the object to enter soft-deleted state.
blob.delete()

iter_default = bucket.list_blobs()
assert len(list(iter_default)) == 0
iter_w_soft_delete = bucket.list_blobs(soft_deleted=True)
assert len(list(iter_w_soft_delete)) > 0

# Get the soft-deleted object.
soft_deleted_blob = bucket.get_blob(blob_name, generation=gen, soft_deleted=True)
assert soft_deleted_blob.soft_delete_time is not None
assert soft_deleted_blob.hard_delete_time is not None

# Restore the soft-deleted object.
restored_blob = soft_deleted_blob.restore()
assert restored_blob.generation != gen

# Patch the soft delete policy on an existing bucket.
new_duration_secs = 10 * 86400
bucket.soft_delete_retention_duration_seconds = new_duration_secs
bucket.patch()
assert bucket.soft_delete_retention_duration_seconds == new_duration_secs