Skip to content

Commit

Permalink
merge adjacent intervals from different threads
Browse files Browse the repository at this point in the history
  • Loading branch information
glennhickey committed Apr 5, 2024
1 parent 89af9b6 commit 7ce91eb
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 24 deletions.
76 changes: 53 additions & 23 deletions src/rgfa.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ void RGFACover::compute(const PathHandleGraph* graph,
this->num_ref_intervals = this->rgfa_intervals.size();

#ifdef debug
#pragma omp critical(cerr)
cerr << "[rgfa] Selected " << rgfa_intervals.size() << " rank=0 reference paths" << endl;
#endif

Expand All @@ -148,8 +149,6 @@ void RGFACover::compute(const PathHandleGraph* graph,

vector<const Snarl*> queue = {snarl};

cerr << "top level snarl " << pb2json(*snarl) << endl;

while(!queue.empty()) {
const Snarl* cur_snarl = queue.back();
queue.pop_back();
Expand All @@ -168,24 +167,23 @@ void RGFACover::compute(const PathHandleGraph* graph,
}
});

// now we need to fold up the thread covers
// now we need to fold up the thread covers
for (int64_t t = 0; t < thread_count; ++t) {
int64_t offset = this->rgfa_intervals.size();

// todo: figure out one-shot stl invocation to move
for (int64_t j = 0; j < rgfa_intervals_vector[t].size(); ++j) {
this->rgfa_intervals.push_back(rgfa_intervals_vector[t][j]);
}
#ifdef debug
#pragma omp critical(cerr)
cerr << "Adding " << rgfa_intervals_vector[t].size() << " rgfa intervals from thread " << t << endl;
#endif
rgfa_intervals_vector[t].clear();

for (const auto& node_interval : node_to_interval_vector[t]) {
this->node_to_interval[node_interval.first] = node_interval.second + offset;
// important to go through function rather than do a raw copy since
// inter-top-level snarl merging may need to happen
for (int64_t j = 0; j < rgfa_intervals_vector[t].size(); ++j) {
// the true flag at the end disables the overlap check. since they were computed
// in separate threads, snarls can overlap by a single node
add_interval(this->rgfa_intervals, this->node_to_interval, rgfa_intervals_vector[t][j], true);
}
rgfa_intervals_vector[t].clear();
node_to_interval_vector[t].clear();
}


// todo: we could optionally go through uncovered nodes and try to greedily search
// for rgfa intervals at this point, since it seems for some graphs there are
Expand Down Expand Up @@ -345,7 +343,10 @@ void RGFACover::load(const PathHandleGraph* graph,

void RGFACover::apply(MutablePathMutableHandleGraph* mutable_graph) {
assert(this->graph == static_cast<PathHandleGraph*>(mutable_graph));

#ifdef debug
cerr << "applying rGFA cover with " << this->num_ref_intervals << " ref intervals "
<< " and " << this->rgfa_intervals.size() << " total intervals" << endl;
#endif
// index paths isued in rgfa cover
unordered_map<step_handle_t, int64_t> step_to_offset;
unordered_set<path_handle_t> source_path_set;
Expand Down Expand Up @@ -484,6 +485,7 @@ void RGFACover::compute_snarl(const Snarl& snarl, PathTraversalFinder& path_trav
}
}
#ifdef debug
#pragma omp critical(cerr)
cerr << "doing snarl " << pb2json(snarl.start()) << "-" << pb2json(snarl.end()) << " with " << travs.size() << " travs" << endl;
#endif

Expand Down Expand Up @@ -537,9 +539,12 @@ void RGFACover::compute_snarl(const Snarl& snarl, PathTraversalFinder& path_trav
const pair<int64_t, int64_t>& uncovered_interval = best_stats_fragment.fragment;

#ifdef debug
#pragma omp critical(cerr)
{
cerr << "best trav: ";
for (auto xx : trav) cerr << " " << graph->get_id(graph->get_handle_of_step(xx));
cerr << endl << "uncovered interval [" << uncovered_interval.first << "," << uncovered_interval.second << "]" <<endl;
}
#endif


Expand Down Expand Up @@ -582,6 +587,7 @@ void RGFACover::compute_snarl(const Snarl& snarl, PathTraversalFinder& path_trav
graph->get_next_step(trav.at(uncovered_interval.second - 1)));
#ifdef debug
int64_t interval_length = uncovered_interval.second - uncovered_interval.first;
#pragma omp critical(cerr)
cerr << "adding interval with length " << interval_length << endl;
#endif
add_interval(thread_rgfa_intervals, thread_node_to_interval, new_interval);
Expand Down Expand Up @@ -619,9 +625,17 @@ vector<pair<int64_t, int64_t>> RGFACover::get_uncovered_intervals(const vector<s

bool RGFACover::add_interval(vector<pair<step_handle_t, step_handle_t>>& thread_rgfa_intervals,
unordered_map<nid_t, int64_t>& thread_node_to_interval,
const pair<step_handle_t, step_handle_t>& new_interval) {
const pair<step_handle_t, step_handle_t>& new_interval,
bool global) {

#ifdef debug
#pragma omp critical(cerr)
cerr << "adding interval " << graph->get_path_name(graph->get_path_handle_of_step(new_interval.first))
<< ":" << graph->get_id(graph->get_handle_of_step(new_interval.first))
<< "-" << graph->get_id(graph->get_handle_of_step(new_interval.second)) << endl;
#endif
bool merged = false;
int64_t merged_interval_idx = -1;
path_handle_t path_handle = graph->get_path_handle_of_step(new_interval.first);

// check the before-first step. if it's in an interval then it must be immediately
Expand All @@ -630,11 +644,19 @@ bool RGFACover::add_interval(vector<pair<step_handle_t, step_handle_t>>& thread_
if (before_first_step != graph->path_front_end(graph->get_path_handle_of_step(before_first_step))) {
nid_t prev_node_id = graph->get_id(graph->get_handle_of_step(before_first_step));
if (thread_node_to_interval.count(prev_node_id)) {
pair<step_handle_t, step_handle_t>& prev_interval = thread_rgfa_intervals[thread_node_to_interval[prev_node_id]];
int64_t prev_idx = thread_node_to_interval[prev_node_id];
pair<step_handle_t, step_handle_t>& prev_interval = thread_rgfa_intervals[prev_idx];
if (graph->get_path_handle_of_step(prev_interval.first) == path_handle) {
assert(prev_interval.second == new_interval.first);
#ifdef debug
#pragma omp critical(cerr)
cerr << "prev interval found" << graph->get_path_name(graph->get_path_handle_of_step(prev_interval.first))
<< ":" << graph->get_id(graph->get_handle_of_step(prev_interval.first))
<< "-" << graph->get_id(graph->get_handle_of_step(prev_interval.second)) << endl;
#endif
assert(global || prev_interval.second == new_interval.first);
prev_interval.second = new_interval.second;
merged = true;
merged_interval_idx = prev_idx;
}
}
}
Expand All @@ -644,24 +666,32 @@ bool RGFACover::add_interval(vector<pair<step_handle_t, step_handle_t>>& thread_
if (new_interval.second != graph->path_end(graph->get_path_handle_of_step(new_interval.second))) {
nid_t next_node_id = graph->get_id(graph->get_handle_of_step(new_interval.second));
if (thread_node_to_interval.count(next_node_id)) {
pair<step_handle_t, step_handle_t>& next_interval = thread_rgfa_intervals[thread_node_to_interval[next_node_id]];
int64_t next_idx = thread_node_to_interval[next_node_id];
pair<step_handle_t, step_handle_t>& next_interval = thread_rgfa_intervals[next_idx];
path_handle_t next_path = graph->get_path_handle_of_step(next_interval.first);
if (graph->get_path_handle_of_step(next_interval.first) == path_handle) {
assert(next_interval.first == new_interval.second);
#ifdef debug
#pragma omp critical(cerr)
cerr << "next interval found" << graph->get_path_name(graph->get_path_handle_of_step(next_interval.first))
<< ":" << graph->get_id(graph->get_handle_of_step(next_interval.first))
<< "-" << graph->get_id(graph->get_handle_of_step(next_interval.second)) << endl;
#endif
assert(global || next_interval.first == new_interval.second);
next_interval.first = new_interval.first;
merged = true;
merged_interval_idx = next_idx;
}
}
}

// add the interval to the local (thread safe) structures
if (!merged) {
for (step_handle_t step = new_interval.first; step != new_interval.second; step = graph->get_next_step(step)) {
thread_node_to_interval[graph->get_id(graph->get_handle_of_step(step))] = thread_rgfa_intervals.size();
}
merged_interval_idx = thread_rgfa_intervals.size();
thread_rgfa_intervals.push_back(new_interval);
}

for (step_handle_t step = new_interval.first; step != new_interval.second; step = graph->get_next_step(step)) {
thread_node_to_interval[graph->get_id(graph->get_handle_of_step(step))] = merged_interval_idx;
}
return !merged;
}

Expand Down
3 changes: 2 additions & 1 deletion src/rgfa.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ class RGFACover {
// returns true if a new interval was added, false if an existing interval was updated
bool add_interval(vector<pair<step_handle_t, step_handle_t>>& thread_rgfa_intervals,
unordered_map<nid_t, int64_t>& thread_node_to_interval,
const pair<step_handle_t, step_handle_t>& new_interval);
const pair<step_handle_t, step_handle_t>& new_interval,
bool global = false);

// get the total coverage of a traversal (sum of step lengths)
int64_t get_coverage(const vector<step_handle_t>& trav, const pair<int64_t, int64_t>& uncovered_interval);
Expand Down

1 comment on commit 7ce91eb

@adamnovak
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vg CI tests complete for branch rgfa2. View the full report here.

16 tests passed, 0 tests failed and 0 tests skipped in 17070 seconds

Please sign in to comment.