Skip to content

Commit

Permalink
Split stream and process mutations with streaming JSON parsing (#619)
Browse files Browse the repository at this point in the history
* split-stream

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* looking goood!

* ok actually it wasn't working and still isn't

* now really looking good

* move dep

* add more logging

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
theosanderson and pre-commit-ci[bot] authored Oct 8, 2024
1 parent faa18f2 commit 88d5a55
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 33 deletions.
1 change: 1 addition & 0 deletions taxonium_component/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
},
"dependencies": {},
"devDependencies": {
"stream-json": "^1.8.0",
"@fontsource/roboto": "^5.0.1",
"@headlessui/react": "^1.7.17",
"@jbrowse/core": "^2.5.0",
Expand Down
7 changes: 6 additions & 1 deletion taxonium_component/src/webworkers/localBackendWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
import { processNewickAndMetadata } from "../utils/processNewick.js";
import { processNextstrain } from "../utils/processNextstrain.js";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
import { parser } from "stream-json";
import { streamValues } from "stream-json/streamers/StreamValues";

console.log("worker starting");
postMessage({ data: "Worker starting" });
Expand Down Expand Up @@ -211,8 +213,11 @@ onmessage = async (event) => {
processedUploadedData = await processJsonl(
data.data,
sendStatusMessage,
ReadableWebToNodeStream
ReadableWebToNodeStream,
parser,
streamValues
);

console.log("processedUploadedData created");
} else if (
data.type === "upload" &&
Expand Down
12 changes: 12 additions & 0 deletions taxonium_component/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10164,6 +10164,11 @@ stream-browserify@^3.0.0:
inherits "~2.0.4"
readable-stream "^3.5.0"

stream-chain@^2.2.5:
version "2.2.5"
resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09"
integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==

stream-http@^3.2.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/stream-http/-/stream-http-3.2.0.tgz#1872dfcf24cb15752677e40e5c3f9cc1926028b5"
Expand All @@ -10174,6 +10179,13 @@ stream-http@^3.2.0:
readable-stream "^3.6.0"
xtend "^4.0.2"

stream-json@^1.8.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c"
integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw==
dependencies:
stream-chain "^2.2.5"

stream-shift@^1.0.0:
version "1.0.1"
resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.1.tgz#d7088281559ab2778424279b0877da3c392d5a3d"
Expand Down
185 changes: 153 additions & 32 deletions taxonium_data_handling/importing.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,91 @@
import zlib from "zlib";
import stream from "stream";
import buffer from "buffer";
import { send } from "process";

class ChunkCounterStream extends stream.PassThrough {
constructor(sendStatusMessage, options = {}) {
super(options);
this.sendStatusMessage = sendStatusMessage;
this.chunkCount = 0;
}

_transform(chunk, encoding, callback) {
this.chunkCount++;
if (this.chunkCount % 100 === 0) {
this.sendStatusMessage({
message: `Processed ${this.chunkCount} groups of mutations`,
count: this.chunkCount,
});
}

// Pass the chunk through unchanged
this.push(chunk);
callback();
}

_flush(callback) {
this.sendStatusMessage({
message: `Finished processing. Total chunks: ${this.chunkCount}`,
count: this.chunkCount,
finished: true,
});
callback();
}
}

class StreamSplitter extends stream.Transform {
constructor(headerParser, dataParser, options = {}) {
super(options);
this.headerParser = headerParser;
this.dataParser = dataParser;
this.firstPart = true;
this.buffer = null; // Buffer to hold partial data
}

_transform(chunk, encoding, callback) {
let data = chunk;
let newlineIndex = data.indexOf(10); // ASCII code for '\n'

if (this.firstPart) {
if (newlineIndex !== -1) {
// Found newline, split the data
const headerData = data.slice(0, newlineIndex);
const restData = data.slice(newlineIndex + 1);

// Write header data to headerParser
this.headerParser.write(headerData);
this.headerParser.end();

// Write restData to dataParser
if (restData.length > 0) {
this.dataParser.write(restData);
}

this.firstPart = false;
} else {
// No newline found, store data in buffer
this.headerParser.write(data);
}
} else {
// After header is processed, pass data to dataParser
this.dataParser.write(data);
}

callback();
}

_flush(callback) {
if (this.firstPart && this.buffer) {
// No newline found in the entire stream, treat entire data as header
this.headerParser.write(this.buffer);
this.headerParser.end();
this.firstPart = false;
}
this.dataParser.end();
callback();
}
}

const roundToDp = (number, dp) => {
return Math.round(number * Math.pow(10, dp)) / Math.pow(10, dp);
Expand Down Expand Up @@ -28,8 +113,57 @@ function reduceMaxOrMin(array, accessFunction, maxOrMin) {
}
}

export const setUpStream = (the_stream, data, sendStatusMessage) => {
export const setUpStream = (
the_stream,
data,
sendStatusMessage,
parser,
streamValues
) => {
// Header parser
const headerParser = parser({ jsonStreaming: true });
const headerPipeline = headerParser.pipe(streamValues());
headerPipeline.on("data", (chunk) => {
data.header = chunk.value;
data.nodes = [];
data.node_to_mut = {};
});
headerPipeline.on("error", (err) => {
console.error("Header parser error:", err);
});

// Data parser for the rest of the stream
let lineBuffer = "";
let line_number = 0;
const dataParser = new stream.Writable({
write(chunk, encoding, callback) {
const chunkStr = chunk.toString();
let start = 0;
let end = chunkStr.indexOf("\n");

while (end !== -1) {
lineBuffer += chunkStr.slice(start, end);
processLine(lineBuffer, line_number);
line_number++;
lineBuffer = "";
start = end + 1;
end = chunkStr.indexOf("\n", start);
}

lineBuffer += chunkStr.slice(start);
callback();
},
final(callback) {
if (lineBuffer) {
processLine(lineBuffer, line_number);
}
callback();
},
});

function processLine(line, line_number) {
if (line.trim() === "") return;

if ((line_number % 10000 === 0 && line_number > 0) || line_number == 500) {
console.log(`Processed ${formatNumber(line_number)} lines`);
if (data.header.total_nodes) {
Expand All @@ -45,44 +179,31 @@ export const setUpStream = (the_stream, data, sendStatusMessage) => {
});
}
}
// console.log("LINE",line_number,line);
const decoded = JSON.parse(line);
if (line_number === 0) {
data.header = decoded;
data.nodes = [];
data.node_to_mut = {};
} else {
data.node_to_mut[decoded.node_id] = decoded.mutations; // this is an int to ints map
data.nodes.push(decoded);
}
data.node_to_mut[decoded.node_id] = decoded.mutations;
data.nodes.push(decoded);
}
let cur_line = "";
let line_counter = 0;
the_stream.on("data", function (data) {
cur_line += data.toString();
if (cur_line.includes("\n")) {
const lines = cur_line.split("\n");
cur_line = lines.pop();
lines.forEach((line) => {
processLine(line, line_counter);
line_counter++;
});
}
});

the_stream.on("error", function (err) {
console.log(err);
});

the_stream.on("end", function () {
console.log("end");
const chunkCounterStream = new ChunkCounterStream(sendStatusMessage);
chunkCounterStream.pipe(headerParser);
const splitter = new StreamSplitter(chunkCounterStream, dataParser);

// Pipe the input stream through the splitter
the_stream
.pipe(splitter)
.on("error", (err) => console.error("Splitter error:", err));

// Handle the completion of the dataParser
dataParser.on("finish", () => {
console.log("Finished processing the stream");
});
};

export const processJsonl = async (
jsonl,
sendStatusMessage,
ReadableWebToNodeStream
ReadableWebToNodeStream,
parser,
streamValues
) => {
console.log(
"Worker processJsonl" //, jsonl
Expand All @@ -98,7 +219,7 @@ export const processJsonl = async (
the_stream = new stream.PassThrough();
}
let new_data = {};
setUpStream(the_stream, new_data, sendStatusMessage);
setUpStream(the_stream, new_data, sendStatusMessage, parser, streamValues);

if (status === "loaded") {
const dataAsArrayBuffer = data;
Expand Down

0 comments on commit 88d5a55

Please sign in to comment.