diff --git a/http-disk-server b/http-disk-server index 36d912c..ca8dbbe 100755 --- a/http-disk-server +++ b/http-disk-server @@ -35,6 +35,7 @@ from io import BytesIO import argparse import errno import fcntl +import json import os import signal import socket @@ -51,6 +52,7 @@ BLKFLSBUF = (0x12 << 8) + 97 # Flush buffer cache. REQ_LIMIT_SIZE = 1024 * 1024 * 128 # In bytes. DRBD_MAJOR = 147 +DRBD_BY_RES_PATH = '/dev/drbd/by-res/' DRBD_OPEN_SLEEP = 1.0 # In seconds. @@ -59,6 +61,7 @@ DRBD_OPEN_SLEEP = 1.0 # In seconds. OUTPUT_PREFIX = '' SIGTERM_RECEIVED = False +STARTING_SERVER = True def handle_sigterm(*args): global SIGTERM_RECEIVED @@ -93,7 +96,7 @@ def is_drbd_device(path): return False return stat.S_ISBLK(st.st_mode) and os.major(st.st_rdev) == DRBD_MAJOR -def open_device(dev_path): +def open_device(dev_path, retry=True): def cannot_open(e): raise Exception('Cannot open device `{}`: `{}`.'.format(dev_path, e)) @@ -106,7 +109,7 @@ def open_device(dev_path): except OSError as e: if e.errno == errno.EAGAIN or e.errno == errno.EINTR: continue - if e.errno != errno.EROFS: + if e.errno != errno.EROFS or not retry: cannot_open(e) if is_drbd is None: @@ -130,9 +133,51 @@ def close_device(fd): return if e.errno == errno.EINTR: continue + raise + except Exception as e: eprint('Cannot close fd {}: `{}`.'.format(fd, e)) return +def is_openable(dev_path): + if not is_drbd_device(dev_path): + return True # Assume non-DRBD paths are always openable. + + if dev_path.startswith(DRBD_BY_RES_PATH): + prefix_len = len(DRBD_BY_RES_PATH) + res_name_end = dev_path.find('/', prefix_len) + assert res_name_end != -1 + resource_name = dev_path[prefix_len:res_name_end] + else: + assert False # TODO: Implement me. + + try: + p = subprocess.Popen( + ['drbdsetup', 'status', resource_name, '--json'], + stdout=subprocess.PIPE, + close_fds=True, + universal_newlines=True + ) + except OSError: + return False # Binary not installed. + + stdout, stderr = p.communicate() + if p.returncode: + return False + + try: + conf = json.loads(stdout) + if not conf: + return False + + for connection in conf[0]['connections']: + if connection['peer-role'] == 'Primary': + return False + except Exception as e: + eprint('Failed to read DRBD res status: `{}`.'.format(e)) + return False + + return True + def get_device_size(fd): disk_capacity = -1 while True: @@ -312,6 +357,14 @@ class HttpDiskServer(HTTPServer): # Reuse old port to avoid: "Address already in use". self.allow_reuse_address = True HTTPServer.__init__(self, *args, **kwargs) + self.ready_event = threading.Event() + + def serve_forever(self, poll_interval=0.5): + self.ready_event.set() + HTTPServer.serve_forever(self, poll_interval) + + def wait_startup(self): + self.ready_event.wait() # ----------------------------------------------------------------------------- @@ -320,12 +373,19 @@ def run_server(disk, ip, port): if not check_bindable(port): return - # Must be printed to notify parent processes that are waiting - # before starting the NBD server(s). - eprint('Server ready!') - - # Note: `open_device` must be called after the message (never before!) - # to ensure we don't block if the device is already open somewhere. + # Note: We try to open the device and start the HTTP server to handle requests before + # sending the message below to the calling process. If we can't open the device, or if + # there is an issue during the first open call, we emit the message, because there + # is probably a running server on another machine. + # + # The goal of this function is to notify the caller and to be sure to always have + # a running server before sending requests. We try to prevent HTTP errors like + # "Destination Host Unreachable". + def emit_server_ready(): + global STARTING_SERVER + if STARTING_SERVER: + eprint('Server ready!') + STARTING_SERVER = False httpd = None httpd_thread = None @@ -336,7 +396,19 @@ def run_server(disk, ip, port): break try: - disk_fd = open_device(disk) + if STARTING_SERVER: + # It's useful in the case of a DRBD to check if the path + # is openable, mainly when this param is set: + # "DrbdOptions/Resource/auto-promote-timeout". + # In the worst case, we may be stuck in an open call for 1 min. + if not is_openable(disk): + # Emit server ready and wait for openable disk. + continue + # Try to open device without retry first and then emit ready message. + # In case of concurrent calls with DRBD, we may stuck for many seconds. + disk_fd = open_device(disk, retry=False) + else: + disk_fd = open_device(disk) is_block_device = stat.S_ISBLK(os.fstat(disk_fd).st_mode) HandlerClass = MakeRequestHandler(disk_fd, is_block_device) @@ -344,6 +416,11 @@ def run_server(disk, ip, port): httpd_thread = threading.Thread(target=httpd.serve_forever) httpd_thread.start() + if STARTING_SERVER: + httpd.wait_startup() + # We emit only after effective startup. + emit_server_ready() + while not SIGTERM_RECEIVED: signal.pause() except KeyboardInterrupt: @@ -353,7 +430,8 @@ def run_server(disk, ip, port): eprint('Unhandled exception: `{}`.'.format(e)) eprint(traceback.format_exc()) eprint('Restarting server...') - time.sleep(1) + if not STARTING_SERVER: + time.sleep(1) finally: try: if httpd_thread: @@ -365,8 +443,13 @@ def run_server(disk, ip, port): httpd = None except Exception as e: eprint('Failed to close server: {}.'.format(e)) - close_device(disk_fd) + finally: + close_device(disk_fd) + # Make sure we notify for server startup if the device cannot be opened. + # Or in case of exception during the first open call. It can be triggered + # if we failed to open the volume due to concurrent DRBD calls. + emit_server_ready() # ==============================================================================