Skip to content

Commit

Permalink
Add key to skip osm-doc
Browse files Browse the repository at this point in the history
  • Loading branch information
kiselev-dv committed Dec 1, 2016
1 parent dfd6f83 commit 0d109ab
Show file tree
Hide file tree
Showing 2 changed files with 270 additions and 255 deletions.
80 changes: 42 additions & 38 deletions Gazetteer/src/main/java/me/osm/gazetter/join/JoinExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@
import org.slf4j.LoggerFactory;

public class JoinExecutor implements JoinFailuresHandler{

private AddrJointHandler addrPointFormatter = new AddrPointFormatter();

private static final Logger log = LoggerFactory.getLogger(JoinExecutor.class.getName());

private AtomicInteger stripesCounter;

private Set<String> filter;

private boolean dropHghNetGeometries = true;
private boolean cleanStripes = false;

public JoinExecutor(Boolean skipHghNets, Boolean keepHghNetGeometries, Boolean cleanStripes, Set<String> filter) {
this.filter = filter;

if(skipHghNets != null) {
this.buildStreetNetworks = !skipHghNets;
}
Expand All @@ -57,17 +57,17 @@ public JoinExecutor(Boolean skipHghNets, Boolean keepHghNetGeometries, Boolean c

private JoinBoundariesExecutor jbe = new JoinBoundariesExecutor();



public static class StripeFilenameFilter implements FilenameFilter {

@Override
public boolean accept(File dir, String name) {
return name.matches("stripe[\\.\\d-]+\\.gjson(\\.gz)?(?!.)");
}

}

public static final StripeFilenameFilter STRIPE_FILE_FN_FILTER = new StripeFilenameFilter();

public void run(String stripesFolder, String coomonPartFile) {
Expand All @@ -83,7 +83,7 @@ public void run(String stripesFolder, String coomonPartFile) {
"Join stripes done in {}",
DurationFormatUtils.formatDurationHMS(new Date().getTime()
- start));

if (cleanStripes) {
log.info("Clean stripes in {}", stripesFolder);
File folder = new File(stripesFolder);
Expand All @@ -93,10 +93,10 @@ public void run(String stripesFolder, String coomonPartFile) {
}
log.info("Removed {} stripes files", stripesFiles.length);
}

start = new Date().getTime();
jbe.run(stripesFolder, common, filter);

} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -105,12 +105,12 @@ public void run(String stripesFolder, String coomonPartFile) {
"Join boundaries done in {}",
DurationFormatUtils.formatDurationHMS(new Date().getTime()
- start));

start = new Date().getTime();
for(JoinOutHandler h : Options.get().getJoinOutHandlers()) {
h.allDone();
}

log.info(
"All handlers done in {}",
DurationFormatUtils.formatDurationHMS(new Date().getTime()
Expand All @@ -120,44 +120,48 @@ public void run(String stripesFolder, String coomonPartFile) {
private final List<File> fails = Collections.synchronizedList(new ArrayList<File>());

private boolean buildStreetNetworks = true;

private void joinStripes(String stripesFolder, List<JSONObject> common) {

int threads = Options.get().getNumberOfThreads();

LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(threads);
ExecutorService executorService = new ThreadPoolExecutor(threads, threads, 0L,
ExecutorService executorService = new ThreadPoolExecutor(threads, threads, 0L,
TimeUnit.MILLISECONDS, queue);

File folder = new File(stripesFolder);
File[] stripesFiles = folder.listFiles(STRIPE_FILE_FN_FILTER);
stripesCounter = new AtomicInteger(stripesFiles.length);
if (stripesFiles == null) {
log.info("Data directory is empty, nothing to join");
return;
}
stripesCounter = new AtomicInteger(stripesFiles.length);
fails.clear();
for(File stripeF : stripesFiles) {
tryToExecute(common, threads, queue, executorService, stripeF);
}

executorService.shutdown();
try {
while(!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
//still waiting
}

} catch (InterruptedException e) {
throw new RuntimeException("Executor service shutdown failed.", e);
}

if(!fails.isEmpty()) {
log.info("Rerun join for {} from {} files. In one thread.", fails.size(), stripesFiles.length);
}

ArrayList<File> oneThread = new ArrayList<File>(fails);
fails.clear();
for(File stripeF : oneThread ) {
new JoinSliceRunable(addrPointFormatter, stripeF, common,
new JoinSliceRunable(addrPointFormatter, stripeF, common,
filter, buildStreetNetworks, dropHghNetGeometries, this, this).run();
}

if(!fails.isEmpty()) {
log.error("Failed to join: {}", fails);
}
Expand All @@ -166,11 +170,11 @@ private void joinStripes(String stripesFolder, List<JSONObject> common) {
private void tryToExecute(List<JSONObject> common, int threads,
LinkedBlockingQueue<Runnable> queue,
ExecutorService executorService, File stripeF) {

long avaibleRAMMeg = MemorySupervizor.getAvaibleRAMMeg();
if(queue.size() < threads && avaibleRAMMeg > 500) {
log.trace("Send {} to execution queue. Free mem: {}meg", stripeF, avaibleRAMMeg);
executorService.execute(new JoinSliceRunable(addrPointFormatter, stripeF,
executorService.execute(new JoinSliceRunable(addrPointFormatter, stripeF,
common, filter, buildStreetNetworks, dropHghNetGeometries, this, this));
}
else {
Expand All @@ -186,36 +190,36 @@ private void tryToExecute(List<JSONObject> common, int threads,

public static List<JSONObject> getCommonPart(String coomonPartFile) {
List<JSONObject> common = new ArrayList<>();

if(coomonPartFile != null) {

File cpf = new File(coomonPartFile);

if(cpf.exists()) {
try {

JSONArray commonArray = new JSONArray(IOUtils.toString(new FileInputStream(cpf)));
for(int i = 0; i < commonArray.length(); i++) {
common.add(commonArray.getJSONObject(i));
}

} catch (Exception e) {
throw new RuntimeException("Failed to read coomon part.", e);
}
}
}

return common;
}


public AtomicInteger getStripesCounter() {
return stripesCounter;
}

@Override
public void failed(File f) {
fails.add(f);
}

}
Loading

0 comments on commit 0d109ab

Please sign in to comment.