Skip to content

Commit

Permalink
Add Google Cloud Memorystore Memcached Operators (#10121)
Browse files Browse the repository at this point in the history
Co-authored-by: Tobiasz Kędzierski <[email protected]>
Co-authored-by: Kamil Breguła <[email protected]>
  • Loading branch information
3 people committed Oct 22, 2020
1 parent b9d677c commit 9150330
Show file tree
Hide file tree
Showing 9 changed files with 1,687 additions and 21 deletions.
131 changes: 115 additions & 16 deletions airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from urllib.parse import urlparse

from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest, Instance
from google.cloud.memcache_v1beta2.types import cloud_memcache

from airflow import models
from airflow.operators.bash import BashOperator
Expand All @@ -37,15 +38,29 @@
CloudMemorystoreListInstancesOperator,
CloudMemorystoreScaleInstanceOperator,
CloudMemorystoreUpdateInstanceOperator,
CloudMemorystoreMemcachedApplyParametersOperator,
CloudMemorystoreMemcachedCreateInstanceOperator,
CloudMemorystoreMemcachedDeleteInstanceOperator,
CloudMemorystoreMemcachedGetInstanceOperator,
CloudMemorystoreMemcachedListInstancesOperator,
CloudMemorystoreMemcachedUpdateInstanceOperator,
CloudMemorystoreMemcachedUpdateParametersOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSBucketCreateAclEntryOperator
from airflow.utils import dates

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")

INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystore")
INSTANCE_NAME_2 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-2")
INSTANCE_NAME_3 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-3")
MEMORYSTORE_REDIS_INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystoreredis-")
MEMORYSTORE_REDIS_INSTANCE_NAME_2 = os.environ.get(
"GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-redis-2"
)
MEMORYSTORE_REDIS_INSTANCE_NAME_3 = os.environ.get(
"GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-redis-3"
)
MEMORYSTORE_MEMCACHED_INSTANCE_NAME = os.environ.get(
"GCP_MEMORYSTORE_INSTANCE_NAME4", "test-memorystore-memcached-1"
)

EXPORT_GCS_URL = os.environ.get("GCP_MEMORYSTORE_EXPORT_GCS_URL", "gs://test-memorystore/my-export.rdb")
EXPORT_GCS_URL_PARTS = urlparse(EXPORT_GCS_URL)
Expand All @@ -57,9 +72,13 @@

SECOND_INSTANCE = {"tier": Instance.Tier.STANDARD_HA, "memory_size_gb": 3}

# [START howto_operator_memcached_instance]
MEMCACHED_INSTANCE = {"name": "", "node_count": 1, "node_config": {"cpu_count": 1, "memory_size_mb": 1024}}
# [END howto_operator_memcached_instance]


with models.DAG(
"gcp_cloud_memorystore",
"gcp_cloud_memorystore_redis",
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
Expand All @@ -68,7 +87,7 @@
create_instance = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance",
location="europe-north1",
instance_id=INSTANCE_NAME,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
instance=FIRST_INSTANCE,
project_id=GCP_PROJECT_ID,
)
Expand All @@ -84,7 +103,7 @@
create_instance_2 = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance-2",
location="europe-north1",
instance_id=INSTANCE_NAME_2,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
instance=SECOND_INSTANCE,
project_id=GCP_PROJECT_ID,
)
Expand All @@ -93,7 +112,7 @@
get_instance = CloudMemorystoreGetInstanceOperator(
task_id="get-instance",
location="europe-north1",
instance=INSTANCE_NAME,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
do_xcom_push=True,
)
Expand All @@ -109,7 +128,7 @@
failover_instance = CloudMemorystoreFailoverInstanceOperator(
task_id="failover-instance",
location="europe-north1",
instance=INSTANCE_NAME_2,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
data_protection_mode=FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS,
project_id=GCP_PROJECT_ID,
)
Expand All @@ -131,7 +150,7 @@
update_instance = CloudMemorystoreUpdateInstanceOperator(
task_id="update-instance",
location="europe-north1",
instance_id=INSTANCE_NAME,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
update_mask={"paths": ["memory_size_gb"]},
instance={"memory_size_gb": 2},
Expand All @@ -152,7 +171,7 @@
export_instance = CloudMemorystoreExportInstanceOperator(
task_id="export-instance",
location="europe-north1",
instance=INSTANCE_NAME,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
)
Expand All @@ -162,30 +181,33 @@
import_instance = CloudMemorystoreImportOperator(
task_id="import-instance",
location="europe-north1",
instance=INSTANCE_NAME_2,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_import_instance]

# [START howto_operator_delete_instance]
delete_instance = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID
task_id="delete-instance",
location="europe-north1",
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_delete_instance]

delete_instance_2 = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance-2",
location="europe-north1",
instance=INSTANCE_NAME_2,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
project_id=GCP_PROJECT_ID,
)

# [END howto_operator_create_instance_and_import]
create_instance_and_import = CloudMemorystoreCreateInstanceAndImportOperator(
task_id="create-instance-and-import",
location="europe-north1",
instance_id=INSTANCE_NAME_3,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
instance=FIRST_INSTANCE,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
Expand All @@ -196,7 +218,7 @@
scale_instance = CloudMemorystoreScaleInstanceOperator(
task_id="scale-instance",
location="europe-north1",
instance_id=INSTANCE_NAME_3,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
project_id=GCP_PROJECT_ID,
memory_size_gb=3,
)
Expand All @@ -206,7 +228,7 @@
export_and_delete_instance = CloudMemorystoreExportAndDeleteInstanceOperator(
task_id="export-and-delete-instance",
location="europe-north1",
instance=INSTANCE_NAME_3,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
)
Expand All @@ -229,3 +251,80 @@
failover_instance >> delete_instance_2

export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance

with models.DAG(
"gcp_cloud_memorystore_memcached",
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
) as dag_memcache:
# [START howto_operator_create_instance_memcached]
create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
task_id="create-instance",
location="europe-north1",
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
instance=MEMCACHED_INSTANCE,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_create_instance_memcached]

# [START howto_operator_delete_instance_memcached]
delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
task_id="delete-instance",
location="europe-north1",
instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_delete_instance_memcached]

# [START howto_operator_get_instance_memcached]
get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
task_id="get-instance",
location="europe-north1",
instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_get_instance_memcached]

# [START howto_operator_list_instances_memcached]
list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
task_id="list-instances", location="-", project_id=GCP_PROJECT_ID
)
# [END howto_operator_list_instances_memcached]

# # [START howto_operator_update_instance_memcached]
update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
task_id="update-instance",
location="europe-north1",
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
update_mask=cloud_memcache.field_mask.FieldMask(paths=["node_count"]),
instance={"node_count": 2},
)
# [END howto_operator_update_instance_memcached]

# [START howto_operator_update_and_apply_parameters_memcached]
update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
task_id="update-parameters",
location="europe-north1",
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
update_mask={"paths": ["params"]},
parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
)

apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
task_id="apply-parameters",
location="europe-north1",
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
node_ids=["node-a-1"],
apply_all=False,
)

# update_parameters >> apply_parameters
# [END howto_operator_update_and_apply_parameters_memcached]

create_memcached_instance >> [list_memcached_instances, get_memcached_instance]
create_memcached_instance >> update_memcached_instance >> update_memcached_parameters
update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance

0 comments on commit 9150330

Please sign in to comment.