Skip to content

Commit

Permalink
Fixup docstring for deprecated DataprocSubmitSparkJobOperator and ref…
Browse files Browse the repository at this point in the history
…actoring system tests (#32743)
  • Loading branch information
moiseenkov committed Jul 26, 2023
1 parent f2e9331 commit 583f407
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 20 deletions.
4 changes: 4 additions & 0 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,10 @@ def execute(self, context: Context):
class DataprocSubmitSparkJobOperator(DataprocJobBaseOperator):
"""Start a Spark Job on a Cloud DataProc cluster.
.. seealso::
This operator is deprecated, please use
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`:
:param main_jar: The HCFS URI of the jar file that contains the main class
(use this or the main_class, not both together).
:param main_class: Name of the job class. (use this or the main_jar, not both
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
DAG_ID = "dataproc_spark"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

CLUSTER_NAME = f"cluster-dataproc-spark-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"


# Cluster definition
CLUSTER_CONFIG = {
Expand All @@ -54,8 +52,6 @@
},
}

TIMEOUT = {"seconds": 1 * 24 * 60 * 60}

# Jobs definitions
# [START how_to_cloud_dataproc_spark_config]
SPARK_JOB = {
Expand All @@ -74,7 +70,7 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc"],
tags=["example", "dataproc", "spark"],
) as dag:
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
Expand All @@ -96,7 +92,14 @@
trigger_rule=TriggerRule.ALL_DONE,
)

create_cluster >> spark_task >> delete_cluster
(
# TEST SETUP
create_cluster
# TEST BODY
>> spark_task
# TEST TEARDOWN
>> delete_cluster
)

from tests.system.utils.watcher import watcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@
DAG_ID = "dataproc_spark_async"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

CLUSTER_NAME = f"dataproc-spark-async-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"

# Cluster definition
CLUSTER_CONFIG = {
Expand All @@ -54,8 +53,6 @@
},
}

TIMEOUT = {"seconds": 1 * 24 * 60 * 60}

# Jobs definitions
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
Expand All @@ -72,7 +69,7 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc"],
tags=["example", "dataproc", "spark", "async"],
) as dag:
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
Expand Down Expand Up @@ -104,7 +101,15 @@
trigger_rule=TriggerRule.ALL_DONE,
)

create_cluster >> spark_task_async >> spark_task_async_sensor >> delete_cluster
(
# TEST SETUP
create_cluster
# TEST BODY
>> spark_task_async
>> spark_task_async_sensor
# TEST TEARDOWN
>> delete_cluster
)

from tests.system.utils.watcher import watcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@
DAG_ID = "dataproc_spark_deferrable"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

CLUSTER_NAME = f"cluster-dataproc-spark-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"


# Cluster definition
CLUSTER_CONFIG = {
Expand All @@ -55,8 +53,6 @@
},
}

TIMEOUT = {"seconds": 1 * 24 * 60 * 60}

# Jobs definitions
# [START how_to_cloud_dataproc_spark_deferrable_config]
SPARK_JOB = {
Expand All @@ -75,7 +71,7 @@
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc"],
tags=["example", "dataproc", "spark", "deferrable"],
) as dag:
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
Expand All @@ -97,7 +93,14 @@
trigger_rule=TriggerRule.ALL_DONE,
)

create_cluster >> spark_task >> delete_cluster
(
# TEST SETUP
create_cluster
# TEST BODY
>> spark_task
# TEST TEARDOWN
>> delete_cluster
)

from tests.system.utils.watcher import watcher

Expand Down

0 comments on commit 583f407

Please sign in to comment.