diff --git a/bin/automated-update.js b/bin/automated-update.js index d46cc51..6177fdb 100644 --- a/bin/automated-update.js +++ b/bin/automated-update.js @@ -1,14 +1,17 @@ const fs = require('fs') const prompts = require('prompts') const childProcess = require('child_process') -const {getEntity} = require('../lib/') - +const util = require('util') +const {Transform, finished} = require('stream') const {BigQuery} = require('@google-cloud/bigquery') +const {getEntity} = require('../lib/') + const HA_REQUESTS_TABLE_REGEX = /`httparchive\.requests\.\w+`/g const HA_LH_TABLE_REGEX = /`httparchive\.lighthouse\.\w+`/g const LH_3P_TABLE_REGEX = /`lighthouse-infrastructure\.third_party_web\.\w+`/g const DATE_UNDERSCORE_REGEX = /\d{4}_\d{2}_\d{2}/g +const LH_PROJECT_REGEX = /lighthouse-infrastructure/g const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA ? [ @@ -20,6 +23,7 @@ const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA [process.env.OVERRIDE_HA_LH_TABLE, HA_LH_TABLE_REGEX], [process.env.OVERRIDE_HA_REQUESTS_TABLE, HA_REQUESTS_TABLE_REGEX], [process.env.OVERRIDE_LH_3P_TABLE, LH_3P_TABLE_REGEX], + [process.env.OVERRIDE_LH_PROJECT, LH_PROJECT_REGEX], ].filter(([override]) => override) function getQueryForTable(filename, dateUnderscore) { @@ -76,6 +80,40 @@ async function getTargetDatasetDate() { return {dateStringUnderscore, dateStringHypens} } +const getQueryResultStream = async query => { + const [job] = await new BigQuery().createQueryJob({ + query, + location: 'US', + useQueryCache: false, + }) + return job.getQueryResultsStream() +} +const resolveOnFinished = streams => { + const toFinishedPromise = util.promisify(finished) + return Promise.all(streams.map(s => toFinishedPromise(s))) +} +const getJSONStringTransformer = rowCounter => { + return new Transform({ + objectMode: true, + transform(row, _, callback) { + const prefix = rowCounter === undefined ? '' : !rowCounter++ ? '[\n' : ',\n' + callback(null, prefix + JSON.stringify(row)) + }, + }) +} +const EntityCanonicalDomainTransformer = new Transform({ + objectMode: true, + transform(row, _, callback) { + const entity = getEntity(row.domain) + const thirdPartyWebRow = { + domain: row.domain, + canonicalDomain: entity && entity.domains[0], + category: (entity && entity.categories[0]) || 'unknown', + } + callback(null, thirdPartyWebRow) + }, +}) + async function main() { const {dateStringUnderscore, dateStringHypens} = await getTargetDatasetDate() @@ -99,44 +137,49 @@ async function main() { await withExistenceCheck(observedDomainsFilename, { checkExistenceFn: () => fs.existsSync(observedDomainsFilename), actionFn: async () => { - const bqClient = new BigQuery() - - const queryOptions = { - query: allObservedDomainsQuery, - location: 'US', - } - - const [job] = await bqClient.createQueryJob(queryOptions) - console.log(`Job ${job.id} started.`) - - // Wait for the query to finish - const [rows] = await job.getQueryResults() + console.log(`Start observed domains query`) - console.log('Wrote', rows.length, 'rows to', observedDomainsFilename) - fs.writeFileSync(observedDomainsFilename, JSON.stringify(rows)) + const start = Date.now() - const rowsForNewTable = rows.map(row => { - const entity = getEntity(row.domain) + const resultsStream = await getQueryResultStream(allObservedDomainsQuery) - return { - domain: row.domain, - canonicalDomain: entity && entity.domains[0], - category: (entity && entity.categories[0]) || 'unknown', - } - }) + // Observed domain json file pipe + let observedDomainsNbRows = 0 + const observedDomainsFileWriterStream = fs.createWriteStream(observedDomainsFilename) + resultsStream + // stringify observed domain json (with json array prefix based on row index) + .pipe(getJSONStringTransformer(observedDomainsNbRows)) + // write to observed-domains json file + .pipe(observedDomainsFileWriterStream) - const schema = [ - {name: 'domain', type: 'STRING'}, - {name: 'canonicalDomain', type: 'STRING'}, - {name: 'category', type: 'STRING'}, - ] - - console.log('Creating', dateStringUnderscore, 'table. This may take a while...') - await bqClient + // Observed domain entity mapping table pipe + const thirdPartyWebTableWriterStream = new BigQuery() .dataset('third_party_web') .table(dateStringUnderscore) - .insert(rowsForNewTable, {schema, location: 'US'}) - console.log('Inserted', rowsForNewTable.length, 'rows') + .createWriteStream({ + schema: [ + {name: 'domain', type: 'STRING'}, + {name: 'canonicalDomain', type: 'STRING'}, + {name: 'category', type: 'STRING'}, + ], + }) + resultsStream + // map observed domain to entity + .pipe(EntityCanonicalDomainTransformer) + // stringify json + .pipe(getJSONStringTransformer()) + // write to thrid_party_web table + .pipe(thirdPartyWebTableWriterStream) + + // Wait both streams to finish + await resolveOnFinished([observedDomainsFileWriterStream, thirdPartyWebTableWriterStream]) + + // Close observed domains json array in file + fs.appendFileSync(observedDomainsFilename, '\n]') + + console.log( + `Finish query in ${(Date.now() - start) / 1000}s. Wrote ${observedDomainsNbRows} rows.` + ) }, deleteFn: async () => { const bqClient = new BigQuery() @@ -153,22 +196,30 @@ async function main() { await withExistenceCheck(entityScriptingFilename, { checkExistenceFn: () => fs.existsSync(entityScriptingFilename), actionFn: async () => { - const bqClient = new BigQuery() + console.log(`Start entity scripting query`) + + const start = Date.now() + + const resultsStream = await getQueryResultStream(entityPerPageQuery) - const queryOptions = { - query: entityPerPageQuery, - location: 'US', - } + // Entity scripting json file pipe + let entityScriptingNbRows = 0 + const entityScriptingFileWriterStream = fs.createWriteStream(entityScriptingFilename) + resultsStream + // stringify entity scripting json (with json array prefix based on row index) + .pipe(getJSONStringTransformer(entityScriptingNbRows)) + // write to entity-scripting json file + .pipe(entityScriptingFileWriterStream) - console.log('Querying execution per entity...') - const [job] = await bqClient.createQueryJob(queryOptions) - console.log(`Job ${job.id} started.`) + // Wait stream to finish + await resolveOnFinished([entityScriptingFileWriterStream]) - // Wait for the query to finish - const [rows] = await job.getQueryResults() + console.log( + `Finish query in ${(Date.now() - start) / 1000}s. Wrote ${entityScriptingNbRows} rows.` + ) - console.log('Wrote', rows.length, 'rows to', entityScriptingFilename) - fs.writeFileSync(entityScriptingFilename, JSON.stringify(rows, null, 2)) + // Close observed domains json array in file + fs.appendFileSync(entityScriptingFilename, ']') }, deleteFn: () => {}, exitFn: () => {},