Skip to content

Commit 70bf307

Browse files
author
Tobiasz Kędzierski
authored
Add How To Guide for Dataflow (#13461)
1 parent f7fe363 commit 70bf307

File tree

7 files changed

+358
-2
lines changed

7 files changed

+358
-2
lines changed

airflow/providers/google/cloud/example_dags/example_dataflow.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
tags=['example'],
6666
) as dag_native_java:
6767

68-
# [START howto_operator_start_java_job]
68+
# [START howto_operator_start_java_job_jar_on_gcs]
6969
start_java_job = DataflowCreateJavaJobOperator(
7070
task_id="start-java-job",
7171
jar=GCS_JAR,
@@ -78,8 +78,9 @@
7878
check_if_running=CheckJobRunning.IgnoreJob,
7979
location='europe-west3',
8080
)
81-
# [END howto_operator_start_java_job]
81+
# [END howto_operator_start_java_job_jar_on_gcs]
8282

83+
# [START howto_operator_start_java_job_local_jar]
8384
jar_to_local = GCSToLocalFilesystemOperator(
8485
task_id="jar-to-local",
8586
bucket=GCS_JAR_BUCKET_NAME,
@@ -99,6 +100,7 @@
99100
check_if_running=CheckJobRunning.WaitForRun,
100101
)
101102
jar_to_local >> start_java_job_local
103+
# [END howto_operator_start_java_job_local_jar]
102104

103105
with models.DAG(
104106
"example_gcp_dataflow_native_python",
@@ -144,6 +146,7 @@
144146
schedule_interval=None, # Override to match your needs
145147
tags=['example'],
146148
) as dag_native_python_async:
149+
# [START howto_operator_start_python_job_async]
147150
start_python_job_async = DataflowCreatePythonJobOperator(
148151
task_id="start-python-job-async",
149152
py_file=GCS_PYTHON,
@@ -158,14 +161,18 @@
158161
location='europe-west3',
159162
wait_until_finished=False,
160163
)
164+
# [END howto_operator_start_python_job_async]
161165

166+
# [START howto_sensor_wait_for_job_status]
162167
wait_for_python_job_async_done = DataflowJobStatusSensor(
163168
task_id="wait-for-python-job-async-done",
164169
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
165170
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
166171
location='europe-west3',
167172
)
173+
# [END howto_sensor_wait_for_job_status]
168174

175+
# [START howto_sensor_wait_for_job_metric]
169176
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
170177
"""Check is metric greater than equals to given value."""
171178

@@ -187,7 +194,9 @@ def callback(metrics: List[Dict]) -> bool:
187194
location='europe-west3',
188195
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
189196
)
197+
# [END howto_sensor_wait_for_job_metric]
190198

199+
# [START howto_sensor_wait_for_job_message]
191200
def check_message(messages: List[dict]) -> bool:
192201
"""Check message"""
193202
for message in messages:
@@ -201,7 +210,9 @@ def check_message(messages: List[dict]) -> bool:
201210
location='europe-west3',
202211
callback=check_message,
203212
)
213+
# [END howto_sensor_wait_for_job_message]
204214

215+
# [START howto_sensor_wait_for_job_autoscaling_event]
205216
def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
206217
"""Check autoscaling event"""
207218
for autoscaling_event in autoscaling_events:
@@ -215,6 +226,7 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
215226
location='europe-west3',
216227
callback=check_autoscaling_event,
217228
)
229+
# [END howto_sensor_wait_for_job_autoscaling_event]
218230

219231
start_python_job_async >> wait_for_python_job_async_done
220232
start_python_job_async >> wait_for_python_job_async_metric
@@ -229,9 +241,11 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
229241
schedule_interval=None, # Override to match your needs
230242
tags=['example'],
231243
) as dag_template:
244+
# [START howto_operator_start_template_job]
232245
start_template_job = DataflowTemplatedJobStartOperator(
233246
task_id="start-template-job",
234247
template='gs://dataflow-templates/latest/Word_Count',
235248
parameters={'inputFile': "gs://dataflow-samples/shakespeare/kinglear.txt", 'output': GCS_OUTPUT},
236249
location='europe-west3',
237250
)
251+
# [END howto_operator_start_template_job]

airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
start_date=days_ago(1),
4545
schedule_interval=None, # Override to match your needs
4646
) as dag_flex_template:
47+
# [START howto_operator_start_template_job]
4748
start_flex_template = DataflowStartFlexTemplateOperator(
4849
task_id="start_flex_template_streaming_beam_sql",
4950
body={
@@ -59,3 +60,4 @@
5960
do_xcom_push=True,
6061
location=BQ_FLEX_TEMPLATE_LOCATION,
6162
)
63+
# [END howto_operator_start_template_job]

airflow/providers/google/cloud/example_dags/example_dataflow_sql.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
schedule_interval=None, # Override to match your needs
4040
tags=['example'],
4141
) as dag_sql:
42+
# [START howto_operator_start_sql_job]
4243
start_sql = DataflowStartSqlJobOperator(
4344
task_id="start_sql_query",
4445
job_name=DATAFLOW_SQL_JOB_NAME,
@@ -61,3 +62,4 @@
6162
location=DATAFLOW_SQL_LOCATION,
6263
do_xcom_push=True,
6364
)
65+
# [END howto_operator_start_sql_job]

airflow/providers/google/cloud/operators/dataflow.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ class DataflowCreateJavaJobOperator(BaseOperator):
8787
For more detail on job submission have a look at the reference:
8888
https://cloud.google.com/dataflow/pipelines/specifying-exec-params
8989
90+
.. seealso::
91+
For more information on how to use this operator, take a look at the guide:
92+
:ref:`howto/operator:DataflowCreateJavaJobOperator`
93+
9094
:param jar: The reference to a self executing DataFlow jar (templated).
9195
:type jar: str
9296
:param job_name: The 'jobName' to use when executing the DataFlow job
@@ -321,6 +325,10 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
321325
Start a Templated Cloud DataFlow job. The parameters of the operation
322326
will be passed to the job.
323327
328+
.. seealso::
329+
For more information on how to use this operator, take a look at the guide:
330+
:ref:`howto/operator:DataflowTemplatedJobStartOperator`
331+
324332
:param template: The reference to the DataFlow template.
325333
:type template: str
326334
:param job_name: The 'jobName' to use when executing the DataFlow template
@@ -543,6 +551,10 @@ class DataflowStartFlexTemplateOperator(BaseOperator):
543551
"""
544552
Starts flex templates with the Dataflow pipeline.
545553
554+
.. seealso::
555+
For more information on how to use this operator, take a look at the guide:
556+
:ref:`howto/operator:DataflowStartFlexTemplateOperator`
557+
546558
:param body: The request body. See:
547559
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
548560
:param location: The location of the Dataflow job (for example europe-west1)
@@ -659,6 +671,14 @@ class DataflowStartSqlJobOperator(BaseOperator):
659671
"""
660672
Starts Dataflow SQL query.
661673
674+
.. seealso::
675+
For more information on how to use this operator, take a look at the guide:
676+
:ref:`howto/operator:DataflowStartSqlJobOperator`
677+
678+
.. warning::
679+
This operator requires ``gcloud`` command (Google Cloud SDK) must be installed on the Airflow worker
680+
<https://cloud.google.com/sdk/docs/install>`__
681+
662682
:param job_name: The unique name to assign to the Cloud Dataflow job.
663683
:type job_name: str
664684
:param query: The SQL query to execute.
@@ -764,6 +784,10 @@ class DataflowCreatePythonJobOperator(BaseOperator):
764784
For more detail on job submission have a look at the reference:
765785
https://cloud.google.com/dataflow/pipelines/specifying-exec-params
766786
787+
.. seealso::
788+
For more information on how to use this operator, take a look at the guide:
789+
:ref:`howto/operator:DataflowCreatePythonJobOperator`
790+
767791
:param py_file: Reference to the python dataflow pipeline file.py, e.g.,
768792
/some/local/file/path/to/your/python/pipeline/file. (templated)
769793
:type py_file: str

airflow/providers/google/cloud/sensors/dataflow.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class DataflowJobStatusSensor(BaseSensorOperator):
3232
"""
3333
Checks for the status of a job in Google Cloud Dataflow.
3434
35+
.. seealso::
36+
For more information on how to use this operator, take a look at the guide:
37+
:ref:`howto/operator:DataflowJobStatusSensor`
38+
3539
:param job_id: ID of the job to be checked.
3640
:type job_id: str
3741
:param expected_statuses: The expected state of the operation.
@@ -122,6 +126,10 @@ class DataflowJobMetricsSensor(BaseSensorOperator):
122126
"""
123127
Checks the metrics of a job in Google Cloud Dataflow.
124128
129+
.. seealso::
130+
For more information on how to use this operator, take a look at the guide:
131+
:ref:`howto/operator:DataflowJobMetricsSensor`
132+
125133
:param job_id: ID of the job to be checked.
126134
:type job_id: str
127135
:param callback: callback which is called with list of read job metrics
@@ -212,6 +220,10 @@ class DataflowJobMessagesSensor(BaseSensorOperator):
212220
"""
213221
Checks for the job message in Google Cloud Dataflow.
214222
223+
.. seealso::
224+
For more information on how to use this operator, take a look at the guide:
225+
:ref:`howto/operator:DataflowJobMessagesSensor`
226+
215227
:param job_id: ID of the job to be checked.
216228
:type job_id: str
217229
:param callback: callback which is called with list of read job metrics
@@ -302,6 +314,10 @@ class DataflowJobAutoScalingEventsSensor(BaseSensorOperator):
302314
"""
303315
Checks for the job autoscaling event in Google Cloud Dataflow.
304316
317+
.. seealso::
318+
For more information on how to use this operator, take a look at the guide:
319+
:ref:`howto/operator:DataflowJobAutoScalingEventsSensor`
320+
305321
:param job_id: ID of the job to be checked.
306322
:type job_id: str
307323
:param callback: callback which is called with list of read job metrics

airflow/providers/google/provider.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ integrations:
201201
tags: [gcp]
202202
- integration-name: Google Dataflow
203203
external-doc-url: https://cloud.google.com/dataflow/
204+
how-to-guide:
205+
- /docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
204206
logo: /integration-logos/gcp/Cloud-Dataflow.png
205207
tags: [gcp]
206208
- integration-name: Google Data Fusion

0 commit comments

Comments
 (0)