Skip to content

Commit

Permalink
Create Endpoint and Model Service, Batch Prediction and Hyperparamete…
Browse files Browse the repository at this point in the history
…r Tuning Jobs operators for Vertex AI service (#22088)
  • Loading branch information
MaksYermak committed Mar 27, 2022
1 parent d91b833 commit ca4b8d1
Show file tree
Hide file tree
Showing 18 changed files with 5,657 additions and 2 deletions.
292 changes: 292 additions & 0 deletions airflow/providers/google/cloud/example_dags/example_vertex_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# specific language governing permissions and limitations
# under the License.

# mypy ignore arg types (for templated fields)
# type: ignore[arg-type]

"""
Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
Cloud Platform.
Expand All @@ -28,11 +31,16 @@
* PYTHON_PACKAGE_GSC_URI - path to test model in archive.
* LOCAL_TRAINING_SCRIPT_PATH - path to local training script.
* DATASET_ID - ID of dataset which will be used in training process.
* MODEL_ID - ID of model which will be used in predict process.
* MODEL_ARTIFACT_URI - The artifact_uri should be the path to a GCS directory containing saved model
artifacts.
"""
import os
from datetime import datetime
from uuid import uuid4

from google.cloud import aiplatform
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value

from airflow import models
Expand All @@ -45,6 +53,11 @@
DeleteAutoMLTrainingJobOperator,
ListAutoMLTrainingJobOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job import (
CreateBatchPredictionJobOperator,
DeleteBatchPredictionJobOperator,
ListBatchPredictionJobsOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
CreateCustomContainerTrainingJobOperator,
CreateCustomPythonPackageTrainingJobOperator,
Expand All @@ -61,6 +74,25 @@
ListDatasetsOperator,
UpdateDatasetOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.endpoint_service import (
CreateEndpointOperator,
DeleteEndpointOperator,
DeployModelOperator,
ListEndpointsOperator,
UndeployModelOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job import (
CreateHyperparameterTuningJobOperator,
DeleteHyperparameterTuningJobOperator,
GetHyperparameterTuningJobOperator,
ListHyperparameterTuningJobOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.model_service import (
DeleteModelOperator,
ExportModelOperator,
ListModelsOperator,
UploadModelOperator,
)

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
REGION = os.environ.get("GCP_LOCATION", "us-central1")
Expand Down Expand Up @@ -157,6 +189,52 @@
{"numeric": {"column_name": "PhotoAmt"}},
]

MODEL_ID = os.environ.get("MODEL_ID", "test-model-id")
MODEL_ARTIFACT_URI = os.environ.get("MODEL_ARTIFACT_URI", "path_to_folder_with_model_artifacts")
MODEL_NAME = f"projects/{PROJECT_ID}/locations/{REGION}/models/{MODEL_ID}"
JOB_DISPLAY_NAME = f"temp_create_batch_prediction_job_test_{uuid4()}"
BIGQUERY_SOURCE = f"bq://{PROJECT_ID}.test_iowa_liquor_sales_forecasting_us.2021_sales_predict"
GCS_DESTINATION_PREFIX = "gs://test-vertex-ai-bucket-us/output"
MODEL_PARAMETERS = json_format.ParseDict({}, Value())

ENDPOINT_CONF = {
"display_name": f"endpoint_test_{uuid4()}",
}
DEPLOYED_MODEL = {
# format: 'projects/{project}/locations/{location}/models/{model}'
'model': f"projects/{PROJECT_ID}/locations/{REGION}/models/{MODEL_ID}",
'display_name': f"temp_endpoint_test_{uuid4()}",
"dedicated_resources": {
"machine_spec": {
"machine_type": "n1-standard-2",
"accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
"accelerator_count": 1,
},
'min_replica_count': 1,
"max_replica_count": 1,
},
}

MODEL_OUTPUT_CONFIG = {
"artifact_destination": {
"output_uri_prefix": STAGING_BUCKET,
},
"export_format_id": "custom-trained",
}
MODEL_OBJ = {
"display_name": f"model-{str(uuid4())}",
"artifact_uri": MODEL_ARTIFACT_URI,
"container_spec": {
"image_uri": MODEL_SERVING_CONTAINER_URI,
"command": [],
"args": [],
"env": [],
"ports": [],
"predict_route": "",
"health_route": "",
},
}

with models.DAG(
"example_gcp_vertex_ai_custom_jobs",
schedule_interval="@once",
Expand Down Expand Up @@ -466,3 +544,217 @@
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]

with models.DAG(
"example_gcp_vertex_ai_batch_prediction_job",
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
) as batch_prediction_job_dag:
# [START how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
create_batch_prediction_job = CreateBatchPredictionJobOperator(
task_id="create_batch_prediction_job",
job_display_name=JOB_DISPLAY_NAME,
model_name=MODEL_NAME,
predictions_format="csv",
bigquery_source=BIGQUERY_SOURCE,
gcs_destination_prefix=GCS_DESTINATION_PREFIX,
model_parameters=MODEL_PARAMETERS,
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_create_batch_prediction_job_operator]

# [START how_to_cloud_vertex_ai_list_batch_prediction_job_operator]
list_batch_prediction_job = ListBatchPredictionJobsOperator(
task_id="list_batch_prediction_jobs",
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_list_batch_prediction_job_operator]

# [START how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]
delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
task_id="delete_batch_prediction_job",
batch_prediction_job_id=create_batch_prediction_job.output['batch_prediction_job_id'],
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]

create_batch_prediction_job >> delete_batch_prediction_job
list_batch_prediction_job

with models.DAG(
"example_gcp_vertex_ai_endpoint",
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
) as endpoint_dag:
# [START how_to_cloud_vertex_ai_create_endpoint_operator]
create_endpoint = CreateEndpointOperator(
task_id="create_endpoint",
endpoint=ENDPOINT_CONF,
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_create_endpoint_operator]

# [START how_to_cloud_vertex_ai_delete_endpoint_operator]
delete_endpoint = DeleteEndpointOperator(
task_id="delete_endpoint",
endpoint_id=create_endpoint.output['endpoint_id'],
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_delete_endpoint_operator]

# [START how_to_cloud_vertex_ai_list_endpoints_operator]
list_endpoints = ListEndpointsOperator(
task_id="list_endpoints",
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_list_endpoints_operator]

# [START how_to_cloud_vertex_ai_deploy_model_operator]
deploy_model = DeployModelOperator(
task_id="deploy_model",
endpoint_id=create_endpoint.output['endpoint_id'],
deployed_model=DEPLOYED_MODEL,
traffic_split={'0': 100},
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_deploy_model_operator]

# [START how_to_cloud_vertex_ai_undeploy_model_operator]
undeploy_model = UndeployModelOperator(
task_id="undeploy_model",
endpoint_id=create_endpoint.output['endpoint_id'],
deployed_model_id=deploy_model.output['deployed_model_id'],
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_undeploy_model_operator]

create_endpoint >> deploy_model >> undeploy_model >> delete_endpoint
list_endpoints

with models.DAG(
"example_gcp_vertex_ai_hyperparameter_tuning_job",
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
) as hyperparameter_tuning_job_dag:
# [START how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
task_id="create_hyperparameter_tuning_job",
staging_bucket=STAGING_BUCKET,
display_name=f"horses-humans-hyptertune-{DISPLAY_NAME}",
worker_pool_specs=[
{
"machine_spec": {
"machine_type": MACHINE_TYPE,
"accelerator_type": ACCELERATOR_TYPE,
"accelerator_count": ACCELERATOR_COUNT,
},
"replica_count": REPLICA_COUNT,
"container_spec": {
"image_uri": f"gcr.io/{PROJECT_ID}/horse-human:hypertune",
},
}
],
sync=False,
region=REGION,
project_id=PROJECT_ID,
parameter_spec={
'learning_rate': aiplatform.hyperparameter_tuning.DoubleParameterSpec(
min=0.01, max=1, scale='log'
),
'momentum': aiplatform.hyperparameter_tuning.DoubleParameterSpec(min=0, max=1, scale='linear'),
'num_neurons': aiplatform.hyperparameter_tuning.DiscreteParameterSpec(
values=[64, 128, 512], scale='linear'
),
},
metric_spec={
'accuracy': 'maximize',
},
max_trial_count=15,
parallel_trial_count=3,
)
# [END how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]

# [START how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]
get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
task_id="get_hyperparameter_tuning_job",
project_id=PROJECT_ID,
region=REGION,
hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"],
)
# [END how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]

# [START how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]
delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
task_id="delete_hyperparameter_tuning_job",
project_id=PROJECT_ID,
region=REGION,
hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"],
)
# [END how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]

# [START how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]
list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
task_id="list_hyperparameter_tuning_job",
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]

create_hyperparameter_tuning_job >> get_hyperparameter_tuning_job >> delete_hyperparameter_tuning_job
list_hyperparameter_tuning_job

with models.DAG(
"example_gcp_vertex_ai_model_service",
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
) as model_service_dag:
# [START how_to_cloud_vertex_ai_upload_model_operator]
upload_model = UploadModelOperator(
task_id="upload_model",
region=REGION,
project_id=PROJECT_ID,
model=MODEL_OBJ,
)
# [END how_to_cloud_vertex_ai_upload_model_operator]

# [START how_to_cloud_vertex_ai_export_model_operator]
export_model = ExportModelOperator(
task_id="export_model",
project_id=PROJECT_ID,
region=REGION,
model_id=upload_model.output["model_id"],
output_config=MODEL_OUTPUT_CONFIG,
)
# [END how_to_cloud_vertex_ai_export_model_operator]

# [START how_to_cloud_vertex_ai_delete_model_operator]
delete_model = DeleteModelOperator(
task_id="delete_model",
project_id=PROJECT_ID,
region=REGION,
model_id=upload_model.output["model_id"],
)
# [END how_to_cloud_vertex_ai_delete_model_operator]

# [START how_to_cloud_vertex_ai_list_models_operator]
list_models = ListModelsOperator(
task_id="list_models",
region=REGION,
project_id=PROJECT_ID,
)
# [END how_to_cloud_vertex_ai_list_models_operator]

upload_model >> export_model >> delete_model
list_models

0 comments on commit ca4b8d1

Please sign in to comment.