Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gabivlj committed Oct 22, 2024
1 parent 56f4d96 commit 9b9391c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 8 deletions.
62 changes: 55 additions & 7 deletions src/registry/r2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from "../chunk";
import { InternalError, ManifestError, RangeError, ServerError } from "../errors";
import { SHA256_PREFIX_LEN, getSHA256, hexToDigest } from "../user";
import { readableToBlob, readerToBlob, wrap } from "../utils";
import { errorString, readableToBlob, readerToBlob, wrap } from "../utils";
import { BlobUnknownError, ManifestUnknownError } from "../v2-errors";
import {
CheckLayerResponse,
Expand Down Expand Up @@ -689,13 +689,61 @@ export class R2Registry implements Registry {
} else {
const upload = this.env.REGISTRY.resumeMultipartUpload(uuid, state.uploadId);
// TODO: Handle one last buffer here
await upload.complete(state.parts);
const obj = await this.env.REGISTRY.get(uuid);
const put = this.env.REGISTRY.put(`${namespace}/blobs/${expectedSha}`, obj!.body, {
sha256: (expectedSha as string).slice(SHA256_PREFIX_LEN),
});
const [, errComplete] = await wrap(upload.complete(state.parts));
if (errComplete !== null) {
console.error("Error completing upload:", errorString(errComplete));
return { response: new InternalError() };
}

const [obj, errGetObject] = await wrap(this.env.REGISTRY.get(uuid));
if (errGetObject !== null) {
console.error("getting object", errorString(errGetObject));
return { response: new InternalError() };
}

const blobPath = `${namespace}/blobs/${expectedSha}`;

const hash = (expectedSha as string).slice(SHA256_PREFIX_LEN);
const limitBytes = 1024 * 1024;
if (!obj) throw new Error("unreachable");

if (obj!.size >= limitBytes) {
const stream = new crypto.DigestStream("SHA-256");
const upload = await this.env.REGISTRY.createMultipartUpload(blobPath);
console.warn("Uploading a layer more than", limitBytes, "bytes, consider reducing layer size");
const parts = [];
for (let i = 0, key = 0; i < obj.size; i += limitBytes, key++) {
console.log("uploading", limitBytes, obj.size);
const [stream, s2] = limit(obj.body, Math.min(limitBytes, obj.size - i)).tee();

s2.pipeTo(stream.getWriter(), { preventClose: i + limitBytes < obj.size });

Check failure on line 719 in src/registry/r2.ts

View workflow job for this annotation

GitHub Actions / Unit Tests (18.x)

Property 'getWriter' does not exist on type 'ReadableStream<any>'.

Check failure on line 719 in src/registry/r2.ts

View workflow job for this annotation

GitHub Actions / Unit Tests (20.x)

Property 'getWriter' does not exist on type 'ReadableStream<any>'.
parts.push(await upload.uploadPart(key + 1, stream));
}

console.log("digesttt");
const digest = await stream.digest;
console.log("uploaded", digest);
const hexString = [...new Uint8Array(digest)].map((b) => b.toString(16).padStart(2, "0")).join("");
if ("sha256:" + hexString != hash) {
return {
//fixme
response: new Response("bad " + hexString, { status: 400 }),
};
}

await upload.complete(parts);
} else {
const put = this.env.REGISTRY.put(blobPath, obj!.body, {
sha256: hash,
});

const [, err] = await wrap(put);
if (err !== null) {
console.error("Error copying blob", errorString(err));
return { response: new InternalError() };
}
}

await put;
await this.env.REGISTRY.delete(uuid);
}

Expand Down
1 change: 1 addition & 0 deletions src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ v2Router.put("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
);

if (err) {
console.error("Error uploading", err);
return new InternalError();
}

Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export function errorString(err: unknown): string {
return "unknown error: " + JSON.stringify(err);
}

export async function wrap<T, E = unknown>(fn: Promise<T>): Promise<[T, null] | [null, E]> {
export async function wrap<T, E = Error>(fn: Promise<T>): Promise<[T, null] | [null, E]> {
return fn.then((data) => [data, null] as [T, null]).catch((err) => [null, err as unknown as E] as [null, E]);
}

Expand Down

0 comments on commit 9b9391c

Please sign in to comment.