Skip to content

Commit

Permalink
Add support for latest Apache Beam SDK in Dataflow operators (#9323)
Browse files Browse the repository at this point in the history
* A

* Add support for Apache Beam latest SDK in Dataflow operators

* fixup! Add support for Apache Beam latest SDK in Dataflow operators

* fixup! fixup! Add support for Apache Beam latest SDK in Dataflow operators

* fixup! fixup! fixup! Add support for Apache Beam latest SDK in Dataflow operators
  • Loading branch information
mik-laj committed Jun 16, 2020
1 parent 47bddf7 commit 639972d
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 53 deletions.
21 changes: 17 additions & 4 deletions airflow/providers/google/cloud/example_dags/example_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@
}

with models.DAG(
"example_gcp_dataflow",
"example_gcp_dataflow_native_java",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag:
) as dag_native_java:

# [START howto_operator_start_java_job]
start_java_job = DataflowCreateJavaJobOperator(
Expand Down Expand Up @@ -90,6 +90,13 @@
)
jar_to_local >> start_java_job_local

with models.DAG(
"example_gcp_dataflow_native_python",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag_native_python:

# [START howto_operator_start_python_job]
start_python_job = DataflowCreatePythonJobOperator(
task_id="start-python-job",
Expand All @@ -100,7 +107,7 @@
'output': GCS_OUTPUT,
},
py_requirements=[
'apache-beam[gcp]>=2.14.0'
'apache-beam[gcp]==2.21.0'
],
py_interpreter='python3',
py_system_site_packages=False,
Expand All @@ -117,12 +124,18 @@
'output': GCS_OUTPUT,
},
py_requirements=[
'apache-beam[gcp]>=2.14.0'
'apache-beam[gcp]==2.14.0'
],
py_interpreter='python3',
py_system_site_packages=False
)

with models.DAG(
"example_gcp_dataflow_template",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag_template:
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start-template-job",
template='gs://dataflow-templates/latest/Word_Count',
Expand Down
85 changes: 44 additions & 41 deletions airflow/providers/google/cloud/hooks/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import re
import select
import shlex
import subprocess
import time
import uuid
Expand All @@ -42,10 +43,9 @@
DEFAULT_DATAFLOW_LOCATION = 'us-central1'


# https://github.com/apache/beam/blob/75eee7857bb80a0cdb4ce99ae3e184101092e2ed/sdks/go/pkg/beam/runners/
# universal/runnerlib/execute.go#L85
JOB_ID_PATTERN = re.compile(
r'https?://console\.cloud\.google\.com/dataflow/jobsDetail/locations/.+?/jobs/([a-z|0-9|A-Z|\-|\_]+).*?')
r'Submitted job: (?P<job_id_java>.*)|Created job with id: \[(?P<job_id_python>.*)\]'
)

RT = TypeVar('RT') # pylint: disable=invalid-name

Expand Down Expand Up @@ -319,48 +319,55 @@ def __init__(
on_new_job_id_callback: Optional[Callable[[str], None]] = None
) -> None:
super().__init__()
self.log.info("Running command: %s", ' '.join(cmd))
self.log.info("Running command: %s", ' '.join(shlex.quote(c) for c in cmd))
self.on_new_job_id_callback = on_new_job_id_callback
self.job_id: Optional[str] = None
self._proc = subprocess.Popen(
cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True)

def _read_line_by_fd(self, fd):
if fd == self._proc.stderr.fileno():
line = self._proc.stderr.readline().decode()
if line:
self.log.warning(line[:-1])
return line
def _process_fd(self, fd):
"""
Prints output to logs and lookup for job ID in each line.
:param fd: File descriptor.
"""
if fd == self._proc.stderr:
while True:
line = self._proc.stderr.readline().decode()
if not line:
return
self._process_line_and_extract_job_id(line)
self.log.warning(line.rstrip("\n"))

if fd == self._proc.stdout.fileno():
line = self._proc.stdout.readline().decode()
if line:
self.log.info(line[:-1])
return line
if fd == self._proc.stdout:
while True:
line = self._proc.stdout.readline().decode()
if not line:
return
self._process_line_and_extract_job_id(line)
self.log.info(line.rstrip("\n"))

raise Exception("No data in stderr or in stdout.")

def _extract_job(self, line: str) -> Optional[str]:
def _process_line_and_extract_job_id(self, line: str) -> None:
"""
Extracts job_id.
:param line: URL from which job_id has to be extracted
:type line: str
:return: job_id or None if no match
:rtype: Optional[str]
"""
# Job id info: https://goo.gl/SE29y9.
matched_job = JOB_ID_PATTERN.search(line)
if matched_job:
job_id = matched_job.group(1)
job_id = matched_job.group('job_id_java') or matched_job.group('job_id_python')
self.log.info("Found Job ID: %s", job_id)
self.job_id = job_id
if self.on_new_job_id_callback:
self.on_new_job_id_callback(job_id)
return job_id
return None

def wait_for_done(self) -> Optional[str]:
"""
Expand All @@ -369,35 +376,31 @@ def wait_for_done(self) -> Optional[str]:
:return: Job id
:rtype: Optional[str]
"""
reads = [self._proc.stderr.fileno() if self._proc.stderr else 0,
self._proc.stdout.fileno() if self._proc.stdout else 0]
self.log.info("Start waiting for DataFlow process to complete.")
job_id = None
# Make sure logs are processed regardless whether the subprocess is
# terminated.
process_ends = False
self.job_id = None
reads = [self._proc.stderr, self._proc.stdout]
while True:
# Wait for at least one available fd.
readable_fbs, _, _ = select.select(reads, [], [], 5)
if readable_fbs is None:
readable_fds, _, _ = select.select(reads, [], [], 5)
if readable_fds is None:
self.log.info("Waiting for DataFlow process to complete.")
continue

# Read available fds.
for readable_fb in readable_fbs:
line = self._read_line_by_fd(readable_fb)
if line and not job_id:
job_id = job_id or self._extract_job(line)
for readable_fd in readable_fds:
self._process_fd(readable_fd)

if process_ends:
break
if self._proc.poll() is not None:
# Mark process completion but allows its outputs to be consumed.
process_ends = True
break

# Corner case: check if more output was created between the last read and the process termination
for readable_fd in reads:
self._process_fd(readable_fd)

self.log.info("Process exited with return code: %s", self._proc.returncode)

if self._proc.returncode != 0:
raise Exception("DataFlow failed with return code {}".format(
self._proc.returncode))
return job_id
raise Exception("DataFlow failed with return code {}".format(self._proc.returncode))
return self.job_id


class DataflowHook(GoogleBaseHook):
Expand Down
82 changes: 76 additions & 6 deletions tests/providers/google/cloud/hooks/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#

import copy
import shlex
import unittest
from typing import Any, Dict

Expand Down Expand Up @@ -1026,14 +1027,83 @@ def test_dataflow_job_cancel_job(self):
mock_batch.execute.assert_called_once()


APACHE_BEAM_V_2_14_0_JAVA_SDK_LOG = f""""\
Dataflow SDK version: 2.14.0
Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow\
/jobsDetail/locations/europe-west3/jobs/{TEST_JOB_ID}?project=XXX
Submitted job: {TEST_JOB_ID}
Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: To cancel the job using the 'gcloud' tool, run:
> gcloud dataflow jobs --project=XXX cancel --region=europe-west3 {TEST_JOB_ID}
"""

APACHE_BEAM_V_2_22_0_JAVA_SDK_LOG = f""""\
INFO: Dataflow SDK version: 2.22.0
Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow\
/jobs/europe-west3/{TEST_JOB_ID}?project=XXXX
Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Submitted job: {TEST_JOB_ID}
Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: To cancel the job using the 'gcloud' tool, run:
> gcloud dataflow jobs --project=XXX cancel --region=europe-west3 {TEST_JOB_ID}
"""

APACHE_BEAM_V_2_14_0_PYTHON_SDK_LOG = f""""\
INFO:root:Completed GCS upload to gs://test-dataflow-example/staging/start-python-job-local-5bcf3d71.\
1592286375.000962/apache_beam-2.14.0-cp37-cp37m-manylinux1_x86_64.whl in 0 seconds.
INFO:root:Create job: <Job
createTime: '2020-06-16T05:46:20.911857Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '{TEST_JOB_ID}'
location: 'us-central1'
name: 'start-python-job-local-5bcf3d71'
projectId: 'XXX'
stageStates: []
startTime: '2020-06-16T05:46:20.911857Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:root:Created job with id: [{TEST_JOB_ID}]
INFO:root:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/\
dataflow/jobsDetail/locations/us-central1/jobs/{TEST_JOB_ID}?project=XXX
"""

APACHE_BEAM_V_2_22_0_PYTHON_SDK_LOG = f""""\
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://test-dataflow-example/\
staging/start-python-job-local-5bcf3d71.1592286719.303624/apache_beam-2.22.0-cp37-cp37m-manylinux1_x86_64.whl\
in 1 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
createTime: '2020-06-16T05:52:04.095216Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '{TEST_JOB_ID}'
location: 'us-central1'
name: 'start-python-job-local-5bcf3d71'
projectId: 'XXX'
stageStates: []
startTime: '2020-06-16T05:52:04.095216Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [{TEST_JOB_ID}]
INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: {TEST_JOB_ID}
INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please \
navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/{TEST_JOB_ID}?project=XXX
"""


class TestDataflow(unittest.TestCase):

def test_data_flow_valid_job_id(self):
cmd = [
'echo', 'additional unit test lines.\n' +
'https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/'
'jobs/{}?project=XXX'.format(TEST_JOB_ID)
]
@parameterized.expand([
(APACHE_BEAM_V_2_14_0_JAVA_SDK_LOG, ),
(APACHE_BEAM_V_2_22_0_JAVA_SDK_LOG, ),
(APACHE_BEAM_V_2_14_0_PYTHON_SDK_LOG, ),
(APACHE_BEAM_V_2_22_0_PYTHON_SDK_LOG, ),
], name_func=lambda func, num, p: f"{func.__name__}_{num}")
def test_data_flow_valid_job_id(self, log):
echos = ";".join([f"echo {shlex.quote(line)}" for line in log.split("\n")])
cmd = ["bash", "-c", echos]
self.assertEqual(_DataflowRunner(cmd).wait_for_done(), TEST_JOB_ID)

def test_data_flow_missing_job_id(self):
Expand Down
12 changes: 10 additions & 2 deletions tests/providers/google/cloud/operators/test_dataflow_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,13 @@
@pytest.mark.credential_file(GCP_DATAFLOW_KEY)
class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
@provide_gcp_context(GCP_DATAFLOW_KEY)
def test_run_example_dag_function(self):
self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
def test_run_example_gcp_dataflow_native_java(self):
self.run_dag('example_gcp_dataflow_native_java', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_DATAFLOW_KEY)
def test_run_example_gcp_dataflow_native_python(self):
self.run_dag('example_gcp_dataflow_native_python', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_DATAFLOW_KEY)
def test_run_example_gcp_dataflow_template(self):
self.run_dag('example_gcp_dataflow_template', CLOUD_DAG_FOLDER)

0 comments on commit 639972d

Please sign in to comment.