diff --git a/.changeset/plenty-birds-melt.md b/.changeset/plenty-birds-melt.md new file mode 100644 index 000000000000..38de007a71c5 --- /dev/null +++ b/.changeset/plenty-birds-melt.md @@ -0,0 +1,9 @@ +--- +"wrangler": patch +--- + +Adds the following new option for `wrangler pipelines create` and `wrangler pipelines update` commands: + +``` +--cors-origins CORS origin allowlist for HTTP endpoint (use * for any origin) [array] +``` diff --git a/.changeset/short-kids-hunt.md b/.changeset/short-kids-hunt.md new file mode 100644 index 000000000000..e1320505b2e9 --- /dev/null +++ b/.changeset/short-kids-hunt.md @@ -0,0 +1,20 @@ +--- +"wrangler": patch +--- + +Rename wrangler pipelines flags + +The following parameters have been renamed: + +| Previous Name | New Name | +| ----------------- | --------------------- | +| access-key-id | r2-access-key-id | +| secret-access-key | r2-secret-access-key | +| transform | transform-worker | +| r2 | r2-bucket | +| prefix | r2-prefix | +| binding | enable-worker-binding | +| http | enable-http | +| authentication | require-http-auth | +| filename | file-template | +| filepath | partition-template | diff --git a/packages/wrangler/src/__tests__/pipelines.test.ts b/packages/wrangler/src/__tests__/pipelines.test.ts index 335dc28149f3..557653c67ea5 100644 --- a/packages/wrangler/src/__tests__/pipelines.test.ts +++ b/packages/wrangler/src/__tests__/pipelines.test.ts @@ -1,4 +1,5 @@ import { http, HttpResponse } from "msw"; +import { describe, expect, it } from "vitest"; import { normalizeOutput } from "../../e2e/helpers/normalize"; import { __testSkipDelays } from "../pipelines"; import { endEventLoop } from "./helpers/end-event-loop"; @@ -255,11 +256,11 @@ describe("pipelines", () => { "wrangler pipelines COMMANDS - wrangler pipelines create Create a new pipeline - wrangler pipelines list List current pipelines - wrangler pipelines show Show a pipeline configuration - wrangler pipelines update Update a pipeline - wrangler pipelines delete Delete a pipeline + wrangler pipelines create Create a new Pipeline + wrangler pipelines list List current Pipelines + wrangler pipelines show Show a Pipeline configuration + wrangler pipelines update Update a Pipeline + wrangler pipelines delete Delete a Pipeline GLOBAL FLAGS -c, --config Path to Wrangler configuration file [string] @@ -278,52 +279,75 @@ describe("pipelines", () => { expect(std.out).toMatchInlineSnapshot(` "wrangler pipelines create - Create a new pipeline + Create a new Pipeline POSITIONALS pipeline The name of the new pipeline [string] [required] + Source settings + --enable-worker-binding Send data from a Worker to a Pipeline using a Binding [boolean] [default: true] + --enable-http Generate an endpoint to ingest data via HTTP [boolean] [default: true] + --require-http-auth Require Cloudflare API Token for HTTPS endpoint authentication [boolean] [default: false] + --cors-origins CORS origin allowlist for HTTP endpoint (use * for any origin) [array] + + Batch hints + --batch-max-mb Maximum batch size in megabytes before flushing [number] + --batch-max-rows Maximum number of rows per batch before flushing [number] + --batch-max-seconds Maximum age of batch in seconds before flushing [number] + + Transformations + --transform-worker Pipeline transform Worker and entrypoint (.) [string] + + Destination settings + --r2-bucket Destination R2 bucket name [string] [required] + --r2-access-key-id R2 service Access Key ID for authentication. Leave empty for OAuth confirmation. [string] + --r2-secret-access-key R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation. [string] + --r2-prefix Prefix for storing files in the destination bucket [string] [default: \\"\\"] + --compression Compression format for output files [string] [choices: \\"none\\", \\"gzip\\", \\"deflate\\"] [default: \\"gzip\\"] + --file-template Template for individual file names (must include \${slug}) [string] + --partition-template Path template for partitioned files in the bucket. If not specified, the default will be used [string] + GLOBAL FLAGS -c, --config Path to Wrangler configuration file [string] -e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string] -h, --help Show help [boolean] - -v, --version Show version number [boolean] - - OPTIONS - --secret-access-key The R2 service token Access Key to write data [string] - --access-key-id The R2 service token Secret Key to write data [string] - --batch-max-mb The approximate maximum size (in megabytes) for each batch before flushing (range: 1 - 100) [number] - --batch-max-rows The approximate maximum number of rows in a batch before flushing (range: 100 - 1000000) [number] - --batch-max-seconds The approximate maximum age (in seconds) of a batch before flushing (range: 1 - 300) [number] - --transform The worker and entrypoint of the PipelineTransform implementation in the format \\"worker.entrypoint\\" - Default: No transformation worker [string] - --compression Sets the compression format of output files - Default: gzip [string] [choices: \\"none\\", \\"gzip\\", \\"deflate\\"] - --prefix Optional base path to store files in the destination bucket - Default: (none) [string] - --filepath The path to store partitioned files in the destination bucket - Default: event_date=\${date}/hr=\${hr} [string] - --filename The name of each unique file in the bucket. Must contain \\"\${slug}\\". File extension is optional - Default: \${slug}\${extension} [string] - --binding Enable Worker binding to this pipeline [boolean] [default: true] - --http Enable HTTPS endpoint to send data to this pipeline [boolean] [default: true] - --authentication Require authentication (Cloudflare API Token) to send data to the HTTPS endpoint [boolean] [default: false] - --r2 Destination R2 bucket name [string] [required]" + -v, --version Show version number [boolean]" `); }); it("should create a pipeline with explicit credentials", async () => { const requests = mockCreateRequest("my-pipeline"); await runWrangler( - "pipelines create my-pipeline --r2 test-bucket --access-key-id my-key --secret-access-key my-secret" + "pipelines create my-pipeline --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret" ); expect(requests.count).toEqual(1); + expect(std.out).toMatchInlineSnapshot(` + "πŸŒ€ Creating Pipeline named \\"my-pipeline\\" + βœ… Successfully created Pipeline \\"my-pipeline\\" with id 0001 + πŸŽ‰ You can now send data to your Pipeline! + + To start interacting with this Pipeline from a Worker, open your Worker’s config file and add the following binding configuration: + + { + \\"pipelines\\": [ + { + \\"pipeline\\": \\"my-pipeline\\", + \\"binding\\": \\"PIPELINE\\" + } + ] + } + + Send data to your Pipeline's HTTP endpoint: + + curl \\"foo\\" -d '[{\\"foo\\": \\"bar\\"}]' + " + `); }); it("should fail a missing bucket", async () => { const requests = mockCreateR2TokenFailure("bad-bucket"); await expect( - runWrangler("pipelines create bad-pipeline --r2 bad-bucket") + runWrangler("pipelines create bad-pipeline --r2-bucket bad-bucket") ).rejects.toThrowError(); await endEventLoop(); @@ -338,7 +362,7 @@ describe("pipelines", () => { it("should create a pipeline with auth", async () => { const requests = mockCreateRequest("my-pipeline"); await runWrangler( - "pipelines create my-pipeline --authentication --r2 test-bucket --access-key-id my-key --secret-access-key my-secret" + "pipelines create my-pipeline --require-http-auth --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret" ); expect(requests.count).toEqual(1); @@ -352,7 +376,7 @@ describe("pipelines", () => { it("should create a pipeline without http", async () => { const requests = mockCreateRequest("my-pipeline"); await runWrangler( - "pipelines create my-pipeline --http=false --r2 test-bucket --access-key-id my-key --secret-access-key my-secret" + "pipelines create my-pipeline --enable-http=false --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret" ); expect(requests.count).toEqual(1); @@ -390,7 +414,7 @@ describe("pipelines", () => { expect(std.err).toMatchInlineSnapshot(`""`); expect(std.out).toMatchInlineSnapshot(` - "Retrieving config for pipeline \\"foo\\". + "Retrieving config for Pipeline \\"foo\\". { \\"id\\": \\"0001\\", \\"version\\": 1, @@ -438,7 +462,7 @@ describe("pipelines", () => { expect(std.err).toMatchInlineSnapshot(`""`); expect(normalizeOutput(std.out)).toMatchInlineSnapshot(` - "Retrieving config for pipeline \\"bad-pipeline\\". + "Retrieving config for Pipeline \\"bad-pipeline\\". X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/pipelines/bad-pipeline) failed. Pipeline does not exist [code: 1000] If you think this is a bug, please open an issue at: @@ -475,7 +499,7 @@ describe("pipelines", () => { const updateReq = mockUpdateRequest(update.name, update); await runWrangler( - "pipelines update my-pipeline --r2 new-bucket --access-key-id service-token-id --secret-access-key my-secret-access-key" + "pipelines update my-pipeline --r2-bucket new-bucket --r2-access-key-id service-token-id --r2-secret-access-key my-secret-access-key" ); expect(updateReq.count).toEqual(1); @@ -495,7 +519,7 @@ describe("pipelines", () => { const updateReq = mockUpdateRequest(update.name, update); await runWrangler( - "pipelines update my-pipeline --r2 new-bucket --access-key-id new-key --secret-access-key new-secret" + "pipelines update my-pipeline --r2-bucket new-bucket --r2-access-key-id new-key --r2-secret-access-key new-secret" ); expect(updateReq.count).toEqual(1); @@ -516,7 +540,7 @@ describe("pipelines", () => { const updateReq = mockUpdateRequest(update.name, update); await runWrangler( - "pipelines update my-pipeline --binding=false --http --authentication" + "pipelines update my-pipeline --enable-worker-binding=false --enable-http --require-http-auth" ); expect(updateReq.count).toEqual(1); @@ -527,6 +551,32 @@ describe("pipelines", () => { ); }); + it("should update a pipeline cors headers", async () => { + const pipeline: Pipeline = samplePipeline; + mockShowRequest(pipeline.name, pipeline); + + const update = JSON.parse(JSON.stringify(pipeline)); + update.source = [ + { + type: "http", + format: "json", + authenticated: true, + }, + ]; + const updateReq = mockUpdateRequest(update.name, update); + + await runWrangler( + "pipelines update my-pipeline --enable-worker-binding=false --enable-http --cors-origins http://localhost:8787" + ); + + expect(updateReq.count).toEqual(1); + expect(updateReq.body?.source.length).toEqual(1); + expect(updateReq.body?.source[0].type).toEqual("http"); + expect((updateReq.body?.source[0] as HttpSource).cors?.origins).toEqual([ + "http://localhost:8787", + ]); + }); + it("should fail a missing pipeline", async () => { const requests = mockShowRequest("bad-pipeline", null, 404, { code: 1000, @@ -534,7 +584,7 @@ describe("pipelines", () => { }); await expect( runWrangler( - "pipelines update bad-pipeline --r2 new-bucket --access-key-id new-key --secret-access-key new-secret" + "pipelines update bad-pipeline --r2-bucket new-bucket --r2-access-key-id new-key --r2-secret-access-key new-secret" ) ).rejects.toThrowError(); @@ -558,8 +608,8 @@ describe("pipelines", () => { expect(std.err).toMatchInlineSnapshot(`""`); expect(std.out).toMatchInlineSnapshot(` - "Deleting pipeline foo. - Deleted pipeline foo." + "Deleting Pipeline foo. + Deleted Pipeline foo." `); expect(requests.count).toEqual(1); }); @@ -577,7 +627,7 @@ describe("pipelines", () => { expect(std.err).toMatchInlineSnapshot(`""`); expect(normalizeOutput(std.out)).toMatchInlineSnapshot(` - "Deleting pipeline bad-pipeline. + "Deleting Pipeline bad-pipeline. X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/pipelines/bad-pipeline) failed. Pipeline does not exist [code: 1000] If you think this is a bug, please open an issue at: diff --git a/packages/wrangler/src/pipelines/cli/create.ts b/packages/wrangler/src/pipelines/cli/create.ts new file mode 100644 index 000000000000..dbb5e17f31ca --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/create.ts @@ -0,0 +1,313 @@ +import chalk from "chalk"; +import { formatConfigSnippet, readConfig } from "../../config"; +import { FatalError, UserError } from "../../errors"; +import { logger } from "../../logger"; +import { requireAuth } from "../../user"; +import { getValidBindingName } from "../../utils/getValidBindingName"; +import { printWranglerBanner } from "../../wrangler-banner"; +import { createPipeline } from "../client"; +import { + authorizeR2Bucket, + BYTES_PER_MB, + getAccountR2Endpoint, + parseTransform, +} from "../index"; +import { validateCorsOrigins, validateInRange } from "../validate"; +import type { + CommonYargsOptions, + StrictYargsOptionsToInterface, +} from "../../yargs-types"; +import type { BindingSource, HttpSource, PipelineUserConfig } from "../client"; +import type { Argv } from "yargs"; + +export function addCreateOptions(yargs: Argv) { + return ( + yargs + .positional("pipeline", { + describe: "The name of the new pipeline", + type: "string", + demandOption: true, + }) + // Sources + .group( + [ + "enable-worker-binding", + "enable-http", + "require-http-auth", + "cors-origins", + ], + `${chalk.bold("Source settings")}` + ) + .option("enable-worker-binding", { + type: "boolean", + describe: "Send data from a Worker to a Pipeline using a Binding", + default: true, + demandOption: false, + }) + .option("enable-http", { + type: "boolean", + describe: "Generate an endpoint to ingest data via HTTP", + default: true, + demandOption: false, + }) + .option("require-http-auth", { + type: "boolean", + describe: + "Require Cloudflare API Token for HTTPS endpoint authentication", + default: false, + demandOption: false, + }) + .option("cors-origins", { + type: "array", + describe: + "CORS origin allowlist for HTTP endpoint (use * for any origin)", + demandOption: false, + coerce: validateCorsOrigins, + }) + + // Batching + .group( + ["batch-max-mb", "batch-max-rows", "batch-max-seconds"], + `${chalk.bold("Batch hints")}` + ) + .option("batch-max-mb", { + type: "number", + describe: "Maximum batch size in megabytes before flushing", + demandOption: false, + coerce: validateInRange("batch-max-mb", 1, 100), + }) + .option("batch-max-rows", { + type: "number", + describe: "Maximum number of rows per batch before flushing", + demandOption: false, + coerce: validateInRange("batch-max-rows", 100, 1000000), + }) + .option("batch-max-seconds", { + type: "number", + describe: "Maximum age of batch in seconds before flushing", + demandOption: false, + coerce: validateInRange("batch-max-seconds", 1, 300), + }) + + // Transform options + .group(["transform-worker"], `${chalk.bold("Transformations")}`) + .option("transform-worker", { + type: "string", + describe: + "Pipeline transform Worker and entrypoint (.)", + demandOption: false, + }) + + // Destination options + .group( + [ + "r2-bucket", + "r2-access-key-id", + "r2-secret-access-key", + "r2-prefix", + "compression", + "file-template", + "partition-template", + ], + `${chalk.bold("Destination settings")}` + ) + .option("r2-bucket", { + type: "string", + describe: "Destination R2 bucket name", + demandOption: true, + }) + .option("r2-access-key-id", { + type: "string", + describe: + "R2 service Access Key ID for authentication. Leave empty for OAuth confirmation.", + demandOption: false, + }) + .option("r2-secret-access-key", { + type: "string", + describe: + "R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation.", + demandOption: false, + }) + // Require these flags to be provided together + .implies("r2-access-key-id", "r2-secret-access-key") + .implies("r2-secret-access-key", "r2-access-key-id") + .check((argv) => { + if ( + (argv["r2-access-key-id"] && !argv["r2-secret-access-key"]) || + (!argv["r2-access-key-id"] && argv["r2-secret-access-key"]) + ) { + throw new UserError( + "--r2-access-key-id and --r2-secret-access-key must be provided together" + ); + } + return true; + }) + .option("r2-prefix", { + type: "string", + describe: "Prefix for storing files in the destination bucket", + default: "", + demandOption: false, + }) + .option("compression", { + type: "string", + describe: "Compression format for output files", + choices: ["none", "gzip", "deflate"], + default: "gzip", + demandOption: false, + }) + .option("partition-template", { + type: "string", + describe: + "Path template for partitioned files in the bucket. If not specified, the default will be used", + demandOption: false, + }) + .option("file-template", { + type: "string", + describe: "Template for individual file names (must include ${slug})", + demandOption: false, + coerce: (val) => { + if (!val.includes("${slug}")) { + throw new UserError("filename must contain ${slug}"); + } + return val; + }, + }) + ); +} + +export async function createPipelineHandler( + args: StrictYargsOptionsToInterface +) { + await printWranglerBanner(); + + const config = readConfig(args); + const bucket = args.r2Bucket; + const name = args.pipeline; + const compression = args.compression; + + const batch = { + max_bytes: args.batchMaxMb + ? args.batchMaxMb * BYTES_PER_MB // convert to bytes for the API + : undefined, + max_duration_s: args.batchMaxSeconds, + max_rows: args.batchMaxRows, + }; + + const accountId = await requireAuth(config); + const pipelineConfig: PipelineUserConfig = { + name: name, + metadata: {}, + source: [], + transforms: [], + destination: { + type: "r2", + format: "json", + compression: { + type: compression, + }, + batch: batch, + path: { + bucket: bucket, + }, + credentials: { + endpoint: getAccountR2Endpoint(accountId), + access_key_id: args.r2AccessKeyId || "", + secret_access_key: args.r2SecretAccessKey || "", + }, + }, + }; + const destination = pipelineConfig.destination; + if ( + !destination.credentials.access_key_id && + !destination.credentials.secret_access_key + ) { + // auto-generate a service token + const auth = await authorizeR2Bucket( + name, + accountId, + pipelineConfig.destination.path.bucket + ); + destination.credentials.access_key_id = auth.accessKeyId; + destination.credentials.secret_access_key = auth.secretAccessKey; + } + + if (!destination.credentials.access_key_id) { + throw new FatalError("Requires a r2 access key id"); + } + + if (!destination.credentials.secret_access_key) { + throw new FatalError("Requires a r2 secret access key"); + } + + // add binding source (default to add) + if (args.enableWorkerBinding) { + pipelineConfig.source.push({ + type: "binding", + format: "json", + } satisfies BindingSource); + } + + // add http source (possibly authenticated), default to add + if (args.enableHttp) { + const source: HttpSource = { + type: "http", + format: "json", + authentication: args.requireHttpAuth, + }; + + if (args.corsOrigins && args.corsOrigins.length > 0) { + source.cors = { origins: args.corsOrigins }; + } + pipelineConfig.source.push(source); + } + + if (pipelineConfig.source.length === 0) { + throw new UserError( + "No sources have been enabled. At least one source (HTTP or Worker Binding) should be enabled" + ); + } + + if (args.transformWorker) { + pipelineConfig.transforms.push(parseTransform(args.transformWorker)); + } + + if (args.r2Prefix) { + pipelineConfig.destination.path.prefix = args.r2Prefix; + } + if (args.partitionTemplate) { + pipelineConfig.destination.path.filepath = args.partitionTemplate; + } + if (args.fileTemplate) { + pipelineConfig.destination.path.filename = args.fileTemplate; + } + + logger.log(`πŸŒ€ Creating Pipeline named "${name}"`); + const pipeline = await createPipeline(accountId, pipelineConfig); + + logger.log( + `βœ… Successfully created Pipeline "${pipeline.name}" with id ${pipeline.id}` + ); + logger.log("πŸŽ‰ You can now send data to your Pipeline!"); + if (args.enableWorkerBinding) { + logger.log( + `\nTo start interacting with this Pipeline from a Worker, open your Worker’s config file and add the following binding configuration:\n` + ); + logger.log( + formatConfigSnippet( + { + pipelines: [ + { + pipeline: pipeline.name, + binding: getValidBindingName("PIPELINE", "PIPELINE"), + }, + ], + }, + config.configPath + ) + ); + } + if (args.enableHttp) { + logger.log(`\nSend data to your Pipeline's HTTP endpoint:\n`); + logger.log(` curl "${pipeline.endpoint}" -d '[{"foo": "bar"}]'\n`); + } +} diff --git a/packages/wrangler/src/pipelines/cli/delete.ts b/packages/wrangler/src/pipelines/cli/delete.ts new file mode 100644 index 000000000000..e6e186290b90 --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/delete.ts @@ -0,0 +1,35 @@ +import { readConfig } from "../../config"; +import { logger } from "../../logger"; +import { requireAuth } from "../../user"; +import { printWranglerBanner } from "../../wrangler-banner"; +import { deletePipeline } from "../client"; +import { validateName } from "../validate"; +import type { + CommonYargsOptions, + StrictYargsOptionsToInterface, +} from "../../yargs-types"; +import type { Argv } from "yargs"; + +export function addDeleteOptions(yargs: Argv) { + return yargs.positional("pipeline", { + type: "string", + describe: "The name of the Pipeline to show", + demandOption: true, + }); +} + +export async function deletePipelineHandler( + args: StrictYargsOptionsToInterface +) { + await printWranglerBanner(); + const config = readConfig(args); + const accountId = await requireAuth(config); + const name = args.pipeline; + + validateName("pipeline name", name); + + logger.log(`Deleting Pipeline ${name}.`); + await deletePipeline(accountId, name); + + logger.log(`Deleted Pipeline ${name}.`); +} diff --git a/packages/wrangler/src/pipelines/cli/list.ts b/packages/wrangler/src/pipelines/cli/list.ts new file mode 100644 index 000000000000..7e742587e415 --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/list.ts @@ -0,0 +1,26 @@ +import { readConfig } from "../../config"; +import { logger } from "../../logger"; +import { requireAuth } from "../../user"; +import { printWranglerBanner } from "../../wrangler-banner"; +import { listPipelines } from "../client"; +import type { CommonYargsOptions } from "../../yargs-types"; +import type { ArgumentsCamelCase } from "yargs"; + +export async function listPipelinesHandler( + args: ArgumentsCamelCase +) { + await printWranglerBanner(); + const config = readConfig(args); + const accountId = await requireAuth(config); + + // TODO: we should show bindings & transforms if they exist for given ids + const list = await listPipelines(accountId); + + logger.table( + list.map((pipeline) => ({ + name: pipeline.name, + id: pipeline.id, + endpoint: pipeline.endpoint, + })) + ); +} diff --git a/packages/wrangler/src/pipelines/cli/show.ts b/packages/wrangler/src/pipelines/cli/show.ts new file mode 100644 index 000000000000..34478292efc0 --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/show.ts @@ -0,0 +1,35 @@ +import { readConfig } from "../../config"; +import { logger } from "../../logger"; +import { requireAuth } from "../../user"; +import { printWranglerBanner } from "../../wrangler-banner"; +import { getPipeline } from "../client"; +import { validateName } from "../validate"; +import type { + CommonYargsOptions, + StrictYargsOptionsToInterface, +} from "../../yargs-types"; +import type { Argv } from "yargs"; + +export function addShowOptions(yargs: Argv) { + return yargs.positional("pipeline", { + type: "string", + describe: "The name of the Pipeline to show", + demandOption: true, + }); +} + +export async function showPipelineHandler( + args: StrictYargsOptionsToInterface +) { + await printWranglerBanner(); + const config = readConfig(args); + const accountId = await requireAuth(config); + const name = args.pipeline; + + validateName("pipeline name", name); + + logger.log(`Retrieving config for Pipeline "${name}".`); + const pipeline = await getPipeline(accountId, name); + + logger.log(JSON.stringify(pipeline, null, 2)); +} diff --git a/packages/wrangler/src/pipelines/cli/update.ts b/packages/wrangler/src/pipelines/cli/update.ts new file mode 100644 index 000000000000..647b0c8f3b29 --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/update.ts @@ -0,0 +1,304 @@ +import chalk from "chalk"; +import { readConfig } from "../../config"; +import { FatalError } from "../../errors"; +import { logger } from "../../logger"; +import { requireAuth } from "../../user"; +import { printWranglerBanner } from "../../wrangler-banner"; +import { getPipeline, updatePipeline } from "../client"; +import { + authorizeR2Bucket, + BYTES_PER_MB, + getAccountR2Endpoint, + parseTransform, +} from "../index"; +import { validateCorsOrigins, validateInRange } from "../validate"; +import type { + CommonYargsOptions, + StrictYargsOptionsToInterface, +} from "../../yargs-types"; +import type { HttpSource, Source } from "../client"; +import type { Argv } from "yargs"; + +/** + * Add all the positional and optional flags for the `wrangler pipelines update` command. + * + * @param yargs + */ +export function addUpdateOptions(yargs: Argv) { + /* These arguments are nearly identical to the option used for creating a pipeline, with some notable differences. + Particularly, not all options are available for updating, and the default values have been removed. In this case, + `undefined` is used to determine if the user provided that flag at all with the intent to change the value. + */ + return ( + yargs + .positional("pipeline", { + describe: "The name of the Pipeline to update", + type: "string", + demandOption: true, + }) + .option("r2-bucket", { + type: "string", + describe: "Destination R2 bucket name", + demandOption: false, // Not required for updates. + }) + // Sources + .group( + [ + "enable-worker-binding", + "enable-http", + "require-http-auth", + "cors-origins", + ], + `${chalk.bold("Source settings")}` + ) + .option("enable-worker-binding", { + type: "boolean", + describe: "Send data from a Worker to a Pipeline using a Binding", + demandOption: false, + }) + .option("enable-http", { + type: "boolean", + describe: "Generate an endpoint to ingest data via HTTP", + demandOption: false, + }) + .option("require-http-auth", { + type: "boolean", + describe: + "Require Cloudflare API Token for HTTPS endpoint authentication", + demandOption: false, + }) + .option("cors-origins", { + type: "array", + describe: + "CORS origin allowlist for HTTP endpoint (use * for any origin)", + demandOption: false, + coerce: validateCorsOrigins, + }) + + // Batching + .group( + ["batch-max-mb", "batch-max-rows", "batch-max-seconds"], + `${chalk.bold("Batch hints")}` + ) + .option("batch-max-mb", { + type: "number", + describe: "Maximum batch size in megabytes before flushing", + demandOption: false, + coerce: validateInRange("batch-max-mb", 1, 100), + }) + .option("batch-max-rows", { + type: "number", + describe: "Maximum number of rows per batch before flushing", + demandOption: false, + coerce: validateInRange("batch-max-rows", 100, 1000000), + }) + .option("batch-max-seconds", { + type: "number", + describe: "Maximum age of batch in seconds before flushing", + demandOption: false, + coerce: validateInRange("batch-max-seconds", 1, 300), + }) + + // Transform options + .group(["transform-worker"], `${chalk.bold("Transformations")}`) + .option("transform-worker", { + type: "string", + describe: + "Pipeline transform Worker and entrypoint (.)", + demandOption: false, + }) + + // Destination options + .group( + [ + "r2-bucket", + "r2-access-key-id", + "r2-secret-access-key", + "r2-prefix", + "compression", + "file-template", + "partition-template", + ], + `${chalk.bold("Destination settings")}` + ) + .option("r2-access-key-id", { + type: "string", + describe: + "R2 service Access Key ID for authentication. Leave empty for OAuth confirmation.", + demandOption: false, + }) + .option("r2-secret-access-key", { + type: "string", + describe: + "R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation.", + demandOption: false, + }) + // Require these flags to be provided together + .implies("r2-access-key-id", "r2-secret-access-key") + .implies("r2-secret-access-key", "r2-access-key-id") + .check((argv) => { + if ( + (argv["r2-access-key-id"] && !argv["r2-secret-access-key"]) || + (!argv["r2-access-key-id"] && argv["r2-secret-access-key"]) + ) { + throw new Error( + "--r2-access-key-id and --r2-secret-access-key must be provided together" + ); + } + return true; + }) + .option("r2-prefix", { + type: "string", + describe: "Prefix for storing files in the destination bucket", + demandOption: false, + }) + .option("compression", { + type: "string", + describe: "Compression format for output files", + choices: ["none", "gzip", "deflate"], + demandOption: false, + }) + .option("partition-template", { + type: "string", + describe: "Path template for partitioned files in the bucket", + demandOption: false, + }) + .option("file-template", { + type: "string", + describe: "Template for individual file names (must include ${slug})", + demandOption: false, + coerce: (val: string) => { + if (!val.includes("${slug}")) { + throw new Error("filename must contain ${slug}"); + } + return val; + }, + }) + ); +} + +export async function updatePipelineHandler( + args: StrictYargsOptionsToInterface +) { + await printWranglerBanner(); + + const name = args.pipeline; + // only the fields set will be updated - other fields will use the existing config + const config = readConfig(args); + const accountId = await requireAuth(config); + + const pipelineConfig = await getPipeline(accountId, name); + + if (args.compression) { + pipelineConfig.destination.compression.type = args.compression; + } + if (args.batchMaxMb) { + pipelineConfig.destination.batch.max_bytes = args.batchMaxMb * BYTES_PER_MB; // convert to bytes for the API + } + if (args.batchMaxSeconds) { + pipelineConfig.destination.batch.max_duration_s = args.batchMaxSeconds; + } + if (args.batchMaxRows) { + pipelineConfig.destination.batch.max_rows = args.batchMaxRows; + } + + const bucket = args.r2Bucket; + const accessKeyId = args.r2AccessKeyId; + const secretAccessKey = args.r2SecretAccessKey; + if (bucket || accessKeyId || secretAccessKey) { + const destination = pipelineConfig.destination; + if (bucket) { + pipelineConfig.destination.path.bucket = bucket; + } + destination.credentials = { + endpoint: getAccountR2Endpoint(accountId), + access_key_id: accessKeyId || "", + secret_access_key: secretAccessKey || "", + }; + if (!accessKeyId && !secretAccessKey) { + const auth = await authorizeR2Bucket( + name, + accountId, + destination.path.bucket + ); + destination.credentials.access_key_id = auth.accessKeyId; + destination.credentials.secret_access_key = auth.secretAccessKey; + } + if (!destination.credentials.access_key_id) { + throw new FatalError("Requires a r2 access key id"); + } + + if (!destination.credentials.secret_access_key) { + throw new FatalError("Requires a r2 secret access key"); + } + } + + if (args.enableWorkerBinding !== undefined) { + // strip off old source & keep if necessary + const source = pipelineConfig.source.find( + (s: Source) => s.type === "binding" + ); + pipelineConfig.source = pipelineConfig.source.filter( + (s: Source) => s.type !== "binding" + ); + // add back only if specified + if (args.enableWorkerBinding) { + pipelineConfig.source.push({ + ...source, + type: "binding", + format: "json", + }); + } + } + + if (args.enableHttp !== undefined) { + // strip off old source & keep if necessary + const source = pipelineConfig.source.find((s: Source) => s.type === "http"); + pipelineConfig.source = pipelineConfig.source.filter( + (s: Source) => s.type !== "http" + ); + // add back if specified + if (args.enableHttp) { + const update = { + type: "http", + format: "json", + ...source, + } satisfies HttpSource; + + pipelineConfig.source.push(update); + } + } + + const httpSource = pipelineConfig.source.find( + (s: Source) => s.type === "http" + ); + if (httpSource) { + if (args.requireHttpAuth) { + httpSource.authentication = args.requireHttpAuth; + } + if (args.corsOrigins && args.corsOrigins.length > 0) { + httpSource.cors = { origins: args.corsOrigins }; + } + } + + if (args.transformWorker) { + pipelineConfig.transforms.push(parseTransform(args.transformWorker)); + } + + if (args.r2Prefix) { + pipelineConfig.destination.path.prefix = args.r2Prefix; + } + if (args.partitionTemplate) { + pipelineConfig.destination.path.filepath = args.partitionTemplate; + } + if (args.fileTemplate) { + pipelineConfig.destination.path.filename = args.fileTemplate; + } + + logger.log(`πŸŒ€ Updating Pipeline "${name}"`); + const pipeline = await updatePipeline(accountId, name, pipelineConfig); + + logger.log( + `βœ… Successfully updated Pipeline "${pipeline.name}" with ID ${pipeline.id}\n` + ); +} diff --git a/packages/wrangler/src/pipelines/client.ts b/packages/wrangler/src/pipelines/client.ts index 06a7f7e85f3f..0d3413ffb65c 100644 --- a/packages/wrangler/src/pipelines/client.ts +++ b/packages/wrangler/src/pipelines/client.ts @@ -26,6 +26,9 @@ export type HttpSource = { format: string; schema?: string; authentication?: boolean; + cors?: { + origins: ["*"] | string[]; + }; }; export type BindingSource = { type: "binding"; @@ -96,14 +99,7 @@ export function sha256(s: string): string { return createHash("sha256").update(s).digest("hex"); } -export type PermissionGroup = { - id: string; - name: string; - description: string; - scopes: string[]; -}; - -interface S3AccessKey { +export interface S3AccessKey { accessKeyId: string; secretAccessKey: string; } diff --git a/packages/wrangler/src/pipelines/index.ts b/packages/wrangler/src/pipelines/index.ts index 2bfcab088dc8..7070b49c3d0e 100644 --- a/packages/wrangler/src/pipelines/index.ts +++ b/packages/wrangler/src/pipelines/index.ts @@ -1,34 +1,55 @@ +import { setTimeout } from "node:timers/promises"; import { HeadBucketCommand, S3Client } from "@aws-sdk/client-s3"; -import { readConfig } from "../config"; -import { FatalError, UserError } from "../errors"; +import { getCloudflareApiEnvironmentFromEnv } from "../environment-variables/misc-variables"; +import { FatalError } from "../errors"; import { logger } from "../logger"; -import * as metrics from "../metrics"; import { APIError } from "../parse"; -import { requireAuth } from "../user"; -import { retryOnAPIFailure } from "../utils/retry"; -import { printWranglerBanner } from "../wrangler-banner"; -import { - createPipeline, - deletePipeline, - generateR2ServiceToken, - getPipeline, - getR2Bucket, - listPipelines, - updatePipeline, -} from "./client"; -import type { CommonYargsArgv, CommonYargsOptions } from "../yargs-types"; -import type { - BindingSource, - HttpSource, - PipelineUserConfig, - Source, -} from "./client"; -import type { Argv } from "yargs"; +import { addCreateOptions, createPipelineHandler } from "./cli/create"; +import { addDeleteOptions, deletePipelineHandler } from "./cli/delete"; +import { listPipelinesHandler } from "./cli/list"; +import { addShowOptions, showPipelineHandler } from "./cli/show"; +import { addUpdateOptions, updatePipelineHandler } from "./cli/update"; +import { generateR2ServiceToken, getR2Bucket } from "./client"; +import type { CommonYargsArgv } from "../yargs-types"; + +export const BYTES_PER_MB = 1000 * 1000; // flag to skip delays for tests let __testSkipDelaysFlag = false; -async function authorizeR2Bucket( +/** + * Verify the credentials used by the S3Client can access a R2 bucket by performing the + * HeadBucket operation. It will retry up to 10 times over 10s to handle newly + * created credentials that might not be active yet (can take a few seconds to propagate). + * + * @param r2 + * @param bucketName + */ +async function verifyBucketAccess(r2: S3Client, bucketName: string) { + const MAX_ATTEMPTS = 10; + const DELAY_MS = 1000; + + const checkCredentials = async () => { + logger.debug(`Checking if credentials are active`); + await r2.send(new HeadBucketCommand({ Bucket: bucketName })); + }; + + for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) { + try { + logger.debug(`Attempt ${attempt} of ${MAX_ATTEMPTS}`); + await checkCredentials(); + return; + } catch (error) { + logger.debug("HeadBucket request failed", error); + if (attempt === MAX_ATTEMPTS) { + throw error; + } + await setTimeout(DELAY_MS); + } + } +} + +export async function authorizeR2Bucket( pipelineName: string, accountId: string, bucketName: string @@ -52,65 +73,39 @@ async function authorizeR2Bucket( pipelineName ); + // return immediately if running in a test + if (__testSkipDelaysFlag) { + return serviceToken; + } + + const endpoint = getAccountR2Endpoint(accountId); + logger.debug(`Using R2 Endpoint ${endpoint}`); const r2 = new S3Client({ region: "auto", credentials: { accessKeyId: serviceToken.accessKeyId, secretAccessKey: serviceToken.secretAccessKey, }, - endpoint: getAccountR2Endpoint(accountId), + endpoint, }); - // Wait for token to settle/propagate, retry up to 10 times, with 1s waits in-between errors - !__testSkipDelaysFlag && - (await retryOnAPIFailure( - async () => { - try { - await r2.send( - new HeadBucketCommand({ - Bucket: bucketName, - }) - ); - } catch (err) { - if (err instanceof Error && err.name === "401") { - throw new AuthAPIError({ - status: 401, - text: "R2 HeadBucket request failed with status: 401", - }); - } - throw err; - } - }, - 1000, - 10 - )); + // Wait for token to settle/propagate, retry up to 10 times, with 2s waits in-between errors + logger.log(`πŸŒ€ Checking access to R2 bucket "${bucketName}"`); + await verifyBucketAccess(r2, bucketName); return serviceToken; } -/** - * AuthAPIError always retries errors so that - * we always retry auth errors while waiting for an - * API token to propegate and start working. - */ -class AuthAPIError extends APIError { - override isRetryable(): boolean { - return true; +export function getAccountR2Endpoint(accountId: string) { + const env = getCloudflareApiEnvironmentFromEnv(); + if (env === "staging") { + return `https://${accountId}.r2-staging.cloudflarestorage.com`; } -} - -function getAccountR2Endpoint(accountId: string) { return `https://${accountId}.r2.cloudflarestorage.com`; } -function validateName(label: string, name: string) { - if (!name.match(/^[a-zA-Z0-9-]+$/)) { - throw new Error(`Must provide a valid ${label}`); - } -} - // Parse out a transform of the form: