Skip to content

Commit

Permalink
Less aggressive eager upgrade of requirements (#8267)
Browse files Browse the repository at this point in the history
With this change requirements are only eagerly upgraded when
generating requirements when setup.py changes. They are also
eagerly upgraded when you run ./breeze generate-requirements
locally. Still the cron job will use the eager update mechanism
when building the docker image which means that CRON jobs will
still detect cases where upgrede of requirements causes failure
either at the installation time or during tests.
  • Loading branch information
potiuk committed Apr 13, 2020
1 parent 0a1dc27 commit 45c8983
Show file tree
Hide file tree
Showing 14 changed files with 56 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ COPY requirements/requirements-python${PYTHON_MAJOR_MINOR_VERSION}.txt \
# But in cron job we will install latest versions matching setup.py to see if there is no breaking change
RUN \
if [[ "${UPGRADE_TO_LATEST_REQUIREMENTS}" == "true" ]]; then \
pip install -e ".[${AIRFLOW_EXTRAS}]" --upgrade; \
pip install -e ".[${AIRFLOW_EXTRAS}]" --upgrade --upgrade-strategy eager; \
else \
pip install -e ".[${AIRFLOW_EXTRAS}]" \
--constraint ${AIRFLOW_SOURCES}/requirements/requirements-python${PYTHON_MAJOR_MINOR_VERSION}.txt ; \
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ def __init__(

self._description = description
# set file location to caller source path
self.fileloc = sys._getframe().f_back.f_code.co_filename
back = sys._getframe().f_back
self.fileloc = back.f_code.co_filename if back else ""
self.task_dict: Dict[str, BaseOperator] = dict()

# set timezone from start_date
Expand Down
9 changes: 5 additions & 4 deletions airflow/providers/apache/pinot/hooks/pinot.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,11 @@ def run_cli(self, cmd: list, verbose: Optional[bool] = True):
env=env)

stdout = ""
for line in iter(sub_process.stdout.readline, b''):
stdout += line.decode("utf-8")
if verbose:
self.log.info(line.decode("utf-8").strip())
if sub_process.stdout:
for line in iter(sub_process.stdout.readline, b''):
stdout += line.decode("utf-8")
if verbose:
self.log.info(line.decode("utf-8").strip())

sub_process.wait()

Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/hooks/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,8 @@ def start_proxy(self) -> None:
stdin=PIPE, stdout=PIPE, stderr=PIPE)
self.log.info("The pid of cloud_sql_proxy: %s", self.sql_proxy_process.pid)
while True:
line = self.sql_proxy_process.stderr.readline().decode('utf-8')
line = self.sql_proxy_process.stderr.readline().decode('utf-8') \
if self.sql_proxy_process.stderr else ""
return_code = self.sql_proxy_process.poll()
if line == '' and return_code is not None:
self.sql_proxy_process = None
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/hooks/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ def wait_for_done(self) -> Optional[str]:
:return: Job id
:rtype: Optional[str]
"""
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
reads = [self._proc.stderr.fileno() if self._proc.stderr else 0,
self._proc.stdout.fileno() if self._proc.stdout else 0]
self.log.info("Start waiting for DataFlow process to complete.")
job_id = None
# Make sure logs are processed regardless whether the subprocess is
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/google/cloud/operators/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,9 @@ def execute(self, context: Dict):
close_fds=True
)
self.log.info("Process output:")
for line in iter(process.stdout.readline, b''):
self.log.info(line.decode(self.output_encoding).rstrip())
if process.stdout:
for line in iter(process.stdout.readline, b''):
self.log.info(line.decode(self.output_encoding).rstrip())

process.wait()
if process.returncode > 0:
Expand Down
4 changes: 3 additions & 1 deletion airflow/security/kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def renew_from_kt(principal: str, keytab: str):
if subp.returncode != 0:
log.error(
"Couldn't reinit from keytab! `kinit' exited with %s.\n%s\n%s",
subp.returncode, "\n".join(subp.stdout.readlines()), "\n".join(subp.stderr.readlines())
subp.returncode,
"\n".join(subp.stdout.readlines() if subp.stdout else []),
"\n".join(subp.stderr.readlines() if subp.stderr else [])
)
sys.exit(subp.returncode)

Expand Down
7 changes: 4 additions & 3 deletions airflow/utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ def execute_in_subprocess(cmd: List[str]):
close_fds=True
)
log.info("Output:")
with proc.stdout:
for line in iter(proc.stdout.readline, b''):
log.info("%s", line.decode().rstrip())
if proc.stdout:
with proc.stdout:
for line in iter(proc.stdout.readline, b''):
log.info("%s", line.decode().rstrip())

exit_code = proc.wait()
if exit_code != 0:
Expand Down
12 changes: 6 additions & 6 deletions requirements/requirements-python3.6.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ cloudant==2.12.0
cloudpickle==1.3.0
colorama==0.4.3
colorlog==4.0.2
coverage==5.0.4
coverage==5.1
croniter==0.3.31
cryptography==2.9
cx-Oracle==7.3.0
Expand Down Expand Up @@ -182,7 +182,7 @@ jmespath==0.9.5
json-merge-patch==0.2
jsondiff==1.1.2
jsonpatch==1.25
jsonpickle==1.3
jsonpickle==1.4
jsonpointer==2.0
jsonschema==3.2.0
jupyter-client==6.1.2
Expand All @@ -205,15 +205,15 @@ msrest==0.6.13
msrestazure==0.6.3
multi-key-dict==2.0.3
mypy-extensions==0.4.3
mypy==0.740
mypy==0.770
mysql-connector-python==8.0.18
mysqlclient==1.3.14
nbclient==0.2.0
nbformat==5.0.5
nest-asyncio==1.3.2
networkx==2.4
nodeenv==1.3.5
nteract-scrapbook==0.3.1
nteract-scrapbook==0.4.1
ntlm-auth==1.4.0
numpy==1.18.2
oauthlib==3.1.0
Expand All @@ -222,7 +222,7 @@ packaging==20.3
pandas-gbq==0.13.1
pandas==1.0.3
papermill==2.1.0
parameterized==0.7.1
parameterized==0.7.3
paramiko==2.7.1
parso==0.6.2
pathspec==0.8.0
Expand Down Expand Up @@ -349,7 +349,7 @@ websocket-client==0.57.0
wrapt==1.12.1
xmltodict==0.12.0
yamllint==1.21.0
yandexcloud==0.31.0
yandexcloud==0.32.0
zdesk==2.7.1
zict==2.0.0
zipp==3.1.0
12 changes: 6 additions & 6 deletions requirements/requirements-python3.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ cloudant==2.12.0
cloudpickle==1.3.0
colorama==0.4.3
colorlog==4.0.2
coverage==5.0.4
coverage==5.1
croniter==0.3.31
cryptography==2.9
cx-Oracle==7.3.0
Expand Down Expand Up @@ -181,7 +181,7 @@ jmespath==0.9.5
json-merge-patch==0.2
jsondiff==1.1.2
jsonpatch==1.25
jsonpickle==1.3
jsonpickle==1.4
jsonpointer==2.0
jsonschema==3.2.0
jupyter-client==6.1.2
Expand All @@ -204,15 +204,15 @@ msrest==0.6.13
msrestazure==0.6.3
multi-key-dict==2.0.3
mypy-extensions==0.4.3
mypy==0.740
mypy==0.770
mysql-connector-python==8.0.18
mysqlclient==1.3.14
nbclient==0.2.0
nbformat==5.0.5
nest-asyncio==1.3.2
networkx==2.4
nodeenv==1.3.5
nteract-scrapbook==0.3.1
nteract-scrapbook==0.4.1
ntlm-auth==1.4.0
numpy==1.18.2
oauthlib==3.1.0
Expand All @@ -221,7 +221,7 @@ packaging==20.3
pandas-gbq==0.13.1
pandas==1.0.3
papermill==2.1.0
parameterized==0.7.1
parameterized==0.7.3
paramiko==2.7.1
parso==0.6.2
pathspec==0.8.0
Expand Down Expand Up @@ -346,7 +346,7 @@ websocket-client==0.57.0
wrapt==1.12.1
xmltodict==0.12.0
yamllint==1.21.0
yandexcloud==0.31.0
yandexcloud==0.32.0
zdesk==2.7.1
zict==2.0.0
zipp==3.1.0
1 change: 1 addition & 0 deletions requirements/setup-3.6.md5
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1a00e376586cbebf4184d1b9adfb6d7f /opt/airflow/setup.py
1 change: 1 addition & 0 deletions requirements/setup-3.7.md5
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1a00e376586cbebf4184d1b9adfb6d7f /opt/airflow/setup.py
2 changes: 1 addition & 1 deletion scripts/ci/_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ function initialize_common_environment {

# upgrade while generating requirements should only happen in localy run
# pre-commits or in cron job
if [[ ${LOCAL_RUN} == "true" || "${TRAVIS_EVENT_TYPE:=}" == "cron" ]]; then
if [[ ${LOCAL_RUN} == "true" ]]; then
export UPGRADE_WHILE_GENERATING_REQUIREMENTS="true"
else
export UPGRADE_WHILE_GENERATING_REQUIREMENTS="false"
Expand Down
19 changes: 19 additions & 0 deletions scripts/ci/in_container/run_generate_requirements.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ HANDLERS="$( trap -p EXIT | cut -f2 -d \' )"
# shellcheck disable=SC2064
trap "${HANDLERS}${HANDLERS:+;}in_container_fix_ownership" EXIT

STORED_SETUP_PY_HASH_FILE="${AIRFLOW_SOURCES}/requirements/setup-${PYTHON_MAJOR_MINOR_VERSION}.md5"

CURRENT_SETUP_PY_HASH=$(md5sum "${AIRFLOW_SOURCES}/setup.py")
STORED_SETUP_PY_HASH=$(cat "${STORED_SETUP_PY_HASH_FILE}" 2>/dev/null || true)

if [[ ${STORED_SETUP_PY_HASH} != "${CURRENT_SETUP_PY_HASH}" ]]; then
echo
echo "Setup.py changed since last time requirements were generated"
echo
echo "Switching to eager update strategy for the requirements"
echo
UPGRADE_WHILE_GENERATING_REQUIREMENTS="true"
fi

# Upgrading requirements will happen only in CRON job to see that we have some
# new requirements released
if [[ ${UPGRADE_WHILE_GENERATING_REQUIREMENTS} == "true" ]]; then
Expand Down Expand Up @@ -52,6 +66,11 @@ echo
echo "Requirements generated in ${GENERATED_REQUIREMENTS_FILE}"
echo

echo
echo "Storing setup.py hash in ${STORED_SETUP_PY_HASH_FILE}"
echo
echo "${CURRENT_SETUP_PY_HASH}" > "${STORED_SETUP_PY_HASH_FILE}"

set +e
# Fail in case diff shows difference
diff --color=always "${OLD_REQUIREMENTS_FILE}" "${GENERATED_REQUIREMENTS_FILE}"
Expand Down

0 comments on commit 45c8983

Please sign in to comment.