Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use query stream for automatic data update script #224

Merged
merged 8 commits into from
Aug 7, 2024
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 97 additions & 46 deletions bin/automated-update.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
const fs = require('fs')
const prompts = require('prompts')
const childProcess = require('child_process')
const {getEntity} = require('../lib/')

const util = require('util')
const {Transform, finished} = require('stream')
const {BigQuery} = require('@google-cloud/bigquery')

const {getEntity} = require('../lib/')

const HA_REQUESTS_TABLE_REGEX = /`httparchive\.requests\.\w+`/g
const HA_LH_TABLE_REGEX = /`httparchive\.lighthouse\.\w+`/g
const LH_3P_TABLE_REGEX = /`lighthouse-infrastructure\.third_party_web\.\w+`/g
const DATE_UNDERSCORE_REGEX = /\d{4}_\d{2}_\d{2}/g
const LH_REGEX = /lighthouse-infrastructure/g
Nigui marked this conversation as resolved.
Show resolved Hide resolved

const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA
? [
Expand All @@ -20,6 +23,7 @@ const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA
[process.env.OVERRIDE_HA_LH_TABLE, HA_LH_TABLE_REGEX],
[process.env.OVERRIDE_HA_REQUESTS_TABLE, HA_REQUESTS_TABLE_REGEX],
[process.env.OVERRIDE_LH_3P_TABLE, LH_3P_TABLE_REGEX],
[process.env.OVERRIDE_LH_PROJECT, LH_REGEX],
patrickhulce marked this conversation as resolved.
Show resolved Hide resolved
].filter(([override]) => override)

function getQueryForTable(filename, dateUnderscore) {
Expand Down Expand Up @@ -76,6 +80,40 @@ async function getTargetDatasetDate() {
return {dateStringUnderscore, dateStringHypens}
}

const getQueryResultStream = async query => {
const [job] = await new BigQuery().createQueryJob({
query,
location: 'US',
useQueryCache: false,
})
return job.getQueryResultsStream()
}
const resolveOnFinished = streams => {
const toFinishedPromise = util.promisify(finished)
return Promise.all(streams.map(s => toFinishedPromise(s)))
}
const getJSONStringTransformer = rowCounter => {
return new Transform({
objectMode: true,
transform(row, _, callback) {
const prefix = rowCounter === undefined ? '' : !rowCounter++ ? '[\n' : ',\n'
callback(null, prefix + JSON.stringify(row))
},
})
}
const EntityCanonicalDomainTransformer = new Transform({
objectMode: true,
transform(row, _, callback) {
const entity = getEntity(row.domain)
const thirdPartyWebRow = {
domain: row.domain,
canonicalDomain: entity && entity.domains[0],
category: (entity && entity.categories[0]) || 'unknown',
}
callback(null, thirdPartyWebRow)
},
})

async function main() {
const {dateStringUnderscore, dateStringHypens} = await getTargetDatasetDate()

Expand All @@ -99,44 +137,49 @@ async function main() {
await withExistenceCheck(observedDomainsFilename, {
checkExistenceFn: () => fs.existsSync(observedDomainsFilename),
actionFn: async () => {
const bqClient = new BigQuery()

const queryOptions = {
query: allObservedDomainsQuery,
location: 'US',
}

const [job] = await bqClient.createQueryJob(queryOptions)
console.log(`Job ${job.id} started.`)

// Wait for the query to finish
const [rows] = await job.getQueryResults()
console.log(`Start observed domains query`)

console.log('Wrote', rows.length, 'rows to', observedDomainsFilename)
fs.writeFileSync(observedDomainsFilename, JSON.stringify(rows))
const start = Date.now()

const rowsForNewTable = rows.map(row => {
const entity = getEntity(row.domain)
const resultsStream = await getQueryResultStream(allObservedDomainsQuery)

return {
domain: row.domain,
canonicalDomain: entity && entity.domains[0],
category: (entity && entity.categories[0]) || 'unknown',
}
})
// Observed domain json file pipe
let observedDomainsNbRows = 0
const observedDomainsFileWriterStream = fs.createWriteStream(observedDomainsFilename)
resultsStream
// stringify observed domain json (with json array prefix based on row index)
.pipe(getJSONStringTransformer(observedDomainsNbRows))
// write to observed-domains json file
.pipe(observedDomainsFileWriterStream)

const schema = [
{name: 'domain', type: 'STRING'},
{name: 'canonicalDomain', type: 'STRING'},
{name: 'category', type: 'STRING'},
]

console.log('Creating', dateStringUnderscore, 'table. This may take a while...')
await bqClient
// Observed domain entity mapping table pipe
const thirdPartyWebTableWriterStream = new BigQuery()
.dataset('third_party_web')
.table(dateStringUnderscore)
.insert(rowsForNewTable, {schema, location: 'US'})
console.log('Inserted', rowsForNewTable.length, 'rows')
.createWriteStream({
schema: [
{name: 'domain', type: 'STRING'},
{name: 'canonicalDomain', type: 'STRING'},
{name: 'category', type: 'STRING'},
],
})
resultsStream
// map observed domain to entity
.pipe(EntityCanonicalDomainTransformer)
// stringify json
.pipe(getJSONStringTransformer())
// write to thrid_party_web table
.pipe(thirdPartyWebTableWriterStream)

// Wait both streams to finish
await resolveOnFinished([observedDomainsFileWriterStream, thirdPartyWebTableWriterStream])

// Close observed domains json array in file
fs.appendFileSync(observedDomainsFilename, '\n]')

console.log(
`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${observedDomainsNbRows} rows.`
)
},
deleteFn: async () => {
const bqClient = new BigQuery()
Expand All @@ -153,22 +196,30 @@ async function main() {
await withExistenceCheck(entityScriptingFilename, {
checkExistenceFn: () => fs.existsSync(entityScriptingFilename),
actionFn: async () => {
const bqClient = new BigQuery()
console.log(`Start entity scripting query`)

const start = Date.now()

const resultsStream = await getQueryResultStream(entityPerPageQuery)

const queryOptions = {
query: entityPerPageQuery,
location: 'US',
}
// Entity scripting json file pipe
let entityScriptingNbRows = 0
const entityScriptingFileWriterStream = fs.createWriteStream(entityScriptingFilename)
resultsStream
// stringify entity scripting json (with json array prefix based on row index)
.pipe(getJSONStringTransformer(entityScriptingNbRows))
// write to entity-scripting json file
.pipe(entityScriptingFileWriterStream)

console.log('Querying execution per entity...')
const [job] = await bqClient.createQueryJob(queryOptions)
console.log(`Job ${job.id} started.`)
// Wait stream to finish
await resolveOnFinished([entityScriptingFileWriterStream])

// Wait for the query to finish
const [rows] = await job.getQueryResults()
console.log(
`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${entityScriptingNbRows} rows.`
)

console.log('Wrote', rows.length, 'rows to', entityScriptingFilename)
fs.writeFileSync(entityScriptingFilename, JSON.stringify(rows, null, 2))
// Close observed domains json array in file
fs.appendFileSync(entityScriptingFilename, ']')
},
deleteFn: () => {},
exitFn: () => {},
Expand Down