Skip to content

Commit

Permalink
Add How To Guide for Dataflow (#13461)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobiasz Kędzierski committed Jan 21, 2021
1 parent f7fe363 commit 70bf307
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 2 deletions.
18 changes: 16 additions & 2 deletions airflow/providers/google/cloud/example_dags/example_dataflow.py
Expand Up @@ -65,7 +65,7 @@
tags=['example'],
) as dag_native_java:

# [START howto_operator_start_java_job]
# [START howto_operator_start_java_job_jar_on_gcs]
start_java_job = DataflowCreateJavaJobOperator(
task_id="start-java-job",
jar=GCS_JAR,
Expand All @@ -78,8 +78,9 @@
check_if_running=CheckJobRunning.IgnoreJob,
location='europe-west3',
)
# [END howto_operator_start_java_job]
# [END howto_operator_start_java_job_jar_on_gcs]

# [START howto_operator_start_java_job_local_jar]
jar_to_local = GCSToLocalFilesystemOperator(
task_id="jar-to-local",
bucket=GCS_JAR_BUCKET_NAME,
Expand All @@ -99,6 +100,7 @@
check_if_running=CheckJobRunning.WaitForRun,
)
jar_to_local >> start_java_job_local
# [END howto_operator_start_java_job_local_jar]

with models.DAG(
"example_gcp_dataflow_native_python",
Expand Down Expand Up @@ -144,6 +146,7 @@
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag_native_python_async:
# [START howto_operator_start_python_job_async]
start_python_job_async = DataflowCreatePythonJobOperator(
task_id="start-python-job-async",
py_file=GCS_PYTHON,
Expand All @@ -158,14 +161,18 @@
location='europe-west3',
wait_until_finished=False,
)
# [END howto_operator_start_python_job_async]

# [START howto_sensor_wait_for_job_status]
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location='europe-west3',
)
# [END howto_sensor_wait_for_job_status]

# [START howto_sensor_wait_for_job_metric]
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
"""Check is metric greater than equals to given value."""

Expand All @@ -187,7 +194,9 @@ def callback(metrics: List[Dict]) -> bool:
location='europe-west3',
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
)
# [END howto_sensor_wait_for_job_metric]

# [START howto_sensor_wait_for_job_message]
def check_message(messages: List[dict]) -> bool:
"""Check message"""
for message in messages:
Expand All @@ -201,7 +210,9 @@ def check_message(messages: List[dict]) -> bool:
location='europe-west3',
callback=check_message,
)
# [END howto_sensor_wait_for_job_message]

# [START howto_sensor_wait_for_job_autoscaling_event]
def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
"""Check autoscaling event"""
for autoscaling_event in autoscaling_events:
Expand All @@ -215,6 +226,7 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
location='europe-west3',
callback=check_autoscaling_event,
)
# [END howto_sensor_wait_for_job_autoscaling_event]

start_python_job_async >> wait_for_python_job_async_done
start_python_job_async >> wait_for_python_job_async_metric
Expand All @@ -229,9 +241,11 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag_template:
# [START howto_operator_start_template_job]
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start-template-job",
template='gs://dataflow-templates/latest/Word_Count',
parameters={'inputFile': "gs://dataflow-samples/shakespeare/kinglear.txt", 'output': GCS_OUTPUT},
location='europe-west3',
)
# [END howto_operator_start_template_job]
Expand Up @@ -44,6 +44,7 @@
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
) as dag_flex_template:
# [START howto_operator_start_template_job]
start_flex_template = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_streaming_beam_sql",
body={
Expand All @@ -59,3 +60,4 @@
do_xcom_push=True,
location=BQ_FLEX_TEMPLATE_LOCATION,
)
# [END howto_operator_start_template_job]
Expand Up @@ -39,6 +39,7 @@
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag_sql:
# [START howto_operator_start_sql_job]
start_sql = DataflowStartSqlJobOperator(
task_id="start_sql_query",
job_name=DATAFLOW_SQL_JOB_NAME,
Expand All @@ -61,3 +62,4 @@
location=DATAFLOW_SQL_LOCATION,
do_xcom_push=True,
)
# [END howto_operator_start_sql_job]
24 changes: 24 additions & 0 deletions airflow/providers/google/cloud/operators/dataflow.py
Expand Up @@ -87,6 +87,10 @@ class DataflowCreateJavaJobOperator(BaseOperator):
For more detail on job submission have a look at the reference:
https://cloud.google.com/dataflow/pipelines/specifying-exec-params
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataflowCreateJavaJobOperator`
:param jar: The reference to a self executing DataFlow jar (templated).
:type jar: str
:param job_name: The 'jobName' to use when executing the DataFlow job
Expand Down Expand Up @@ -321,6 +325,10 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
Start a Templated Cloud DataFlow job. The parameters of the operation
will be passed to the job.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataflowTemplatedJobStartOperator`
:param template: The reference to the DataFlow template.
:type template: str
:param job_name: The 'jobName' to use when executing the DataFlow template
Expand Down Expand Up @@ -543,6 +551,10 @@ class DataflowStartFlexTemplateOperator(BaseOperator):
"""
Starts flex templates with the Dataflow pipeline.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataflowStartFlexTemplateOperator`
:param body: The request body. See:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
:param location: The location of the Dataflow job (for example europe-west1)
Expand Down Expand Up @@ -659,6 +671,14 @@ class DataflowStartSqlJobOperator(BaseOperator):
"""
Starts Dataflow SQL query.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataflowStartSqlJobOperator`
.. warning::
This operator requires ``gcloud`` command (Google Cloud SDK) must be installed on the Airflow worker
<https://cloud.google.com/sdk/docs/install>`__
:param job_name: The unique name to assign to the Cloud Dataflow job.
:type job_name: str
:param query: The SQL query to execute.
Expand Down Expand Up @@ -764,6 +784,10 @@ class DataflowCreatePythonJobOperator(BaseOperator):
For more detail on job submission have a look at the reference:
https://cloud.google.com/dataflow/pipelines/specifying-exec-params
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataflowCreatePythonJobOperator`
:param py_file: Reference to the python dataflow pipeline file.py, e.g.,
/some/local/file/path/to/your/python/pipeline/file. (templated)
:type py_file: str
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/google/cloud/sensors/dataflow.py
Expand Up @@ -32,6 +32,10 @@ class DataflowJobStatusSensor(BaseSensorOperator):
"""
Checks for the status of a job in Google Cloud Dataflow.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataflowJobStatusSensor`
:param job_id: ID of the job to be checked.
:type job_id: str
:param expected_statuses: The expected state of the operation.
Expand Down Expand Up @@ -122,6 +126,10 @@ class DataflowJobMetricsSensor(BaseSensorOperator):
"""
Checks the metrics of a job in Google Cloud Dataflow.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataflowJobMetricsSensor`
:param job_id: ID of the job to be checked.
:type job_id: str
:param callback: callback which is called with list of read job metrics
Expand Down Expand Up @@ -212,6 +220,10 @@ class DataflowJobMessagesSensor(BaseSensorOperator):
"""
Checks for the job message in Google Cloud Dataflow.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataflowJobMessagesSensor`
:param job_id: ID of the job to be checked.
:type job_id: str
:param callback: callback which is called with list of read job metrics
Expand Down Expand Up @@ -302,6 +314,10 @@ class DataflowJobAutoScalingEventsSensor(BaseSensorOperator):
"""
Checks for the job autoscaling event in Google Cloud Dataflow.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataflowJobAutoScalingEventsSensor`
:param job_id: ID of the job to be checked.
:type job_id: str
:param callback: callback which is called with list of read job metrics
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/google/provider.yaml
Expand Up @@ -201,6 +201,8 @@ integrations:
tags: [gcp]
- integration-name: Google Dataflow
external-doc-url: https://cloud.google.com/dataflow/
how-to-guide:
- /docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
logo: /integration-logos/gcp/Cloud-Dataflow.png
tags: [gcp]
- integration-name: Google Data Fusion
Expand Down

0 comments on commit 70bf307

Please sign in to comment.