Skip to content

Commit

Permalink
feat: pgroll support import (#1470)
Browse files Browse the repository at this point in the history
  • Loading branch information
eemmiillyy authored and SferaDev committed May 17, 2024
1 parent 392125e commit 9a81c47
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 22 deletions.
3 changes: 3 additions & 0 deletions cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"ini": "^4.1.2",
"lodash.compact": "^3.0.1",
"lodash.get": "^4.4.2",
"lodash.keyby": "^4.6.0",
"lodash.set": "^4.3.2",
"node-fetch": "^3.3.2",
"open": "^10.1.0",
Expand All @@ -51,6 +52,7 @@
"text-table": "^0.2.0",
"tmp": "^0.2.3",
"tslib": "^2.6.2",
"type-fest": "^4.18.1",
"which": "^4.0.0",
"zod": "^3.23.8"
},
Expand All @@ -59,6 +61,7 @@
"@types/babel__core": "^7.20.5",
"@types/lodash.compact": "^3.0.9",
"@types/lodash.get": "^4.4.9",
"@types/lodash.keyby": "^4.6.9",
"@types/lodash.set": "^4.3.9",
"@types/relaxed-json": "^1.0.4",
"@types/text-table": "^0.2.5",
Expand Down
83 changes: 63 additions & 20 deletions cli/src/commands/import/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ import { importColumnTypes } from '@xata.io/importer';
import { open, writeFile } from 'fs/promises';
import { BaseCommand } from '../../base.js';
import { enumFlag } from '../../utils/oclif.js';
import { getBranchDetailsWithPgRoll } from '../../migrations/pgroll.js';
import {
getBranchDetailsWithPgRoll,
waitForMigrationToFinish,
xataColumnTypeToPgRollComment
} from '../../migrations/pgroll.js';
import { compareSchemas } from '../../utils/compareSchema.js';
import keyBy from 'lodash.keyby';

const ERROR_CONSOLE_LOG_LIMIT = 200;
const ERROR_LOG_FILE = 'errors.log';
Expand All @@ -23,6 +29,8 @@ const bufferEncodings: BufferEncoding[] = [
'hex'
];

const INTERNAL_COLUMNS_PGROLL = ['xata_id', 'xata_createdat', 'xata_updatedat', 'xata_version'];

export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
static description = 'Import a CSV file';

Expand Down Expand Up @@ -144,12 +152,26 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
if (!parseResults.success) {
throw new Error('Failed to parse CSV file');
}
const batchRows = parseResults.data.map(({ data }) => data);
const batchRows = parseResults.data.map(({ data }) => {
const formattedRow: { [k: string]: any } = {};
const keys = Object.keys(data);
for (const key of keys) {
if (INTERNAL_COLUMNS_PGROLL.includes(key) && key !== 'xata_id') continue;
formattedRow[key] = data[key];
}
return formattedRow;
});

const importResult = await xata.import.importBatch(
{ workspace, region, database, branch },
{ columns: parseResults.columns, table, batchRows }
{
columns: parseResults.columns.filter(
({ name }) => name === 'xata_id' || !INTERNAL_COLUMNS_PGROLL.includes(name)
),
table,
batchRows
}
);

await xata.import.importFiles(
{ database, branch, region, workspace: workspace },
{
Expand Down Expand Up @@ -212,22 +234,39 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
const xata = await this.getXataClient();
const { workspace, region, database, branch } = await this.parseDatabase();
const { schema: existingSchema } = await getBranchDetailsWithPgRoll(xata, { workspace, region, database, branch });
const newSchema = {
tables: [
...existingSchema.tables.filter((t) => t.name !== table),
{ name: table, columns: columns.filter((c) => c.name !== 'id') }
]
};

const { edits } = await xata.api.migrations.compareBranchWithUserSchema({
pathParams: { workspace, region, dbBranchName: `${database}:main` },
body: { schema: newSchema }
});
if (edits.operations.length > 0) {
const destructiveOperations = edits.operations
const { edits } = compareSchemas(
{},
{
tables: {
[table]: {
name: table,
xataCompatible: false,
columns: keyBy(
columns
.filter((c) => !INTERNAL_COLUMNS_PGROLL.includes(c.name as any))
.map((c) => {
return {
name: c.name,
type: c.type,
nullable: c.notNull !== false,
default: c.defaultValue ?? null,
unique: c.unique,
comment: xataColumnTypeToPgRollComment(c)
};
}),
'name'
)
}
}
}
);

if (edits.length > 0) {
const destructiveOperations = edits
.map((op) => {
if (!('removeColumn' in op)) return undefined;
return op.removeColumn.column;
if (!('drop_column' in op)) return undefined;
return op.drop_column.column;
})
.filter((x) => x !== undefined);

Expand Down Expand Up @@ -262,10 +301,14 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
process.exit(1);
}

await xata.api.migrations.applyBranchSchemaEdit({
const { jobID } = await xata.api.migrations.applyMigration({
pathParams: { workspace, region, dbBranchName: `${database}:${branch}` },
body: { edits }
body: {
adaptTables: true,
operations: edits
}
});
await waitForMigrationToFinish(xata.api, workspace, region, database, branch, jobID);
}
}
}
Expand Down
129 changes: 129 additions & 0 deletions cli/src/utils/compareSchema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import { PgRollOperation } from '@xata.io/pgroll';
import { PartialDeep } from 'type-fest';
import { Schemas } from '@xata.io/client';
import { generateLinkReference, tableNameFromLinkComment, xataColumnTypeToPgRoll } from '../migrations/pgroll.js';

export function compareSchemas(
source: PartialDeep<Schemas.BranchSchema>,
target: PartialDeep<Schemas.BranchSchema>
): { edits: PgRollOperation[] } {
const edits: PgRollOperation[] = [];

// Compare tables
const sourceTables = Object.keys(source.tables ?? {});
const targetTables = Object.keys(target.tables ?? {});
const newTables = targetTables.filter((table) => !sourceTables.includes(table));
const deletedTables = sourceTables.filter((table) => !targetTables.includes(table));

// Compare columns
for (const table of sourceTables) {
const sourceColumns = Object.keys(source.tables?.[table]?.columns ?? {});
const targetColumns = Object.keys(target.tables?.[table]?.columns ?? {});
const newColumns = targetColumns.filter((column) => !sourceColumns.includes(column));
const deletedColumns = sourceColumns.filter((column) => !targetColumns.includes(column));

// Add columns
for (const column of newColumns) {
const props = target.tables?.[table]?.columns?.[column] ?? {};
edits.push({
add_column: {
table,
column: {
name: column,
type: xataColumnTypeToPgRoll(props?.type as any),
comment: props?.comment,
nullable: !(props?.nullable === false),
unique: props?.unique,
default: props?.default ?? undefined,
references:
props?.type === 'link' && props?.name
? generateLinkReference({
column: props.name,
table: tableNameFromLinkComment(props?.comment ?? '') ?? ''
})
: undefined
}
}
});
}

// Delete columns
for (const column of deletedColumns) {
edits.push({ drop_column: { table, column } });
}

// Compare column properties
for (const column of targetColumns) {
const sourceProps = source.tables?.[table]?.columns?.[column] ?? {};
const targetProps = target.tables?.[table]?.columns?.[column] ?? {};

if (sourceProps.type !== targetProps.type) {
edits.push({
alter_column: {
table,
column,
type: targetProps.type,
references:
targetProps?.type === 'link' && targetProps?.name
? generateLinkReference({
column: targetProps.name,
table: tableNameFromLinkComment(targetProps?.comment ?? '') ?? ''
})
: undefined
}
});
}

if (sourceProps.nullable !== targetProps.nullable) {
edits.push({ alter_column: { table, column, nullable: targetProps.nullable } });
}

if (sourceProps.unique !== targetProps.unique) {
edits.push({
alter_column: {
table,
column,
unique: {
name: `${table}_${column}_unique`
}
}
});
}
}
}

// Delete tables
for (const table of deletedTables) {
edits.push({ drop_table: { name: table } });
}

// Add new tables
for (const table of newTables) {
const props = target.tables?.[table] ?? {};
edits.push({
create_table: {
name: table,
comment: props.comment,
columns: Object.entries(props.columns ?? {}).map(([name, column]) => {
return {
name,
type: xataColumnTypeToPgRoll(column?.type as any),
comment: column?.comment,
nullable: !(column?.nullable === false),
unique: column?.unique,
default: column?.default ?? undefined,
references:
column?.type === 'link' && column?.name
? generateLinkReference({
column: column?.name,
table: tableNameFromLinkComment(column?.comment ?? '') ?? ''
})
: undefined
};
})
}
});
}

return { edits };
}
28 changes: 26 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9a81c47

Please sign in to comment.