Skip to content

Commit

Permalink
fixes for sql block pagination and sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Feb 1, 2025
1 parent f9b3bc9 commit cc1eeaa
Show file tree
Hide file tree
Showing 16 changed files with 355 additions and 314 deletions.
21 changes: 17 additions & 4 deletions apps/api/src/python/query/athena.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,16 @@ def briefer_make_athena_query():
df, columns = to_pandas(data)
result = {
"version": 2,
"type": "success",
"rows": json.loads(df.head(250).to_json(orient='records', date_format="iso")),
"columns": get_columns_result(columns),
"rows": json.loads(df.head(50).to_json(orient='records', date_format="iso")),
"count": len(df),
"columns": get_columns_result(columns),
"page": 0,
"pageSize": 50,
"pageCount": int(len(df) // 50 + 1),
}
print(json.dumps(result, ensure_ascii=False, default=str))
Expand Down Expand Up @@ -364,11 +370,18 @@ def briefer_make_athena_query():
return
result = {
"version": 2,
"type": "success",
"rows": json.loads(df.head(250).to_json(orient='records', date_format="iso")),
"columns": get_columns_result(columns),
"rows": json.loads(df.head(50).to_json(orient='records', date_format="iso")),
"count": len(df),
"durationMs": query_status.get("QueryExecution", {}).get("Statistics", {}).get("TotalExecutionTimeInMillis", None),
"page": 0,
"pageSize": 50,
"pageCount": int(len(df) // 50 + 1),
"queryDurationMs": query_status.get("QueryExecution", {}).get("Statistics", {}).get("TotalExecutionTimeInMillis", None),
}
print(json.dumps(result, ensure_ascii=False, default=str))
Expand Down
38 changes: 28 additions & 10 deletions apps/api/src/python/query/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,16 @@ def _briefer_make_bq_query():
print(json.dumps({"type": "log", "message": f"rows count {query_result.total_rows}"}))
if query_result.total_rows == 0:
result = {
"version": 2,
"type": "success",
"rows": [],
"columns": [],
"count": 0
"rows": [],
"count": 0,
"page": 0,
"pageSize": 50,
"pageCount": 1,
}
df = query_result.to_dataframe()
convert_columns(df, columns_by_type)
Expand Down Expand Up @@ -176,10 +182,10 @@ def _briefer_make_bq_query():
rows_count += len(chunk)
chunks.append(chunk)
if len(initial_rows) < 250:
if len(initial_rows) < 50:
df = pd.concat(chunks, ignore_index=True)
convert_columns(df, columns_by_type)
initial_rows = json.loads(df.head(250).to_json(orient='records', date_format="iso"))
initial_rows = json.loads(df.head(50).to_json(orient='records', date_format="iso"))
# convert all values to string to make sure we preserve the python values
# when displaying this data in the browser
Expand All @@ -193,10 +199,16 @@ def _briefer_make_bq_query():
now = time.time()
if now - last_emitted_at > 1:
result = {
"version": 2,
"type": "success",
"rows": initial_rows,
"columns": columns,
"count": rows_count
"rows": initial_rows,
"count": rows_count,
"page": 0,
"pageSize": 50,
"pageCount": int(rows_count // 50 + 1),
}
print(json.dumps({"type": "log", "message": f"Emitting {rows_count} rows"}))
print(json.dumps(result, default=str))
Expand All @@ -213,8 +225,8 @@ def _briefer_make_bq_query():
print(json.dumps(result, default=str))
return None
if len(initial_rows) < 250:
initial_rows = json.loads(df.head(250).to_json(orient='records', date_format="iso"))
if len(initial_rows) < 50:
initial_rows = json.loads(df.head(50).to_json(orient='records', date_format="iso"))
# convert all values to string to make sure we preserve the python values
# when displaying this data in the browser
Expand All @@ -224,10 +236,16 @@ def _briefer_make_bq_query():
columns = get_columns(df)
result = {
"version": 2,
"type": "success",
"rows": initial_rows,
"columns": columns,
"count": rows_count
"rows": initial_rows,
"count": rows_count,
"page": 0,
"pageSize": 50,
"pageCount": int(rows_count // 50 + 1),
}
df = pd.concat(chunks, ignore_index=True)
Expand Down
20 changes: 16 additions & 4 deletions apps/api/src/python/query/duckdb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,22 @@ def _briefer_make_duckdb_query():
query = duckdb.query(${JSON.stringify(renderedQuery)})
if query == None:
result = {
"version": 2,
"type": "success",
"rows": [],
"columns": [],
"count": 0
"rows": [],
"count": 0,
"page": 0,
"pageSize": 50,
"pageCount": 1,
}
print(json.dumps(result, ensure_ascii=False, default=str))
return
df = query.df()
rows = json.loads(df.head(250).to_json(orient='records', date_format='iso'))
rows = json.loads(df.head(50).to_json(orient='records', date_format='iso'))
# convert all values to string to make sure we preserve the python values
# when displaying this data in the browser
Expand All @@ -75,10 +81,16 @@ def _briefer_make_duckdb_query():
except:
pass
result = {
"version": 2,
"type": "success",
"rows": rows,
"columns": columns,
"rows": rows,
"count": len(df)
"page": 0,
"pageSize": 50,
"pageCount": int(len(df) // 50 + 1),
}
print(json.dumps(result, ensure_ascii=False, default=str))
df.to_parquet(parquet_file_path, compression='gzip', index=False)
Expand Down
25 changes: 22 additions & 3 deletions apps/api/src/python/query/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { DataSource } from '@briefer/database'
import { executeCode } from '../index.js'
import {
AbortErrorRunQueryResult,
DataFrame,
DataFrameColumn,
DataFrameStringColumn,
PythonErrorRunQueryResult,
RunQueryResult,
SQLQueryConfiguration,
SuccessRunQueryResult,
SuccessRunQueryResultV2,
SyntaxErrorRunQueryResult,
TableSort,
jsonString,
} from '@briefer/types'
Expand Down Expand Up @@ -341,6 +345,12 @@ del _briefer_read_query`
return [resultPromise, abortFunction]
}

export type ReadDataFramePageResult =
| Omit<SuccessRunQueryResultV2, 'queryDurationMs'>
| SyntaxErrorRunQueryResult
| AbortErrorRunQueryResult
| PythonErrorRunQueryResult

export async function readDataframePage(
workspaceId: string,
sessionId: string,
Expand All @@ -349,7 +359,7 @@ export async function readDataframePage(
page: number,
pageSize: number,
sort: TableSort | null
): Promise<RunQueryResult | null> {
): Promise<ReadDataFramePageResult> {
const code = `import json
sort_config = json.loads(${JSON.stringify(JSON.stringify(sort))})
Expand Down Expand Up @@ -386,14 +396,19 @@ if "${dataframeName}" in globals():
columns = [{"name": col, "type": dtype.name} for col, dtype in ${dataframeName}.dtypes.items()]
result = {
"version": 2,
"type": "success",
"rows": rows,
"count": len(${dataframeName}),
"columns": columns
"columns": columns,
"page": ${page},
"pageSize": ${pageSize},
"pageCount": int(len(${dataframeName}) / ${pageSize} + 1),
}
print(json.dumps(result))`

let result: RunQueryResult | null = null
let result: ReadDataFramePageResult | null = null
let error: Error | null = null
await (
await executeCode(
Expand Down Expand Up @@ -441,6 +456,10 @@ if "${dataframeName}" in globals():
throw error
}

if (!result) {
throw new Error('No result')
}

return result
}

Expand Down
23 changes: 18 additions & 5 deletions apps/api/src/python/query/sqlalchemy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def briefer_make_sqlalchemy_query():
chunk = rename_duplicates(chunk)
df = convert_df(pd.concat([df, chunk], ignore_index=True))
if rows is None:
rows = json.loads(df.head(250).to_json(orient='records', date_format="iso"))
rows = json.loads(df.head(50).to_json(orient='records', date_format="iso"))
# convert all values to string to make sure we preserve the python values
# when displaying this data in the browser
Expand Down Expand Up @@ -183,10 +183,16 @@ def briefer_make_sqlalchemy_query():
now = time.time()
if now - last_emitted_at > 1:
result = {
"version": 2,
"type": "success",
"rows": rows,
"columns": columns,
"count": count
"rows": rows,
"count": count,
"page": 0,
"pageSize": 50,
"pageCount": int(len(df) // 50 + 1),
}
print(json.dumps(result, ensure_ascii=False, default=str))
last_emitted_at = now
Expand Down Expand Up @@ -238,11 +244,18 @@ def briefer_make_sqlalchemy_query():
return
result = {
"version": 2,
"type": "success",
"rows": rows,
"columns": columns,
"rows": rows,
"count": count,
"durationMs": duration_ms,
"page": 0,
"pageSize": 50,
"pageCount": int(len(df) // 50 + 1),
"queryDurationMs": duration_ms,
}
print(json.dumps(result, ensure_ascii=False, default=str))
queue.put(None)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,73 +1,8 @@
import { z } from 'zod'

import { getParam } from '../../../../../../../utils/express.js'
import { Response, Request, Router } from 'express'
import { Router } from 'express'
import csvRouter from './csv.js'
import { readDataframePage } from '../../../../../../../python/query/index.js'
import { getJupyterManager } from '../../../../../../../jupyter/index.js'
import { TableSort } from '@briefer/types'

const queryRouter = Router({ mergeParams: true })

export async function getQueryHandler(req: Request, res: Response) {
const workspaceId = getParam(req, 'workspaceId')
const documentId = getParam(req, 'documentId')

const payload = z
.object({
dataframeName: z.string(),
page: z.preprocess(
(a) => parseInt(z.string().parse(a), 10),
z.number().nonnegative()
),
pageSize: z.preprocess(
(a) => parseInt(z.string().parse(a), 10),
z.number().nonnegative()
),
sortColumn: TableSort.shape.column.optional().nullable(),
sortOrder: TableSort.shape.order.optional().nullable(),
})
.safeParse(req.query)

if (!payload.success) {
res.status(400).end()
return
}

const data = payload.data

const pageSize = Math.min(data.pageSize, 250)
const queryId = getParam(req, 'queryId')

try {
await getJupyterManager().ensureRunning(workspaceId)
const result = await readDataframePage(
workspaceId,
documentId,
queryId,
data.dataframeName,
data.page,
pageSize,
data.sortColumn && data.sortOrder
? { column: data.sortColumn, order: data.sortOrder }
: null
)
if (!result) {
res.status(404).end()
return
}

res.json(result)
} catch (err) {
req.log.error(
{ err, workspaceId, documentId, queryId },
'Error while executing query'
)
res.status(500).json({ error: 'unexpected' })
}
}

queryRouter.get('/', getQueryHandler)
queryRouter.use('/csv', csvRouter)

export default queryRouter
Loading

0 comments on commit cc1eeaa

Please sign in to comment.