Skip to content

Commit

Permalink
Parallelize host-side hashing and client-side hashing.
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianPugh committed Sep 23, 2024
1 parent 5e68403 commit 8400dfa
Showing 1 changed file with 47 additions and 46 deletions.
93 changes: 47 additions & 46 deletions belay/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,59 +374,59 @@ def sync(
if not folder.exists():
raise ValueError(f'"{folder}" does not exist.')

# Create a list of all files and dirs (on-device).
# This is so we know what to clean up after done syncing.
snippets_to_execute = []

if progress_update:
progress_update(description="Bootstrapping sync...")
if "viper" in self.implementation.emitters:
snippets_to_execute.append("hf_viper")
elif "native" in self.implementation.emitters:
snippets_to_execute.append("hf_native")
else:
snippets_to_execute.append("hf")
with TemporaryDirectory() as tmp_dir, concurrent.futures.ThreadPoolExecutor() as executor:
tmp_dir = Path(tmp_dir)

if self.implementation.name == "circuitpython":
snippets_to_execute.append("ilistdir_circuitpython")
else:
snippets_to_execute.append("ilistdir_micropython")
snippets_to_execute.append("sync_begin")
self._exec_snippet(*snippets_to_execute)

# Remove the keep files from the on-device ``all_files`` set
# so they don't get deleted.
keep_all = folder.is_file() or keep is True
keep = preprocess_keep(keep, dst)
ignore = preprocess_ignore(ignore)

src_files, src_dirs, dst_files = discover_files_dirs(dst, folder, ignore)

if mpy_cross_binary:
dst_files = [
dst_file.with_suffix(".mpy") if dst_file.suffix == ".py" else dst_file for dst_file in dst_files
]
dst_files = [dst_file.as_posix() for dst_file in dst_files]
dst_dirs = generate_dst_dirs(dst, folder, src_dirs)

if keep_all:
self("del __belay_del_fs")
else:
self(f"__belay_del_fs({repr(dst)}, {repr(set(keep + dst_files))}); del __belay_del_fs")
# Create a list of all files and dirs (on-device).
# This is so we know what to clean up after done syncing.
snippets_to_execute = []

# Try and make all remote dirs
if dst_dirs:
if progress_update:
progress_update(description="Creating remote directories...")
self(f"__belay_mkdirs({repr(dst_dirs)})")
# Remove the keep files from the on-device ``all_files`` set
# so they don't get deleted.
keep_all = folder.is_file() or keep is True
keep = preprocess_keep(keep, dst)
ignore = preprocess_ignore(ignore)

with TemporaryDirectory() as tmp_dir, concurrent.futures.ThreadPoolExecutor() as executor:
tmp_dir = Path(tmp_dir)
src_files, src_dirs, dst_files = discover_files_dirs(dst, folder, ignore)

def _preprocess_src_file_hash_helper(src_file):
return preprocess_src_file_hash(tmp_dir, src_file, minify, mpy_cross_binary)

src_files_and_hashes = executor.map(_preprocess_src_file_hash_helper, src_files)
futures = [executor.submit(_preprocess_src_file_hash_helper, src_file) for src_file in src_files]

if progress_update:
progress_update(description="Bootstrapping sync...")
if "viper" in self.implementation.emitters:
snippets_to_execute.append("hf_viper")
elif "native" in self.implementation.emitters:
snippets_to_execute.append("hf_native")
else:
snippets_to_execute.append("hf")

if self.implementation.name == "circuitpython":
snippets_to_execute.append("ilistdir_circuitpython")
else:
snippets_to_execute.append("ilistdir_micropython")
snippets_to_execute.append("sync_begin")
self._exec_snippet(*snippets_to_execute)

if mpy_cross_binary:
dst_files = [
dst_file.with_suffix(".mpy") if dst_file.suffix == ".py" else dst_file for dst_file in dst_files
]
dst_files = [dst_file.as_posix() for dst_file in dst_files]
dst_dirs = generate_dst_dirs(dst, folder, src_dirs)

if keep_all:
self("del __belay_del_fs")
else:
self(f"__belay_del_fs({repr(dst)}, {repr(set(keep + dst_files))}); del __belay_del_fs")

# Try and make all remote dirs
if dst_dirs:
if progress_update:
progress_update(description="Creating remote directories...")
self(f"__belay_mkdirs({repr(dst_dirs)})")

# Get all remote hashes
if progress_update:
Expand All @@ -436,6 +436,7 @@ def _preprocess_src_file_hash_helper(src_file):
if len(dst_hashes) != len(dst_files):
raise InternalError

src_files_and_hashes = [future.result() for future in futures]
puts = []
for (src_file, src_hash), dst_file, dst_hash in zip(src_files_and_hashes, dst_files, dst_hashes):
if src_hash != dst_hash:
Expand Down

0 comments on commit 8400dfa

Please sign in to comment.