Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running multiple chexpert labelers in parallel #34

Open
PabloMessina opened this issue Mar 15, 2022 · 2 comments
Open

Running multiple chexpert labelers in parallel #34

PabloMessina opened this issue Mar 15, 2022 · 2 comments

Comments

@PabloMessina
Copy link

PabloMessina commented Mar 15, 2022

Chexpert labeler is a bit slow. In my machine it labels about 4-5 reports per second on average, which is too slow if you want to label dozens of thousands of reports quickly. As a workaround, I thought that I could leverage the fact that my machine has multiple cores by running multiple instances of chexpert labeler over disjoint splits of my report dataset. To this effect I tried the following:

def _invoke_chexpert_labeler_process(self, reports, tmp_suffix='', n_processes = 10):

    n = len(reports)
    if n < 100:
        n_processes = 1

    chunk_size = n // n_processes
    processes = []
    output_paths = []

    if self.verbose:
        print(f'Chexpert labeler: running {n_processes} processes in parallel')

    start = time.time()
    custom_env = _get_custom_env()

    for i in range(n_processes):
        # Define chunk range
        b = i * chunk_size
        e = n if i + 1 == n_processes else b + chunk_size
        
        # Define input & output paths for i-th chunk
        input_path = os.path.join(TMP_FOLDER, f'labeler-input{tmp_suffix}_{i}.csv')
        output_path = os.path.join(TMP_FOLDER, f'labeler-output{tmp_suffix}_{i}.csv')
        output_paths.append(output_path)

        # Create input file
        os.makedirs(TMP_FOLDER, exist_ok=True)
        in_df = pd.DataFrame(reports[b:e])
        in_df.to_csv(input_path, header=False, index=False, quoting=csv.QUOTE_ALL)

        # Build command & call chexpert labeler process
        cmd_cd = f'cd {CHEXPERT_FOLDER}'
        cmd_call = f'{CHEXPERT_PYTHON} label.py --reports_path {input_path} --output_path {output_path}'
        cmd = f'{cmd_cd} && {cmd_call}'
        if self.verbose:
            print(f'({i}) Running chexpert labeler over {len(in_df)} reports ...')
        processes.append(subprocess.Popen(cmd, shell=True, env=custom_env))
    
    out_labels = np.empty((n, len(CHEXPERT_LABELS)), np.int8)
    
    offset = 0        
    for i, p in enumerate(processes):
        # Wait for subprocess to finish
        if p.poll() is None:
            p.wait()
        if self.verbose: print(f'process {i} finished, elapsed time = {time.time() - start}')
        # Read chexpert-labeler output
        out_df = pd.read_csv(output_paths[i])
        out_df = out_df.fillna(-2)
        out_labels[offset : offset + len(out_df)] = out_df[CHEXPERT_LABELS].to_numpy().astype(np.int8)
        offset += len(out_df)
    
    assert offset == n
    
    return out_labels

Unfortunately, I'm getting this very strange behavior:

Chexpert labeler: running 10 processes in parallel

  1. Running chexpert labeler over 29 reports ...
  2. Running chexpert labeler over 29 reports ...
  3. Running chexpert labeler over 29 reports ...
  4. Running chexpert labeler over 29 reports ...
  5. Running chexpert labeler over 29 reports ...
  6. Running chexpert labeler over 29 reports ...
  7. Running chexpert labeler over 29 reports ...
  8. Running chexpert labeler over 29 reports ...
  9. Running chexpert labeler over 29 reports ...
  10. Running chexpert labeler over 34 reports ...
    process 0 finished, elapsed time = 9.482320785522461
    process 1 finished, elapsed time = 10.595801830291748
    process 2 finished, elapsed time = 203.73371744155884
    process 3 finished, elapsed time = 203.74254941940308
    process 4 finished, elapsed time = 203.7504105567932
    process 5 finished, elapsed time = 209.21588110923767
    process 6 finished, elapsed time = 209.2250039577484
    process 7 finished, elapsed time = 209.2326741218567
    process 8 finished, elapsed time = 209.23797416687012
    process 9 finished, elapsed time = 209.24284863471985

As you can see, the first two processes terminate relatively quickly (in about 10 seconds), but for some unknown reason processes 2 through 9 terminate about 200 seconds later. I've run my code several times and I always get the same result.

I have two questions:

  • Is it possible to run multiple instances of chexpert labeler in parallel for performance gains?
  • If so, is there an example code of how this can be done? Maybe the way I'm doing it is not optimal (to be honest, I'm not even sure if I'm doing it the right way, this is the first time I attempt to parallelize a command using subprocess.Popen).

Thank you very much in advance.

@BardiaKh
Copy link

It is probably too late of an answer, but I didn't have much luck with Python's native mp. I ended up spawning multiple processes using bash. For future reference, the code is shared here:

https://github.com/BardiaKh/ParallelCheXpertLabeler

@PabloMessina
Copy link
Author

@BardiaKh That's great. In my case it turns out that my implementation was okay, as long as the dockerized version of the chexpert labeler is invoked. If you invoke multiple docker containers running the chexpert labeler in parallel, that works like a charm. Code here: https://github.com/PabloMessina/MedVQA/blob/5d4835a390b6e5d697bd2f4f3dfef73af95101bb/medvqa/metrics/medical/chexpert.py#L198

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants