diff --git a/src/registry/r2.ts b/src/registry/r2.ts index ef09b01..63e2f01 100644 --- a/src/registry/r2.ts +++ b/src/registry/r2.ts @@ -11,7 +11,7 @@ import { } from "../chunk"; import { InternalError, ManifestError, RangeError, ServerError } from "../errors"; import { SHA256_PREFIX_LEN, getSHA256, hexToDigest } from "../user"; -import { readableToBlob, readerToBlob, wrap } from "../utils"; +import { errorString, readableToBlob, readerToBlob, wrap } from "../utils"; import { BlobUnknownError, ManifestUnknownError } from "../v2-errors"; import { CheckLayerResponse, @@ -689,13 +689,61 @@ export class R2Registry implements Registry { } else { const upload = this.env.REGISTRY.resumeMultipartUpload(uuid, state.uploadId); // TODO: Handle one last buffer here - await upload.complete(state.parts); - const obj = await this.env.REGISTRY.get(uuid); - const put = this.env.REGISTRY.put(`${namespace}/blobs/${expectedSha}`, obj!.body, { - sha256: (expectedSha as string).slice(SHA256_PREFIX_LEN), - }); + const [, errComplete] = await wrap(upload.complete(state.parts)); + if (errComplete !== null) { + console.error("Error completing upload:", errorString(errComplete)); + return { response: new InternalError() }; + } + + const [obj, errGetObject] = await wrap(this.env.REGISTRY.get(uuid)); + if (errGetObject !== null) { + console.error("getting object", errorString(errGetObject)); + return { response: new InternalError() }; + } + + const blobPath = `${namespace}/blobs/${expectedSha}`; + + const hash = (expectedSha as string).slice(SHA256_PREFIX_LEN); + const limitBytes = 1024 * 1024; + if (!obj) throw new Error("unreachable"); + + if (obj!.size >= limitBytes) { + const stream = new crypto.DigestStream("SHA-256"); + const upload = await this.env.REGISTRY.createMultipartUpload(blobPath); + console.warn("Uploading a layer more than", limitBytes, "bytes, consider reducing layer size"); + const parts = []; + for (let i = 0, key = 0; i < obj.size; i += limitBytes, key++) { + console.log("uploading", limitBytes, obj.size); + const [stream, s2] = limit(obj.body, Math.min(limitBytes, obj.size - i)).tee(); + + s2.pipeTo(stream.getWriter(), { preventClose: i + limitBytes < obj.size }); + parts.push(await upload.uploadPart(key + 1, stream)); + } + + console.log("digesttt"); + const digest = await stream.digest; + console.log("uploaded", digest); + const hexString = [...new Uint8Array(digest)].map((b) => b.toString(16).padStart(2, "0")).join(""); + if ("sha256:" + hexString != hash) { + return { + //fixme + response: new Response("bad " + hexString, { status: 400 }), + }; + } + + await upload.complete(parts); + } else { + const put = this.env.REGISTRY.put(blobPath, obj!.body, { + sha256: hash, + }); + + const [, err] = await wrap(put); + if (err !== null) { + console.error("Error copying blob", errorString(err)); + return { response: new InternalError() }; + } + } - await put; await this.env.REGISTRY.delete(uuid); } diff --git a/src/router.ts b/src/router.ts index 0a92340..3566540 100644 --- a/src/router.ts +++ b/src/router.ts @@ -450,6 +450,7 @@ v2Router.put("/:name+/blobs/uploads/:uuid", async (req, env: Env) => { ); if (err) { + console.error("Error uploading", err); return new InternalError(); } diff --git a/src/utils.ts b/src/utils.ts index 2b3e198..c13c338 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -60,7 +60,7 @@ export function errorString(err: unknown): string { return "unknown error: " + JSON.stringify(err); } -export async function wrap(fn: Promise): Promise<[T, null] | [null, E]> { +export async function wrap(fn: Promise): Promise<[T, null] | [null, E]> { return fn.then((data) => [data, null] as [T, null]).catch((err) => [null, err as unknown as E] as [null, E]); }