Skip to content

Commit

Permalink
Standardise dataproc location param to region (#16034)
Browse files Browse the repository at this point in the history
* Standardise dataproc location param to region

Standardises DataProc hook & operators `location` parameter to `region` in line
with underlying google DataProc Python client library.

* Adding back `location` parameter for backward compability

* Fix test

* Update airflow/providers/google/CHANGELOG.rst

Co-authored-by: Jarek Potiuk <[email protected]>
  • Loading branch information
Daniel-Han-Yang and potiuk committed Jul 7, 2021
1 parent 5a5f30f commit b0f7f91
Show file tree
Hide file tree
Showing 8 changed files with 849 additions and 159 deletions.
1 change: 0 additions & 1 deletion airflow/providers/google/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
specific language governing permissions and limitations
under the License.
Changelog
---------

Expand Down
22 changes: 11 additions & 11 deletions airflow/providers/google/cloud/example_dags/example_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@
update_mask=UPDATE_MASK,
graceful_decommission_timeout=TIMEOUT,
project_id=PROJECT_ID,
location=REGION,
region=REGION,
)
# [END how_to_cloud_dataproc_update_cluster_operator]

Expand All @@ -179,7 +179,7 @@
task_id="create_workflow_template",
template=WORKFLOW_TEMPLATE,
project_id=PROJECT_ID,
location=REGION,
region=REGION,
)
# [END how_to_cloud_dataproc_create_workflow_template]

Expand All @@ -190,24 +190,24 @@
# [END how_to_cloud_dataproc_trigger_workflow_template]

pig_task = DataprocSubmitJobOperator(
task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID
task_id="pig_task", job=PIG_JOB, region=REGION, project_id=PROJECT_ID
)
spark_sql_task = DataprocSubmitJobOperator(
task_id="spark_sql_task", job=SPARK_SQL_JOB, location=REGION, project_id=PROJECT_ID
task_id="spark_sql_task", job=SPARK_SQL_JOB, region=REGION, project_id=PROJECT_ID
)

spark_task = DataprocSubmitJobOperator(
task_id="spark_task", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID
task_id="spark_task", job=SPARK_JOB, region=REGION, project_id=PROJECT_ID
)

# [START cloud_dataproc_async_submit_sensor]
spark_task_async = DataprocSubmitJobOperator(
task_id="spark_task_async", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID, asynchronous=True
task_id="spark_task_async", job=SPARK_JOB, region=REGION, project_id=PROJECT_ID, asynchronous=True
)

spark_task_async_sensor = DataprocJobSensor(
task_id='spark_task_async_sensor_task',
location=REGION,
region=REGION,
project_id=PROJECT_ID,
dataproc_job_id="{{task_instance.xcom_pull(task_ids='spark_task_async')}}",
poke_interval=10,
Expand All @@ -216,20 +216,20 @@

# [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)
# [END how_to_cloud_dataproc_submit_job_to_cluster_operator]

sparkr_task = DataprocSubmitJobOperator(
task_id="sparkr_task", job=SPARKR_JOB, location=REGION, project_id=PROJECT_ID
task_id="sparkr_task", job=SPARKR_JOB, region=REGION, project_id=PROJECT_ID
)

hive_task = DataprocSubmitJobOperator(
task_id="hive_task", job=HIVE_JOB, location=REGION, project_id=PROJECT_ID
task_id="hive_task", job=HIVE_JOB, region=REGION, project_id=PROJECT_ID
)

hadoop_task = DataprocSubmitJobOperator(
task_id="hadoop_task", job=HADOOP_JOB, location=REGION, project_id=PROJECT_ID
task_id="hadoop_task", job=HADOOP_JOB, region=REGION, project_id=PROJECT_ID
)

# [START how_to_cloud_dataproc_delete_cluster_operator]
Expand Down

0 comments on commit b0f7f91

Please sign in to comment.