From 257c5b389209b4217ddf211cada6a6a0549ae27d Mon Sep 17 00:00:00 2001 From: Eric Maslowski Date: Thu, 4 Jan 2024 11:31:41 -0500 Subject: [PATCH 1/3] (fix) more flexibility in handling returned status codes from Cook (fix) more details failure and during execution (fix) increased max transmit size for webdav to 100Gb --- server/db/api/AssetVersion.ts | 4 ++ server/job/impl/Cook/JobCook.ts | 107 ++++++++++++++++++++++++-------- 2 files changed, 86 insertions(+), 25 deletions(-) diff --git a/server/db/api/AssetVersion.ts b/server/db/api/AssetVersion.ts index 610e10c1b..1e2e7b7c6 100644 --- a/server/db/api/AssetVersion.ts +++ b/server/db/api/AssetVersion.ts @@ -369,6 +369,10 @@ export class AssetVersion extends DBC.DBObject implements Asse } /** Pass in a value for Retired if you need to seek only asset versions that have or have not been retired */ + /* Ingested: + null = uploaded, not processed (after bytes transfered) + false = uploaded, processed (after transferred and inspected) + true = uploaded, processed, ingested (once ingested in the system) */ static async fetchFromUserByIngested(idUserCreator: number, Ingested: boolean | null, Retired: boolean | null = null): Promise { if (!idUserCreator) return null; diff --git a/server/job/impl/Cook/JobCook.ts b/server/job/impl/Cook/JobCook.ts index 1d0a38ba6..e2783222c 100644 --- a/server/job/impl/Cook/JobCook.ts +++ b/server/job/impl/Cook/JobCook.ts @@ -175,6 +175,7 @@ export abstract class JobCook extends JobPackrat { } async signalCompletion(): Promise { + // when the Cook job has compeleted and returned this._complete = true; for (const mutex of this._completionMutexes) mutex.cancel(); @@ -265,19 +266,28 @@ export abstract class JobCook extends JobPackrat { let requestUrl: string = this.CookServerURL() + 'job'; const jobCookPostBody: JobCookPostBody = new JobCookPostBody(this._configuration, await this.getParameters(), eJobCookPriority.eNormal); + // submit the Cook job via an axios request, and retry for 'CookRequestRetryCount' times. + // todo: there's a condition here leading to Cook timeout and repeated attempts even on failure while (true) { try { LOG.info(`JobCook [${this.name()}] creating job: ${requestUrl} body ${JSON.stringify(jobCookPostBody, H.Helpers.saferStringify)}`, LOG.LS.eJOB); const axiosResponse: AxiosResponse | null = await axios.post(encodeURI(requestUrl), jobCookPostBody); - if (axiosResponse?.status === 201) + if (axiosResponse?.status === 201) { + LOG.info(`JobCook [${this.name()}] creating job: ${requestUrl} successful post response (${axiosResponse.status}:${axiosResponse.statusText} - ${axiosResponse.data}`,LOG.LS.eJOB); break; // success, continue - else { - res.error = `JobCook [${this.name()}] creating job: ${requestUrl} unexpected response ${axiosResponse?.status}`; - LOG.info(res.error, LOG.LS.eJOB); + } else { + res.error = `JobCook [${this.name()}] creating job: ${requestUrl} unexpected response (${axiosResponse?.status}:${axiosResponse?.statusText})`; + LOG.error(res.error, LOG.LS.eJOB); } } catch (err) { res = this.handleRequestException(err, requestUrl, 'post', jobCookPostBody); + + // log error at point + res.error = `JobCook [${this.name()}] creating job: ${requestUrl} failed (${err})`; + LOG.error(res.error, LOG.LS.eJOB); + + // if we can't retry, return if (res.allowRetry === false) return res; } @@ -290,10 +300,12 @@ export abstract class JobCook extends JobPackrat { await H.Helpers.sleep(CookRetryDelay); } - // stage files + // wait for all files to be staged res = await this.stageFiles(); - if (!res.success) + if (!res.success) { + LOG.error(`JobCook [${this.name()}] failed to stage files (${res.error})`,LOG.LS.eJOB); return res; + } // Initiate job via PATCH to /clients//jobs//run requestCount = 0; @@ -301,13 +313,19 @@ export abstract class JobCook extends JobPackrat { requestUrl = this.CookServerURL() + `clients/${this._configuration.clientId}/jobs/${this._configuration.jobId}/run`; while (true) { try { - LOG.info(`JobCook [${this.name()}] running job: ${requestUrl}`, LOG.LS.eJOB); + LOG.info(`JobCook [${this.name()}] starting job: ${requestUrl}`, LOG.LS.eJOB); const axiosResponse = await axios.patch(encodeURI(requestUrl)); - if (axiosResponse.status === 202) + if (axiosResponse.status >= 200 && axiosResponse.status <= 299) { + LOG.info(`JobCook [${this.name()}] starting job: ${requestUrl} successful response (${axiosResponse.status}:${axiosResponse.statusText})`,LOG.LS.eJOB); break; // success, continue - res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl} failed: ${JSON.stringify(axiosResponse)}` }; + } + + // if we failed, report out + res = { success: false, error: `JobCook [${this.name()}] starting job: ${requestUrl} failed (${axiosResponse.status}:${axiosResponse.statusText})` }; + LOG.error(res.error, LOG.LS.eJOB); } catch (err) { res = this.handleRequestException(err, requestUrl, 'patch', jobCookPostBody); + LOG.error(res.error, LOG.LS.eJOB); if (res.allowRetry === false) return res; } @@ -317,6 +335,8 @@ export abstract class JobCook extends JobPackrat { res.allowRetry = true; // allow outer level to retry job initiation return res; } + + // wait for our next attempt await H.Helpers.sleep(CookRetryDelay); } @@ -332,6 +352,7 @@ export abstract class JobCook extends JobPackrat { async cancelJobWorker(): Promise { // Cancel job via PATCH to /clients//jobs//cancel + // todo: use 'delete' to prevent lingering jobs on Cook server. investigate we have reports copied. (DPO3DPKRT-762) let requestCount: number = 0; let res: H.IOResults = { success: false }; const requestUrl: string = this.CookServerURL() + `clients/${this._configuration.clientId}/jobs/${this._configuration.jobId}/cancel`; @@ -339,10 +360,10 @@ export abstract class JobCook extends JobPackrat { while (true) { try { const axiosResponse = await axios.patch(encodeURI(requestUrl)); - if (axiosResponse.status !== 200) - res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl} failed: ${JSON.stringify(axiosResponse)}` }; + if (axiosResponse.status < 200 || axiosResponse.status > 299) + res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl} failed: ${axiosResponse.status}:${axiosResponse.statusText}` }; } catch (error) { - res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl}: ${JSON.stringify(error)}` }; + res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl}: ${H.Helpers.JSONStringify(error)}` }; } if (res.success) break; @@ -366,7 +387,7 @@ export abstract class JobCook extends JobPackrat { const requestUrl: string = this.CookServerURL() + `clients/${this._configuration.clientId}/jobs/${this._configuration.jobId}/report`; try { const axiosResponse = await axios.get(encodeURI(requestUrl)); - if (axiosResponse.status !== 200) { + if (axiosResponse.status < 200 || axiosResponse.status > 299) { // only log errors after first attempt, as job creation may not be complete on Cook server const error: string = JSON.stringify(axiosResponse); if (pollNumber > 1) @@ -404,9 +425,18 @@ export abstract class JobCook extends JobPackrat { const webdavClient: WebDAVClient = createClient(cookEndpoint, { authType: AuthType.None, - maxBodyLength: 10 * 1024 * 1024 * 1024, + maxBodyLength: 100 * 1024 * 1024 * 1024, // 100Gb withCredentials: false }); + + // DEBUG: make sure we have a file to get and its size + // TODO: more robust support with alt type + // const stat = await webdavClient.stat(destination); + // const fileSize = (stat as FileStat).size; + // console.log(`>>>> fetchFile file size: ${fileSize} | ${destination}`); + // if(fileSize <= 0) + // throw new Error(`destination file doesn't exist or is empty. (${fileSize} bytes | ${destination})`); + const webdavWSOpts: CreateReadStreamOptions = { headers: { 'Content-Type': 'application/octet-stream' } }; @@ -422,9 +452,14 @@ export abstract class JobCook extends JobPackrat { } protected async stageFiles(): Promise { + // this runs on job creation when internal work starts + LOG.info(`JobCook.stageFiles is staging ${this._idAssetVersions?.length} asset versions. (${H.Helpers.JSONStringify(this._idAssetVersions)})`,LOG.LS.eJOB); + + // early out if we don't have anything staged if (!this._idAssetVersions) return { success: true }; + // otherwise cycle through each, getting the read stream and executing let resOuter: H.IOResults = { success: true }; for (const idAssetVersion of this._idAssetVersions) { const resInner: H.IOResults = await JobCook._stagingSempaphoreWrite.runExclusive(async (value) => { @@ -460,6 +495,7 @@ export abstract class JobCook extends JobPackrat { headers: { 'Content-Type': 'application/octet-stream' } }; + // create a write stream for transmitting our data to staging via WebDAV let res: H.IOResultsSized = { success: false, error: 'Not Executed', size: -1 }; for (let transmitCount: number = 0; transmitCount < CookWebDAVTransmitRetryCount; transmitCount++) { let WS: Writable | null = null; @@ -538,6 +574,10 @@ export abstract class JobCook extends JobPackrat { const cookServerURL: string = this._configuration.cookServerURLs[this._configuration.cookServerURLIndex]; if (++this._configuration.cookServerURLIndex >= this._configuration.cookServerURLs.length) this._configuration.cookServerURLIndex = 0; + + // additional logging in case notification isn't sent + const error = `JobCook.handleCookConnectionFailure: Packrat was unable to connect to ${cookServerURL}`; + LOG.error(error,LOG.LS.eJOB); LOG.info(`JobCook.handleCookConnectionFailure switching from ${cookServerURL} to ${this._configuration.cookServerURLs[this._configuration.cookServerURLIndex]}`, LOG.LS.eJOB); // only notify once about a specific server @@ -545,6 +585,7 @@ export abstract class JobCook extends JobPackrat { return; JobCook._cookServerFailureNotificationList.add(cookServerURL); + // see if we should send a notification based on how long since previous notification let sendNotification: boolean = true; let timeSinceLastNotification: number = CookFailureNotificationTime + 1; const now: Date = new Date(); @@ -557,7 +598,8 @@ export abstract class JobCook extends JobPackrat { } if (sendNotification) { - const res: H.IOResults = await Email.Send(undefined, undefined, 'Cook Connection Failure', `Packrat was unable to connect to ${cookServerURL}`, undefined); + // const res: H.IOResults = await Email.Send(undefined, undefined, 'Cook Connection Failure', `Packrat was unable to connect to ${cookServerURL}`, undefined); + const res: H.IOResults = await Email.Send(undefined, undefined, 'Cook Connection Failure', error, undefined); if (!res.success) LOG.error(`JobCook.handleCookConnectionFailure unable to send email notification: ${res.error}`, LOG.LS.eJOB); } @@ -587,7 +629,7 @@ export abstract class JobCook extends JobPackrat { let emitLog: boolean = true; let error: string; if (!pollNumber) - error = `JobCook [${this.name()}] ${method} ${requestUrl} body ${JSON.stringify(jobCookPostBody)} failed with error ${message}: ${JSON.stringify(axiosResponse?.data)}`; + error = `JobCook [${this.name()}] ${method} ${requestUrl} body ${H.Helpers.JSONStringify(jobCookPostBody)} failed with error ${message}`; else { emitLog = (pollNumber >= 1); error = `JobCook [${this.name()}] polling [${pollNumber}] ${method} ${requestUrl} failed with error ${message}`; @@ -595,16 +637,31 @@ export abstract class JobCook extends JobPackrat { const res: CookIOResults = { success: false, allowRetry: true, connectFailure: false, otherCookError: false, error }; - // if we receive a 500 status, log this as an error and avoid retrying - switch (status) { - case 500: - LOG.error(error, LOG.LS.eJOB); - res.allowRetry = false; - return res; - case 400: - res.otherCookError = true; - break; + // if we have a status code with 4xx/5xx + if(status) { + switch (true) { + // catch all 5xx codes and treat as errors + case (status>=500 && status<=599): { + LOG.error(error, LOG.LS.eJOB); + res.allowRetry = false; + return res; + } + // request timed out (408) or too many requests (429) + case (status===408 || status===429): { + res.otherCookError = true; + } break; + // catch remaining 4xx codes which should be failure + case (status>=400 && status<=499): { + LOG.error(error, LOG.LS.eJOB); + res.allowRetry = false; + return res; + } + } + } else { + LOG.error('JobCook.handleRequestException - no status response received.',LOG.LS.eJOB); + return res; } + if (emitLog) LOG.info(error, LOG.LS.eJOB); return res; From c488bb1f5fdfe11276f297d7553bf9ac5d85a769 Mon Sep 17 00:00:00 2001 From: Eric Maslowski Date: Thu, 4 Jan 2024 11:45:13 -0500 Subject: [PATCH 2/3] after develop rebase From 0a8b15976df88bb6f04c278d8ea1f3ae80e4894b Mon Sep 17 00:00:00 2001 From: Eric Maslowski Date: Thu, 4 Jan 2024 12:02:06 -0500 Subject: [PATCH 3/3] (new) verify streams are valid before writing files to disk (fix) additional comments and logging for storing assets --- server/storage/impl/LocalStorage/LocalStorage.ts | 3 +++ server/storage/interface/AssetStorageAdapter.ts | 15 +++++++++++---- server/utils/parser/svxReader.ts | 4 ++++ 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/server/storage/impl/LocalStorage/LocalStorage.ts b/server/storage/impl/LocalStorage/LocalStorage.ts index 11d833743..03d668ad4 100644 --- a/server/storage/impl/LocalStorage/LocalStorage.ts +++ b/server/storage/impl/LocalStorage/LocalStorage.ts @@ -228,7 +228,10 @@ export class LocalStorage implements STORE.IStorage { }; } + // figure out our path on disk for the staging file, and make sure it exists const pathOnDisk: string = (inputStream) ? '' : path.join(this.ocflRoot.computeLocationStagingRoot(), storageKeyStaged); + + // if we have data then update the OCFL object const PSAR: STORE.PromoteStagedAssetResult = await ocflObjectInitResults.ocflObject.addOrUpdate(pathOnDisk, inputStream, fileName, metadata, opInfo); // moves staged file, or streams file, if present if (!PSAR.success) diff --git a/server/storage/interface/AssetStorageAdapter.ts b/server/storage/interface/AssetStorageAdapter.ts index c5338ee79..4dc0ba82b 100644 --- a/server/storage/interface/AssetStorageAdapter.ts +++ b/server/storage/interface/AssetStorageAdapter.ts @@ -993,8 +993,8 @@ export class AssetStorageAdapter { const wsRes: STORE.WriteStreamResult = await storage.writeStream(ISI.FileName); if (!wsRes.success || !wsRes.writeStream || !wsRes.storageKey) { - const error: string = `AssetStorageAdapter.ingestStreamOrFile Unable to create write stream for ${ISI.FileName}: ${wsRes.error}`; - LOG.error(error, LOG.LS.eSTR); + const error: string = `unable to create write stream for ${ISI.FileName}: ${wsRes.error}`; + LOG.error('AssetStorageAdapter.ingestStreamOrFile ' + error, LOG.LS.eSTR); return { success: false, error }; } @@ -1007,11 +1007,18 @@ export class AssetStorageAdapter { ISI.readStream = fs.createReadStream(ISI.localFilePath); } + // make sure both streams are valid + if(ISI.readStream.readable===false || wsRes.writeStream.writable===false) { + const error: string = `individual streams are not valid. (read: ${ISI.readStream.readable} | write: ${wsRes.writeStream.writable})`; + LOG.error('AssetStorageAdapter.ingestStreamOrFile ' + error, LOG.LS.eSTR); + return { success: false, error }; + } + try { const wrRes: H.IOResults = await H.Helpers.writeStreamToStream(ISI.readStream, wsRes.writeStream); if (!wrRes.success) { - const error: string = `AssetStorageAdapter.ingestStreamOrFile Unable to write to stream: ${wrRes.error}`; - LOG.error(error, LOG.LS.eSTR); + const error: string = `unable to write to stream: ${wrRes.error}`; + LOG.error('AssetStorageAdapter.ingestStreamOrFile ' + error, LOG.LS.eSTR); return { success: false, error }; } } finally { diff --git a/server/utils/parser/svxReader.ts b/server/utils/parser/svxReader.ts index ce8136c49..ea89fe355 100644 --- a/server/utils/parser/svxReader.ts +++ b/server/utils/parser/svxReader.ts @@ -325,6 +325,10 @@ export class SvxReader { if (!buffer) return { success: false, error: 'Unable to read stream' }; const json: string = buffer.toString(); + + LOG.info(`SvxReader.loadFromStream read ${buffer.length} bytes.`,LOG.LS.eSYS); + // LOG.info(`SvxReader.loadFromStream read json ${json}`,LOG.LS.eSYS); + return this.loadFromJSON(json); } catch (err) /* istanbul ignore next */ { LOG.error('SvxReader.loadFromStream', LOG.LS.eSYS, err);