From 4b4cb333f0c4736aa0aaa618a3e1d04b57451c73 Mon Sep 17 00:00:00 2001 From: Guillaume NICOLAS Date: Wed, 31 Jul 2024 17:28:04 +0200 Subject: [PATCH 1/8] feat: add OVERRIDE_LH_PROJECT variable to override observed domains project --- bin/automated-update.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bin/automated-update.js b/bin/automated-update.js index d46cc51..e8982e7 100644 --- a/bin/automated-update.js +++ b/bin/automated-update.js @@ -9,6 +9,7 @@ const HA_REQUESTS_TABLE_REGEX = /`httparchive\.requests\.\w+`/g const HA_LH_TABLE_REGEX = /`httparchive\.lighthouse\.\w+`/g const LH_3P_TABLE_REGEX = /`lighthouse-infrastructure\.third_party_web\.\w+`/g const DATE_UNDERSCORE_REGEX = /\d{4}_\d{2}_\d{2}/g +const LH_REGEX = /lighthouse-infrastructure/g const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA ? [ @@ -20,6 +21,7 @@ const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA [process.env.OVERRIDE_HA_LH_TABLE, HA_LH_TABLE_REGEX], [process.env.OVERRIDE_HA_REQUESTS_TABLE, HA_REQUESTS_TABLE_REGEX], [process.env.OVERRIDE_LH_3P_TABLE, LH_3P_TABLE_REGEX], + [process.env.OVERRIDE_LH_PROJECT, LH_REGEX], ].filter(([override]) => override) function getQueryForTable(filename, dateUnderscore) { From 4c2e9289c611ade9c22bad5b1607defad7789339 Mon Sep 17 00:00:00 2001 From: Guillaume NICOLAS Date: Wed, 31 Jul 2024 17:30:38 +0200 Subject: [PATCH 2/8] feat: run bigquery in a stream to handle large results --- bin/automated-update.js | 125 ++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 49 deletions(-) diff --git a/bin/automated-update.js b/bin/automated-update.js index e8982e7..56fc1d1 100644 --- a/bin/automated-update.js +++ b/bin/automated-update.js @@ -53,6 +53,23 @@ async function withExistenceCheck(name, {checkExistenceFn, actionFn, deleteFn, e await actionFn() } +function runQueryStream(query, onData, onEnd) { + return new Promise((resolve, reject) => { + new BigQuery() + .createQueryStream({ + query, + location: 'US', + }) + .on('data', onData) + .on('end', onEnd) + .on('finish', resolve) + .on('error', (...params) => { + console.error('error in query stream', ...params) + reject(...params) + }) + }) +} + async function getTargetDatasetDate() { const msInDay = 24 * 60 * 60 * 1000 const daysIntoCurrentMonth = new Date().getDate() @@ -101,44 +118,48 @@ async function main() { await withExistenceCheck(observedDomainsFilename, { checkExistenceFn: () => fs.existsSync(observedDomainsFilename), actionFn: async () => { - const bqClient = new BigQuery() + console.log(`Start observed domains query`) - const queryOptions = { - query: allObservedDomainsQuery, + const fileWriterStream = fs.createWriteStream(observedDomainsFilename) + const tableWriterStream = new BigQuery() + .dataset('third_party_web') + .table(dateStringUnderscore) + const start = Date.now() + const nbRows = 0 + const schema = { + schema: [ + {name: 'domain', type: 'STRING'}, + {name: 'canonicalDomain', type: 'STRING'}, + {name: 'category', type: 'STRING'}, + ], location: 'US', } - const [job] = await bqClient.createQueryJob(queryOptions) - console.log(`Job ${job.id} started.`) - - // Wait for the query to finish - const [rows] = await job.getQueryResults() - - console.log('Wrote', rows.length, 'rows to', observedDomainsFilename) - fs.writeFileSync(observedDomainsFilename, JSON.stringify(rows)) - - const rowsForNewTable = rows.map(row => { - const entity = getEntity(row.domain) - - return { - domain: row.domain, - canonicalDomain: entity && entity.domains[0], - category: (entity && entity.categories[0]) || 'unknown', + await runQueryStream( + allObservedDomainsQuery, + row => { + const prefix = !nbRows++ ? '[' : ',' + fileWriterStream.write(prefix + JSON.stringify(row)) + const entity = getEntity(row.domain) + tableWriterStream.insert( + { + domain: row.domain, + canonicalDomain: entity && entity.domains[0], + category: (entity && entity.categories[0]) || 'unknown', + }, + schema + ) + }, + () => { + fileWriterStream.write(']') } - }) - - const schema = [ - {name: 'domain', type: 'STRING'}, - {name: 'canonicalDomain', type: 'STRING'}, - {name: 'category', type: 'STRING'}, - ] - - console.log('Creating', dateStringUnderscore, 'table. This may take a while...') - await bqClient - .dataset('third_party_web') - .table(dateStringUnderscore) - .insert(rowsForNewTable, {schema, location: 'US'}) - console.log('Inserted', rowsForNewTable.length, 'rows') + ) + .then(() => { + console.log(`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${nbRows} rows.`) + }) + .catch(() => { + process.exit(1) + }) }, deleteFn: async () => { const bqClient = new BigQuery() @@ -155,22 +176,28 @@ async function main() { await withExistenceCheck(entityScriptingFilename, { checkExistenceFn: () => fs.existsSync(entityScriptingFilename), actionFn: async () => { - const bqClient = new BigQuery() - - const queryOptions = { - query: entityPerPageQuery, - location: 'US', - } - - console.log('Querying execution per entity...') - const [job] = await bqClient.createQueryJob(queryOptions) - console.log(`Job ${job.id} started.`) - - // Wait for the query to finish - const [rows] = await job.getQueryResults() - - console.log('Wrote', rows.length, 'rows to', entityScriptingFilename) - fs.writeFileSync(entityScriptingFilename, JSON.stringify(rows, null, 2)) + console.log(`Start entity scripting query`) + + const fileWriterStream = fs.createWriteStream(entityScriptingFilename) + const start = Date.now() + const nbRows = 0 + + await runQueryStream( + entityPerPageQuery, + row => { + const prefix = !nbRows++ ? '[' : ',' + fileWriterStream.write(prefix + JSON.stringify(row)) + }, + () => { + fileWriterStream.write(']') + } + ) + .then(() => { + console.log(`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${nbRows} rows.`) + }) + .catch(() => { + process.exit(1) + }) }, deleteFn: () => {}, exitFn: () => {}, From 1e7efca66252d52def7b23c196d4113b93de8b19 Mon Sep 17 00:00:00 2001 From: Guillaume NICOLAS Date: Thu, 1 Aug 2024 16:07:20 +0200 Subject: [PATCH 3/8] fix: can't increment a constant --- bin/automated-update.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/automated-update.js b/bin/automated-update.js index 56fc1d1..f9bb99f 100644 --- a/bin/automated-update.js +++ b/bin/automated-update.js @@ -125,7 +125,7 @@ async function main() { .dataset('third_party_web') .table(dateStringUnderscore) const start = Date.now() - const nbRows = 0 + let nbRows = 0 const schema = { schema: [ {name: 'domain', type: 'STRING'}, From 8cd138328bf0410e89faa51d62056e46db0dbcfd Mon Sep 17 00:00:00 2001 From: Guillaume NICOLAS Date: Fri, 2 Aug 2024 10:45:20 +0200 Subject: [PATCH 4/8] feat: use pipes to handle streams --- bin/automated-update.js | 163 ++++++++++++++++++++++------------------ 1 file changed, 89 insertions(+), 74 deletions(-) diff --git a/bin/automated-update.js b/bin/automated-update.js index f9bb99f..b7b2931 100644 --- a/bin/automated-update.js +++ b/bin/automated-update.js @@ -1,10 +1,12 @@ const fs = require('fs') const prompts = require('prompts') const childProcess = require('child_process') -const {getEntity} = require('../lib/') - +const util = require('util') +const {Transform, finished} = require('stream') const {BigQuery} = require('@google-cloud/bigquery') +const {getEntity} = require('../lib/') + const HA_REQUESTS_TABLE_REGEX = /`httparchive\.requests\.\w+`/g const HA_LH_TABLE_REGEX = /`httparchive\.lighthouse\.\w+`/g const LH_3P_TABLE_REGEX = /`lighthouse-infrastructure\.third_party_web\.\w+`/g @@ -53,23 +55,6 @@ async function withExistenceCheck(name, {checkExistenceFn, actionFn, deleteFn, e await actionFn() } -function runQueryStream(query, onData, onEnd) { - return new Promise((resolve, reject) => { - new BigQuery() - .createQueryStream({ - query, - location: 'US', - }) - .on('data', onData) - .on('end', onEnd) - .on('finish', resolve) - .on('error', (...params) => { - console.error('error in query stream', ...params) - reject(...params) - }) - }) -} - async function getTargetDatasetDate() { const msInDay = 24 * 60 * 60 * 1000 const daysIntoCurrentMonth = new Date().getDate() @@ -95,6 +80,39 @@ async function getTargetDatasetDate() { return {dateStringUnderscore, dateStringHypens} } +const getQueryResultStream = async query => { + const [job] = await new BigQuery().createQueryJob({ + query, + location: 'US', + }) + return job.getQueryResultsStream() +} +const resolveOnFinished = streams => { + const toFinishedPromise = util.promisify(finished) + return Promise.all(streams.map(s => toFinishedPromise(s))) +} +const getJSONStringTransformer = rowCounter => { + return new Transform({ + objectMode: true, + transform(row, _, callback) { + const prefix = rowCounter === undefined ? '' : !rowCounter++ ? '[\n' : ',\n' + callback(null, prefix + JSON.stringify(row)) + }, + }) +} +const EntityCanonicalDomainTransformer = new Transform({ + objectMode: true, + transform(row, _, callback) { + const entity = getEntity(row.domain) + const thirdPartyWebRow = { + domain: row.domain, + canonicalDomain: entity && entity.domains[0], + category: (entity && entity.categories[0]) || 'unknown', + } + callback(null, thirdPartyWebRow) + }, +}) + async function main() { const {dateStringUnderscore, dateStringHypens} = await getTargetDatasetDate() @@ -120,46 +138,41 @@ async function main() { actionFn: async () => { console.log(`Start observed domains query`) - const fileWriterStream = fs.createWriteStream(observedDomainsFilename) - const tableWriterStream = new BigQuery() + const start = Date.now() + + const resultsStream = await getQueryResultStream(allObservedDomainsQuery) + + // Observed domain json file pipe + let observedDomainsNbRows = 0 + const observedDomainsFileWriterStream = fs.createWriteStream(observedDomainsFilename) + resultsStream + // stringify observed domain json (with json array prefix based on row index) + .pipe(getJSONStringTransformer(observedDomainsNbRows)) + // write to observed-domains json file + .pipe(observedDomainsFileWriterStream) + + // Observed domain entity mapping table pipe + const thirdPartyWebTableWriterStream = new BigQuery() .dataset('third_party_web') .table(dateStringUnderscore) - const start = Date.now() - let nbRows = 0 - const schema = { - schema: [ - {name: 'domain', type: 'STRING'}, - {name: 'canonicalDomain', type: 'STRING'}, - {name: 'category', type: 'STRING'}, - ], - location: 'US', - } - - await runQueryStream( - allObservedDomainsQuery, - row => { - const prefix = !nbRows++ ? '[' : ',' - fileWriterStream.write(prefix + JSON.stringify(row)) - const entity = getEntity(row.domain) - tableWriterStream.insert( - { - domain: row.domain, - canonicalDomain: entity && entity.domains[0], - category: (entity && entity.categories[0]) || 'unknown', - }, - schema - ) - }, - () => { - fileWriterStream.write(']') - } + .createWriteStream() + resultsStream + // map observed domain to entity + .pipe(EntityCanonicalDomainTransformer) + // stringify json + .pipe(getJSONStringTransformer()) + // write to thrid_party_web table + .pipe(thirdPartyWebTableWriterStream) + + // Wait both streams to finish + await resolveOnFinished([observedDomainsFileWriterStream, thirdPartyWebTableWriterStream]) + + // Close observed domains json array in file + fs.appendFileSync(observedDomainsFilename, '\n]') + + console.log( + `Finish query in ${(Date.now() - start) / 1000}s. Wrote ${observedDomainsNbRows} rows.` ) - .then(() => { - console.log(`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${nbRows} rows.`) - }) - .catch(() => { - process.exit(1) - }) }, deleteFn: async () => { const bqClient = new BigQuery() @@ -178,26 +191,28 @@ async function main() { actionFn: async () => { console.log(`Start entity scripting query`) - const fileWriterStream = fs.createWriteStream(entityScriptingFilename) const start = Date.now() - const nbRows = 0 - - await runQueryStream( - entityPerPageQuery, - row => { - const prefix = !nbRows++ ? '[' : ',' - fileWriterStream.write(prefix + JSON.stringify(row)) - }, - () => { - fileWriterStream.write(']') - } + + const resultsStream = await getQueryResultStream(entityPerPageQuery) + + // Entity scripting json file pipe + let entityScriptingNbRows = 0 + const entityScriptingFileWriterStream = fs.createWriteStream(entityScriptingFilename) + resultsStream + // stringify entity scripting json (with json array prefix based on row index) + .pipe(getJSONStringTransformer(entityScriptingNbRows)) + // write to entity-scripting json file + .pipe(entityScriptingFileWriterStream) + + // Wait stream to finish + await resolveOnFinished([entityScriptingFileWriterStream]) + + console.log( + `Finish query in ${(Date.now() - start) / 1000}s. Wrote ${entityScriptingNbRows} rows.` ) - .then(() => { - console.log(`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${nbRows} rows.`) - }) - .catch(() => { - process.exit(1) - }) + + // Close observed domains json array in file + fs.appendFileSync(entityScriptingFilename, ']') }, deleteFn: () => {}, exitFn: () => {}, From 3d75fc9a8321e205171bd31c45d78caf52905fc7 Mon Sep 17 00:00:00 2001 From: Guillaume NICOLAS Date: Fri, 2 Aug 2024 10:49:44 +0200 Subject: [PATCH 5/8] fix: add table schema --- bin/automated-update.js | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bin/automated-update.js b/bin/automated-update.js index b7b2931..2c2d8b3 100644 --- a/bin/automated-update.js +++ b/bin/automated-update.js @@ -155,7 +155,13 @@ async function main() { const thirdPartyWebTableWriterStream = new BigQuery() .dataset('third_party_web') .table(dateStringUnderscore) - .createWriteStream() + .createWriteStream({ + schema: [ + {name: 'domain', type: 'STRING'}, + {name: 'canonicalDomain', type: 'STRING'}, + {name: 'category', type: 'STRING'}, + ], + }) resultsStream // map observed domain to entity .pipe(EntityCanonicalDomainTransformer) From 64d02be8b4108aee62d0f011b86eee09f15bd023 Mon Sep 17 00:00:00 2001 From: Guillaume NICOLAS Date: Tue, 6 Aug 2024 11:56:20 +0200 Subject: [PATCH 6/8] fix: remove cache for queries --- bin/automated-update.js | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/automated-update.js b/bin/automated-update.js index 2c2d8b3..c502343 100644 --- a/bin/automated-update.js +++ b/bin/automated-update.js @@ -84,6 +84,7 @@ const getQueryResultStream = async query => { const [job] = await new BigQuery().createQueryJob({ query, location: 'US', + useQueryCache: false, }) return job.getQueryResultsStream() } From dcc90e0cb4a5ff5d5fd6dd35c54d204a73c0dd67 Mon Sep 17 00:00:00 2001 From: Guillaume NICOLAS Date: Wed, 7 Aug 2024 16:26:16 +0200 Subject: [PATCH 7/8] refactor: rename project regex Co-authored-by: Patrick Hulce --- bin/automated-update.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/automated-update.js b/bin/automated-update.js index c502343..f8b5787 100644 --- a/bin/automated-update.js +++ b/bin/automated-update.js @@ -11,7 +11,7 @@ const HA_REQUESTS_TABLE_REGEX = /`httparchive\.requests\.\w+`/g const HA_LH_TABLE_REGEX = /`httparchive\.lighthouse\.\w+`/g const LH_3P_TABLE_REGEX = /`lighthouse-infrastructure\.third_party_web\.\w+`/g const DATE_UNDERSCORE_REGEX = /\d{4}_\d{2}_\d{2}/g -const LH_REGEX = /lighthouse-infrastructure/g +const LH_PROJECT_REGEX = /lighthouse-infrastructure/g const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA ? [ From e8912d0e0112b1eebd21fa8d9a9c27d51d9d7a31 Mon Sep 17 00:00:00 2001 From: Patrick Hulce Date: Wed, 7 Aug 2024 09:37:55 -0500 Subject: [PATCH 8/8] Update bin/automated-update.js --- bin/automated-update.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/automated-update.js b/bin/automated-update.js index f8b5787..6177fdb 100644 --- a/bin/automated-update.js +++ b/bin/automated-update.js @@ -23,7 +23,7 @@ const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA [process.env.OVERRIDE_HA_LH_TABLE, HA_LH_TABLE_REGEX], [process.env.OVERRIDE_HA_REQUESTS_TABLE, HA_REQUESTS_TABLE_REGEX], [process.env.OVERRIDE_LH_3P_TABLE, LH_3P_TABLE_REGEX], - [process.env.OVERRIDE_LH_PROJECT, LH_REGEX], + [process.env.OVERRIDE_LH_PROJECT, LH_PROJECT_REGEX], ].filter(([override]) => override) function getQueryForTable(filename, dateUnderscore) {