diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 2805448..1609a09 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -13,8 +13,8 @@ jobs: runs-on: ubuntu-latest permissions: - contents: read - id-token: write + contents: read + id-token: write steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/releaser.yaml b/.github/workflows/releaser.yaml index 617775d..66bfa86 100644 --- a/.github/workflows/releaser.yaml +++ b/.github/workflows/releaser.yaml @@ -9,9 +9,9 @@ on: - main permissions: - contents: write # Necessary for accessing and modifying repository content - pull-requests: write # Necessary for interacting with pull requests - actions: write # Necessary for triggering other workflows + contents: write # Necessary for accessing and modifying repository content + pull-requests: write # Necessary for interacting with pull requests + actions: write # Necessary for triggering other workflows jobs: tag-discussion: @@ -21,8 +21,8 @@ jobs: - name: Checkout Code uses: actions/checkout@v3 with: - ref: ${{ github.event.pull_request.base.ref }} # Checkout the base branch (target repository) - repository: ${{ github.event.pull_request.base.repo.full_name }} # Checkout from the target repo + ref: ${{ github.event.pull_request.base.ref }} # Checkout the base branch (target repository) + repository: ${{ github.event.pull_request.base.repo.full_name }} # Checkout from the target repo - name: Calculate new versions id: calculate_versions @@ -165,15 +165,15 @@ jobs: git push origin ${{ steps.determine_version.outputs.new_version }} - name: Trigger Release Workflow run: | - curl -X POST \ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - -H "Accept: application/vnd.github.everest-preview+json" \ - https://api.github.com/repos/${{ github.repository }}/actions/workflows/release.yaml/dispatches \ - -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' + curl -X POST \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.everest-preview+json" \ + https://api.github.com/repos/${{ github.repository }}/actions/workflows/release.yaml/dispatches \ + -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' - name: Trigger Publish Workflow run: | - curl -X POST \ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - -H "Accept: application/vnd.github.everest-preview+json" \ - https://api.github.com/repos/${{ github.repository }}/actions/workflows/publish.yaml/dispatches \ - -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' + curl -X POST \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.everest-preview+json" \ + https://api.github.com/repos/${{ github.repository }}/actions/workflows/publish.yaml/dispatches \ + -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' diff --git a/deno.json b/deno.json index df776fb..4c68c7b 100644 --- a/deno.json +++ b/deno.json @@ -31,7 +31,7 @@ }, "tasks": { "check": "deno fmt && deno lint --fix && deno check ./src/actors/mod.ts ./src/actors/hono/middleware.ts", - "test": "rm kv;deno test -A --unstable-kv .", + "test": "rm kv;deno test -A --unstable-kv --env .", "release": "deno run -A jsr:@deco/scripts/release" }, "lock": false, diff --git a/src/actors/runtime.ts b/src/actors/runtime.ts index 374d10f..fbcdfbc 100644 --- a/src/actors/runtime.ts +++ b/src/actors/runtime.ts @@ -4,6 +4,8 @@ import { ActorState } from "./state.ts"; import { DenoKvActorStorage } from "./storage/denoKv.ts"; import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts"; import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts"; +import { S3ActorStorage } from "./storage/s3.ts"; +import type { ActorStorage } from "./storage.ts"; /** * Represents an actor. @@ -81,6 +83,22 @@ export class ActorRuntime { ) { } + getActorStorage(actorId: string, actorName: string): ActorStorage { + const storage = Deno.env.get("DECO_ACTORS_STORAGE"); + + if (storage === "s3") { + return new S3ActorStorage({ + actorId, + actorName, + }); + } + + return new DenoKvActorStorage({ + actorId, + actorName, + }); + } + /** * Ensures that the actors are initialized for the given actor ID. * @param actorId - The ID of the actor. @@ -91,10 +109,7 @@ export class ActorRuntime { } this.actorsConstructors.forEach((Actor) => { const initialization = Promise.withResolvers(); - const storage = new DenoKvActorStorage({ - actorId, - actorName: Actor.name, - }); + const storage = this.getActorStorage(actorId, Actor.name); const state = new ActorState({ initialization, storage, diff --git a/src/actors/storage/s3.ts b/src/actors/storage/s3.ts new file mode 100644 index 0000000..616e0a0 --- /dev/null +++ b/src/actors/storage/s3.ts @@ -0,0 +1,380 @@ +import type { + ActorStorage, + ActorStorageGetOptions, + ActorStorageListOptions, + ActorStoragePutOptions, +} from "../storage.ts"; + +export interface StorageOptions { + actorName: string; + actorId: string; +} + +export class S3ActorStorage implements ActorStorage { + private bucketName: string; + private accessKeyId: string; + private secretAccessKey: string; + private region: string; + + constructor(protected options: StorageOptions) { + this.bucketName = Deno.env.get("DECO_ACTORS_S3_BUCKET_NAME")!; + this.accessKeyId = Deno.env.get("AWS_ACCESS_KEY_ID")!; + this.secretAccessKey = Deno.env.get("AWS_SECRET_ACCESS_KEY")!; + this.region = Deno.env.get("AWS_REGION") ?? "us-east-1"; + } + + // Build the full key based on actor name, id, and provided key + buildKey(key: string[]): string { + return [this.options.actorName, this.options.actorId, ...key].join("/"); + } + + // Overloaded get methods + async get( + key: string, + options?: ActorStorageGetOptions, + ): Promise; + async get( + key: string[], + options?: ActorStorageGetOptions, + ): Promise; + async get( + keys: string[][], + options?: ActorStorageGetOptions, + ): Promise<[string[], T][]>; + async get( + keyOrKeys: string | string[] | string[][], + options?: ActorStorageGetOptions, + ): Promise { + if (Array.isArray(keyOrKeys) && Array.isArray(keyOrKeys[0])) { + // keys: string[][] + const result: [string[], T][] = []; + for (const key of keyOrKeys as string[][]) { + const value = await this.get(key, options); + if (value !== undefined) { + result.push([key, value]); + } + } + return result; + } else { + // key: string | string[] + const keyArray = Array.isArray(keyOrKeys) + ? keyOrKeys as string[] + : [keyOrKeys]; + const key = this.buildKey(keyArray); + const response = await this.getObject(key); + if (response.status === 200) { + const data = await response.text(); + return JSON.parse(data) as T; + } else if (response.status === 404) { + await response.body?.cancel(); + return undefined as T; + } else { + await response.body?.cancel(); + throw new Error(`Failed to get object: ${response.statusText}`); + } + } + } + + // Overloaded put methods + async put( + key: string, + value: T, + options?: ActorStoragePutOptions, + ): Promise; + async put( + key: string[], + value: T, + options?: ActorStoragePutOptions, + ): Promise; + async put( + entries: [string[], T][], + options?: ActorStoragePutOptions, + ): Promise; + async put( + keyOrEntries: string | string[] | [string[], T][], + valueOrOptions?: T | ActorStoragePutOptions, + _options?: ActorStoragePutOptions, + ): Promise { + if (Array.isArray(keyOrEntries) && Array.isArray(keyOrEntries[0])) { + // entries: [string[], T][] + const entries = keyOrEntries as [string[], T][]; + for (const [keyParts, value] of entries) { + const key = this.buildKey(keyParts); + const body = JSON.stringify(value); + const response = await this.putObject(key, body); + if (response.status !== 200) { + await response.body?.cancel(); + throw new Error( + `Failed to put object: ${response.statusText}`, + ); + } + await response.body?.cancel(); + } + } else { + // key: string | string[], value: T + const keyArray = Array.isArray(keyOrEntries) + ? keyOrEntries as string[] + : [keyOrEntries]; + const value = valueOrOptions as T; + const key = this.buildKey(keyArray); + const body = JSON.stringify(value); + const response = await this.putObject(key, body); + if (response.status !== 200) { + await response.body?.cancel(); + throw new Error(`Failed to put object: ${response.statusText}`); + } + await response.body?.cancel(); + } + } + + // Overloaded delete methods + async delete( + key: string[], + options?: ActorStoragePutOptions, + ): Promise; + async delete( + keys: string[][], + options?: ActorStoragePutOptions, + ): Promise; + async delete( + keyOrKeys: string[] | string[][], + _options?: ActorStoragePutOptions, + ): Promise { + if (Array.isArray(keyOrKeys[0])) { + // keys: string[][] + const keys = keyOrKeys as string[][]; + let deletedCount = 0; + for (const keyParts of keys) { + const key = this.buildKey(keyParts); + const response = await this.deleteObject(key); + if (response.status === 204 || response.status === 200) { + deletedCount++; + } + await response.body?.cancel(); + } + return deletedCount; + } else { + // key: string[] + const keyParts = keyOrKeys as string[]; + const key = this.buildKey(keyParts); + const response = await this.deleteObject(key); + await response.body?.cancel(); + return response.status === 204 || response.status === 200; + } + } + + // Implement the deleteAll method + async deleteAll(_options?: ActorStoragePutOptions): Promise { + const prefix = this.buildKey([]); + const objects = await this.listObjects(prefix); + for (const object of objects) { + await this.deleteObject(object.Key); + } + } + + // Implement the list method + async list( + options?: ActorStorageListOptions, + ): Promise<[string[], T][]> { + const prefix = this.buildKey(options?.prefix ?? []); + const objects = await this.listObjects(prefix); + + const result: [string[], T][] = []; + for (const object of objects) { + const key = object.Key; + const keyParts = key.split("/").slice(2); // Remove actorName and actorId + const value = await this.get(keyParts); + if (value !== undefined) { + result.push([keyParts, value]); + } + } + + return result; + } + + // Implement the atomic method + atomic(_storage: (st: ActorStorage) => Promise): Promise { + throw new Error( + "Atomic operations are not supported in S3ActorStorage.", + ); + } + + // Helper method to get an object from S3 + private async getObject(key: string): Promise { + const method = "GET"; + const url = + `https://${this.bucketName}.s3.${this.region}.amazonaws.com/${key}`; + const headers = await this.getSignedHeaders(method, key); + return await fetch(url, { method, headers }); + } + + // Helper method to put an object to S3 + private async putObject(key: string, body: string): Promise { + const method = "PUT"; + const url = + `https://${this.bucketName}.s3.${this.region}.amazonaws.com/${key}`; + const headers = await this.getSignedHeaders(method, key, body); + return await fetch(url, { method, headers, body }); + } + + // Helper method to delete an object from S3 + private async deleteObject(key: string): Promise { + const method = "DELETE"; + const url = + `https://${this.bucketName}.s3.${this.region}.amazonaws.com/${key}`; + const headers = await this.getSignedHeaders(method, key); + return await fetch(url, { method, headers }); + } + + // Helper method to list objects in S3 + private async listObjects(prefix: string): Promise<{ Key: string }[]> { + const method = "GET"; + const url = `https://${this.bucketName}.s3.${this.region}.amazonaws.com/`; + const params = new URLSearchParams({ + "list-type": "2", + "prefix": prefix, + }); + const fullUrl = `${url}?${params.toString()}`; + const headers = await this.getSignedHeaders(method, "", "", params); + const response = await fetch(fullUrl, { method, headers }); + if (response.status !== 200) { + await response.body?.cancel(); + throw new Error(`Failed to list objects: ${response.statusText}`); + } + const text = await response.text(); + const parser = new DOMParser(); + const xmlDoc = parser.parseFromString(text, "application/xml"); + const keys = Array.from(xmlDoc.getElementsByTagName("Contents")) + .map((content) => { + const keyNode = content.getElementsByTagName("Key")[0]; + return { Key: keyNode.textContent! }; + }); + return keys; + } + + // Helper method to generate signed headers for S3 requests + private async getSignedHeaders( + method: string, + key: string, + body: string = "", + params: URLSearchParams = new URLSearchParams(), + ): Promise { + const service = "s3"; + const host = `${this.bucketName}.s3.${this.region}.amazonaws.com`; + const now = new Date(); + const amzDate = now.toISOString().replace(/[:-]|\.\d{3}/g, ""); + const dateStamp = amzDate.slice(0, 8); + + const credentialScope = + `${dateStamp}/${this.region}/${service}/aws4_request`; + const canonicalUri = `/${key}`; + const canonicalQuerystring = params.toString(); + const payloadHash = await this.hash(body); + const canonicalHeaders = `host:${host}\n` + + `x-amz-content-sha256:${payloadHash}\n` + `x-amz-date:${amzDate}\n`; + const signedHeaders = "host;x-amz-content-sha256;x-amz-date"; + + const canonicalRequest = [ + method, + canonicalUri, + canonicalQuerystring, + canonicalHeaders, + signedHeaders, + payloadHash, + ].join("\n"); + + const stringToSign = [ + "AWS4-HMAC-SHA256", + amzDate, + credentialScope, + await this.hash(canonicalRequest), + ].join("\n"); + + const signingKey = await this.getSignatureKey( + this.secretAccessKey, + dateStamp, + this.region, + service, + ); + + const signature = await this.hmac(signingKey, stringToSign); + + const authorizationHeader = [ + `AWS4-HMAC-SHA256 Credential=${this.accessKeyId}/${credentialScope}`, + `SignedHeaders=${signedHeaders}`, + `Signature=${signature}`, + ].join(", "); + + const headers = new Headers(); + headers.set("x-amz-date", amzDate); + headers.set("Authorization", authorizationHeader); + headers.set("x-amz-content-sha256", payloadHash); + headers.set("Host", host); + if (body && method !== "GET" && method !== "DELETE") { + headers.set("Content-Type", "application/octet-stream"); + headers.set("Content-Length", body.length.toString()); + } + return headers; + } + + private async hash(stringToHash: string): Promise { + const encoder = new TextEncoder(); + const data = encoder.encode(stringToHash); + const hashBuffer = await crypto.subtle.digest("SHA-256", data); + return Array.from(new Uint8Array(hashBuffer)) + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); + } + + private async hmac( + key: ArrayBuffer, + data: string, + ): Promise { + const encoder = new TextEncoder(); + const dataBytes = encoder.encode(data); + const signature = await crypto.subtle.sign( + "HMAC", + await crypto.subtle.importKey( + "raw", + key, + { name: "HMAC", hash: "SHA-256" }, + false, + ["sign"], + ), + dataBytes, + ); + return Array.from(new Uint8Array(signature)) + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); + } + + private async getSignatureKey( + key: string, + dateStamp: string, + regionName: string, + serviceName: string, + ): Promise { + const kDate = await this.hmacDigest(`AWS4${key}`, dateStamp); + const kRegion = await this.hmacDigest(kDate, regionName); + const kService = await this.hmacDigest(kRegion, serviceName); + const kSigning = await this.hmacDigest(kService, "aws4_request"); + return kSigning; + } + + private async hmacDigest( + key: string | ArrayBuffer, + data: string, + ): Promise { + const encoder = new TextEncoder(); + const keyBytes = typeof key === "string" ? encoder.encode(key) : key; + const dataBytes = encoder.encode(data); + const cryptoKey = await crypto.subtle.importKey( + "raw", + keyBytes, + { name: "HMAC", hash: "SHA-256" }, + false, + ["sign"], + ); + return await crypto.subtle.sign("HMAC", cryptoKey, dataBytes); + } +}