From 9795f5b6d339e2b0a979b2ef67bc2af4a002d4b8 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Tue, 23 Jun 2020 16:14:20 +0200 Subject: [PATCH] WARC spout to emit captures into topology (implements #755) - remove spout-internal sleep - upgrade jwarc dependency to 0.13.0 --- external/warc/pom.xml | 2 +- .../stormcrawler/warc/WARCSpout.java | 38 +++++++------------ 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/external/warc/pom.xml b/external/warc/pom.xml index 831ba51f5..c2d6fb55f 100644 --- a/external/warc/pom.xml +++ b/external/warc/pom.xml @@ -18,7 +18,7 @@ 1.10 - 0.12.0 + 0.13.0 1.10.8 2.19.0 diff --git a/external/warc/src/main/java/com/digitalpebble/stormcrawler/warc/WARCSpout.java b/external/warc/src/main/java/com/digitalpebble/stormcrawler/warc/WARCSpout.java index 1b37ca23d..11e990f3e 100644 --- a/external/warc/src/main/java/com/digitalpebble/stormcrawler/warc/WARCSpout.java +++ b/external/warc/src/main/java/com/digitalpebble/stormcrawler/warc/WARCSpout.java @@ -135,12 +135,7 @@ private void openWARC() { LOG.info("Reading WARC file {}", warcFileInProgress); ReadableByteChannel warcChannel = null; try { - if (warcFileInProgress.matches("^https?://.*")) { - URL warcUrl = new URL(warcFileInProgress); - warcChannel = Channels.newChannel(warcUrl.openStream()); - } else { - warcChannel = FileChannel.open(Paths.get(warcFileInProgress)); - } + warcChannel = openChannel(warcFileInProgress); warcReader = new WarcReader(warcChannel); } catch (IOException e) { LOG.error("Failed to open WARC file " + warcFileInProgress, e); @@ -154,6 +149,16 @@ private void openWARC() { } } + private static ReadableByteChannel openChannel(String path) + throws IOException { + if (path.matches("^https?://.*")) { + URL warcUrl = new URL(path); + return Channels.newChannel(warcUrl.openStream()); + } else { + return FileChannel.open(Paths.get(path)); + } + } + private void closeWARC() { LOG.info("Finished reading WARC file {}", warcFileInProgress); try { @@ -375,10 +380,6 @@ public void nextTuple() { if (warcReader == null && buffer.isEmpty()) { // input exhausted - try { - Thread.sleep(10); - } catch (InterruptedException e1) { - } return; } @@ -390,13 +391,11 @@ public void nextTuple() { if (warcType == null) { LOG.warn("No type for {}", record.get().getClass()); } else { - eventCounter.scope( - "warc_skipped_record_of_type_" + record.get().type()) + eventCounter.scope("warc_skipped_record_of_type_" + warcType) .incr(); - LOG.debug("Skipped WARC record of type {}", - record.get().type()); + LOG.debug("Skipped WARC record of type {}", warcType); } - if (storeHTTPHeaders && "request".equals(warcType)) { + if (storeHTTPHeaders && record.get() instanceof WarcRequest) { // store request records to be able to add HTTP request // header to metadata precedingWarcRequest = (WarcRequest) record.get(); @@ -501,7 +500,6 @@ public void nextTuple() { _collector.emit(new Values(url, content, metadata)); - sleep(sleepEmitFetched, 0); return; } @@ -510,14 +508,6 @@ public void nextTuple() { // redirects, 404s, etc. _collector.emit(Constants.StatusStreamName, new Values(url, metadata, status), url); - sleep(0, 1000); - } - - private void sleep(long millis, int nanos) { - try { - Thread.sleep(millis, nanos); - } catch (InterruptedException e) { - } } @Override