Skip to content

Commit

Permalink
Try to open device and start HTTP server before notifying the user
Browse files Browse the repository at this point in the history
We try to open the device and start the HTTP server to handle requests before
sending the startup message to the calling process.
Before this modification, the startup message was always sent before opening the disk
and the start of the HTTP server. And it could cause several invalid HTTP requests
if no server was reachable on any machine.

The goal is to quickly notify the calling process and to be sure to always have
a running server before sending requests. We try to prevent HTTP errors like
"Destination Host Unreachable".

Signed-off-by: Ronan Abhamon <[email protected]>
  • Loading branch information
Wescoeur committed Jul 31, 2024
1 parent 6dc48fd commit e56583d
Showing 1 changed file with 94 additions and 11 deletions.
105 changes: 94 additions & 11 deletions http-disk-server
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ from io import BytesIO
import argparse
import errno
import fcntl
import json
import os
import signal
import socket
Expand All @@ -51,6 +52,7 @@ BLKFLSBUF = (0x12 << 8) + 97 # Flush buffer cache.
REQ_LIMIT_SIZE = 1024 * 1024 * 128 # In bytes.

DRBD_MAJOR = 147
DRBD_BY_RES_PATH = '/dev/drbd/by-res/'

DRBD_OPEN_SLEEP = 1.0 # In seconds.

Expand All @@ -59,6 +61,7 @@ DRBD_OPEN_SLEEP = 1.0 # In seconds.
OUTPUT_PREFIX = ''

SIGTERM_RECEIVED = False
STARTING_SERVER = True

def handle_sigterm(*args):
global SIGTERM_RECEIVED
Expand Down Expand Up @@ -93,7 +96,7 @@ def is_drbd_device(path):
return False
return stat.S_ISBLK(st.st_mode) and os.major(st.st_rdev) == DRBD_MAJOR

def open_device(dev_path):
def open_device(dev_path, retry=True):
def cannot_open(e):
raise Exception('Cannot open device `{}`: `{}`.'.format(dev_path, e))

Expand All @@ -106,7 +109,7 @@ def open_device(dev_path):
except OSError as e:
if e.errno == errno.EAGAIN or e.errno == errno.EINTR:
continue
if e.errno != errno.EROFS:
if e.errno != errno.EROFS or not retry:
cannot_open(e)

if is_drbd is None:
Expand All @@ -130,9 +133,51 @@ def close_device(fd):
return
if e.errno == errno.EINTR:
continue
raise
except Exception as e:
eprint('Cannot close fd {}: `{}`.'.format(fd, e))
return

def is_openable(dev_path):
if not is_drbd_device(dev_path):
return True # Assume non-DRBD paths are always openable.

if dev_path.startswith(DRBD_BY_RES_PATH):
prefix_len = len(DRBD_BY_RES_PATH)
res_name_end = dev_path.find('/', prefix_len)
assert res_name_end != -1
resource_name = dev_path[prefix_len:res_name_end]
else:
assert False # TODO: Implement me.

try:
p = subprocess.Popen(
['drbdsetup', 'status', resource_name, '--json'],
stdout=subprocess.PIPE,
close_fds=True,
universal_newlines=True
)
except OSError:
return False # Binary not installed.

stdout, stderr = p.communicate()
if p.returncode:
return False

try:
conf = json.loads(stdout)
if not conf:
return False

for connection in conf[0]['connections']:
if connection['peer-role'] == 'Primary':
return False
except Exception as e:
eprint('Failed to read DRBD res status: `{}`.'.format(e))
return False

return True

def get_device_size(fd):
disk_capacity = -1
while True:
Expand Down Expand Up @@ -312,6 +357,14 @@ class HttpDiskServer(HTTPServer):
# Reuse old port to avoid: "Address already in use".
self.allow_reuse_address = True
HTTPServer.__init__(self, *args, **kwargs)
self.ready_event = threading.Event()

def serve_forever(self, poll_interval=0.5):
self.ready_event.set()
HTTPServer.serve_forever(self, poll_interval)

def wait_startup(self):
self.ready_event.wait()

# -----------------------------------------------------------------------------

Expand All @@ -320,12 +373,19 @@ def run_server(disk, ip, port):
if not check_bindable(port):
return

# Must be printed to notify parent processes that are waiting
# before starting the NBD server(s).
eprint('Server ready!')

# Note: `open_device` must be called after the message (never before!)
# to ensure we don't block if the device is already open somewhere.
# Note: We try to open the device and start the HTTP server to handle requests before
# sending the message below to the calling process. If we can't open the device, or if
# there is an issue during the first open call, we emit the message, because there
# is probably a running server on another machine.
#
# The goal of this function is to notify the caller and to be sure to always have
# a running server before sending requests. We try to prevent HTTP errors like
# "Destination Host Unreachable".
def emit_server_ready():
global STARTING_SERVER
if STARTING_SERVER:
eprint('Server ready!')
STARTING_SERVER = False

httpd = None
httpd_thread = None
Expand All @@ -336,14 +396,31 @@ def run_server(disk, ip, port):
break

try:
disk_fd = open_device(disk)
if STARTING_SERVER:
# It's useful in the case of a DRBD to check if the path
# is openable, mainly when this param is set:
# "DrbdOptions/Resource/auto-promote-timeout".
# In the worst case, we may be stuck in an open call for 1 min.
if not is_openable(disk):
# Emit server ready and wait for openable disk.
continue
# Try to open device without retry first and then emit ready message.
# In case of concurrent calls with DRBD, we may stuck for many seconds.
disk_fd = open_device(disk, retry=False)
else:
disk_fd = open_device(disk)
is_block_device = stat.S_ISBLK(os.fstat(disk_fd).st_mode)

HandlerClass = MakeRequestHandler(disk_fd, is_block_device)
httpd = HttpDiskServer((ip or '', port), HandlerClass)
httpd_thread = threading.Thread(target=httpd.serve_forever)
httpd_thread.start()

if STARTING_SERVER:
httpd.wait_startup()
# We emit only after effective startup.
emit_server_ready()

while not SIGTERM_RECEIVED:
signal.pause()
except KeyboardInterrupt:
Expand All @@ -353,7 +430,8 @@ def run_server(disk, ip, port):
eprint('Unhandled exception: `{}`.'.format(e))
eprint(traceback.format_exc())
eprint('Restarting server...')
time.sleep(1)
if not STARTING_SERVER:
time.sleep(1)
finally:
try:
if httpd_thread:
Expand All @@ -365,8 +443,13 @@ def run_server(disk, ip, port):
httpd = None
except Exception as e:
eprint('Failed to close server: {}.'.format(e))
close_device(disk_fd)
finally:
close_device(disk_fd)

# Make sure we notify for server startup if the device cannot be opened.
# Or in case of exception during the first open call. It can be triggered
# if we failed to open the volume due to concurrent DRBD calls.
emit_server_ready()

# ==============================================================================

Expand Down

0 comments on commit e56583d

Please sign in to comment.