diff --git a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/DeleteAllCmsEntries.ts b/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/DeleteAllCmsEntries.ts deleted file mode 100644 index 7d05ac4e85b..00000000000 --- a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/DeleteAllCmsEntries.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { Logger } from "@webiny/logger"; -import { SegmentProcessor } from "./SegmentProcessor"; - -interface SegmentProcessorParams { - ddbTable: string; - ddbEsTable: string; - totalSegments: number; - logger: Logger; -} - -export class DeleteAllCmsEntries { - private readonly ddbTable: string; - private readonly ddbEsTable: string; - private readonly totalSegments: number; - private readonly logger: Logger; - - constructor(params: SegmentProcessorParams) { - this.ddbTable = params.ddbTable; - this.ddbEsTable = params.ddbEsTable; - this.totalSegments = params.totalSegments; - this.logger = params.logger; - } - - async execute() { - const scanProcessesPromises = []; - - const start = Date.now(); - const getDuration = () => { - return (Date.now() - start) / 1000; - }; - - for (let segmentIndex = 0; segmentIndex < this.totalSegments; segmentIndex++) { - const segmentProcessor = new SegmentProcessor({ - segmentIndex, - totalSegments: this.totalSegments, - ddbTable: this.ddbTable, - ddbEsTable: this.ddbEsTable - }); - - scanProcessesPromises.push(segmentProcessor.execute()); - } - - await Promise.all(scanProcessesPromises); - - this.logger.trace(`All CMS entries have been deleted. Took: ${getDuration()}s`); - } -} diff --git a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/SegmentProcessor.ts b/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/SegmentProcessor.ts deleted file mode 100644 index 783a32c886f..00000000000 --- a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/SegmentProcessor.ts +++ /dev/null @@ -1,44 +0,0 @@ -import execa from "execa"; -import path from "path"; - -interface SegmentProcessorParams { - ddbTable: string; - ddbEsTable: string; - segmentIndex: number; - totalSegments: number; -} - -export class SegmentProcessor { - private readonly ddbTable: string; - private readonly ddbEsTable: string; - private readonly segmentIndex: number; - private readonly totalSegments: number; - - constructor(params: SegmentProcessorParams) { - this.ddbTable = params.ddbTable; - this.ddbEsTable = params.ddbEsTable; - this.segmentIndex = params.segmentIndex; - this.totalSegments = params.totalSegments; - } - - async execute() { - return execa( - "node", - [ - path.join(__dirname, "worker"), - "--ddbTable", - this.ddbTable, - "--ddbEsTable", - this.ddbEsTable, - "--segmentIndex", - String(this.segmentIndex), - "--totalSegments", - String(this.totalSegments) - ], - { - stdio: "inherit", - env: process.env - } - ); - } -} diff --git a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/bin.ts b/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/bin.ts deleted file mode 100644 index 344ec0adba3..00000000000 --- a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/bin.ts +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env node -import yargs from "yargs/yargs"; -import { hideBin } from "yargs/helpers"; -import { DeleteAllCmsEntries } from "./DeleteAllCmsEntries"; -import { createPinoLogger, getLogLevel } from "@webiny/logger"; -import pinoPretty from "pino-pretty"; - -const argv = yargs(hideBin(process.argv)) - .options({ - ddbTable: { type: "string", demandOption: true }, - ddbEsTable: { type: "string", demandOption: true }, - segments: { type: "number", demandOption: true } - }) - .parseSync(); - -(async () => { - const logger = createPinoLogger( - { - level: getLogLevel(process.env.MIGRATIONS_LOG_LEVEL, "trace") - }, - pinoPretty({ ignore: "pid,hostname" }) - ); - - const deleteAllCmsEntries = new DeleteAllCmsEntries({ - totalSegments: argv.segments, - ddbTable: argv.ddbTable, - ddbEsTable: argv.ddbEsTable, - logger - }); - - await deleteAllCmsEntries.execute(); -})(); diff --git a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/worker.ts b/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/worker.ts deleted file mode 100644 index 06815f6681e..00000000000 --- a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/deleteAllCmsEntries/worker.ts +++ /dev/null @@ -1,196 +0,0 @@ -import { executeWithRetry } from "@webiny/utils"; -import { createPinoLogger, getLogLevel } from "@webiny/logger"; -import { createTable } from "@webiny/data-migration"; -import { getDocumentClient } from "@webiny/aws-sdk/client-dynamodb"; -import yargs from "yargs/yargs"; -import { hideBin } from "yargs/helpers"; -import { CmsEntry } from "~/migrations/5.39.0/001/types"; -import { - createDdbEntryEntity, - createDdbEsEntryEntity -} from "~/migrations/5.39.0/001/entities/createEntryEntity"; -import { batchWriteAll, BatchWriteItem, ddbScanWithCallback } from "~/utils"; -import pinoPretty from "pino-pretty"; - -const argv = yargs(hideBin(process.argv)) - .options({ - ddbTable: { type: "string", demandOption: true }, - ddbEsTable: { type: "string", demandOption: true }, - segmentIndex: { type: "number", demandOption: true }, - totalSegments: { type: "number", demandOption: true } - }) - .parseSync(); - -interface LastEvaluatedKeyObject { - PK: string; - SK: string; - GSI1_PK: string; - GSI1_SK: string; -} - -type LastEvaluatedKey = LastEvaluatedKeyObject | true | null; - -interface MigrationStatus { - lastEvaluatedKey: LastEvaluatedKey; - iterationsCount: number; - recordsScanned: number; - recordsUpdated: number; - recordsSkipped: number; -} - -const createInitialStatus = (): MigrationStatus => { - return { - lastEvaluatedKey: null, - iterationsCount: 0, - recordsScanned: 0, - recordsUpdated: 0, - recordsSkipped: 0 - }; -}; - -(async () => { - const logger = createPinoLogger( - { - level: getLogLevel(process.env.MIGRATIONS_LOG_LEVEL, "trace"), - msgPrefix: `[segment #${argv.segmentIndex}] ` - }, - pinoPretty({ ignore: "pid,hostname" }) - ); - - const documentClient = getDocumentClient(); - - const primaryTable = createTable({ - name: argv.ddbTable, - documentClient - }); - const dynamoToEsTable = createTable({ - name: argv.ddbEsTable, - documentClient - }); - - const ddbEntryEntity = createDdbEntryEntity(primaryTable); - const ddbEsEntryEntity = createDdbEsEntryEntity(dynamoToEsTable); - - const status = createInitialStatus(); - - await ddbScanWithCallback( - { - entity: ddbEntryEntity, - options: { - segment: argv.segmentIndex, - segments: argv.totalSegments, - filters: [ - { - attr: "_et", - eq: "CmsEntries" - } - ], - startKey: status.lastEvaluatedKey || undefined, - limit: 100 - } - }, - async result => { - status.iterationsCount++; - status.recordsScanned += result.items.length; - - logger.trace(`Reading ${result.items.length} record(s)...`); - const ddbItemsToBatchDelete: BatchWriteItem[] = []; - const ddbEsItemsToBatchDelete: BatchWriteItem[] = []; - const ddbEsItemsToPutIntoBatchDelete: Record = {}; - - for (const item of result.items) { - ddbItemsToBatchDelete.push(ddbEntryEntity.deleteBatch(item)); - - /** - * Prepare the loading of DynamoDB Elasticsearch part of the records. - */ - - const ddbEsLatestRecordKey = `${item.entryId}:L`; - if (ddbEsItemsToPutIntoBatchDelete[ddbEsLatestRecordKey]) { - continue; - } - - ddbEsItemsToPutIntoBatchDelete[ddbEsLatestRecordKey] = { - PK: item.PK, - SK: "L" - }; - - const ddbEsPublishedRecordKey = `${item.entryId}:P`; - if (item.status === "published" || !!item.locked) { - ddbEsItemsToPutIntoBatchDelete[ddbEsPublishedRecordKey] = { - PK: item.PK, - SK: "P" - }; - } - } - - if (Object.keys(ddbEsItemsToPutIntoBatchDelete).length > 0) { - Object.values(ddbEsItemsToPutIntoBatchDelete).forEach(item => { - ddbEsItemsToBatchDelete.push(ddbEsEntryEntity.deleteBatch(item)); - }); - } - - if (ddbItemsToBatchDelete.length) { - // Store data in primary DynamoDB table. - const execute = () => { - return batchWriteAll({ - table: ddbEntryEntity.table, - items: ddbItemsToBatchDelete - }); - }; - - logger.trace( - `Deleting ${ddbItemsToBatchDelete.length} record(s) in primary DynamoDB table...` - ); - await executeWithRetry(execute, { - onFailedAttempt: error => { - logger.warn( - `Batch delete attempt #${error.attemptNumber} failed: ${error.message}` - ); - } - }); - - if (ddbEsItemsToBatchDelete.length) { - logger.trace( - `Deleting ${ddbEsItemsToBatchDelete.length} record(s) in DDB-ES DynamoDB table...` - ); - - // Store data in DDB-ES DynamoDB table. - const executeDdbEs = () => { - return batchWriteAll({ - table: ddbEsEntryEntity.table, - items: ddbEsItemsToBatchDelete - }); - }; - - await executeWithRetry(executeDdbEs, { - onFailedAttempt: error => { - logger.warn( - `[DDB-ES Table] Batch delete attempt #${error.attemptNumber} failed: ${error.message}` - ); - } - }); - } - - status.recordsUpdated += ddbItemsToBatchDelete.length; - } - - // Update checkpoint after every batch. - let lastEvaluatedKey: LastEvaluatedKey = true; - if (result.lastEvaluatedKey) { - lastEvaluatedKey = result.lastEvaluatedKey as unknown as LastEvaluatedKeyObject; - } - - status.lastEvaluatedKey = lastEvaluatedKey; - - if (lastEvaluatedKey === true) { - return false; - } - - // Continue further scanning. - return true; - } - ); - - logger.trace({ status }, "Segment processing completed."); -})(); diff --git a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/MetaFieldsMigration.ts b/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/MetaFieldsMigration.ts deleted file mode 100644 index 4bd0641972a..00000000000 --- a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/MetaFieldsMigration.ts +++ /dev/null @@ -1,211 +0,0 @@ -import { Logger } from "@webiny/logger"; -import { SegmentProcessor } from "./SegmentProcessor"; -import { - disableElasticsearchIndexing, - esListIndexes, - fetchOriginalElasticsearchSettings, - restoreOriginalElasticsearchSettings -} from "~/utils"; -import { createElasticsearchClient } from "@webiny/api-elasticsearch"; -import { createWaitUntilHealthy } from "@webiny/api-elasticsearch/utils/waitUntilHealthy"; -import { DEFAULT_ES_HEALTH_CHECKS_PARAMS, EsHealthChecksParams } from "../../utils"; -import path from "path"; -import os from "os"; -import fs from "fs"; -import glob from "fast-glob"; - -export interface MetaFieldsMigrationParams { - ddbTable: string; - ddbEsTable: string; - esEndpoint: string; - totalSegments: number; - logger: Logger; - - // Elasticsearch health check options. - esHealthChecks?: Partial; -} - -export class MetaFieldsMigration { - private readonly runId: string; - private readonly ddbTable: string; - private readonly ddbEsTable: string; - private readonly esEndpoint: string; - private readonly totalSegments: number; - private readonly logger: Logger; - - private readonly esHealthChecks: EsHealthChecksParams; - - constructor(params: MetaFieldsMigrationParams) { - this.runId = String(new Date().getTime()); - this.ddbTable = params.ddbTable; - this.ddbEsTable = params.ddbEsTable; - this.esEndpoint = params.esEndpoint; - this.totalSegments = params.totalSegments; - this.logger = params.logger; - this.esHealthChecks = { - ...DEFAULT_ES_HEALTH_CHECKS_PARAMS, - ...params.esHealthChecks - }; - } - - async execute() { - const scanProcessesPromises = []; - - const start = Date.now(); - const getDuration = () => { - return (Date.now() - start) / 1000; - }; - - this.logger.info("Starting 5.39.6-001 meta fields data migration..."); - this.logger.info( - { - ddbTable: this.ddbTable, - ddbEsTable: this.ddbEsTable, - esEndpoint: this.esEndpoint, - totalSegments: this.totalSegments, - esHealthChecks: this.esHealthChecks - }, - "Received the following parameters:" - ); - - const elasticsearchClient = createElasticsearchClient({ - endpoint: `https://${this.esEndpoint}` - }); - - this.logger.info("Checking Elasticsearch health status..."); - const waitUntilHealthy = createWaitUntilHealthy(elasticsearchClient, this.esHealthChecks); - this.logger.info("Elasticsearch is healthy."); - - await waitUntilHealthy.wait(); - - const indexes = await esListIndexes({ elasticsearchClient, match: "-headless-cms-" }); - const indexSettings: Record = {}; - for (const indexName of indexes) { - this.logger.info(`Disabling indexing for Elasticsearch index "${indexName}"...`); - indexSettings[indexName] = await fetchOriginalElasticsearchSettings({ - elasticsearchClient, - index: indexName, - logger: this.logger - }); - - await disableElasticsearchIndexing({ - elasticsearchClient, - index: indexName, - logger: this.logger - }); - } - - this.logger.info("Proceeding with the migration..."); - - for (let segmentIndex = 0; segmentIndex < this.totalSegments; segmentIndex++) { - const segmentProcessor = new SegmentProcessor({ - segmentIndex, - runId: this.runId, - totalSegments: this.totalSegments, - ddbTable: this.ddbTable, - ddbEsTable: this.ddbEsTable, - esEndpoint: this.esEndpoint, - esHealthChecks: this.esHealthChecks - }); - - scanProcessesPromises.push(segmentProcessor.execute()); - } - - await Promise.all(scanProcessesPromises); - - this.logger.info("Restoring original Elasticsearch settings..."); - await restoreOriginalElasticsearchSettings({ - elasticsearchClient, - indexSettings, - logger: this.logger - }); - - const duration = getDuration(); - this.logger.info(`5.39.6-001 migration completed in ${duration}s, here are the results...`); - - // Wait for 1 second. - await new Promise(resolve => setTimeout(resolve, 1000)); - - this.logger.info( - { - totalSegments: this.totalSegments, - esHealthChecks: this.esHealthChecks - }, - "The migration was performed with the following following parameters:" - ); - - // Pickup all log files and print a summary of the migration. - const logFilePaths = await glob( - path.join( - os.tmpdir(), - `webiny-5-39-6-meta-fields-data-migration-log-${this.runId}-*.log` - ) - ); - - const migrationStats = { - iterationsCount: 0, - avgIterationDuration: 0, - recordsScanned: 0, - avgRecordsScannedPerIteration: 0, - recordsScannedPerSecond: 0, - recordsUpdated: 0, - recordsSkipped: 0, - esHealthChecks: { - timeSpentWaiting: 0, - checksCount: 0, - unhealthyReasons: {} as Record - } - }; - - for (const logFilePath of logFilePaths) { - const logFileContent = fs.readFileSync(logFilePath, "utf-8"); - const logFile = JSON.parse(logFileContent); - - migrationStats.iterationsCount += logFile.iterationsCount; - migrationStats.recordsScanned += logFile.recordsScanned; - migrationStats.recordsUpdated += logFile.recordsUpdated; - migrationStats.recordsSkipped += logFile.recordsSkipped; - - migrationStats.esHealthChecks.timeSpentWaiting += - logFile.esHealthChecks.timeSpentWaiting; - migrationStats.esHealthChecks.checksCount += logFile.esHealthChecks.checksCount; - - for (const unhealthyReasonType in logFile.esHealthChecks.unhealthyReasons) { - if (!logFile.esHealthChecks.unhealthyReasons.hasOwnProperty(unhealthyReasonType)) { - continue; - } - - const hasCount = - unhealthyReasonType in migrationStats.esHealthChecks.unhealthyReasons; - if (hasCount) { - migrationStats.esHealthChecks.unhealthyReasons[unhealthyReasonType] += - logFile.esHealthChecks.unhealthyReasons[unhealthyReasonType]; - } else { - migrationStats.esHealthChecks.unhealthyReasons[unhealthyReasonType] = - logFile.esHealthChecks.unhealthyReasons[unhealthyReasonType]; - } - } - } - - migrationStats.avgIterationDuration = duration / migrationStats.iterationsCount; - - migrationStats.avgRecordsScannedPerIteration = - migrationStats.recordsScanned / migrationStats.iterationsCount; - - migrationStats.recordsScannedPerSecond = migrationStats.recordsScanned / duration; - - this.logger.info( - migrationStats, - `Migration summary (based on ${logFilePaths.length} generated logs):` - ); - - const logFilePath = path.join( - os.tmpdir(), - `webiny-5-39-6-meta-fields-data-migration-log-${this.runId}.log` - ); - - // Save segment processing stats to a file. - fs.writeFileSync(logFilePath, JSON.stringify(migrationStats, null, 2)); - this.logger.trace(`Migration summary saved to "${logFilePath}".`); - } -} diff --git a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/SegmentProcessor.ts b/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/SegmentProcessor.ts deleted file mode 100644 index 7199d35cf3b..00000000000 --- a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/SegmentProcessor.ts +++ /dev/null @@ -1,70 +0,0 @@ -import execa from "execa"; -import path from "path"; -import { EsHealthChecksParams } from "~/migrations/5.39.6/001/ddb-es/utils"; - -interface SegmentProcessorParams { - runId: string; - ddbTable: string; - ddbEsTable: string; - esEndpoint: string; - segmentIndex: number; - totalSegments: number; - esHealthChecks: EsHealthChecksParams; -} - -export class SegmentProcessor { - private readonly runId: string; - private readonly ddbTable: string; - private readonly ddbEsTable: string; - private readonly esEndpoint: string; - private readonly segmentIndex: number; - private readonly totalSegments: number; - private readonly esHealthChecks: EsHealthChecksParams; - - constructor(params: SegmentProcessorParams) { - this.runId = params.runId; - this.ddbTable = params.ddbTable; - this.ddbEsTable = params.ddbEsTable; - this.esEndpoint = params.esEndpoint; - this.segmentIndex = params.segmentIndex; - this.totalSegments = params.totalSegments; - this.esHealthChecks = params.esHealthChecks; - } - - execute() { - return execa( - "node", - [ - path.join(__dirname, "worker"), - "--runId", - this.runId, - "--ddbTable", - this.ddbTable, - "--ddbEsTable", - this.ddbEsTable, - "--esEndpoint", - this.esEndpoint, - "--segmentIndex", - String(this.segmentIndex), - "--totalSegments", - String(this.totalSegments), - - // Elasticsearch health check options. - "--esHealthMinClusterHealthStatus", - this.esHealthChecks.minClusterHealthStatus, - "--esHealthMaxProcessorPercent", - String(this.esHealthChecks.maxProcessorPercent), - "--esHealthMaxRamPercent", - String(this.esHealthChecks.maxRamPercent), - "--esHealthMaxWaitingTime", - String(this.esHealthChecks.maxWaitingTime), - "--esHealthWaitingTimeStep", - String(this.esHealthChecks.waitingTimeStep) - ], - { - stdio: "inherit", - env: process.env - } - ); - } -} diff --git a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/bin.ts b/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/bin.ts deleted file mode 100644 index f70fc2efe4d..00000000000 --- a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/bin.ts +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env node -import yargs from "yargs/yargs"; -import { hideBin } from "yargs/helpers"; -import { MetaFieldsMigration } from "./MetaFieldsMigration"; -import { createPinoLogger, getLogLevel } from "@webiny/logger"; -import pinoPretty from "pino-pretty"; -import { - DEFAULT_ES_HEALTH_CHECKS_PARAMS, - EsHealthChecksParams -} from "~/migrations/5.39.6/001/ddb-es/utils"; - -const argv = yargs(hideBin(process.argv)) - .options({ - ddbTable: { type: "string", demandOption: true }, - ddbEsTable: { type: "string", demandOption: true }, - esEndpoint: { type: "string", demandOption: true }, - segments: { type: "number", demandOption: true }, - - // Elasticsearch health check options. - esHealthMinClusterHealthStatus: { - type: "string", - demandOption: false, - default: DEFAULT_ES_HEALTH_CHECKS_PARAMS.minClusterHealthStatus, - description: `Minimum cluster health status to wait for before proceeding with the migration.` - }, - esHealthMaxProcessorPercent: { - type: "number", - demandOption: false, - default: DEFAULT_ES_HEALTH_CHECKS_PARAMS.maxProcessorPercent, - description: `Maximum CPU usage percentage to wait for before proceeding with the migration.` - }, - esHealthMaxRamPercent: { - type: "number", - demandOption: false, - default: DEFAULT_ES_HEALTH_CHECKS_PARAMS.maxRamPercent, - description: `Maximum RAM usage percentage to wait for before proceeding with the migration.` - }, - esHealthMaxWaitingTime: { - type: "number", - demandOption: false, - default: DEFAULT_ES_HEALTH_CHECKS_PARAMS.maxWaitingTime, - description: `Maximum time to wait (seconds) for before proceeding with the migration.` - }, - esHealthWaitingTimeStep: { - type: "number", - demandOption: false, - default: DEFAULT_ES_HEALTH_CHECKS_PARAMS.waitingTimeStep, - description: `Time step (seconds) to wait before checking Elasticsearch health status again.` - } - }) - .parseSync(); - -(async () => { - const logger = createPinoLogger( - { - level: getLogLevel(process.env.MIGRATIONS_LOG_LEVEL, "trace") - }, - pinoPretty({ ignore: "pid,hostname" }) - ); - - const migration = new MetaFieldsMigration({ - totalSegments: argv.segments, - ddbTable: argv.ddbTable, - ddbEsTable: argv.ddbEsTable, - esEndpoint: argv.esEndpoint, - esHealthChecks: { - minClusterHealthStatus: - argv.esHealthMinClusterHealthStatus as EsHealthChecksParams["minClusterHealthStatus"], - maxProcessorPercent: argv.esHealthMaxProcessorPercent, - maxRamPercent: argv.esHealthMaxRamPercent, - maxWaitingTime: argv.esHealthMaxWaitingTime, - waitingTimeStep: argv.esHealthWaitingTimeStep - }, - logger - }); - - await migration.execute(); -})(); diff --git a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/worker.ts b/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/worker.ts deleted file mode 100644 index f736b1ed531..00000000000 --- a/packages/migrations/src/migrations/5.39.6/001/ddb-es/temp/revertMetaFieldsMigration/worker.ts +++ /dev/null @@ -1,331 +0,0 @@ -import { executeWithRetry } from "@webiny/utils"; -import { createPinoLogger, getLogLevel } from "@webiny/logger"; -import { createTable } from "@webiny/data-migration"; -import { getDocumentClient } from "@webiny/aws-sdk/client-dynamodb"; -import { createElasticsearchClient } from "@webiny/api-elasticsearch"; -import yargs from "yargs/yargs"; -import { hideBin } from "yargs/helpers"; -import { isMigratedEntry } from "~/migrations/5.39.0/001/utils/isMigratedEntry"; -import { getDecompressedData } from "~/migrations/5.39.0/001/utils/getDecompressedData"; -import { getCompressedData } from "~/migrations/5.39.0/001/utils/getCompressedData"; -import { CmsEntry } from "~/migrations/5.39.0/001/types"; -import { - createDdbEntryEntity, - createDdbEsEntryEntity -} from "~/migrations/5.39.0/001/entities/createEntryEntity"; -import { - batchReadAll, - BatchReadItem, - batchWriteAll, - BatchWriteItem, - ddbScanWithCallback -} from "~/utils"; -import { createWaitUntilHealthy } from "@webiny/api-elasticsearch/utils/waitUntilHealthy"; -import pinoPretty from "pino-pretty"; -import { EsHealthChecksParams } from "~/migrations/5.39.6/001/ddb-es/utils"; -import path from "path"; -import os from "os"; -import fs from "fs"; - -const argv = yargs(hideBin(process.argv)) - .options({ - runId: { type: "string", demandOption: true }, - ddbTable: { type: "string", demandOption: true }, - ddbEsTable: { type: "string", demandOption: true }, - esEndpoint: { type: "string", demandOption: true }, - segmentIndex: { type: "number", demandOption: true }, - totalSegments: { type: "number", demandOption: true }, - - // Elasticsearch health check options. - esHealthMinClusterHealthStatus: { type: "string", demandOption: true }, - esHealthMaxProcessorPercent: { type: "number", demandOption: true }, - esHealthMaxRamPercent: { type: "number", demandOption: true }, - esHealthMaxWaitingTime: { type: "number", demandOption: true }, - esHealthWaitingTimeStep: { type: "number", demandOption: true } - }) - .parseSync(); - -interface LastEvaluatedKeyObject { - PK: string; - SK: string; - GSI1_PK: string; - GSI1_SK: string; -} - -type LastEvaluatedKey = LastEvaluatedKeyObject | true | null; - -interface MigrationStatus { - lastEvaluatedKey: LastEvaluatedKey; - stats: { - iterationsCount: number; - recordsScanned: number; - recordsUpdated: number; - recordsSkipped: number; - esHealthChecks: { - timeSpentWaiting: number; - checksCount: number; - unhealthyReasons: { - [key: string]: number; - }; - }; - }; -} - -interface DynamoDbElasticsearchRecord { - PK: string; - SK: string; - data: string; -} - -const createInitialStatus = (): MigrationStatus => { - return { - lastEvaluatedKey: null, - stats: { - iterationsCount: 0, - recordsScanned: 0, - recordsUpdated: 0, - recordsSkipped: 0, - esHealthChecks: { - timeSpentWaiting: 0, - checksCount: 0, - unhealthyReasons: {} - } - } - }; -}; - -(async () => { - const logger = createPinoLogger( - { - level: getLogLevel(process.env.MIGRATIONS_LOG_LEVEL, "trace"), - msgPrefix: `[segment #${argv.segmentIndex}] ` - }, - pinoPretty({ ignore: "pid,hostname" }) - ); - - const documentClient = getDocumentClient(); - const elasticsearchClient = createElasticsearchClient({ - endpoint: `https://${argv.esEndpoint}` - }); - - const primaryTable = createTable({ - name: argv.ddbTable, - documentClient - }); - const dynamoToEsTable = createTable({ - name: argv.ddbEsTable, - documentClient - }); - - const ddbEntryEntity = createDdbEntryEntity(primaryTable); - const ddbEsEntryEntity = createDdbEsEntryEntity(dynamoToEsTable); - - const status = createInitialStatus(); - - const waitUntilHealthy = createWaitUntilHealthy(elasticsearchClient, { - minClusterHealthStatus: - argv.esHealthMinClusterHealthStatus as EsHealthChecksParams["minClusterHealthStatus"], - maxProcessorPercent: argv.esHealthMaxProcessorPercent, - maxRamPercent: argv.esHealthMaxRamPercent, - maxWaitingTime: argv.esHealthMaxWaitingTime, - waitingTimeStep: argv.esHealthWaitingTimeStep - }); - - await ddbScanWithCallback( - { - entity: ddbEntryEntity, - options: { - segment: argv.segmentIndex, - segments: argv.totalSegments, - filters: [ - { - attr: "_et", - eq: "CmsEntries" - } - ], - startKey: status.lastEvaluatedKey || undefined, - limit: 100 - } - }, - async result => { - status.stats.iterationsCount++; - status.stats.recordsScanned += result.items.length; - - if (status.stats.iterationsCount % 5 === 0) { - // We log every 5th iteration. - logger.trace( - `[iteration #${status.stats.iterationsCount}] Reading ${result.items.length} record(s)...` - ); - } - - const ddbItemsToBatchWrite: BatchWriteItem[] = []; - const ddbEsItemsToBatchWrite: BatchWriteItem[] = []; - const ddbEsItemsToBatchRead: Record = {}; - - // Update records in primary DynamoDB table. Also do preparations for - // subsequent updates on DDB-ES DynamoDB table, and in Elasticsearch. - for (const item of result.items) { - if (!isMigratedEntry(item)) { - status.stats.recordsSkipped++; - continue; - } - - // @ts-expect-error - delete item.revisionCreatedOn; - - ddbItemsToBatchWrite.push(ddbEntryEntity.putBatch(item)); - - /** - * Prepare the loading of DynamoDB Elasticsearch part of the records. - */ - - const ddbEsLatestRecordKey = `${item.entryId}:L`; - if (ddbEsItemsToBatchRead[ddbEsLatestRecordKey]) { - continue; - } - - ddbEsItemsToBatchRead[ddbEsLatestRecordKey] = ddbEsEntryEntity.getBatch({ - PK: item.PK, - SK: "L" - }); - - const ddbEsPublishedRecordKey = `${item.entryId}:P`; - if (item.status === "published" || !!item.locked) { - ddbEsItemsToBatchRead[ddbEsPublishedRecordKey] = ddbEsEntryEntity.getBatch({ - PK: item.PK, - SK: "P" - }); - } - } - - if (Object.keys(ddbEsItemsToBatchRead).length > 0) { - /** - * Get all the records from DynamoDB Elasticsearch. - */ - const ddbEsRecords = await batchReadAll({ - table: ddbEsEntryEntity.table, - items: Object.values(ddbEsItemsToBatchRead) - }); - - for (const ddbEsRecord of ddbEsRecords) { - const decompressedData = await getDecompressedData(ddbEsRecord.data); - if (!decompressedData) { - logger.trace( - `[DDB-ES Table] Skipping record "${ddbEsRecord.PK}" as it is not a valid CMS entry...` - ); - continue; - } - - if (!isMigratedEntry(decompressedData)) { - status.stats.recordsSkipped++; - continue; - } - - // @ts-expect-error - delete decompressedData.revisionCreatedOn; - - const compressedData = await getCompressedData(decompressedData); - - ddbEsItemsToBatchWrite.push( - ddbEsEntryEntity.putBatch({ - ...ddbEsRecord, - data: compressedData - }) - ); - } - } - - if (ddbItemsToBatchWrite.length) { - // Store data in primary DynamoDB table. - const execute = () => { - return batchWriteAll({ - table: ddbEntryEntity.table, - items: ddbItemsToBatchWrite - }); - }; - - logger.trace( - `Storing ${ddbItemsToBatchWrite.length} record(s) in primary DynamoDB table...` - ); - await executeWithRetry(execute, { - onFailedAttempt: error => { - logger.warn( - `Batch write attempt #${error.attemptNumber} failed: ${error.message}` - ); - } - }); - - if (ddbEsItemsToBatchWrite.length) { - logger.trace( - `Storing ${ddbEsItemsToBatchWrite.length} record(s) in DDB-ES DynamoDB table...` - ); - - const results = await waitUntilHealthy.wait({ - async onUnhealthy(params) { - const shouldWaitReason = params.waitingReason.name; - - logger.warn( - `Cluster is unhealthy (${shouldWaitReason}). Waiting for the cluster to become healthy...`, - params - ); - - if (status.stats.esHealthChecks.unhealthyReasons[shouldWaitReason]) { - status.stats.esHealthChecks.unhealthyReasons[shouldWaitReason]++; - } else { - status.stats.esHealthChecks.unhealthyReasons[shouldWaitReason] = 1; - } - } - }); - - status.stats.esHealthChecks.checksCount++; - status.stats.esHealthChecks.timeSpentWaiting += results.runningTime; - - // Store data in DDB-ES DynamoDB table. - const executeDdbEs = () => { - return batchWriteAll({ - table: ddbEsEntryEntity.table, - items: ddbEsItemsToBatchWrite - }); - }; - - await executeWithRetry(executeDdbEs, { - onFailedAttempt: error => { - logger.warn( - `[DDB-ES Table] Batch write attempt #${error.attemptNumber} failed: ${error.message}` - ); - } - }); - } - - status.stats.recordsUpdated += ddbItemsToBatchWrite.length; - } - - // Update checkpoint after every batch. - let lastEvaluatedKey: LastEvaluatedKey = true; - if (result.lastEvaluatedKey) { - lastEvaluatedKey = result.lastEvaluatedKey as unknown as LastEvaluatedKeyObject; - } - - status.lastEvaluatedKey = lastEvaluatedKey; - - if (lastEvaluatedKey === true) { - return false; - } - - // Continue further scanning. - return true; - } - ); - - // Store status in tmp file. - logger.trace({ status }, "Segment processing completed. Saving status to tmp file..."); - const logFilePath = path.join( - os.tmpdir(), - `webiny-5-39-6-meta-fields-data-migration-log-${argv.runId}-${argv.segmentIndex}.log` - ); - - // Save segment processing stats to a file. - fs.writeFileSync(logFilePath, JSON.stringify(status.stats, null, 2)); - - logger.trace(`Segment processing stats saved in ${logFilePath}.`); -})();