Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix processor cleanup on DagFileProcessorManager #22685

Merged
merged 16 commits into from
Apr 6, 2022
Merged

Fix processor cleanup on DagFileProcessorManager #22685

merged 16 commits into from
Apr 6, 2022

Conversation

pcolladosoto
Copy link
Contributor

This PR fixes the cleanup procedures of the DAGFileProcessors being spawned by the DagFileProcessorManager instance. Thus, this PR closes #22191.

We came across this bug when encountering a timeout triggering the forceful killing of these DAGFileProcessors.

We have had somewhat lengthy discussion on an issue (#22191) which we encourage anyone to read for some more insight into the cause, discovery and posterior fix. People on that thread were extremely helpful! 馃樃

We have not included tests because we're unsure of how to test a behaviour such as this one. If pointed in the right direction we would be more than happy to add them. We can say however that we applied these changes on our own production Airflow instance and we haven't encountered the issue ever since.

On top of that we would be more than welcome if you made any suggestions to the code: for instance, when cleaning up a dictionary we're iterating over we decided to take note of the problematic DAGFileProcessors to then remove them on a second for loop. Our background in programming is much stronger on some other languages and so we feel really uncomfortable pushing Python 'to the limit' in terms of relying on its implementation to make design choices. If there's anything that can be done better by all means say so.

Another fertile topic for discussion is how to wait() for the processes being killed through the SIGKILL signal. This has been brought up by @dlesco on #22191 and we agree with him on adding an optional timeout to the operation to avoid blocking in very bizarre circumstances (which the current solution would do). However, we decided to contribute our initial approach and then iterate on solutions within the PR.

Thanks a lot for letting me contribute to a tool with the expertise and size of Airflow: it's truly an honour.

Hope to make the merge as seamless as possible 馃槣

closes: #22191

References to processors weren't being cleaned up after
killing them in the event of a timeout. This lead to
a crash caused by an unhandled exception when trying to
read from a closed end of a pipe.
@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label Apr 1, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 1, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it鈥檚 a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 馃殌.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

When calling `_kill_process()` we're generating
zombies which weren't being `wait()`ed for. This
led to a process leak we fix by just calling
`waitpid()` on the appropriate PIDs.
@@ -231,6 +231,9 @@ def _kill_process(self) -> None:
if self._process.is_alive() and self._process.pid:
self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)

# Reap the spawned zombie
os.waitpid(self._process.pid, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed there might be a short time between even SIGKILL gets processed so waiting here makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there's anything here that could be racy, but there are some details here: https://bugs.python.org/issue42558.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ough. Very interesting . So it seems that this one (or similar) is safer (ad waitpid might crash for Python 3.9+) :)

Do I read it right ?

while self._process.poll() is None:
    sleep(0.001)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is how I read it too.

@pcolladosoto you wanna make that change? I don't think I had anything else that needed changing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @malthe! I just pushed the changes: I decided to import the time module so that I could call time.sleep() within the while loop. Hope I made the right choice... You can check out the changes on 27502be. Thanks a lot for the input! This is something that I'm sure will come in handy for future projects 馃樃

@potiuk
Copy link
Member

potiuk commented Apr 1, 2022

This looks really great fix. @ashb @uranusjr @malthe -> the usual suspectcs :). can you also take a look please.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Apr 1, 2022
@github-actions
Copy link

github-actions bot commented Apr 1, 2022

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

According to @potiuk's and @malthe's input, the way
we were reaping the zombies could cause some racy and
unwanted situations. As seen on the discussion over at
`https://bugs.python.org/issue42558` we can safely
reap the spawned zombies with the changes we have
introduced.
pcolladosoto and others added 2 commits April 2, 2022 21:15
As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now...

Co-authored-by: Jarek Potiuk <[email protected]>
After accepting the changes proposed on the PR
we found a small typo (we make those on a daily basis)
and a trailing whitespace we though was nice to delete.
Hope we made the right choice!
pcolladosoto and others added 7 commits April 3, 2022 18:28
We were calling `poll()` through the `_process` attribute
and, as shown on the static checks triggered by GitHub,
it's not defined for the `BaseProcess` class. We instead
have to call `poll()` through `BaseProcess`'s `_popen`
attribute.
References to processors weren't being cleaned up after
killing them in the event of a timeout. This lead to
a crash caused by an unhandled exception when trying to
read from a closed end of a pipe.
When calling `_kill_process()` we're generating
zombies which weren't being `wait()`ed for. This
led to a process leak we fix by just calling
`waitpid()` on the appropriate PIDs.
According to @potiuk's and @malthe's input, the way
we were reaping the zombies could cause some racy and
unwanted situations. As seen on the discussion over at
`https://bugs.python.org/issue42558` we can safely
reap the spawned zombies with the changes we have
introduced.
As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now...

Co-authored-by: Jarek Potiuk <[email protected]>
After accepting the changes proposed on the PR
we found a small typo (we make those on a daily basis)
and a trailing whitespace we though was nice to delete.
Hope we made the right choice!
We were calling `poll()` through the `_process` attribute
and, as shown on the static checks triggered by GitHub,
it's not defined for the `BaseProcess` class. We instead
have to call `poll()` through `BaseProcess`'s `_popen`
attribute.
@pcolladosoto
Copy link
Contributor Author

Hi @potiuk! Is there anything else that needs to be done on my end? 馃

Thanks for your time!

@potiuk
Copy link
Member

potiuk commented Apr 4, 2022

I guess fixing static checks and tests failing since you asked. But I think GitHub already told you so by all the notificaitons and you can see it in the red status of the PR.

Some changes have been force pushed!
After reading through `multiprocessing`'s implementation we
really didn't know why the static check on line `239` was
failing: the process should contain a `_popen` attribute...
That's when we found line `223` and discovered the trailing
`# type: ignore` comment. After reading up on it we found
that it instructs *MyPy* not to statically check that very
line. Given we're having trouble with the exact same attribute
we decided to include the same directive for the static checker.
Hope we made the right call!
We hadn't updated the tests for the method whose
body we've altered. This caused the tests to fail
when trying to retrieve a processor's *waitable*,
a property similar to a *file descriptor* in
UNIX-like systems. We have added a mock property to
the `processor` and we've also updated the `manager`'s
attributes so as to faithfully recreate the state of
the data sctructures at a moment when a `processor`
is to be terminated.

Please note the `assertions` at the end are meant to
check we reach the `manager`'s expected state. We have
chosen to check the number of processor's against an
explicit value because we're defining `manager._processors`
explicitly within the test. On the other hand, `manager.waitables`
can have a different length depending on the call to
`DagFileProcessorManager`'s `__init__()`. In this test the
expected initial length is `1` given we're passing `MagicMock()`
as the `signal_conn` when instantiating the manager. However,
if this were to be changed the tests would 'inexplicably' fail.
Instead of checking `manager.waitables`' length against a hardcoded
value we decided to instead compare it to its initial length
so as to emphasize we're interested in the change in length, not
its absolute value.
One of the methods we are to mock required a rather
long `@mock.patch` decorator which didn't pass the
checks made by `black` on the precommit hooks. On
top of that, we messed up the ordering of the
`@mock.patch` decorators which meant we didn't
set them up properly. This manifested as a `KeyError`
on the method we're currently testing. O_o
@potiuk potiuk merged commit 4a06f89 into apache:main Apr 6, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 6, 2022

Awesome work, congrats on your first merged pull request!

@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Apr 7, 2022
@ephraimbuddy ephraimbuddy added this to the Airflow 2.2.6 milestone Apr 7, 2022
@eladkal eladkal modified the milestones: Airflow 2.2.6, Airflow 2.3.0 Apr 12, 2022
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Aug 30, 2022
Fix processor cleanup on DagFileProcessorManager - cherrypick from
Community apache/airflow#22685

Change-Id: I7e505840325b5f61bd96238424caedf9f9afe19e
GitOrigin-RevId: 9e20cd0bf3bd2fa67115b1d4a81a6e2009e49936
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Aug 30, 2022
Fix processor cleanup on DagFileProcessorManager - cherrypick from
Community apache/airflow#22685

Change-Id: Iaebb1431f78b220d444810ba9f0c854186d7e07b
GitOrigin-RevId: 9972c3d38fee1bbfd7216a1db97d1b17bb88e7d3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues full tests needed We need to run full set of tests for this PR to merge type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

dag_processing code needs to handle OSError("handle is closed") in poll() and recv() calls
6 participants