Skip to content

Commit

Permalink
WARC spout to emit captures into topology (implements apache#755)
Browse files Browse the repository at this point in the history
- remove spout-internal sleep
- upgrade jwarc dependency to 0.13.0
  • Loading branch information
sebastian-nagel committed Jun 23, 2020
1 parent 1037407 commit 9795f5b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 25 deletions.
2 changes: 1 addition & 1 deletion external/warc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<properties>
<commons-codec.version>1.10</commons-codec.version>
<jwarc.version>0.12.0</jwarc.version>
<jwarc.version>0.13.0</jwarc.version>
<mockito-all.version>1.10.8</mockito-all.version>
<wiremock.version>2.19.0</wiremock.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,7 @@ private void openWARC() {
LOG.info("Reading WARC file {}", warcFileInProgress);
ReadableByteChannel warcChannel = null;
try {
if (warcFileInProgress.matches("^https?://.*")) {
URL warcUrl = new URL(warcFileInProgress);
warcChannel = Channels.newChannel(warcUrl.openStream());
} else {
warcChannel = FileChannel.open(Paths.get(warcFileInProgress));
}
warcChannel = openChannel(warcFileInProgress);
warcReader = new WarcReader(warcChannel);
} catch (IOException e) {
LOG.error("Failed to open WARC file " + warcFileInProgress, e);
Expand All @@ -154,6 +149,16 @@ private void openWARC() {
}
}

private static ReadableByteChannel openChannel(String path)
throws IOException {
if (path.matches("^https?://.*")) {
URL warcUrl = new URL(path);
return Channels.newChannel(warcUrl.openStream());
} else {
return FileChannel.open(Paths.get(path));
}
}

private void closeWARC() {
LOG.info("Finished reading WARC file {}", warcFileInProgress);
try {
Expand Down Expand Up @@ -375,10 +380,6 @@ public void nextTuple() {

if (warcReader == null && buffer.isEmpty()) {
// input exhausted
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
}
return;
}

Expand All @@ -390,13 +391,11 @@ public void nextTuple() {
if (warcType == null) {
LOG.warn("No type for {}", record.get().getClass());
} else {
eventCounter.scope(
"warc_skipped_record_of_type_" + record.get().type())
eventCounter.scope("warc_skipped_record_of_type_" + warcType)
.incr();
LOG.debug("Skipped WARC record of type {}",
record.get().type());
LOG.debug("Skipped WARC record of type {}", warcType);
}
if (storeHTTPHeaders && "request".equals(warcType)) {
if (storeHTTPHeaders && record.get() instanceof WarcRequest) {
// store request records to be able to add HTTP request
// header to metadata
precedingWarcRequest = (WarcRequest) record.get();
Expand Down Expand Up @@ -501,7 +500,6 @@ public void nextTuple() {

_collector.emit(new Values(url, content, metadata));

sleep(sleepEmitFetched, 0);
return;
}

Expand All @@ -510,14 +508,6 @@ public void nextTuple() {
// redirects, 404s, etc.
_collector.emit(Constants.StatusStreamName,
new Values(url, metadata, status), url);
sleep(0, 1000);
}

private void sleep(long millis, int nanos) {
try {
Thread.sleep(millis, nanos);
} catch (InterruptedException e) {
}
}

@Override
Expand Down

0 comments on commit 9795f5b

Please sign in to comment.