diff --git a/packages/browser/src/utils/crc32.ts b/packages/browser/src/utils/crc32.ts index 85f2082f..87c397cf 100644 --- a/packages/browser/src/utils/crc32.ts +++ b/packages/browser/src/utils/crc32.ts @@ -1,6 +1,6 @@ /* eslint-disable no-bitwise */ -import { MB } from './helper' +const MB = 1024 ** 2 /** * 以下 class 实现参考 diff --git a/packages/common/src/api/index.ts b/packages/common/src/api/index.ts new file mode 100644 index 00000000..efe716bb --- /dev/null +++ b/packages/common/src/api/index.ts @@ -0,0 +1,259 @@ +import { urlSafeBase64Encode } from '../helper/base64' +import { HttpAbort, HttpClient, HttpHeader, OnHttpProgress } from '../types/http' +import { InnerTokenProvider, TokenProvider } from '../types/token' +import { Result, isErrorResult } from '../types/types' +import { HostProvider } from '../upload/common/host' + +interface BasicParams { + abort?: HttpAbort + onProgress?: OnHttpProgress +} + +interface BasicWithAuthParams extends BasicParams { + token: string +} + +interface DirectUploadParams extends BasicWithAuthParams { + +} + +interface InitMultipartUploadParams extends BasicWithAuthParams { + key?: string + bucket: string +} + +interface UploadPartParams extends BasicWithAuthParams { + bucket: string + uploadId: string + part: IBlob + partIndex: number + md5?: string + key?: string +} + +interface ListMultipartUploadPartsParams extends BasicWithAuthParams { + uploadId: string + bucket: string + key?: string +} + +interface ListMultipartUploadPartsResponse { + uploadId: string + expireAt: number + partNumberMarker: number + parts: Array<{ + Etag: string + Size: number + PutTime: number + PartNumber: number + }> +} + +interface CompleteMultipartUploadParams extends BasicWithAuthParams { + uploadId: string + md5?: string + key?: string +} + +interface AbortMultipartUploadParams extends BasicWithAuthParams { + uploadId: string + key?: string +} + +interface UploadChunkData { + etag: string + md5: string +} + +interface InitPartsData { + /** 该文件的上传 id, 后续该文件其他各个块的上传,已上传块的废弃,已上传块的合成文件,都需要该 id */ + uploadId: string + /** uploadId 的过期时间 */ + expireAt: number +} + +type UploadCompleteData = T + +interface GetChunkRequestPathParams extends BasicWithAuthParams { + key?: string +} + +interface DirectUploadParams extends BasicWithAuthParams { + file: IFile + key?: string + + crc32?: string + meta?: string[] + accept?: string + fileName?: string + custom_name?: string + custom_value?: string +} + +export class UploadApis { + constructor( + /** http 请求客户端;通过实现不同的 HttpClient 来实现多环境支持 */ + private httpClient: HttpClient, + /** 上传 host 池;提供了获取和管理上传 host 的能力 */ + private hostProvider: HostProvider, + /** token 获取器;api 会自动通过该对象获取 token */ + private tokenProvider: InnerTokenProvider + ) {} + + private generateAuthHeaders(token: string): HttpHeader { + const auth = 'UpToken ' + token + return { Authorization: auth } + } + + private async getBaseRequestPath(params: GetChunkRequestPathParams): Promise> { + const tokenResult = await this.tokenProvider.getUploadToken(params) + if (isErrorResult(tokenResult)) return tokenResult + const hostResult = await this.hostProvider.getUploadHost(tokenResult.result) + if (isErrorResult(hostResult)) return hostResult + const uploadHostUrl = hostResult.result.getUrl() + + const realKey = params.key != null ? urlSafeBase64Encode(params.key) : '~' + const url = `${uploadHostUrl}/buckets/${tokenResult.result.bucket}/objects/${realKey}/uploads` + return { result: url } + } + + async initMultipartUpload(params: InitMultipartUploadParams) { + const requestPathResult = await this.getBaseRequestPath(params) + if (isErrorResult(requestPathResult)) return requestPathResult + + const headers = this.generateAuthHeaders(params.token) + headers['content-type'] = 'application/json' + return this.httpClient.post(requestPathResult.result, { + headers, + abort: params.abort, + onProgress: params.onProgress + }) + } + + async uploadPart(params: UploadPartParams) { + const requestPathResult = await this.getBaseRequestPath(params) + if (isErrorResult(requestPathResult)) return requestPathResult + + const url = `${requestPathResult.result}/${params.uploadId}/${params.partIndex}` + const headers = this.generateAuthHeaders(params.token) + headers['content-type'] = 'application/json' + if (params.md5) headers['Content-MD5'] = params.md5 + return this.httpClient.put(url, { + onProgress: params.onProgress, + abort: params.abort, + body: params.part, + headers + }) + } + + async listMultipartUploadParts(params: ListMultipartUploadPartsParams) { + const requestPathResult = await this.getBaseRequestPath(params) + if (isErrorResult(requestPathResult)) return requestPathResult + + const url = `${requestPathResult.result}/${params.uploadId}` + const headers = this.generateAuthHeaders(params.token) + headers['content-type'] = 'application/json' + return this.httpClient.get(url, { + headers, + abort: params.abort, + onProgress: params.onProgress + }) + } + + async completeMultipartUpload(params: CompleteMultipartUploadParams) { + const requestPathResult = await this.getBaseRequestPath(params) + if (isErrorResult(requestPathResult)) return requestPathResult + + const url = `${requestPathResult.result}/${params.uploadId}` + const headers = this.generateAuthHeaders(params.token) + headers['content-type'] = 'application/json' + return this.httpClient.post>(url, { + headers, + abort: params.abort, + onProgress: params.onProgress + }) + } + + async abortMultipartUpload(params: AbortMultipartUploadParams) { + const requestPathResult = await this.getBaseRequestPath(params) + if (isErrorResult(requestPathResult)) return requestPathResult + + const url = `${requestPathResult.result}/${params.uploadId}` + const headers = this.generateAuthHeaders(params.token) + headers['content-type'] = 'application/json' + return this.httpClient.delete(url, { + headers, + abort: params.abort, + onProgress: params.onProgress + }) + } + + async directUpload(params: DirectUploadParams) { + const tokenResult = await this.tokenProvider.getUploadToken(params) + if (isErrorResult(tokenResult)) return tokenResult + const hostResult = await this.hostProvider.getUploadHost(tokenResult.result) + if (isErrorResult(hostResult)) return hostResult + const uploadHostUrl = hostResult.result.getUrl() + + return this.httpClient.post>(uploadHostUrl, { + abort: params.abort, + onProgress: params.onProgress, + headers: { 'content-type': 'multipart/form-data' }, + body: { + resource_key: params.key, + upload_token: params.token, + fileBinaryData: params.file, + custom_name: params.custom_name, + custom_value: params.custom_value, + crc32: params.crc32, + accept: params.accept, + fileName: params.fileName, + 'x-qn-meta': params.meta + } + }) + } +} + +interface GetHostConfigParams { + serverUrl: string + accessKey: string + bucket: string +} + +interface HostConfig { + hosts: Array<{ + region: string + ttl: number + up: { + domains: string[] + old: string[] + } + io: { + domains: string[] + old: string[] + } + io_src: { + domains: string[] + } + s3: { + region_alias: string + domains: string[] + } + }> +} + +export class ConfigApis { + constructor( + /** http 请求客户端;通过实现不同的 HttpClient 来实现多环境支持 */ + private httpClient: HttpClient + ) {} + + /** 从服务中心获取接口服务地址 */ + async getHostConfig(params: GetHostConfigParams) { + /** 从配置中心获取上传服务地址 */ + const query = `ak=${encodeURIComponent(params.accessKey)}&bucket=${encodeURIComponent(params.bucket)}` + // TODO: 支持设置,私有云自动获取上传地址 + const url = `${params.serverUrl}/v4/query?${query}` + return this.httpClient.get(url) + } +} diff --git a/packages/common/src/helper/base64/index.test.ts b/packages/common/src/helper/base64/index.test.ts new file mode 100644 index 00000000..e0ef6386 --- /dev/null +++ b/packages/common/src/helper/base64/index.test.ts @@ -0,0 +1,31 @@ +import * as base64 from '.' + +// 测试用例来自以下地址 +// https://github.com/LinusU/encode-utf8/blob/bd6c09b1c67baafc51853b1bea0e80bfe1e69ed0/test.js +const testCases = [ + ['正', '5q2j'], + ['𝌆', '8J2Mhg'], + ['💩', '8J-SqQ'], + ['Hello, World!', 'SGVsbG8sIFdvcmxkIQ'], + ['🐵 🙈 🙉 🙊', '8J-QtSDwn5mIIPCfmYkg8J-Zig'], + ['åß∂ƒ©˙∆˚¬…æ', 'w6XDn-KIgsaSwqnLmeKIhsuawqzigKbDpg'], + ['사회과학원 어학연구소', '7IKs7ZqM6rO87ZWZ7JuQIOyWtO2VmeyXsOq1rOyGjA'], + ['゚・✿ヾ╲(。◕‿◕。)╱✿・゚', '776f772l4py_44O-4pWyKO-9oeKXleKAv-KXle-9oSnilbHinL_vvaXvvp8'], + ['Powerلُلُصّبُلُلصّبُررً ॣ ॣh ॣ ॣ冗', 'UG93ZXLZhNmP2YTZj9i12ZHYqNmP2YTZj9mE2LXZkdio2Y_Ysdix2Ysg4KWjIOClo2gg4KWjIOClo-WGlw'], + ['𝕿𝖍𝖊 𝖖𝖚𝖎𝖈𝖐 𝖇𝖗𝖔𝖜𝖓 𝖋𝖔𝖝 𝖏𝖚𝖒𝖕𝖘 𝖔𝖛𝖊𝖗 𝖙𝖍𝖊 𝖑𝖆𝖟𝖞 𝖉𝖔𝖌', '8J2Vv_Cdlo3wnZaKIPCdlpbwnZaa8J2WjvCdlojwnZaQIPCdlofwnZaX8J2WlPCdlpzwnZaTIPCdlovwnZaU8J2WnSDwnZaP8J2WmvCdlpLwnZaV8J2WmCDwnZaU8J2Wm_CdlorwnZaXIPCdlpnwnZaN8J2WiiDwnZaR8J2WhvCdlp_wnZaeIPCdlonwnZaU8J2WjA'] +] + +describe('test base64', () => { + test('urlSafeBase64Encode', () => { + for (const [input, expected] of testCases) { + const actual = base64.urlSafeBase64Encode(input) + expect(actual).toMatch(expected) + } + }) + test('urlSafeBase64Decode', () => { + for (const [expected, input] of testCases) { + const actual = base64.urlSafeBase64Decode(input) + expect(actual).toMatch(expected) + } + }) +}) diff --git a/packages/common/src/helper/base64/index.ts b/packages/common/src/helper/base64/index.ts new file mode 100644 index 00000000..4bf529d9 --- /dev/null +++ b/packages/common/src/helper/base64/index.ts @@ -0,0 +1,274 @@ +/* eslint-disable */ + +// https://github.com/locutusjs/locutus/blob/master/src/php/xml/utf8_encode.js +function utf8Encode(argString: string) { + // http://kevin.vanzonneveld.net + // + original by: Webtoolkit.info (http://www.webtoolkit.info/) + // + improved by: Kevin van Zonneveld (http://kevin.vanzonneveld.net) + // + improved by: sowberry + // + tweaked by: Jack + // + bugfixed by: Onno Marsman + // + improved by: Yves Sucaet + // + bugfixed by: Onno Marsman + // + bugfixed by: Ulrich + // + bugfixed by: Rafal Kukawski + // + improved by: kirilloid + // + bugfixed by: kirilloid + // * example 1: this.utf8Encode('Kevin van Zonneveld') + // * returns 1: 'Kevin van Zonneveld' + + if (argString === null || typeof argString === 'undefined') { + return '' + } + + let string = argString + '' // .replace(/\r\n/g, '\n').replace(/\r/g, '\n') + let utftext = '', + start, + end, + stringl = 0 + + start = end = 0 + stringl = string.length + for (let n = 0; n < stringl; n++) { + let c1 = string.charCodeAt(n) + let enc = null + + if (c1 < 128) { + end++ + } else if (c1 > 127 && c1 < 2048) { + enc = String.fromCharCode((c1 >> 6) | 192, (c1 & 63) | 128) + } else if ((c1 & 0xf800 ^ 0xd800) > 0) { + enc = String.fromCharCode( + (c1 >> 12) | 224, + ((c1 >> 6) & 63) | 128, + (c1 & 63) | 128 + ) + } else { + // surrogate pairs + if ((c1 & 0xfc00 ^ 0xd800) > 0) { + throw new RangeError('Unmatched trail surrogate at ' + n) + } + let c2 = string.charCodeAt(++n) + if ((c2 & 0xfc00 ^ 0xdc00) > 0) { + throw new RangeError('Unmatched lead surrogate at ' + (n - 1)) + } + c1 = ((c1 & 0x3ff) << 10) + (c2 & 0x3ff) + 0x10000 + enc = String.fromCharCode( + (c1 >> 18) | 240, + ((c1 >> 12) & 63) | 128, + ((c1 >> 6) & 63) | 128, + (c1 & 63) | 128 + ) + } + if (enc !== null) { + if (end > start) { + utftext += string.slice(start, end) + } + utftext += enc + start = end = n + 1 + } + } + + if (end > start) { + utftext += string.slice(start, stringl) + } + + return utftext +} + +// https://github.com/locutusjs/locutus/blob/master/src/php/xml/utf8_decode.js +function utf8Decode(strData: string) { + // eslint-disable-line camelcase + // discuss at: https://locutus.io/php/utf8_decode/ + // original by: Webtoolkit.info (https://www.webtoolkit.info/) + // input by: Aman Gupta + // input by: Brett Zamir (https://brett-zamir.me) + // improved by: Kevin van Zonneveld (https://kvz.io) + // improved by: Norman "zEh" Fuchs + // bugfixed by: hitwork + // bugfixed by: Onno Marsman (https://twitter.com/onnomarsman) + // bugfixed by: Kevin van Zonneveld (https://kvz.io) + // bugfixed by: kirilloid + // bugfixed by: w35l3y (https://www.wesley.eti.br) + // example 1: utf8_decode('Kevin van Zonneveld') + // returns 1: 'Kevin van Zonneveld' + + const tmpArr = [] + let i = 0 + let c1 = 0 + let seqlen = 0 + + strData += '' + + while (i < strData.length) { + c1 = strData.charCodeAt(i) & 0xFF + seqlen = 0 + + // https://en.wikipedia.org/wiki/UTF-8#Codepage_layout + if (c1 <= 0xBF) { + c1 = (c1 & 0x7F) + seqlen = 1 + } else if (c1 <= 0xDF) { + c1 = (c1 & 0x1F) + seqlen = 2 + } else if (c1 <= 0xEF) { + c1 = (c1 & 0x0F) + seqlen = 3 + } else { + c1 = (c1 & 0x07) + seqlen = 4 + } + + for (let ai = 1; ai < seqlen; ++ai) { + c1 = ((c1 << 0x06) | (strData.charCodeAt(ai + i) & 0x3F)) + } + + if (seqlen === 4) { + c1 -= 0x10000 + tmpArr.push(String.fromCharCode(0xD800 | ((c1 >> 10) & 0x3FF))) + tmpArr.push(String.fromCharCode(0xDC00 | (c1 & 0x3FF))) + } else { + tmpArr.push(String.fromCharCode(c1)) + } + + i += seqlen + } + + return tmpArr.join('') +} + +function base64Encode(data: string) { + // http://kevin.vanzonneveld.net + // + original by: Tyler Akins (http://rumkin.com) + // + improved by: Bayron Guevara + // + improved by: Thunder.m + // + improved by: Kevin van Zonneveld (http://kevin.vanzonneveld.net) + // + bugfixed by: Pellentesque Malesuada + // + improved by: Kevin van Zonneveld (http://kevin.vanzonneveld.net) + // - depends on: this.utf8Encode + // * example 1: this.base64Encode('Kevin van Zonneveld') + // * returns 1: 'S2V2aW4gdmFuIFpvbm5ldmVsZA==' + // mozilla has this native + // - but breaks in 2.0.0.12! + // if (typeof this.window['atob'] == 'function') { + // return atob(data) + // } + let b64 = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=' + let o1, + o2, + o3, + h1, + h2, + h3, + h4, + bits, + i = 0, + ac = 0, + enc = '', + tmp_arr = [] + + if (!data) { + return data + } + + data = utf8Encode(data + '') + + do { + // pack three octets into four hexets + o1 = data.charCodeAt(i++) + o2 = data.charCodeAt(i++) + o3 = data.charCodeAt(i++) + + bits = (o1 << 16) | (o2 << 8) | o3 + + h1 = (bits >> 18) & 0x3f + h2 = (bits >> 12) & 0x3f + h3 = (bits >> 6) & 0x3f + h4 = bits & 0x3f + + // use hexets to index into b64, and append result to encoded string + tmp_arr[ac++] = + b64.charAt(h1) + b64.charAt(h2) + b64.charAt(h3) + b64.charAt(h4) + } while (i < data.length) + + enc = tmp_arr.join('') + + switch (data.length % 3) { + case 1: + enc = enc.slice(0, -2) + '==' + break + case 2: + enc = enc.slice(0, -1) + '=' + break + } + + return enc +} + +function base64Decode(data: string) { + // http://kevin.vanzonneveld.net + // + original by: Tyler Akins (http://rumkin.com) + // + improved by: Thunder.m + // + input by: Aman Gupta + // + improved by: Kevin van Zonneveld (http://kevin.vanzonneveld.net) + // + bugfixed by: Onno Marsman + // + bugfixed by: Pellentesque Malesuada + // + improved by: Kevin van Zonneveld (http://kevin.vanzonneveld.net) + // + input by: Brett Zamir (http://brett-zamir.me) + // + bugfixed by: Kevin van Zonneveld (http://kevin.vanzonneveld.net) + // * example 1: base64_decode('S2V2aW4gdmFuIFpvbm5ldmVsZA==') + // * returns 1: 'Kevin van Zonneveld' + // mozilla has this native + // - but breaks in 2.0.0.12! + // if (typeof this.window['atob'] == 'function') { + // return atob(data) + // } + let b64 = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=' + let o1, o2, o3, h1, h2, h3, h4, bits, i = 0, + ac = 0, + dec = '', + tmp_arr = [] + + if (!data) { + return data + } + + data += '' + + do { // unpack four hexets into three octets using index points in b64 + h1 = b64.indexOf(data.charAt(i++)) + h2 = b64.indexOf(data.charAt(i++)) + h3 = b64.indexOf(data.charAt(i++)) + h4 = b64.indexOf(data.charAt(i++)) + + bits = h1 << 18 | h2 << 12 | h3 << 6 | h4 + + o1 = bits >> 16 & 0xff + o2 = bits >> 8 & 0xff + o3 = bits & 0xff + + if (h3 === 64) { + tmp_arr[ac++] = String.fromCharCode(o1) + } else if (h4 === 64) { + tmp_arr[ac++] = String.fromCharCode(o1, o2) + } else { + tmp_arr[ac++] = String.fromCharCode(o1, o2, o3) + } + } while (i < data.length) + + dec = tmp_arr.join('') + + return utf8Decode(dec) +} + +export function urlSafeBase64Encode(v: string) { + v = base64Encode(v) + + // 参考 https://tools.ietf.org/html/rfc4648#section-5 + return v.replace(/\//g, '_').replace(/\+/g, '-') +} + +export function urlSafeBase64Decode(v: string) { + v = v.replace(/_/g, '/').replace(/-/g, '+') + return base64Decode(v) +} diff --git a/packages/common/src/helper/crc32/index.test.ts b/packages/common/src/helper/crc32/index.test.ts new file mode 100644 index 00000000..cb78f08a --- /dev/null +++ b/packages/common/src/helper/crc32/index.test.ts @@ -0,0 +1,25 @@ +import { CRC32 } from './index' +import { MB } from './helper' + +function mockFile(size = 4, name = 'mock.jpg', type = 'image/jpg'): File { + if (size >= 1024) throw new Error('the size is set too large.') + + const blob = new Blob(['1'.repeat(size * MB)], { type }) + return new File([blob], name) +} + +describe('test crc32', () => { + test('file', async () => { + const crc32One = new CRC32() + await expect(crc32One.file(mockFile(0))).resolves.toEqual(0) + + const crc32Two = new CRC32() + await expect(crc32Two.file(mockFile(0.5))).resolves.toEqual(1610895105) + + const crc32Three = new CRC32() + await expect(crc32Three.file(mockFile(1))).resolves.toEqual(3172987001) + + const crc32Four = new CRC32() + await expect(crc32Four.file(mockFile(2))).resolves.toEqual(847982614) + }) +}) diff --git a/packages/common/src/helper/crc32/index.ts b/packages/common/src/helper/crc32/index.ts new file mode 100644 index 00000000..85f2082f --- /dev/null +++ b/packages/common/src/helper/crc32/index.ts @@ -0,0 +1,90 @@ +/* eslint-disable no-bitwise */ + +import { MB } from './helper' + +/** + * 以下 class 实现参考 + * https://github.com/Stuk/jszip/blob/d4702a70834bd953d4c2d0bc155fad795076631a/lib/crc32.js + * 该实现主要针对大文件优化、对计算的值进行了 `>>> 0` 运算(为与服务端保持一致) + */ +export class CRC32 { + private crc = -1 + private table = this.makeTable() + + private makeTable() { + const table = new Array() + for (let i = 0; i < 256; i++) { + let t = i + for (let j = 0; j < 8; j++) { + if (t & 1) { + // IEEE 标准 + t = (t >>> 1) ^ 0xEDB88320 + } else { + t >>>= 1 + } + } + table[i] = t + } + + return table + } + + private append(data: Uint8Array) { + let crc = this.crc + for (let offset = 0; offset < data.byteLength; offset++) { + crc = (crc >>> 8) ^ this.table[(crc ^ data[offset]) & 0xFF] + } + this.crc = crc + } + + private compute() { + return (this.crc ^ -1) >>> 0 + } + + private async readAsUint8Array(file: File | Blob): Promise { + if (typeof file.arrayBuffer === 'function') { + return new Uint8Array(await file.arrayBuffer()) + } + + return new Promise((resolve, reject) => { + const reader = new FileReader() + reader.onload = () => { + if (reader.result == null) { + reject() + return + } + + if (typeof reader.result === 'string') { + reject() + return + } + + resolve(new Uint8Array(reader.result)) + } + reader.readAsArrayBuffer(file) + }) + } + + async file(file: File): Promise { + if (file.size <= MB) { + this.append(await this.readAsUint8Array(file)) + return this.compute() + } + + const count = Math.ceil(file.size / MB) + for (let index = 0; index < count; index++) { + const start = index * MB + const end = index === (count - 1) ? file.size : start + MB + // eslint-disable-next-line no-await-in-loop + const chuck = await this.readAsUint8Array(file.slice(start, end)) + this.append(new Uint8Array(chuck)) + } + + return this.compute() + } + + static file(file: File): Promise { + const crc = new CRC32() + return crc.file(file) + } +} diff --git a/packages/common/src/helper/queue/index.ts b/packages/common/src/helper/queue/index.ts new file mode 100644 index 00000000..5efb96a2 --- /dev/null +++ b/packages/common/src/helper/queue/index.ts @@ -0,0 +1,151 @@ +import { IError } from '../../types/error' +import { Result } from '../../types/types' + +export type OnError = (error: IError) => void +export type OnTaskProgress

= (progress: P) => void +export type OnProgress = (states: Array>) => void +export type OnComplete = (states: Array>) => void +export type TaskStatus = 'waiting' | 'processing' | 'cancel' | 'error' | 'success' + +export interface Task { + cancel(): Promise + process(progress: OnTaskProgress

): Promise> +} + +export interface TaskProgress {} + +export interface TaskState { + status: TaskStatus + result?: Result + progress?: P +} + +export class TaskQueue, R = unknown, P extends TaskProgress = TaskProgress> { + private tasks: T[] = [] + private taskStates = new Map>() + + private concurrent = 3 + private processing = false + private errorListeners: OnError[] = [] + private progressListeners: Array> = [] + private completeListeners: Array> = [] + + private handleError(error: IError) { + for (let index = 0; index < this.errorListeners.length; index++) { + const errorListener = this.errorListeners[index] + errorListener(error) + } + } + + private handleProgress() { + for (let index = 0; index < this.progressListeners.length; index++) { + const progressListener = this.progressListeners[index] + progressListener([...this.taskStates.values()]) + } + } + + private handleComplete() { + for (let index = 0; index < this.completeListeners.length; index++) { + const completeListener = this.completeListeners[index] + completeListener([...this.taskStates.values()]) + } + } + + onProgress(listener: OnProgress) { + this.progressListeners.push(listener) + } + onComplete(listener: OnComplete) { + this.completeListeners.push(listener) + } + onError(listener: OnError) { + this.errorListeners.push(listener) + } + + private async process(task?: T) { + if (this.concurrent === 0) return + if (!this.processing) return + this.processing = true + + if (task != null) { + const taskState = this.taskStates.get(task)! + + const handleProcess = (process: P) => { + taskState.status = 'processing' + taskState.progress = process + this.handleProgress() + } + + const handleResult = async (result: Result) => { + taskState.result = result + // 发生错误立即停止全部任务 + if ('error' in result) { + taskState.status = 'error' + await this.cancel() + this.handleError(result.error) + } + } + + this.concurrent -= 1 + const result = await task.process(handleProcess) + handleResult(result) + this.concurrent += 1 + this.process() + return + } + + // 获取任务并处理 + const taskEntries = [...this.taskStates.entries()] + .filter(([_, state]) => state.status === 'waiting') + + // 根据当前剩余的 concurrent 取出任务并执行 + const pendingTasks = taskEntries.slice(0, this.concurrent) + for (let index = 0; index < pendingTasks.length; index++) { + const [pending] = pendingTasks[index] + this.process(pending) + } + + // 如果任务都已经处理完成,则处理 complete 事件 + const hasComplete = [...this.taskStates.values()] + .every(item => item.status === 'success') + + if (hasComplete) { + this.processing = false + this.handleComplete() + } + } + + enqueue(task: T) { + if (this.taskStates.has(task) || this.tasks.includes(task)) { + throw new Error('Duplicate tasks cannot be added') + } + + this.tasks.push(task) + this.taskStates.set(task, { + status: 'waiting' + }) + } + + start() { + if (!this.processing) { + this.concurrent = 3 + this.process() + } + } + + async cancel() { + this.processing = false + const promises: Array> = [] + + // 取出所有的处理中的任务 + const processingTasks = [...this.taskStates.entries()] + .filter(([_, state]) => state.status === 'processing') + + // 调用 abort 停止任务并观察 promise 状态 + for (let index = 0; index < processingTasks.length; index++) { + const [processingTask] = processingTasks[index] + promises.push(processingTask.cancel()) + } + + await Promise.all(promises) + } +} diff --git a/packages/common/src/types/error.ts b/packages/common/src/types/error.ts new file mode 100644 index 00000000..8b0b7a6d --- /dev/null +++ b/packages/common/src/types/error.ts @@ -0,0 +1 @@ +export interface IError {} diff --git a/packages/common/src/types/file.ts b/packages/common/src/types/file.ts new file mode 100644 index 00000000..0ac41b03 --- /dev/null +++ b/packages/common/src/types/file.ts @@ -0,0 +1,9 @@ +interface IBlob { +} + +interface IFile { + path(): string + name(): string + size(): number + slice(offset: number, size: number): IBlob +} diff --git a/packages/common/src/types/host.ts b/packages/common/src/types/host.ts new file mode 100644 index 00000000..7f811aad --- /dev/null +++ b/packages/common/src/types/host.ts @@ -0,0 +1,5 @@ +import { Result } from './types' + +export interface HostProvider { + getUploadHost(): Promise> +} diff --git a/packages/common/src/types/http.ts b/packages/common/src/types/http.ts new file mode 100644 index 00000000..04c29bb1 --- /dev/null +++ b/packages/common/src/types/http.ts @@ -0,0 +1,30 @@ +import type { Result } from './types' + +export type HttpProgress = { + total: number + loaded: number +} + +export interface HttpAbort { + abort(): void + onAbort(callback: () => void): void +} + +export type HttpRequestBody = unknown +export type HttpProtocol = 'HTTP' | 'HTTPS' +export type HttpHeader = Record +export type OnHttpProgress = (progress: HttpProgress) => void + +export interface HttpClientOptions { + abort?: HttpAbort + onProgress?: OnHttpProgress + headers?: HttpHeader + body?: HttpRequestBody +} + +export interface HttpClient { + get(url: string, options?: HttpClientOptions): Promise> + put(url: string, options?: HttpClientOptions): Promise> + post(url: string, options?: HttpClientOptions): Promise> + delete(url: string, options?: HttpClientOptions): Promise> +} diff --git a/packages/common/src/types/token.ts b/packages/common/src/types/token.ts new file mode 100644 index 00000000..632953a4 --- /dev/null +++ b/packages/common/src/types/token.ts @@ -0,0 +1,21 @@ +import { Result } from './types' + +interface GetUploadTokenParams { + key?: string +} + +export interface TokenProvider { + getUploadToken(params: GetUploadTokenParams): Promise +} + +interface Token { + bucket: string + accessKey: string + signature: string + expiration: number +} + +// 和上面的区别在于内部会将 string 转换成 token 对象便于内部消费 +export interface InnerTokenProvider { + getUploadToken(params: GetUploadTokenParams): Promise> +} diff --git a/packages/common/src/types/types.ts b/packages/common/src/types/types.ts new file mode 100644 index 00000000..49fe7576 --- /dev/null +++ b/packages/common/src/types/types.ts @@ -0,0 +1,8 @@ +import { IError } from './error' + +export type ErrorResult = { error: IError } +export type Result = { result: R } | ErrorResult + +export function isErrorResult(result: Result): result is ErrorResult { + return !!(result && 'error' in result && result.error) +} diff --git a/packages/common/src/upload/common/host/index.ts b/packages/common/src/upload/common/host/index.ts new file mode 100644 index 00000000..9197eeb8 --- /dev/null +++ b/packages/common/src/upload/common/host/index.ts @@ -0,0 +1,146 @@ +import { ConfigApis, UploadApis } from '../../../api' +import { HttpProtocol } from '../../../types/http' +import { Result, isErrorResult } from '../../../types/types' + +/** + * @description 解冻时间,key 是 host,value 为解冻时间 + */ +const unfreezeTimeMap = new Map() + +export class Host { + constructor( + private host: string, + private protocol: HttpProtocol + ) {} + + /** + * @description 当前 host 是否为冻结状态 + */ + isFrozen() { + const currentTime = new Date().getTime() + const unfreezeTime = unfreezeTimeMap.get(this.host) + return unfreezeTime != null && unfreezeTime >= currentTime + } + + /** + * @param {number} time 单位秒,默认 20s + * @description 冻结该 host 对象,该 host 将在指定时间内不可用 + */ + freeze(time = 20) { + const unfreezeTime = new Date().getTime() + (time * 1000) + unfreezeTimeMap.set(this.host, unfreezeTime) + } + + /** + * @description 解冻该 host + */ + unfreeze() { + unfreezeTimeMap.delete(this.host) + } + + /** + * @description 获取当前 host 的完整 url + */ + getUrl() { + return `${this.protocol}://${this.host}` + } + + /** + * @description 获取解冻时间 + */ + getUnfreezeTime() { + return unfreezeTimeMap.get(this.host) + } +} + +interface GetUploadHostParams { + accessKey: string + bucket: string +} + +export class HostProvider { + /** + * @description 缓存的 host 表,以 bucket 和 accessKey 作为 key + */ + private cachedHostsMap = new Map() + + /** + * @param {string[]} initHosts + * @description 如果在构造时传入 initHosts,则该 host 池始终使用传入的 initHosts 做为可用的数据 + */ + constructor( + private serverUrl: string, + private configApis: ConfigApis, + private protocol: HttpProtocol + ) {} + + /** + * @description 注册可用 host + */ + private register(accessKey: string, bucketName: string, hosts: string[]): void { + this.cachedHostsMap.set( + `${accessKey}@${bucketName}`, + hosts.map(host => new Host(host, this.protocol)) + ) + } + + /** + * @description 刷新最新的 host 数据,如果用户在构造时该类时传入了 host 或者已经存在缓存则不会发起请求 + */ + private async refresh(accessKey: string, bucketName: string): Promise> { + const cachedHostList = this.cachedHostsMap.get(`${accessKey}@${bucketName}`) || [] + if (cachedHostList.length > 0) return { result: false } + + const configResult = await this.configApis.getHostConfig({ + accessKey, + bucket: bucketName, + serverUrl: this.serverUrl + }) + + if (isErrorResult(configResult)) { + return configResult + } + + const hostConfigs = configResult.result.hosts + + if (hostConfigs && hostConfigs.length > 0) { + // 取第一个区域也就是当前空间所在区域的上传地址 + // 暂时不用其他区域上传地址是是因为不同区域必须从头上传(第一个分片) + const hostConfig = hostConfigs[0] + this.register(accessKey, bucketName, [ + // 严格依照优先级 + ...hostConfig.up.domains, + ...hostConfig.up.old + ]) + } + + return { result: true } + } + + /** + * @description 获取一个可用的上传 Host,排除已冻结的 + */ + public async getUploadHost(params: GetUploadHostParams): Promise> { + const { accessKey, bucket } = params + + const refreshResult = await this.refresh(accessKey, bucket) + if (isErrorResult(refreshResult)) return refreshResult + + const cachedHostList = this.cachedHostsMap.get(`${accessKey}@${bucket}`) || [] + + if (cachedHostList.length === 0) { + // 没有可用的 host + return { error: 'TODO' } + } + + const availableHostList = cachedHostList.filter(host => !host.isFrozen()) + if (availableHostList.length > 0) return { result: availableHostList[0] } + + // 无可用的,去取离解冻最近的 host + const priorityQueue = cachedHostList + .slice() + .sort((hostA, hostB) => (hostA.getUnfreezeTime() || 0) - (hostB.getUnfreezeTime() || 0)) + + return { result: priorityQueue[0] } + } +} diff --git a/packages/common/src/upload/common/token/index.ts b/packages/common/src/upload/common/token/index.ts new file mode 100644 index 00000000..f791a36d --- /dev/null +++ b/packages/common/src/upload/common/token/index.ts @@ -0,0 +1,14 @@ +import { Result } from '../../../types/types' + +interface Token { + bucket: string + accessKey: string + signature: string + expiration: number +} + +export class TokenProvider implements TokenProvider { + getUploadToken(): Promise> { + return {} as any + } +} diff --git a/packages/common/src/upload/direct/index.ts b/packages/common/src/upload/direct/index.ts new file mode 100644 index 00000000..d72d03df --- /dev/null +++ b/packages/common/src/upload/direct/index.ts @@ -0,0 +1,28 @@ +import { ConfigApis, UploadApis } from '../../api' +import { OnTaskProgress, Task, TaskQueue } from '../../helper/queue' +import { Result } from '../../types/types' +import { HostProvider } from '../common/host' +import { TokenProvider } from '../common/token' + +import { UploadTaskCreator } from '../types' + +class DirectUploadTask implements Task { + cancel(): Promise { + throw new Error('Method not implemented.') + } + process(progress: OnTaskProgress<{}>): Promise> { + throw new Error('Method not implemented.') + } +} + +export const createDirectUploadTask: UploadTaskCreator = (file, config) => { + const tokenProvider = new TokenProvider() + const protocol = config.protocol || 'HTTPS' + const configApis = new ConfigApis(config.httpClient) + const hosProvider = new HostProvider(config.serverUrl, configApis, protocol) + const uploadApis = new UploadApis(config.httpClient, hosProvider, tokenProvider) + + uploadApis.directUpload(file) + + const taskQueue = new TaskQueue() +} diff --git a/packages/common/src/upload/index.tsx b/packages/common/src/upload/index.tsx new file mode 100644 index 00000000..7bc4a2b2 --- /dev/null +++ b/packages/common/src/upload/index.tsx @@ -0,0 +1,2 @@ +export { createDirectUploadTask } from './direct' +export { createMultipartUploadTask } from './multipart' diff --git a/packages/common/src/upload/multipart/index.ts b/packages/common/src/upload/multipart/index.ts new file mode 100644 index 00000000..1a9d6fb6 --- /dev/null +++ b/packages/common/src/upload/multipart/index.ts @@ -0,0 +1,3 @@ +import { UploadTaskCreator } from '../types' + +export const createMultipartUploadTask: UploadTaskCreator = (file, config) => ({} as any) diff --git a/packages/common/src/upload/types.ts b/packages/common/src/upload/types.ts new file mode 100644 index 00000000..d4e7fc13 --- /dev/null +++ b/packages/common/src/upload/types.ts @@ -0,0 +1,30 @@ +import { IError } from '../types/error' +import { HttpClient, HttpProtocol } from '../types/http' + +interface TokenProvider {} + +export interface UploadConfig { + serverUrl: string + httpClient: HttpClient + tokenProvider: TokenProvider + protocol?: HttpProtocol +} + +export interface Progress { + total: number + loaded: number +} + +export type OnError = (error: IError) => void +export type OnProgress = (progress: Progress) => void +export type OnComplete = (result: R) => void + +export interface UploadTask { + onProgress(fn: OnProgress): void + onComplete(fn: OnComplete): void + onError(fn: OnError): void + cancel(): Promise + start(): void +} + +export type UploadTaskCreator = (file: IFile, config: UploadConfig) => UploadTask