Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use in-memory notifications for workers #3247

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions gunicorn/arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ def reap_workers(self):
worker = self.WORKERS.pop(wpid, None)
if not worker:
continue
worker.tmp.close()
self.cfg.child_exit(self, worker)
except OSError as e:
if e.errno != errno.ECHILD:
Expand Down Expand Up @@ -596,10 +595,6 @@ def spawn_worker(self):
self.WORKERS[pid] = worker
return pid

# Do not inherit the temporary files of other workers
for sibling in self.WORKERS.values():
sibling.tmp.close()

# Process Child
worker.pid = os.getpid()
try:
Expand All @@ -624,7 +619,6 @@ def spawn_worker(self):
finally:
self.log.info("Worker exiting (pid: %s)", worker.pid)
try:
worker.tmp.close()
self.cfg.worker_exit(self, worker)
except Exception:
self.log.warning("Exception during worker exit:\n%s",
Expand Down Expand Up @@ -664,7 +658,6 @@ def kill_worker(self, pid, sig):
if e.errno == errno.ESRCH:
try:
worker = self.WORKERS.pop(pid)
worker.tmp.close()
self.cfg.worker_exit(self, worker)
return
except (KeyError, OSError):
Expand Down
22 changes: 0 additions & 22 deletions gunicorn/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,28 +1113,6 @@ class Pidfile(Setting):
"""


class WorkerTmpDir(Setting):
name = "worker_tmp_dir"
section = "Server Mechanics"
cli = ["--worker-tmp-dir"]
meta = "DIR"
validator = validate_string
default = None
desc = """\
A directory to use for the worker heartbeat temporary file.

If not set, the default temporary directory will be used.

.. note::
The current heartbeat system involves calling ``os.fchmod`` on
temporary file handlers and may block a worker for arbitrary time
if the directory is on a disk-backed filesystem.

See :ref:`blocking-os-fchmod` for more detailed information
and a solution for avoiding this problem.
"""


class User(Setting):
name = "user"
section = "Server Mechanics"
Expand Down
3 changes: 1 addition & 2 deletions gunicorn/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(self, age, ppid, sockets, app, timeout, cfg, log):

self.alive = True
self.log = log
self.tmp = WorkerTmp(cfg)
self.tmp = WorkerTmp()

def __str__(self):
return "<Worker %s>" % self.pid
Expand Down Expand Up @@ -109,7 +109,6 @@ def init_process(self):
# Prevent fd inheritance
for s in self.sockets:
util.close_on_exec(s)
util.close_on_exec(self.tmp.fileno())

self.wait_fds = self.sockets + [self.PIPE[0]]

Expand Down
46 changes: 5 additions & 41 deletions gunicorn/workers/workertmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,16 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.

import os
import multiprocessing
import time
import platform
import tempfile

from gunicorn import util

PLATFORM = platform.system()
IS_CYGWIN = PLATFORM.startswith('CYGWIN')


class WorkerTmp(object):

def __init__(self, cfg):
old_umask = os.umask(cfg.umask)
fdir = cfg.worker_tmp_dir
if fdir and not os.path.isdir(fdir):
raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir)
fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir)
os.umask(old_umask)

# change the owner and group of the file if the worker will run as
# a different user or group, so that the worker can modify the file
if cfg.uid != os.geteuid() or cfg.gid != os.getegid():
util.chown(name, cfg.uid, cfg.gid)

# unlink the file so we don't leak temporary files
try:
if not IS_CYGWIN:
util.unlink(name)
# In Python 3.8, open() emits RuntimeWarning if buffering=1 for binary mode.
# Because we never write to this file, pass 0 to switch buffering off.
self._tmp = os.fdopen(fd, 'w+b', 0)
except Exception:
os.close(fd)
raise
def __init__(self):
self._val = multiprocessing.Value('d', lock=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if (in the O_TMPFILE and in the shm case) converting to floating point and back is needlessly weird.. We are reading the clock as a nanosecond-precision unsigned integer, are we not?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By not using the _ns variant we ask for it. No bug, just.. why ask for binary64 (@d) to store uint64 (=Q)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'd' is a double floating point encoding, not binary64. I'm not sure what you mean by asking for it.


def notify(self):
new_time = time.monotonic()
os.utime(self._tmp.fileno(), (new_time, new_time))
self._val.value = time.monotonic()

def last_update(self):
return os.fstat(self._tmp.fileno()).st_mtime

def fileno(self):
return self._tmp.fileno()

def close(self):
return self._tmp.close()
return self._val.value
1 change: 0 additions & 1 deletion tests/test_arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def test_arbiter_reap_workers(mock_os_waitpid):
mock_worker = mock.Mock()
arbiter.WORKERS = {42: mock_worker}
arbiter.reap_workers()
mock_worker.tmp.close.assert_called_with()
arbiter.cfg.child_exit.assert_called_with(arbiter, mock_worker)


Expand Down