Skip to content

Commit

Permalink
Add support for BeamGoPipelineOperator (#20386)
Browse files Browse the repository at this point in the history
closes: #20283

In this PR:
- [x]  Upgrade the minimum package requirement to 2.33.0 for apache-beam (first stable for beam go sdk)
- [x]  Refactor `operators/beam.py` with an abstract `BeamBasePipelineOperator` class to factorize initialization and common code, also fixed mypy hook on ``BeamDataflowMixin``
- [x] Add `BeamRunGoPipelineOperator` and `BeamHook.start_go_pipeline` (+tests)
- [x]  Add `utils/go_module.py` to handle initialisation and dependency installation for a module. (+ tests)
- [x]  Slightly modified `process_util` + tests to be able to handle an extra optional parameter `cwd`. (This way we can move to the module directory to build it)
- [x]  Write docs
  • Loading branch information
pierrejeambrun committed Feb 13, 2022
1 parent cca2f94 commit da485da
Show file tree
Hide file tree
Showing 17 changed files with 771 additions and 163 deletions.
114 changes: 113 additions & 1 deletion airflow/providers/apache/beam/example_dags/example_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from airflow import models
from airflow.providers.apache.beam.operators.beam import (
BeamRunGoPipelineOperator,
BeamRunJavaPipelineOperator,
BeamRunPythonPipelineOperator,
)
Expand All @@ -43,7 +44,10 @@
GCS_PYTHON_DATAFLOW_ASYNC = os.environ.get(
'APACHE_BEAM_PYTHON_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.py'
)

GCS_GO = os.environ.get('APACHE_BEAM_GO', 'gs://INVALID BUCKET NAME/wordcount_debugging.go')
GCS_GO_DATAFLOW_ASYNC = os.environ.get(
'APACHE_BEAM_GO_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.go'
)
GCS_JAR_DIRECT_RUNNER = os.environ.get(
'APACHE_BEAM_DIRECT_RUNNER_JAR',
'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-DirectRunner.jar',
Expand Down Expand Up @@ -323,3 +327,111 @@

start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done
# [END howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]


with models.DAG(
"example_beam_native_go",
start_date=START_DATE,
schedule_interval="@once",
catchup=False,
default_args=DEFAULT_ARGS,
tags=['example'],
) as dag_native_go:

# [START howto_operator_start_go_direct_runner_pipeline_local_file]
start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_local_direct_runner",
go_file='files/apache_beam/examples/wordcount.go',
)
# [END howto_operator_start_go_direct_runner_pipeline_local_file]

# [START howto_operator_start_go_direct_runner_pipeline_gcs_file]
start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_direct_runner",
go_file=GCS_GO,
pipeline_options={"output": GCS_OUTPUT},
)
# [END howto_operator_start_go_direct_runner_pipeline_gcs_file]

# [START howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_dataflow_runner",
runner="DataflowRunner",
go_file=GCS_GO,
pipeline_options={
'tempLocation': GCS_TMP,
'stagingLocation': GCS_STAGING,
'output': GCS_OUTPUT,
'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
},
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
),
)
# [END howto_operator_start_go_dataflow_runner_pipeline_gcs_file]

start_go_pipeline_local_spark_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_local_spark_runner",
go_file='/files/apache_beam/examples/wordcount.go',
runner="SparkRunner",
pipeline_options={
'endpoint': '/your/spark/endpoint',
},
)

start_go_pipeline_local_flink_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_local_flink_runner",
go_file='/files/apache_beam/examples/wordcount.go',
runner="FlinkRunner",
pipeline_options={
'output': '/tmp/start_go_pipeline_local_flink_runner',
},
)

(
[
start_go_pipeline_local_direct_runner,
start_go_pipeline_direct_runner,
]
>> start_go_pipeline_local_flink_runner
>> start_go_pipeline_local_spark_runner
)


with models.DAG(
"example_beam_native_go_dataflow_async",
default_args=DEFAULT_ARGS,
start_date=START_DATE,
schedule_interval="@once",
catchup=False,
tags=['example'],
) as dag_native_go_dataflow_async:
# [START howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
task_id="start_go_job_dataflow_runner_async",
runner="DataflowRunner",
go_file=GCS_GO_DATAFLOW_ASYNC,
pipeline_options={
'tempLocation': GCS_TMP,
'stagingLocation': GCS_STAGING,
'output': GCS_OUTPUT,
'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
},
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}',
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
),
)

wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-go-job-async-done",
job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location='us-central1',
)

start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done
# [END howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
51 changes: 50 additions & 1 deletion airflow/providers/apache/beam/hooks/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
"""This module contains a Apache Beam Hook."""
import json
import os
import select
import shlex
import subprocess
Expand All @@ -26,6 +27,7 @@

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.providers.google.go_module_utils import init_module, install_dependencies
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.python_virtualenv import prepare_virtualenv

Expand Down Expand Up @@ -80,12 +82,14 @@ class BeamCommandRunner(LoggingMixin):
:param cmd: Parts of the command to be run in subprocess
:param process_line_callback: Optional callback which can be used to process
stdout and stderr to detect job id
:param working_directory: Working directory
"""

def __init__(
self,
cmd: List[str],
process_line_callback: Optional[Callable[[str], None]] = None,
working_directory: Optional[str] = None,
) -> None:
super().__init__()
self.log.info("Running command: %s", " ".join(shlex.quote(c) for c in cmd))
Expand All @@ -94,6 +98,7 @@ def __init__(

self._proc = subprocess.Popen(
cmd,
cwd=working_directory,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
Expand Down Expand Up @@ -169,6 +174,7 @@ def _start_pipeline(
variables: dict,
command_prefix: List[str],
process_line_callback: Optional[Callable[[str], None]] = None,
working_directory: Optional[str] = None,
) -> None:
cmd = command_prefix + [
f"--runner={self.runner}",
Expand All @@ -178,6 +184,7 @@ def _start_pipeline(
cmd_runner = BeamCommandRunner(
cmd=cmd,
process_line_callback=process_line_callback,
working_directory=working_directory,
)
cmd_runner.wait_for_done()

Expand All @@ -195,6 +202,7 @@ def start_python_pipeline(
Starts Apache Beam python pipeline.
:param variables: Variables passed to the pipeline.
:param py_file: Path to the python file to execute.
:param py_options: Additional options.
:param py_interpreter: Python version of the Apache Beam pipeline.
If None, this defaults to the python3.
Expand All @@ -210,7 +218,8 @@ def start_python_pipeline(
See virtualenv documentation for more information.
This option is only relevant if the ``py_requirements`` parameter is not None.
:param on_new_job_id_callback: Callback called when the job ID is known.
:param process_line_callback: (optional) Callback that can be used to process each line of
the stdout and stderr file descriptors.
"""
if "labels" in variables:
variables["labels"] = [f"{key}={value}" for key, value in variables["labels"].items()]
Expand Down Expand Up @@ -265,6 +274,8 @@ def start_java_pipeline(
:param variables: Variables passed to the job.
:param jar: Name of the jar for the pipeline
:param job_class: Name of the java class for the pipeline.
:param process_line_callback: (optional) Callback that can be used to process each line of
the stdout and stderr file descriptors.
"""
if "labels" in variables:
variables["labels"] = json.dumps(variables["labels"], separators=(",", ":"))
Expand All @@ -275,3 +286,41 @@ def start_java_pipeline(
command_prefix=command_prefix,
process_line_callback=process_line_callback,
)

def start_go_pipeline(
self,
variables: dict,
go_file: str,
process_line_callback: Optional[Callable[[str], None]] = None,
should_init_module: bool = False,
) -> None:
"""
Starts Apache Beam Go pipeline.
:param variables: Variables passed to the job.
:param go_file: Path to the Go file with your beam pipeline.
:param go_file:
:param process_line_callback: (optional) Callback that can be used to process each line of
the stdout and stderr file descriptors.
:param should_init_module: If False (default), will just execute a `go run` command. If True, will
init a module and dependencies with a ``go mod init`` and ``go mod tidy``, useful when pulling
source with GCSHook.
:return:
"""
if "labels" in variables:
variables["labels"] = json.dumps(variables["labels"], separators=(",", ":"))

working_directory = os.path.dirname(go_file)
basename = os.path.basename(go_file)

if should_init_module:
init_module("main", working_directory)
install_dependencies(working_directory)

command_prefix = ["go", "run", basename]
self._start_pipeline(
variables=variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
working_directory=working_directory,
)

0 comments on commit da485da

Please sign in to comment.