Skip to content

Commit

Permalink
feat: use papaparse as our parser (#762)
Browse files Browse the repository at this point in the history
  • Loading branch information
asalem1 authored Jun 13, 2022
1 parent 1d9dacb commit 3f1b857
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 119 deletions.
2 changes: 1 addition & 1 deletion giraffe/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@influxdata/giraffe",
"version": "2.27.1",
"version": "2.28.0",
"main": "dist/index.js",
"module": "dist/index.js",
"license": "MIT",
Expand Down
32 changes: 30 additions & 2 deletions giraffe/src/utils/fromFlux.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,36 @@ describe('fastFromFlux', () => {
const fFlux = fastFromFlux(resp)
expect(fFlux).toEqual(expected)
})
it('parses query with newlines and hashtags', () => {
const CSV = `#group,false,false,true,false
#datatype,string,long,long,string
#default,_result,,,
,result,table,_time_reverse,_value
,,0,-1652887800000000000,"Row 1
#newline #somehashTags https://a_link.com/giraffe"
,,0,-1652887811000000000,Row 2
,,0,-1652888700000000000,"Row 3: 👇👇👇 // emojis
,Mew line!"
,,0,-1652889550000000000,"Row 4:
New line 1!
New line 2!
multiple new lines!`
const {table} = fastFromFlux(CSV)

const expectedColumns = [
`Row 1
#newline #somehashTags https://a_link.com/giraffe`,
'Row 2',
`Row 3: 👇👇👇 // emojis
,Mew line!`,
`Row 4:
New line 1!
New line 2!
multiple new lines!`,
]

expect(table.getColumn('_value')).toStrictEqual(expectedColumns)
})

it('can parse a Flux CSV with mismatched schemas', () => {
const CSV = `#group,false,false,true,true,false,true,true,true,true,true
Expand Down Expand Up @@ -1081,8 +1111,6 @@ describe('fastFromFlux', () => {
expect(() => {
fastFromFlux(CSV)
}).not.toThrow()
const actual = fastFromFlux(CSV)
expect(actual.error).toBeTruthy()
})

it('uses the default annotation to fill in empty values', () => {
Expand Down
231 changes: 116 additions & 115 deletions giraffe/src/utils/fromFlux.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {csvParse, csvParseRows} from 'd3-dsv'
import Papa from 'papaparse'
import {Table, ColumnType, FluxDataType} from '../types'
import {assert} from './assert'
import {newTable} from './newTable'
import {RESULT} from '../constants/columnKeys'
import Papa from 'papaparse'
import {escapeCSVFieldWithSpecialCharacters} from './escapeCSVFieldWithSpecialCharacters'
export interface FromFluxResult {
error?: Error
Expand Down Expand Up @@ -114,7 +114,7 @@ export const fromFlux = (fluxCSV: string): FromFluxResult => {
.substring(currentIndex, fluxCSV.length)
.search(/\n\s*\n#(?=datatype|group|default)/)
if (nextIndex === -1) {
chunks.push([prevIndex, fluxCSV.length])
chunks.push([prevIndex, fluxCSV.length - 1])
currentIndex = -1
break
} else {
Expand All @@ -133,12 +133,19 @@ export const fromFlux = (fluxCSV: string): FromFluxResult => {
let chunk = ''

for (const [start, end] of chunks) {
chunk = fluxCSV.substring(start, end)
/**
* substring doesn't include the index for the end. For example:
*
* 'hello'.substring(0, 1) // 'h'
*
* Given the fact that we want to include the last character of the chunk
* we want to add + 1 to the substring ending
*/
chunk = fluxCSV.substring(start, end + 1)
const parsedChunkData = Papa.parse(chunk).data
const splittedChunk: string[] = parsedChunkData.map(line =>
line.map(escapeCSVFieldWithSpecialCharacters).join(',')
)

const tableTexts = []
const annotationTexts = []

Expand Down Expand Up @@ -269,10 +276,7 @@ export const fromFlux = (fluxCSV: string): FromFluxResult => {

export const fastFromFlux = (fluxCSV: string): FromFluxResult => {
const columns: Columns = {}
const fluxGroupKeyUnion = new Set<string>()
const resultColumnNames = new Set<string>()
let tableLength = 0

try {
/*
A Flux CSV response can contain multiple CSV files each joined by a newline.
Expand Down Expand Up @@ -309,7 +313,7 @@ export const fastFromFlux = (fluxCSV: string): FromFluxResult => {
.substring(curr, fluxCSV.length)
.search(/\n\s*\n#/)
if (nextIndex === -1) {
chunks.push([oldVal, fluxCSV.length])
chunks.push([oldVal, fluxCSV.length - 1])
curr = -1
break
} else {
Expand All @@ -319,120 +323,117 @@ export const fastFromFlux = (fluxCSV: string): FromFluxResult => {
}

// declaring all nested variables here to reduce memory drain
let tableText = ''
let tableData: any = []
let annotationText = ''
let columnType: any = ''
let columnKey = ''
let columnDefault: any = ''

const fluxGroupKeyUnion = new Set<string>()
const resultColumnNames = new Set<string>()
let _end
for (const [start, end] of chunks) {
const splittedChunk = fluxCSV.substring(start, end).split('\n')

const tableTexts = []
const annotationTexts = []

splittedChunk.forEach(line => {
if (line.startsWith('#')) {
annotationTexts.push(line)
} else {
tableTexts.push(line)
}
})

tableText = tableTexts.join('\n').trim()

assert(
!!tableText,
'could not find annotation lines in Flux response; are `annotations` enabled in the Flux query `dialect` option?'
)

// TODO(ariel): csvParse is a slow operation
tableData = csvParse(tableText)

annotationText = annotationTexts.join('\n').trim()

assert(
!!annotationText,
'could not find annotation lines in Flux response; are `annotations` enabled in the Flux query `dialect` option?'
)
const annotationData = parseAnnotations(annotationText, tableData.columns)

for (const columnName of tableData.columns.slice(1)) {
columnType =
TO_COLUMN_TYPE[annotationData.datatypeByColumnName[columnName]]

assert(
!!columnType,
`encountered unknown Flux column type ${annotationData.datatypeByColumnName[columnName]}`
)

columnKey = `${columnName} (${columnType})`

if (!columns[columnKey]) {
columns[columnKey] = {
name: columnName,
type: columnType,
fluxDataType: annotationData.datatypeByColumnName[columnName],
data: [],
} as Column
}

columnDefault = annotationData.defaultByColumnName[columnName]

for (let i = 0; i < tableData.length; i++) {
if (columnName === RESULT) {
if (columnDefault.length) {
resultColumnNames.add(columnDefault)
} else if (tableData[i][columnName].length) {
resultColumnNames.add(tableData[i][columnName])
}
}
const value = tableData[i][columnName] || columnDefault
let result = null

if (value === undefined) {
result = undefined
} else if (value === 'null') {
result = null
} else if (value === 'NaN') {
result = NaN
} else if (columnType === 'boolean' && value === 'true') {
result = true
} else if (columnType === 'boolean' && value === 'false') {
result = false
} else if (columnType === 'string') {
result = value
} else if (columnType === 'time') {
if (/\s/.test(value)) {
result = Date.parse(value.trim())
} else {
result = Date.parse(value)
}
} else if (columnType === 'number') {
if (value === '') {
result = null
} else {
const parsedValue = Number(value)
result = parsedValue === parsedValue ? parsedValue : value
}
_end = end
let annotationMode = true

const parsed = {
group: [],
datatype: [],
default: [],
header: [],
columnKey: [],
}
// we want to move the pointer to the first non-whitespace character at the end of the chunk
while (/\s/.test(fluxCSV[_end]) && _end > start) {
_end--
}
/**
* substring doesn't include the index for the end. For example:
*
* 'hello'.substring(0, 1) // 'h'
*
* Given the fact that we want to include the last character of the chunk
* we want to add + 1 to the substring ending
*/
Papa.parse(fluxCSV.substring(start, _end + 1), {
step: function(results) {
if (results.data[0] === '#group') {
parsed.group = results.data.slice(1)
} else if (results.data[0] === '#datatype') {
parsed.datatype = results.data.slice(1)
} else if (results.data[0] === '#default') {
parsed.default = results.data.slice(1)
} else if (results.data[0][0] !== '#' && annotationMode === true) {
annotationMode = false
parsed.header = results.data.slice(1)
parsed.header.reduce((acc, curr, index) => {
columnKey = `${curr} (${TO_COLUMN_TYPE[parsed.datatype[index]]})`
parsed.columnKey.push(columnKey)
if (!acc[columnKey]) {
acc[columnKey] = {
name: curr,
type: TO_COLUMN_TYPE[parsed.datatype[index]],
fluxDataType: parsed.datatype[index],
data: [],
}
}
if (parsed.group[index] === 'true') {
fluxGroupKeyUnion.add(columnKey)
}
return acc
}, columns)
} else {
result = null
results.data.slice(1).forEach((data, index) => {
const value = data || parsed.default[index]
let result = null

if (value === undefined) {
result = undefined
} else if (value === 'null') {
result = null
} else if (value === 'NaN') {
result = NaN
} else if (
TO_COLUMN_TYPE[parsed.datatype[index]] === 'boolean' &&
value === 'true'
) {
result = true
} else if (
TO_COLUMN_TYPE[parsed.datatype[index]] === 'boolean' &&
value === 'false'
) {
result = false
} else if (TO_COLUMN_TYPE[parsed.datatype[index]] === 'string') {
result = value
} else if (TO_COLUMN_TYPE[parsed.datatype[index]] === 'time') {
if (/\s/.test(value)) {
result = Date.parse(value.trim())
} else {
result = Date.parse(value)
}
} else if (TO_COLUMN_TYPE[parsed.datatype[index]] === 'number') {
if (value === '') {
result = null
} else {
const parsedValue = Number(value)
result = parsedValue === parsedValue ? parsedValue : value
}
} else {
result = null
}

if (columns[parsed.columnKey[index]] !== undefined) {
if (
columns[parsed.columnKey[index]].name === RESULT &&
result
) {
resultColumnNames.add(result)
}
columns[parsed.columnKey[index]].data[tableLength] = result
}
})
tableLength++
}

columns[columnKey].data[tableLength + i] = result
}

if (annotationData.groupKey.includes(columnName)) {
fluxGroupKeyUnion.add(columnKey)
}
}

tableLength += tableData.length
},
})
}

resolveNames(columns, fluxGroupKeyUnion)

const table = Object.entries(columns).reduce(
(table, [key, {name, fluxDataType, type, data}]) => {
data.length = tableLength
Expand Down
2 changes: 1 addition & 1 deletion stories/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@influxdata/giraffe-stories",
"version": "2.27.1",
"version": "2.28.0",
"license": "MIT",
"repository": {
"type": "git",
Expand Down

0 comments on commit 3f1b857

Please sign in to comment.