Skip to content

Commit

Permalink
perf: csv import (#746)
Browse files Browse the repository at this point in the history
* feat: using worker parse csv

* fix: import multiple column error

* feat: update webpack config for import worker

* fix: vitest worker file path error

* fix: excel import missing key

* feat: using `convertCellValue2DBValue` transfer cellvalue

* feat: add workerId escape conflict

* fix: sqlite e2e error

* feat: compact filter input
  • Loading branch information
caoxing9 authored Jul 24, 2024
1 parent 1dc5216 commit 2c3b888
Show file tree
Hide file tree
Showing 12 changed files with 482 additions and 98 deletions.
3 changes: 2 additions & 1 deletion apps/nestjs-backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@
"@types/oauth2orize": "1.11.5",
"@types/papaparse": "5.3.14",
"@types/passport": "1.0.16",
"@types/passport-openidconnect": "0.1.3",
"@types/passport-github2": "1.2.9",
"@types/passport-google-oauth20": "2.0.14",
"@types/passport-jwt": "4.0.1",
"@types/passport-local": "1.0.38",
"@types/passport-oauth2-client-password": "0.1.5",
"@types/passport-openidconnect": "0.1.3",
"@types/pause": "0.1.3",
"@types/sharedb": "3.3.10",
"@types/ws": "8.5.10",
Expand Down Expand Up @@ -152,6 +152,7 @@
"cookie-parser": "1.4.6",
"cors": "2.8.5",
"dayjs": "1.11.10",
"esbuild": "0.23.0",
"express": "4.19.1",
"express-session": "1.18.0",
"fs-extra": "11.2.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { join } from 'path';
import { Worker } from 'worker_threads';
import { Injectable, Logger, BadRequestException } from '@nestjs/common';
import type { IFieldRo } from '@teable/core';
import { FieldType, FieldKeyType } from '@teable/core';
import { FieldType, FieldKeyType, getRandomString } from '@teable/core';
import { PrismaService } from '@teable/db-main-prisma';
import type {
IAnalyzeRo,
Expand Down Expand Up @@ -172,77 +174,106 @@ export class ImportOpenApiService {
sourceColumnMap?: Record<string, number | null>;
}
) {
const { skipFirstNLines, sheetKey, notification } = options;
const { sheetKey, notification } = options;
const { columnInfo, fields, sourceColumnMap } = recordsCal;

importer.parse(
{
skipFirstNLines,
key: sheetKey,
const workerId = `worker_${getRandomString(8)}`;
const worker = new Worker(join(process.cwd(), 'dist', 'worker', 'parse.js'), {
workerData: {
config: importer.getConfig(),
options: {
key: options.sheetKey,
notification: options.notification,
skipFirstNLines: options.skipFirstNLines,
},
id: workerId,
},
async (result) => {
const currentResult = result[sheetKey];
// fill data
const records = currentResult.map((row) => {
const res: { fields: Record<string, unknown> } = {
fields: {},
};
// import new table
if (columnInfo) {
columnInfo.forEach((col, index) => {
const { sourceColumnIndex } = col;
// empty row will be return void row value
const value = Array.isArray(row) ? row[sourceColumnIndex] : null;
res.fields[fields[index].id] = value?.toString();
});
}
// inplace records
if (sourceColumnMap) {
for (const [key, value] of Object.entries(sourceColumnMap)) {
if (value !== null) {
const { type } = fields.find((f) => f.id === key) || {};
// link value should be string
res.fields[key] = type === FieldType.Link ? toString(row[value]) : row[value];
});

worker.on('message', async (result) => {
const { type, data, chunkId, id } = result;
switch (type) {
case 'chunk': {
const currentResult = (data as Record<string, unknown[][]>)[sheetKey];
// fill data
const records = currentResult.map((row) => {
const res: { fields: Record<string, unknown> } = {
fields: {},
};
// import new table
if (columnInfo) {
columnInfo.forEach((col, index) => {
const { sourceColumnIndex } = col;
// empty row will be return void row value
const value = Array.isArray(row) ? row[sourceColumnIndex] : null;
res.fields[fields[index].id] = value?.toString();
});
}
// inplace records
if (sourceColumnMap) {
for (const [key, value] of Object.entries(sourceColumnMap)) {
if (value !== null) {
const { type } = fields.find((f) => f.id === key) || {};
// link value should be string
res.fields[key] = type === FieldType.Link ? toString(row[value]) : row[value];
}
}
}
}
return res;
});
if (records.length === 0) {
return;
}
try {
const createFn = columnInfo
? this.recordOpenApiService.createRecordsOnlySql.bind(this.recordOpenApiService)
: this.recordOpenApiService.multipleCreateRecords.bind(this.recordOpenApiService);
await createFn(table.id, {
fieldKeyType: FieldKeyType.Id,
typecast: true,
records,
return res;
});
} catch (e) {
this.logger.error((e as Error)?.message, (e as Error)?.stack);
throw e;
if (records.length === 0) {
return;
}
try {
const createFn = columnInfo
? this.recordOpenApiService.createRecordsOnlySql.bind(this.recordOpenApiService)
: this.recordOpenApiService.multipleCreateRecords.bind(this.recordOpenApiService);
workerId === id &&
(await createFn(table.id, {
fieldKeyType: FieldKeyType.Id,
typecast: true,
records,
}));
worker.postMessage({ type: 'done', chunkId });
} catch (e) {
this.logger.error((e as Error)?.message, (e as Error)?.stack);
throw e;
}
break;
}
},
() => {
notification &&
this.notificationService.sendImportResultNotify({
baseId,
tableId: table.id,
toUserId: userId,
message: `🎉 ${table.name} ${sourceColumnMap ? 'inplace' : ''} imported successfully`,
});
},
(error) => {
notification &&
this.notificationService.sendImportResultNotify({
baseId,
tableId: table.id,
toUserId: userId,
message: `❌ ${table.name} import failed: ${error}`,
});
case 'finished':
workerId === id &&
notification &&
this.notificationService.sendImportResultNotify({
baseId,
tableId: table.id,
toUserId: userId,
message: `🎉 ${table.name} ${sourceColumnMap ? 'inplace' : ''} imported successfully`,
});
break;
case 'error':
workerId === id &&
notification &&
this.notificationService.sendImportResultNotify({
baseId,
tableId: table.id,
toUserId: userId,
message: `❌ ${table.name} import failed: ${data}`,
});
break;
}
);
});
worker.on('error', (e) => {
notification &&
this.notificationService.sendImportResultNotify({
baseId,
tableId: table.id,
toUserId: userId,
message: `❌ ${table.name} import failed: ${e.message}`,
});
});
worker.on('exit', (code) => {
this.logger.log(`Worker stopped with exit code ${code}`);
});
}
}
36 changes: 31 additions & 5 deletions apps/nestjs-backend/src/features/import/open-api/import.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ const validateZodSchemaMap: Record<IValidateTypes, ZodType> = {
[FieldType.SingleLineText]: z.string(),
};

interface IImportConstructorParams {
export interface IImportConstructorParams {
url: string;
type: SUPPORTEDTYPE;
maxRowCount?: number;
fileName?: string;
}

interface IParseResult {
Expand Down Expand Up @@ -74,6 +75,14 @@ export abstract class Importer {
]
): Promise<IParseResult>;

private setFileNameFromHeader(fileName: string) {
this.config.fileName = fileName;
}

getConfig() {
return this.config;
}

async getFile() {
const { url, type } = this.config;
const { body: stream, headers } = await fetch(url);
Expand All @@ -97,12 +106,29 @@ export abstract class Importer {
);
}

return stream;
const contentDisposition = headers.get('content-disposition');
let fileName = 'Import Table.csv';

if (contentDisposition) {
const fileNameMatch =
contentDisposition.match(/filename\*=UTF-8''([^;]+)/) ||
contentDisposition.match(/filename="?([^"]+)"?/);
if (fileNameMatch) {
fileName = fileNameMatch[1];
}
}

const finalFileName = fileName.split('.').shift() as string;

this.setFileNameFromHeader(decodeURIComponent(finalFileName));

return { stream, fileName: finalFileName };
}

async genColumns() {
const supportTypes = Importer.SUPPORTEDTYPE;
const parseResult = await this.parse();
const { fileName, type } = this.config;
const result: IAnalyzeVo['worksheets'] = {};

for (const [sheetName, cols] of Object.entries(parseResult)) {
Expand Down Expand Up @@ -154,7 +180,7 @@ export abstract class Importer {
});

result[sheetName] = {
name: sheetName,
name: type === SUPPORTEDTYPE.EXCEL ? sheetName : fileName ? fileName : sheetName,
columns: calculatedColumnHeaders,
};
}
Expand Down Expand Up @@ -185,7 +211,7 @@ export class CsvImporter extends Importer {
]
): Promise<unknown> {
const [options, chunkCb, onFinished, onError] = args;
const stream = await this.getFile();
const { stream } = await this.getFile();

// chunk parse
if (options && chunkCb) {
Expand Down Expand Up @@ -299,7 +325,7 @@ export class ExcelImporter extends Importer {
onFinished?: () => void,
onError?: (errorMsg: string) => void
): Promise<unknown> {
const fileSteam = await this.getFile();
const { stream: fileSteam } = await this.getFile();

const asyncRs = async (stream: NodeJS.ReadableStream): Promise<IParseResult> =>
new Promise((res, rej) => {
Expand Down
9 changes: 5 additions & 4 deletions apps/nestjs-backend/src/features/record/record.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -676,18 +676,19 @@ export class RecordService {
await this.creditCheck(tableId);
const dbTableName = await this.getDbTableName(tableId);
const fields = await this.getFieldsByProjection(tableId);
const fieldsMap = fields.reduce(
const fieldInstanceMap = fields.reduce(
(map, curField) => {
map[curField.id] = curField.dbFieldName;
map[curField.id] = curField;
return map;
},
{} as Record<string, string>
{} as Record<string, IFieldInstance>
);

const newRecords = records.map((record) => {
const fieldsValues: Record<string, unknown> = {};
Object.entries(record.fields).forEach(([fieldId, value]) => {
fieldsValues[fieldsMap[fieldId]] = value;
const fieldInstance = fieldInstanceMap[fieldId];
fieldsValues[fieldInstance.dbFieldName] = fieldInstance.convertCellValue2DBValue(value);
});
return {
__id: generateRecordId(),
Expand Down
41 changes: 41 additions & 0 deletions apps/nestjs-backend/src/worker/parse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { parentPort, workerData } from 'worker_threads';
import { getRandomString } from '@teable/core';
import type { IImportConstructorParams } from '../features/import/open-api/import.class';
import { importerFactory } from '../features/import/open-api/import.class';

const parse = () => {
const { config, options, id } = { ...workerData } as {
config: IImportConstructorParams;
options: {
skipFirstNLines: number;
key: string;
};
id: string;
};
const importer = importerFactory(config.type, config);
importer.parse(
{ ...options },
async (chunk) => {
return await new Promise((resolve) => {
const chunkId = `chunk_${getRandomString(8)}`;
parentPort?.postMessage({ type: 'chunk', data: chunk, chunkId, id });
parentPort?.on('message', (result) => {
const { type, chunkId: tunnelChunkId } = result;
if (type === 'done' && tunnelChunkId === chunkId) {
resolve();
}
});
});
},
() => {
parentPort?.postMessage({ type: 'finished', id });
parentPort?.close();
},
(error) => {
parentPort?.postMessage({ type: 'error', data: error, id });
parentPort?.close();
}
);
};

parse();
11 changes: 9 additions & 2 deletions apps/nestjs-backend/test/table-import.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ const genTestFiles = async () => {

const {
data: { presignedUrl },
} = await apiNotify(token);
} = await apiNotify(token, undefined, 'Import Table.csv');

result[format] = {
path: tmpPath,
Expand Down Expand Up @@ -222,6 +222,8 @@ describe('OpenAPI ImportController (e2e)', () => {
});

describe('/import/{baseId} OpenAPI ImportController (e2e) (Post)', () => {
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

it.each(testFileFormats.filter((format) => format !== TestFileFormat.TXT))(
'should create a new Table from %s file',
async (format) => {
Expand Down Expand Up @@ -266,10 +268,15 @@ describe('OpenAPI ImportController (e2e)', () => {
name: field.name,
}));

await apiGetTableById(baseId, table.data[0].id);
await delay(1000);

const { records } = await apiGetTableById(baseId, table.data[0].id, {
includeContent: true,
});

bases.push([baseId, id]);

expect(records?.length).toBe(2);
expect(createdFields).toEqual(assertHeaders);
}
);
Expand Down
Loading

0 comments on commit 2c3b888

Please sign in to comment.