Skip to content

Commit 639972d

Browse files
authored
Add support for latest Apache Beam SDK in Dataflow operators (#9323)
* A * Add support for Apache Beam latest SDK in Dataflow operators * fixup! Add support for Apache Beam latest SDK in Dataflow operators * fixup! fixup! Add support for Apache Beam latest SDK in Dataflow operators * fixup! fixup! fixup! Add support for Apache Beam latest SDK in Dataflow operators
1 parent 47bddf7 commit 639972d

File tree

4 files changed

+147
-53
lines changed

4 files changed

+147
-53
lines changed

airflow/providers/google/cloud/example_dags/example_dataflow.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@
4949
}
5050

5151
with models.DAG(
52-
"example_gcp_dataflow",
52+
"example_gcp_dataflow_native_java",
5353
default_args=default_args,
5454
schedule_interval=None, # Override to match your needs
5555
tags=['example'],
56-
) as dag:
56+
) as dag_native_java:
5757

5858
# [START howto_operator_start_java_job]
5959
start_java_job = DataflowCreateJavaJobOperator(
@@ -90,6 +90,13 @@
9090
)
9191
jar_to_local >> start_java_job_local
9292

93+
with models.DAG(
94+
"example_gcp_dataflow_native_python",
95+
default_args=default_args,
96+
schedule_interval=None, # Override to match your needs
97+
tags=['example'],
98+
) as dag_native_python:
99+
93100
# [START howto_operator_start_python_job]
94101
start_python_job = DataflowCreatePythonJobOperator(
95102
task_id="start-python-job",
@@ -100,7 +107,7 @@
100107
'output': GCS_OUTPUT,
101108
},
102109
py_requirements=[
103-
'apache-beam[gcp]>=2.14.0'
110+
'apache-beam[gcp]==2.21.0'
104111
],
105112
py_interpreter='python3',
106113
py_system_site_packages=False,
@@ -117,12 +124,18 @@
117124
'output': GCS_OUTPUT,
118125
},
119126
py_requirements=[
120-
'apache-beam[gcp]>=2.14.0'
127+
'apache-beam[gcp]==2.14.0'
121128
],
122129
py_interpreter='python3',
123130
py_system_site_packages=False
124131
)
125132

133+
with models.DAG(
134+
"example_gcp_dataflow_template",
135+
default_args=default_args,
136+
schedule_interval=None, # Override to match your needs
137+
tags=['example'],
138+
) as dag_template:
126139
start_template_job = DataflowTemplatedJobStartOperator(
127140
task_id="start-template-job",
128141
template='gs://dataflow-templates/latest/Word_Count',

airflow/providers/google/cloud/hooks/dataflow.py

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import json
2323
import re
2424
import select
25+
import shlex
2526
import subprocess
2627
import time
2728
import uuid
@@ -42,10 +43,9 @@
4243
DEFAULT_DATAFLOW_LOCATION = 'us-central1'
4344

4445

45-
# https://github.com/apache/beam/blob/75eee7857bb80a0cdb4ce99ae3e184101092e2ed/sdks/go/pkg/beam/runners/
46-
# universal/runnerlib/execute.go#L85
4746
JOB_ID_PATTERN = re.compile(
48-
r'https?://console\.cloud\.google\.com/dataflow/jobsDetail/locations/.+?/jobs/([a-z|0-9|A-Z|\-|\_]+).*?')
47+
r'Submitted job: (?P<job_id_java>.*)|Created job with id: \[(?P<job_id_python>.*)\]'
48+
)
4949

5050
RT = TypeVar('RT') # pylint: disable=invalid-name
5151

@@ -319,48 +319,55 @@ def __init__(
319319
on_new_job_id_callback: Optional[Callable[[str], None]] = None
320320
) -> None:
321321
super().__init__()
322-
self.log.info("Running command: %s", ' '.join(cmd))
322+
self.log.info("Running command: %s", ' '.join(shlex.quote(c) for c in cmd))
323323
self.on_new_job_id_callback = on_new_job_id_callback
324+
self.job_id: Optional[str] = None
324325
self._proc = subprocess.Popen(
325326
cmd,
326327
shell=False,
327328
stdout=subprocess.PIPE,
328329
stderr=subprocess.PIPE,
329330
close_fds=True)
330331

331-
def _read_line_by_fd(self, fd):
332-
if fd == self._proc.stderr.fileno():
333-
line = self._proc.stderr.readline().decode()
334-
if line:
335-
self.log.warning(line[:-1])
336-
return line
332+
def _process_fd(self, fd):
333+
"""
334+
Prints output to logs and lookup for job ID in each line.
335+
336+
:param fd: File descriptor.
337+
"""
338+
if fd == self._proc.stderr:
339+
while True:
340+
line = self._proc.stderr.readline().decode()
341+
if not line:
342+
return
343+
self._process_line_and_extract_job_id(line)
344+
self.log.warning(line.rstrip("\n"))
337345

338-
if fd == self._proc.stdout.fileno():
339-
line = self._proc.stdout.readline().decode()
340-
if line:
341-
self.log.info(line[:-1])
342-
return line
346+
if fd == self._proc.stdout:
347+
while True:
348+
line = self._proc.stdout.readline().decode()
349+
if not line:
350+
return
351+
self._process_line_and_extract_job_id(line)
352+
self.log.info(line.rstrip("\n"))
343353

344354
raise Exception("No data in stderr or in stdout.")
345355

346-
def _extract_job(self, line: str) -> Optional[str]:
356+
def _process_line_and_extract_job_id(self, line: str) -> None:
347357
"""
348358
Extracts job_id.
349359
350360
:param line: URL from which job_id has to be extracted
351361
:type line: str
352-
:return: job_id or None if no match
353-
:rtype: Optional[str]
354362
"""
355363
# Job id info: https://goo.gl/SE29y9.
356364
matched_job = JOB_ID_PATTERN.search(line)
357365
if matched_job:
358-
job_id = matched_job.group(1)
366+
job_id = matched_job.group('job_id_java') or matched_job.group('job_id_python')
359367
self.log.info("Found Job ID: %s", job_id)
368+
self.job_id = job_id
360369
if self.on_new_job_id_callback:
361370
self.on_new_job_id_callback(job_id)
362-
return job_id
363-
return None
364371

365372
def wait_for_done(self) -> Optional[str]:
366373
"""
@@ -369,35 +376,31 @@ def wait_for_done(self) -> Optional[str]:
369376
:return: Job id
370377
:rtype: Optional[str]
371378
"""
372-
reads = [self._proc.stderr.fileno() if self._proc.stderr else 0,
373-
self._proc.stdout.fileno() if self._proc.stdout else 0]
374379
self.log.info("Start waiting for DataFlow process to complete.")
375-
job_id = None
376-
# Make sure logs are processed regardless whether the subprocess is
377-
# terminated.
378-
process_ends = False
380+
self.job_id = None
381+
reads = [self._proc.stderr, self._proc.stdout]
379382
while True:
380383
# Wait for at least one available fd.
381-
readable_fbs, _, _ = select.select(reads, [], [], 5)
382-
if readable_fbs is None:
384+
readable_fds, _, _ = select.select(reads, [], [], 5)
385+
if readable_fds is None:
383386
self.log.info("Waiting for DataFlow process to complete.")
384387
continue
385388

386-
# Read available fds.
387-
for readable_fb in readable_fbs:
388-
line = self._read_line_by_fd(readable_fb)
389-
if line and not job_id:
390-
job_id = job_id or self._extract_job(line)
389+
for readable_fd in readable_fds:
390+
self._process_fd(readable_fd)
391391

392-
if process_ends:
393-
break
394392
if self._proc.poll() is not None:
395-
# Mark process completion but allows its outputs to be consumed.
396-
process_ends = True
393+
break
394+
395+
# Corner case: check if more output was created between the last read and the process termination
396+
for readable_fd in reads:
397+
self._process_fd(readable_fd)
398+
399+
self.log.info("Process exited with return code: %s", self._proc.returncode)
400+
397401
if self._proc.returncode != 0:
398-
raise Exception("DataFlow failed with return code {}".format(
399-
self._proc.returncode))
400-
return job_id
402+
raise Exception("DataFlow failed with return code {}".format(self._proc.returncode))
403+
return self.job_id
401404

402405

403406
class DataflowHook(GoogleBaseHook):

tests/providers/google/cloud/hooks/test_dataflow.py

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#
1919

2020
import copy
21+
import shlex
2122
import unittest
2223
from typing import Any, Dict
2324

@@ -1026,14 +1027,83 @@ def test_dataflow_job_cancel_job(self):
10261027
mock_batch.execute.assert_called_once()
10271028

10281029

1030+
APACHE_BEAM_V_2_14_0_JAVA_SDK_LOG = f""""\
1031+
Dataflow SDK version: 2.14.0
1032+
Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run
1033+
INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow\
1034+
/jobsDetail/locations/europe-west3/jobs/{TEST_JOB_ID}?project=XXX
1035+
Submitted job: {TEST_JOB_ID}
1036+
Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run
1037+
INFO: To cancel the job using the 'gcloud' tool, run:
1038+
> gcloud dataflow jobs --project=XXX cancel --region=europe-west3 {TEST_JOB_ID}
1039+
"""
1040+
1041+
APACHE_BEAM_V_2_22_0_JAVA_SDK_LOG = f""""\
1042+
INFO: Dataflow SDK version: 2.22.0
1043+
Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
1044+
INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow\
1045+
/jobs/europe-west3/{TEST_JOB_ID}?project=XXXX
1046+
Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
1047+
INFO: Submitted job: {TEST_JOB_ID}
1048+
Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
1049+
INFO: To cancel the job using the 'gcloud' tool, run:
1050+
> gcloud dataflow jobs --project=XXX cancel --region=europe-west3 {TEST_JOB_ID}
1051+
"""
1052+
1053+
APACHE_BEAM_V_2_14_0_PYTHON_SDK_LOG = f""""\
1054+
INFO:root:Completed GCS upload to gs://test-dataflow-example/staging/start-python-job-local-5bcf3d71.\
1055+
1592286375.000962/apache_beam-2.14.0-cp37-cp37m-manylinux1_x86_64.whl in 0 seconds.
1056+
INFO:root:Create job: <Job
1057+
createTime: '2020-06-16T05:46:20.911857Z'
1058+
currentStateTime: '1970-01-01T00:00:00Z'
1059+
id: '{TEST_JOB_ID}'
1060+
location: 'us-central1'
1061+
name: 'start-python-job-local-5bcf3d71'
1062+
projectId: 'XXX'
1063+
stageStates: []
1064+
startTime: '2020-06-16T05:46:20.911857Z'
1065+
steps: []
1066+
tempFiles: []
1067+
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
1068+
INFO:root:Created job with id: [{TEST_JOB_ID}]
1069+
INFO:root:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/\
1070+
dataflow/jobsDetail/locations/us-central1/jobs/{TEST_JOB_ID}?project=XXX
1071+
"""
1072+
1073+
APACHE_BEAM_V_2_22_0_PYTHON_SDK_LOG = f""""\
1074+
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://test-dataflow-example/\
1075+
staging/start-python-job-local-5bcf3d71.1592286719.303624/apache_beam-2.22.0-cp37-cp37m-manylinux1_x86_64.whl\
1076+
in 1 seconds.
1077+
INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
1078+
createTime: '2020-06-16T05:52:04.095216Z'
1079+
currentStateTime: '1970-01-01T00:00:00Z'
1080+
id: '{TEST_JOB_ID}'
1081+
location: 'us-central1'
1082+
name: 'start-python-job-local-5bcf3d71'
1083+
projectId: 'XXX'
1084+
stageStates: []
1085+
startTime: '2020-06-16T05:52:04.095216Z'
1086+
steps: []
1087+
tempFiles: []
1088+
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
1089+
INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [{TEST_JOB_ID}]
1090+
INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: {TEST_JOB_ID}
1091+
INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please \
1092+
navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/{TEST_JOB_ID}?project=XXX
1093+
"""
1094+
1095+
10291096
class TestDataflow(unittest.TestCase):
10301097

1031-
def test_data_flow_valid_job_id(self):
1032-
cmd = [
1033-
'echo', 'additional unit test lines.\n' +
1034-
'https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/'
1035-
'jobs/{}?project=XXX'.format(TEST_JOB_ID)
1036-
]
1098+
@parameterized.expand([
1099+
(APACHE_BEAM_V_2_14_0_JAVA_SDK_LOG, ),
1100+
(APACHE_BEAM_V_2_22_0_JAVA_SDK_LOG, ),
1101+
(APACHE_BEAM_V_2_14_0_PYTHON_SDK_LOG, ),
1102+
(APACHE_BEAM_V_2_22_0_PYTHON_SDK_LOG, ),
1103+
], name_func=lambda func, num, p: f"{func.__name__}_{num}")
1104+
def test_data_flow_valid_job_id(self, log):
1105+
echos = ";".join([f"echo {shlex.quote(line)}" for line in log.split("\n")])
1106+
cmd = ["bash", "-c", echos]
10371107
self.assertEqual(_DataflowRunner(cmd).wait_for_done(), TEST_JOB_ID)
10381108

10391109
def test_data_flow_missing_job_id(self):

tests/providers/google/cloud/operators/test_dataflow_system.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,13 @@
2525
@pytest.mark.credential_file(GCP_DATAFLOW_KEY)
2626
class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
2727
@provide_gcp_context(GCP_DATAFLOW_KEY)
28-
def test_run_example_dag_function(self):
29-
self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
28+
def test_run_example_gcp_dataflow_native_java(self):
29+
self.run_dag('example_gcp_dataflow_native_java', CLOUD_DAG_FOLDER)
30+
31+
@provide_gcp_context(GCP_DATAFLOW_KEY)
32+
def test_run_example_gcp_dataflow_native_python(self):
33+
self.run_dag('example_gcp_dataflow_native_python', CLOUD_DAG_FOLDER)
34+
35+
@provide_gcp_context(GCP_DATAFLOW_KEY)
36+
def test_run_example_gcp_dataflow_template(self):
37+
self.run_dag('example_gcp_dataflow_template', CLOUD_DAG_FOLDER)

0 commit comments

Comments
 (0)