Skip to content

Commit

Permalink
black formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
pattonw committed Oct 25, 2024
1 parent fa8a696 commit 49fc711
Show file tree
Hide file tree
Showing 23 changed files with 1 addition and 108 deletions.
3 changes: 0 additions & 3 deletions daisy/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class Block(Freezable):
"""

def __init__(self, total_roi, read_roi, write_roi, block_id=None, task_id=None):

self.read_roi = read_roi
self.write_roi = write_roi
self.requested_write_roi = write_roi # save original write_roi
Expand All @@ -83,7 +82,6 @@ def __init__(self, total_roi, read_roi, write_roi, block_id=None, task_id=None):
self.freeze()

def copy(self):

return copy.deepcopy(self)

def __compute_block_id(self, total_roi, write_roi, shift=None):
Expand All @@ -95,7 +93,6 @@ def __compute_block_id(self, total_roi, write_roi, shift=None):
return block_id

def __repr__(self):

return "%s/%d with read ROI %s and write ROI %s" % (
self.block_id[0],
self.block_id[1],
Expand Down
4 changes: 0 additions & 4 deletions daisy/block_bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@


class BlockLog:

def __init__(self, block, stream):
self.block = block
self.stream = stream
self.time_sent = time.time()


class BlockBookkeeper:

def __init__(self, processing_timeout=None):
self.processing_timeout = processing_timeout
self.sent_blocks = {}
Expand Down Expand Up @@ -68,7 +66,6 @@ def get_lost_blocks(self):

lost_block_ids = []
for block_id, log in self.sent_blocks.items():

# is the stream to the client still alive?
if log.stream.closed():
lost_block_ids.append(block_id)
Expand All @@ -80,7 +77,6 @@ def get_lost_blocks(self):

lost_blocks = []
for block_id in lost_block_ids:

lost_block = self.sent_blocks[block_id].block
lost_blocks.append(lost_block)
del self.sent_blocks[block_id]
Expand Down
7 changes: 0 additions & 7 deletions daisy/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def compute_level_conflicts(self):
prev_level_offset = None

for level, level_offset in enumerate(self.level_offsets):

# get conflicts to previous level
if prev_level_offset is not None and self.read_write_conflict:
conflict_offsets = self.get_conflict_offsets(
Expand All @@ -121,13 +120,11 @@ def compute_level_conflicts(self):
return level_conflict_offsets

def create_dependency_graph(self):

blocks = []

for level_offset, level_conflicts in zip(
self.level_offsets, self.level_conflicts
):

# all block offsets of the current level (relative to total ROI
# start)
block_dim_offsets = [
Expand Down Expand Up @@ -237,7 +234,6 @@ def get_conflict_offsets(self, level_offset, prev_level_offset, level_stride):
return conflict_offsets

def enumerate_dependencies(self, conflict_offsets, block_offsets):

inclusion_criteria = {
"valid": lambda b: self.total_roi.contains(b.read_roi),
"overhang": lambda b: self.total_roi.contains(b.write_roi.get_begin()),
Expand All @@ -253,7 +249,6 @@ def enumerate_dependencies(self, conflict_offsets, block_offsets):
blocks = []

for block_offset in block_offsets:

# create a block shifted by the current offset
block = Block(
self.total_roi,
Expand All @@ -269,7 +264,6 @@ def enumerate_dependencies(self, conflict_offsets, block_offsets):
# get all blocks in conflict with the current block
conflicts = []
for conflict_offset in conflict_offsets:

conflict = Block(
self.total_roi,
block.read_roi + conflict_offset,
Expand All @@ -289,7 +283,6 @@ def enumerate_dependencies(self, conflict_offsets, block_offsets):
return blocks

def shrink_possible(self, block):

if not self.total_roi.contains(block.write_roi.get_begin()):
return False

Expand Down
11 changes: 0 additions & 11 deletions daisy/cl_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@ def emit(self, record):


class TaskSummary:

def __init__(self, state):

self.block_failures = []
self.state = state


class BlockFailure:

def __init__(self, block, exception, worker_id):
self.block = block
self.exception = exception
Expand All @@ -49,7 +46,6 @@ def __repr__(self):


class CLMonitor(ServerObserver):

def __init__(self, server):
super().__init__(server)
self.progresses = {}
Expand All @@ -72,7 +68,6 @@ def _wrap_logging_handlers(self):
logger.handlers[i] = TqdmLoggingHandler(logger.handlers[i])

def _is_tty_stream_handler(self, handler):

return (
hasattr(handler, "stream")
and hasattr(handler.stream, "isatty")
Expand All @@ -86,18 +81,15 @@ def on_release_block(self, task_id, task_state):
self._update_state(task_id, task_state)

def on_block_failure(self, block, exception, context):

task_id = block.block_id[0]
self.summaries[task_id].block_failures.append(
BlockFailure(block, exception, context["worker_id"])
)

def on_task_start(self, task_id, task_state):

self.summaries[task_id] = TaskSummary(task_state)

def on_task_done(self, task_id, task_state):

if task_id not in self.summaries:
self.summaries[task_id] = TaskSummary(task_state)
else:
Expand All @@ -114,7 +106,6 @@ def on_task_done(self, task_id, task_state):
self.progresses[task_id].close()

def on_server_exit(self):

for task_id, progress in self.progresses.items():
progress.close()

Expand All @@ -125,7 +116,6 @@ def on_server_exit(self):
max_entries = 100

for task_id, summary in self.summaries.items():

num_block_failures = len(summary.block_failures)

print()
Expand Down Expand Up @@ -166,7 +156,6 @@ def on_server_exit(self):
print(" all blocks processed successfully")

def _update_state(self, task_id, task_state):

if task_id not in self.progresses:
total = task_state.total_block_count
self.progresses[task_id] = tqdm_auto(
Expand Down
11 changes: 0 additions & 11 deletions daisy/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,18 @@


class Context:

ENV_VARIABLE = "DAISY_CONTEXT"

def __init__(self, **kwargs):

self.__dict = dict(logdir=get_log_basedir(), **kwargs)

def copy(self):

return copy.deepcopy(self)

def to_env(self):

return ":".join("%s=%s" % (k, v) for k, v in self.__dict.items())

def __setitem__(self, k, v):

k = str(k)
v = str(v)

Expand All @@ -35,26 +30,20 @@ def __setitem__(self, k, v):
self.__dict[k] = v

def __getitem__(self, k):

return self.__dict[k]

def get(self, k, v=None):

return self.__dict.get(k, v)

def __repr__(self):

return self.to_env()

@staticmethod
def from_env():

try:

tokens = os.environ[Context.ENV_VARIABLE].split(":")

except KeyError:

logger.error("%s environment variable not found!", Context.ENV_VARIABLE)
raise

Expand Down
5 changes: 0 additions & 5 deletions daisy/dependency_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ def _num_level_blocks(self, level):
return num_blocks

def level_blocks(self, level):

for block_offset in self._compute_level_block_offsets(level):
block = Block(
self.total_read_roi,
Expand Down Expand Up @@ -303,7 +302,6 @@ def upstream(self, block):
return conflicts

def enumerate_all_dependencies(self):

self._level_block_offsets = self.compute_level_block_offsets()

for level in range(self.num_levels):
Expand Down Expand Up @@ -407,7 +405,6 @@ def compute_level_conflicts(self) -> List[List[Coordinate]]:
prev_level_offset = None

for level, level_offset in enumerate(self._level_offsets):

# get conflicts to previous level
if prev_level_offset is not None and self.read_write_conflict:
conflict_offsets: List[Coordinate] = self.get_conflict_offsets(
Expand Down Expand Up @@ -451,7 +448,6 @@ def compute_level_block_offsets(self) -> List[List[Coordinate]]:
level_block_offsets = []

for level in range(self.num_levels):

level_block_offsets.append(list(self._compute_level_block_offsets(level)))

return level_block_offsets
Expand Down Expand Up @@ -481,7 +477,6 @@ def get_offsets(op, ls):
return conflict_offsets

def shrink_possible(self, block):

return self.total_write_roi.contains(block.write_roi.begin)

def shrink(self, block):
Expand Down
1 change: 0 additions & 1 deletion daisy/freezable.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
class Freezable(object):

__isfrozen = False

def __setattr__(self, key, value):
Expand Down
2 changes: 0 additions & 2 deletions daisy/messages/block_failed.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@


class BlockFailed(ClientException):

def __init__(self, exception, block, context):

super().__init__(exception, context)
self.block = block
1 change: 0 additions & 1 deletion daisy/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def acquire_block(self, task_id):
while True:
block = self.task_queues[task_id].get_next()
if block is not None:

# update states
self.task_states[task_id].ready_count -= 1
self.task_states[task_id].processing_count += 1
Expand Down
4 changes: 1 addition & 3 deletions daisy/serial_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ class SerialServer(ServerObservee):
def __init__(self):
super().__init__()

def run_blockwise(
self, tasks: list[Task], scheduler=None
) -> dict[str, TaskState]:
def run_blockwise(self, tasks: list[Task], scheduler=None) -> dict[str, TaskState]:
if scheduler is None:
scheduler = Scheduler(tasks)
else:
Expand Down
3 changes: 0 additions & 3 deletions daisy/server_observer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
class ServerObserver:

def __init__(self, server):

self.server = server
server.register_observer(self)

Expand All @@ -25,7 +23,6 @@ def on_server_exit(self):


class ServerObservee:

def __init__(self):
self.observers = []

Expand Down
7 changes: 0 additions & 7 deletions daisy/task_worker_pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@


class TaskWorkerPools(ServerObserver):

def __init__(self, tasks, server, max_block_failures=3):

super().__init__(server)

logger.debug("Creating worker pools")
Expand All @@ -26,7 +24,6 @@ def __init__(self, tasks, server, max_block_failures=3):
self.failure_counts = {}

def recruit_workers(self, tasks):

for task_id, worker_pool in self.worker_pools.items():
if task_id in tasks:
logger.debug(
Expand All @@ -37,18 +34,15 @@ def recruit_workers(self, tasks):
worker_pool.set_num_workers(tasks[task_id].num_workers)

def stop(self):

logger.debug("Stopping all workers")
for worker_pool in self.worker_pools.values():
worker_pool.stop()

def check_worker_health(self):

for worker_pool in self.worker_pools.values():
worker_pool.check_for_errors()

def on_block_failure(self, block, exception, context):

task_id = context["task_id"]
worker_id = int(context["worker_id"])

Expand All @@ -61,7 +55,6 @@ def on_block_failure(self, block, exception, context):
self.failure_counts[task_id][worker_id] += 1

if self.failure_counts[task_id][worker_id] > self.max_block_failures:

logger.error(
"Worker %s failed too many times, restarting this worker...", context
)
Expand Down
3 changes: 0 additions & 3 deletions daisy/tcp/io_looper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ def clear():
IOLooper.ioloops = {}

def __init__(self):

pid = os.getpid()

if pid not in IOLooper.threads:

logger.debug("Creating new IOLoop for process %d...", pid)
self.ioloop = tornado.ioloop.IOLoop()
self.ioloops[pid] = self.ioloop
Expand All @@ -44,6 +42,5 @@ def __init__(self):
IOLooper.threads[pid].start()

else:

logger.debug("Reusing IOLoop for process %d...", pid)
self.ioloop = self.ioloops[pid]
Loading

0 comments on commit 49fc711

Please sign in to comment.