Skip to content

Commit

Permalink
multithreaded file read
Browse files Browse the repository at this point in the history
  • Loading branch information
vdemichev committed Jul 21, 2018
1 parent 8db9bac commit 22d1015
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 22 deletions.
Binary file modified DiaNN.exe
Binary file not shown.
103 changes: 81 additions & 22 deletions src/diann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,8 @@ std::vector<Ion> generate_fragments(std::vector<double> &sequence, std::vector<I
bool ProfileSpectrumWarning = false;
class Tandem {
public:
double RT, window_low, window_high;
int MS_level = 0; // 0 - unknown
double RT = 0.0, window_low = 0.0, window_high = 0.0;
std::vector<Peak> peaks;
inline int size() { return peaks.size(); }
inline void resize(int size) { peaks.resize(size); }
Expand All @@ -1452,10 +1453,11 @@ class Tandem {
#ifdef MSTOOLKIT
const double CentroidSpan = (1.0 / 1000000.0) / 20.0;
#define cent_win(x) ((x) * sqrt(x) * CentroidSpan) // for Orbitraps
Tandem(MSToolkit::Spectrum &s, bool centroid) {
RT = s.getRTime();
window_low = s.getSelWindowLower();
window_high = s.getSelWindowUpper();
void init(MSToolkit::Spectrum &s, bool centroid) {
RT = s.getRTime();
window_low = s.getSelWindowLower();
window_high = s.getSelWindowUpper();
MS_level = s.getMsLevel();

if (!centroid) {
resize(s.size());
Expand Down Expand Up @@ -1515,6 +1517,8 @@ class Tandem {
peaks = centroided;
}
}

void init(Tandem &T) { RT = T.RT, window_low = T.window_low, window_high = T.window_high, peaks = T.peaks; }
#endif

inline bool has(float mz) { return (window_low <= mz && window_high > mz); }
Expand Down Expand Up @@ -3238,6 +3242,10 @@ const int fInit = 1 << 2;

class Run {
public:
Lock lock;
volatile int ms1_cnt, ms2_cnt;
std::atomic<int> sp_alloc;

int run_index;
std::string name; // run name
float weights[pN], guide_weights[pN];
Expand Down Expand Up @@ -4043,6 +4051,54 @@ class Run {
}
}

#ifdef MSTOOLKIT
void load_raw(int thread_id, char * file_name, std::vector<Tandem> * spectra) {
const int Block = 5;
MSToolkit::MSReader r;
std::vector<MSToolkit::Spectrum> s(Block);

r.setFilter(MSToolkit::MS1);
r.addFilter(MSToolkit::MS2);

bool first = true, finish = false;;
int start = 0, stop = 0, next = 0;
while (true) {
int curr = sp_alloc.fetch_add(Block);
start = curr + 1, stop = curr + Block;
for (next = start; next <= stop; next++) {
if (first) {
first = false;
if (!r.readFile(file_name, s[next - start], next)) return;
int total = r.getLastScan();
if (total > 1) {
while (!lock.set()) {}
spectra->reserve(total);
lock.free();
}
} else { if (!r.readFile(NULL, s[next - start], next)) { finish = true; break; } }
if (s[next - start].getScanNumber() == 0) { finish = true; break; }
}

if (next > start) {
while (!lock.set()) {}
if (next > spectra->size()) spectra->resize(next);
for (int i = start; i < next; i++) {
bool centroid = (s[i - start].getCentroidStatus() != 1);
int level = s[i - start].getMsLevel();
if (level != 1 && level != 2) continue;

(*spectra)[i - 1].init(s[i - start], centroid);
if (level == 1) ms1_cnt++;
else ms2_cnt++;
}
lock.free();
}

if (finish) break;
}
}
#endif

bool load(char * file_name) { // Spectra in the file should be ordered by the acquisition time
name = std::string(file_name);
if (Verbose >= 1) Time(), std::cout << "Loading run " << name << "\n";
Expand All @@ -4054,27 +4110,30 @@ class Run {

#ifdef MSTOOLKIT
{
MSToolkit::MSReader r;
MSToolkit::Spectrum s;
std::vector<Tandem> spectra;

r.setFilter(MSToolkit::MS1);
r.addFilter(MSToolkit::MS2);
sp_alloc.store(0);
ms1_cnt = ms2_cnt = 0;
std::vector<std::thread> threads;
int max_threads = Max(1, Threads - 1);
for (int i = 0; i < max_threads; i++) threads.push_back(std::thread(&Run::load_raw, this, i, file_name, &spectra));
for (int i = 0; i < max_threads; i++) threads[i].join();

int cnt = 0;
while (true) {
if (!cnt) {
if (!r.readFile(file_name, s, 1)) {
std::cout << "cannot read the file\n";
return false;
}
} else if (!r.readFile(NULL, s)) break;
if (s.getScanNumber() == 0) break;
if (!ms2_cnt) {
std::cout << "Cannot read the file\n";
return false;
}

bool centroid = (s.getCentroidStatus() != 1);
if (s.getMsLevel() == 2) scans.push_back(Tandem(s, centroid));
else if (s.getMsLevel() == 1) ms1.push_back(Tandem(s, centroid));
cnt++;
scans.clear(); scans.resize(ms2_cnt);
ms1.clear(); ms1.resize(ms1_cnt);

int n_ms1 = 0, n_ms2 = 0;
for (auto &s : spectra) {
if (s.MS_level == 1) ms1[n_ms1++].init(s);
else scans[n_ms2++].init(s);
std::vector<Peak>().swap(s.peaks);
}
std::vector<Tandem>().swap(spectra);
}
#endif

Expand Down

0 comments on commit 22d1015

Please sign in to comment.