From b21c76b4ec4bdb929a22865d15d12c51d5dda0a5 Mon Sep 17 00:00:00 2001 From: Cole MacKenzie Date: Tue, 14 Jan 2025 13:25:15 -0800 Subject: [PATCH 01/13] Refactor pipelines commands into separate files, rename some options **Pipelines is currently in closed beta, renaming these without providing aliases _should_ have no impact** Moving each of the subcommand handlers and options for each subcommand into its own file. This also renames many of the parameters to be more specific. The following parameters have been renamed: | Previous Name | New Name | | ---- | ---- | | access-key-id | r2-access-key-id | | secret-access-key | r2-secret-access-key | | transform | transform-worker | | r2 | r2-bucket | | prefix | r2-prefix | | binding | enable-worker-binding | | http | enable-http | | authentication | require-http-auth | | filename | file-template | | filepath | partition-template | Adds the following new option for `create` and `update` commands: ``` --cors-origins CORS origin allowlist for HTTP endpoint (use * for any origin) [array] ``` Closes https://jira.cfdata.org/browse/PIPE-160. --- .changeset/short-kids-hunt.md | 26 + .../wrangler/src/__tests__/pipelines.test.ts | 62 +-- packages/wrangler/src/pipelines/cli/create.ts | 321 +++++++++++ packages/wrangler/src/pipelines/cli/delete.ts | 38 ++ packages/wrangler/src/pipelines/cli/list.ts | 28 + packages/wrangler/src/pipelines/cli/show.ts | 39 ++ packages/wrangler/src/pipelines/cli/update.ts | 300 +++++++++++ packages/wrangler/src/pipelines/client.ts | 3 + packages/wrangler/src/pipelines/index.ts | 497 +----------------- packages/wrangler/src/pipelines/validate.ts | 36 ++ 10 files changed, 850 insertions(+), 500 deletions(-) create mode 100644 .changeset/short-kids-hunt.md create mode 100644 packages/wrangler/src/pipelines/cli/create.ts create mode 100644 packages/wrangler/src/pipelines/cli/delete.ts create mode 100644 packages/wrangler/src/pipelines/cli/list.ts create mode 100644 packages/wrangler/src/pipelines/cli/show.ts create mode 100644 packages/wrangler/src/pipelines/cli/update.ts create mode 100644 packages/wrangler/src/pipelines/validate.ts diff --git a/.changeset/short-kids-hunt.md b/.changeset/short-kids-hunt.md new file mode 100644 index 000000000000..ae86ae3c0882 --- /dev/null +++ b/.changeset/short-kids-hunt.md @@ -0,0 +1,26 @@ +--- +"wrangler": minor +--- + +Rename wrangler pipelines flags, add `--cors-origins` flag + +The following parameters have been renamed: + +| Previous Name | New Name | +| ----------------- | --------------------- | +| access-key-id | r2-access-key-id | +| secret-access-key | r2-secret-access-key | +| transform | transform-worker | +| r2 | r2-bucket | +| prefix | r2-prefix | +| binding | enable-worker-binding | +| http | enable-http | +| authentication | require-http-auth | +| filename | file-template | +| filepath | partition-template | + +Adds the following new option for `create` and `update` commands: + +``` +--cors-origins CORS origin allowlist for HTTP endpoint (use * for any origin) [array] +``` diff --git a/packages/wrangler/src/__tests__/pipelines.test.ts b/packages/wrangler/src/__tests__/pipelines.test.ts index 335dc28149f3..f1ab7d05c8a9 100644 --- a/packages/wrangler/src/__tests__/pipelines.test.ts +++ b/packages/wrangler/src/__tests__/pipelines.test.ts @@ -283,39 +283,41 @@ describe("pipelines", () => { POSITIONALS pipeline The name of the new pipeline [string] [required] + Source settings: + --enable-worker-binding Send data from a Worker to a Pipeline using a Binding [boolean] [default: true] + --enable-http Generate an endpoint to ingest data via HTTP [boolean] [default: true] + --require-http-auth Require Cloudflare API Token for HTTPS endpoint authentication [boolean] [default: false] + --cors-origins CORS origin allowlist for HTTP endpoint (use * for any origin) [array] + + Batch hints: + --batch-max-mb Maximum batch size in megabytes before flushing [number] + --batch-max-rows Maximum number of rows per batch before flushing [number] + --batch-max-seconds Maximum age of batch in seconds before flushing [number] + + Transformations: + --transform-worker PipelineTransform worker and entrypoint (.) [string] + + Destination settings: + --r2-bucket Destination R2 bucket name [string] [required] + --r2-access-key-id R2 service Access Key ID for authentication. Leave empty for OAuth confirmation. [string] + --r2-secret-access-key R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation. [string] + --r2-prefix Prefix for storing files in the destination bucket [string] [default: \\"\\"] + --compression Compression format for output files [string] [choices: \\"none\\", \\"gzip\\", \\"deflate\\"] [default: \\"gzip\\"] + --file-template Template for individual file names (must include \${slug}) [string] [default: \\"\${slug}\${extension}\\"] + --partition-template Path template for partitioned files in the bucket [string] [default: \\"event_date=\${date}/hr=\${hr}\\"] + GLOBAL FLAGS -c, --config Path to Wrangler configuration file [string] -e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string] -h, --help Show help [boolean] - -v, --version Show version number [boolean] - - OPTIONS - --secret-access-key The R2 service token Access Key to write data [string] - --access-key-id The R2 service token Secret Key to write data [string] - --batch-max-mb The approximate maximum size (in megabytes) for each batch before flushing (range: 1 - 100) [number] - --batch-max-rows The approximate maximum number of rows in a batch before flushing (range: 100 - 1000000) [number] - --batch-max-seconds The approximate maximum age (in seconds) of a batch before flushing (range: 1 - 300) [number] - --transform The worker and entrypoint of the PipelineTransform implementation in the format \\"worker.entrypoint\\" - Default: No transformation worker [string] - --compression Sets the compression format of output files - Default: gzip [string] [choices: \\"none\\", \\"gzip\\", \\"deflate\\"] - --prefix Optional base path to store files in the destination bucket - Default: (none) [string] - --filepath The path to store partitioned files in the destination bucket - Default: event_date=\${date}/hr=\${hr} [string] - --filename The name of each unique file in the bucket. Must contain \\"\${slug}\\". File extension is optional - Default: \${slug}\${extension} [string] - --binding Enable Worker binding to this pipeline [boolean] [default: true] - --http Enable HTTPS endpoint to send data to this pipeline [boolean] [default: true] - --authentication Require authentication (Cloudflare API Token) to send data to the HTTPS endpoint [boolean] [default: false] - --r2 Destination R2 bucket name [string] [required]" + -v, --version Show version number [boolean]" `); }); it("should create a pipeline with explicit credentials", async () => { const requests = mockCreateRequest("my-pipeline"); await runWrangler( - "pipelines create my-pipeline --r2 test-bucket --access-key-id my-key --secret-access-key my-secret" + "pipelines create my-pipeline --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret" ); expect(requests.count).toEqual(1); }); @@ -323,7 +325,7 @@ describe("pipelines", () => { it("should fail a missing bucket", async () => { const requests = mockCreateR2TokenFailure("bad-bucket"); await expect( - runWrangler("pipelines create bad-pipeline --r2 bad-bucket") + runWrangler("pipelines create bad-pipeline --r2-bucket bad-bucket") ).rejects.toThrowError(); await endEventLoop(); @@ -338,7 +340,7 @@ describe("pipelines", () => { it("should create a pipeline with auth", async () => { const requests = mockCreateRequest("my-pipeline"); await runWrangler( - "pipelines create my-pipeline --authentication --r2 test-bucket --access-key-id my-key --secret-access-key my-secret" + "pipelines create my-pipeline --require-http-auth --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret" ); expect(requests.count).toEqual(1); @@ -352,7 +354,7 @@ describe("pipelines", () => { it("should create a pipeline without http", async () => { const requests = mockCreateRequest("my-pipeline"); await runWrangler( - "pipelines create my-pipeline --http=false --r2 test-bucket --access-key-id my-key --secret-access-key my-secret" + "pipelines create my-pipeline --enable-http=false --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret" ); expect(requests.count).toEqual(1); @@ -475,7 +477,7 @@ describe("pipelines", () => { const updateReq = mockUpdateRequest(update.name, update); await runWrangler( - "pipelines update my-pipeline --r2 new-bucket --access-key-id service-token-id --secret-access-key my-secret-access-key" + "pipelines update my-pipeline --r2-bucket new-bucket --r2-access-key-id service-token-id --r2-secret-access-key my-secret-access-key" ); expect(updateReq.count).toEqual(1); @@ -495,7 +497,7 @@ describe("pipelines", () => { const updateReq = mockUpdateRequest(update.name, update); await runWrangler( - "pipelines update my-pipeline --r2 new-bucket --access-key-id new-key --secret-access-key new-secret" + "pipelines update my-pipeline --r2-bucket new-bucket --r2-access-key-id new-key --r2-secret-access-key new-secret" ); expect(updateReq.count).toEqual(1); @@ -516,7 +518,7 @@ describe("pipelines", () => { const updateReq = mockUpdateRequest(update.name, update); await runWrangler( - "pipelines update my-pipeline --binding=false --http --authentication" + "pipelines update my-pipeline --enable-worker-binding=false --enable-http --require-http-auth" ); expect(updateReq.count).toEqual(1); @@ -534,7 +536,7 @@ describe("pipelines", () => { }); await expect( runWrangler( - "pipelines update bad-pipeline --r2 new-bucket --access-key-id new-key --secret-access-key new-secret" + "pipelines update bad-pipeline --r2-bucket new-bucket --r2-access-key-id new-key --r2-secret-access-key new-secret" ) ).rejects.toThrowError(); diff --git a/packages/wrangler/src/pipelines/cli/create.ts b/packages/wrangler/src/pipelines/cli/create.ts new file mode 100644 index 000000000000..05259bfd4b70 --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/create.ts @@ -0,0 +1,321 @@ +import dedent from "ts-dedent"; +import { formatConfigSnippet, readConfig } from "../../config"; +import { FatalError, UserError } from "../../errors"; +import { logger } from "../../logger"; +import * as metrics from "../../metrics"; +import { requireAuth } from "../../user"; +import { getValidBindingName } from "../../utils/getValidBindingName"; +import { printWranglerBanner } from "../../wrangler-banner"; +import { createPipeline } from "../client"; +import { + authorizeR2Bucket, + BYTES_PER_MB, + getAccountR2Endpoint, + parseTransform, +} from "../index"; +import { validateCorsOrigins, validateInRange } from "../validate"; +import type { + CommonYargsOptions, + StrictYargsOptionsToInterface, +} from "../../yargs-types"; +import type { BindingSource, HttpSource, PipelineUserConfig } from "../client"; +import type { Argv } from "yargs"; + +export function addCreateOptions(yargs: Argv) { + return ( + yargs + .positional("pipeline", { + describe: "The name of the new pipeline", + type: "string", + demandOption: true, + }) + // Sources + .group( + [ + "enable-worker-binding", + "enable-http", + "require-http-auth", + "cors-origins", + ], + "Source settings:" + ) + .option("enable-worker-binding", { + type: "boolean", + describe: "Send data from a Worker to a Pipeline using a Binding", + default: true, + demandOption: false, + }) + .option("enable-http", { + type: "boolean", + describe: "Generate an endpoint to ingest data via HTTP", + default: true, + demandOption: false, + }) + .option("require-http-auth", { + type: "boolean", + describe: + "Require Cloudflare API Token for HTTPS endpoint authentication", + default: false, + demandOption: false, + }) + .option("cors-origins", { + type: "array", + describe: + "CORS origin allowlist for HTTP endpoint (use * for any origin)", + demandOption: false, + coerce: validateCorsOrigins, + }) + + // Batching + .group( + ["batch-max-mb", "batch-max-rows", "batch-max-seconds"], + "Batch hints:" + ) + .option("batch-max-mb", { + type: "number", + describe: "Maximum batch size in megabytes before flushing", + demandOption: false, + coerce: validateInRange("batch-max-mb", 1, 100), + }) + .option("batch-max-rows", { + type: "number", + describe: "Maximum number of rows per batch before flushing", + demandOption: false, + coerce: validateInRange("batch-max-rows", 100, 1000000), + }) + .option("batch-max-seconds", { + type: "number", + describe: "Maximum age of batch in seconds before flushing", + demandOption: false, + coerce: validateInRange("batch-max-seconds", 1, 300), + }) + + // Transform options + .group(["transform-worker"], "Transformations:") + .option("transform-worker", { + type: "string", + describe: + "PipelineTransform worker and entrypoint (.)", + demandOption: false, + }) + + // Destination options + .group( + [ + "r2-bucket", + "r2-access-key-id", + "r2-secret-access-key", + "r2-prefix", + "compression", + "file-template", + "partition-template", + ], + "Destination settings:" + ) + .option("r2-bucket", { + type: "string", + describe: "Destination R2 bucket name", + demandOption: true, + }) + .option("r2-access-key-id", { + type: "string", + describe: + "R2 service Access Key ID for authentication. Leave empty for OAuth confirmation.", + demandOption: false, + }) + .option("r2-secret-access-key", { + type: "string", + describe: + "R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation.", + demandOption: false, + }) + // Require these flags to be provided together + .implies("r2-access-key-id", "r2-secret-access-key") + .implies("r2-secret-access-key", "r2-access-key-id") + .check((argv) => { + if ( + (argv["r2-access-key-id"] && !argv["r2-secret-access-key"]) || + (!argv["r2-access-key-id"] && argv["r2-secret-access-key"]) + ) { + throw new Error( + "--r2-access-key-id and --r2-secret-access-key must be provided together" + ); + } + return true; + }) + .option("r2-prefix", { + type: "string", + describe: "Prefix for storing files in the destination bucket", + default: "", + demandOption: false, + }) + .option("compression", { + type: "string", + describe: "Compression format for output files", + choices: ["none", "gzip", "deflate"], + default: "gzip", + demandOption: false, + }) + .option("partition-template", { + type: "string", + describe: "Path template for partitioned files in the bucket", + default: "event_date=${date}/hr=${hr}", + demandOption: false, + }) + .option("file-template", { + type: "string", + describe: "Template for individual file names (must include ${slug})", + default: "${slug}${extension}", + demandOption: false, + coerce: (val: string) => { + if (!val.includes("${slug}")) { + throw new Error("filename must contain ${slug}"); + } + return val; + }, + }) + ); +} + +export async function createPipelineHandler( + args: StrictYargsOptionsToInterface +) { + await printWranglerBanner(); + + const config = readConfig(args); + const bucket = args.r2Bucket; + const name = args.pipeline; + const compression = args.compression; + + const batch = { + max_bytes: args.batchMaxMb + ? args.batchMaxMb * BYTES_PER_MB // convert to bytes for the API + : undefined, + max_duration_s: args.batchMaxSeconds, + max_rows: args.batchMaxRows, + }; + + const accountId = await requireAuth(config); + const pipelineConfig: PipelineUserConfig = { + name: name, + metadata: {}, + source: [], + transforms: [], + destination: { + type: "r2", + format: "json", + compression: { + type: compression, + }, + batch: batch, + path: { + bucket: bucket, + }, + credentials: { + endpoint: getAccountR2Endpoint(accountId), + access_key_id: args.r2AccessKeyId || "", + secret_access_key: args.r2SecretAccessKey || "", + }, + }, + }; + const destination = pipelineConfig.destination; + if ( + !destination.credentials.access_key_id && + !destination.credentials.secret_access_key + ) { + // auto-generate a service token + const auth = await authorizeR2Bucket( + name, + accountId, + pipelineConfig.destination.path.bucket + ); + destination.credentials.access_key_id = auth.accessKeyId; + destination.credentials.secret_access_key = auth.secretAccessKey; + } + + if (!destination.credentials.access_key_id) { + throw new FatalError("Requires a r2 access key id"); + } + + if (!destination.credentials.secret_access_key) { + throw new FatalError("Requires a r2 secret access key"); + } + + // add binding source (default to add) + if (args.enableWorkerBinding) { + pipelineConfig.source.push({ + type: "binding", + format: "json", + } satisfies BindingSource); + } + + // add http source (possibly authenticated), default to add + if (args.enableHttp) { + const source: HttpSource = { + type: "http", + format: "json", + authentication: args.requireHttpAuth, + }; + + if (args.corsOrigins && args.corsOrigins.length > 0) { + source.cors = { origins: args.corsOrigins }; + } + pipelineConfig.source.push(source); + } + + if (pipelineConfig.source.length === 0) { + throw new UserError( + "No sources have been enabled. At least one source (HTTP or Worker Binding) should be enabled" + ); + } + + if (args.transformWorker) { + pipelineConfig.transforms.push(parseTransform(args.transformWorker)); + } + + if (args.r2Prefix) { + pipelineConfig.destination.path.prefix = args.r2Prefix; + } + if (args.partitionTemplate) { + pipelineConfig.destination.path.filepath = args.partitionTemplate; + } + if (args.fileTemplate) { + pipelineConfig.destination.path.filename = args.fileTemplate; + } + + logger.log(`πŸŒ€ Creating pipeline named "${name}"`); + const pipeline = await createPipeline(accountId, pipelineConfig); + metrics.sendMetricsEvent("create pipeline", { + sendMetrics: config.send_metrics, + }); + + logger.log( + `βœ… Successfully created pipeline "${pipeline.name}" with id ${pipeline.id}` + ); + logger.log("πŸŽ‰ You can now send data to your pipeline!"); + if (args.enableWorkerBinding) { + logger.log(dedent` + + To start interacting with this Pipeline from a Worker, open your Worker’s config file and add the following binding configuration: + + ${formatConfigSnippet( + { + pipelines: [ + { + pipeline: pipeline.name, + binding: getValidBindingName("PIPELINE", "PIPELINE"), + }, + ], + }, + config.configPath + )} + `); + } + if (args.enableHttp) { + logger.log(dedent` + Send data to your pipelines HTTP endpoint: + + curl "${pipeline.endpoint}" -d '[{"foo": "bar"}]' + `); + } +} diff --git a/packages/wrangler/src/pipelines/cli/delete.ts b/packages/wrangler/src/pipelines/cli/delete.ts new file mode 100644 index 000000000000..f86cdb08e063 --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/delete.ts @@ -0,0 +1,38 @@ +import { readConfig } from "../../config"; +import { logger } from "../../logger"; +import * as metrics from "../../metrics"; +import { requireAuth } from "../../user"; +import { printWranglerBanner } from "../../wrangler-banner"; +import { deletePipeline } from "../client"; +import { validateName } from "../validate"; +import type { + CommonYargsOptions, + StrictYargsOptionsToInterface, +} from "../../yargs-types"; +import type { Argv } from "yargs"; + +export function addDeleteOptions(yargs: Argv) { + return yargs.positional("pipeline", { + type: "string", + describe: "The name of the pipeline to show", + demandOption: true, + }); +} + +export async function deletePipelineHandler( + args: StrictYargsOptionsToInterface +) { + await printWranglerBanner(); + const config = readConfig(args); + const accountId = await requireAuth(config); + const name = args.pipeline; + + validateName("pipeline name", name); + + logger.log(`Deleting pipeline ${name}.`); + await deletePipeline(accountId, name); + logger.log(`Deleted pipeline ${name}.`); + metrics.sendMetricsEvent("delete pipeline", { + sendMetrics: config.send_metrics, + }); +} diff --git a/packages/wrangler/src/pipelines/cli/list.ts b/packages/wrangler/src/pipelines/cli/list.ts new file mode 100644 index 000000000000..a0bab35a7ac3 --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/list.ts @@ -0,0 +1,28 @@ +import { readConfig } from "../../config"; +import { logger } from "../../logger"; +import * as metrics from "../../metrics"; +import { requireAuth } from "../../user"; +import { listPipelines } from "../client"; +import type { CommonYargsOptions } from "../../yargs-types"; +import type { ArgumentsCamelCase } from "yargs"; + +export async function listPipelinesHandler( + args: ArgumentsCamelCase +) { + const config = readConfig(args); + const accountId = await requireAuth(config); + + // TODO: we should show bindings & transforms if they exist for given ids + const list = await listPipelines(accountId); + metrics.sendMetricsEvent("list pipelines", { + sendMetrics: config.send_metrics, + }); + + logger.table( + list.map((pipeline) => ({ + name: pipeline.name, + id: pipeline.id, + endpoint: pipeline.endpoint, + })) + ); +} diff --git a/packages/wrangler/src/pipelines/cli/show.ts b/packages/wrangler/src/pipelines/cli/show.ts new file mode 100644 index 000000000000..76d72d1f0465 --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/show.ts @@ -0,0 +1,39 @@ +import { readConfig } from "../../config"; +import { logger } from "../../logger"; +import * as metrics from "../../metrics"; +import { requireAuth } from "../../user"; +import { printWranglerBanner } from "../../wrangler-banner"; +import { getPipeline } from "../client"; +import { validateName } from "../validate"; +import type { + CommonYargsOptions, + StrictYargsOptionsToInterface, +} from "../../yargs-types"; +import type { Argv } from "yargs"; + +export function addShowOptions(yargs: Argv) { + return yargs.positional("pipeline", { + type: "string", + describe: "The name of the pipeline to show", + demandOption: true, + }); +} + +export async function showPipelineHandler( + args: StrictYargsOptionsToInterface +) { + await printWranglerBanner(); + const config = readConfig(args); + const accountId = await requireAuth(config); + const name = args.pipeline; + + validateName("pipeline name", name); + + logger.log(`Retrieving config for pipeline "${name}".`); + const pipeline = await getPipeline(accountId, name); + metrics.sendMetricsEvent("show pipeline", { + sendMetrics: config.send_metrics, + }); + + logger.log(JSON.stringify(pipeline, null, 2)); +} diff --git a/packages/wrangler/src/pipelines/cli/update.ts b/packages/wrangler/src/pipelines/cli/update.ts new file mode 100644 index 000000000000..561d2c6e0576 --- /dev/null +++ b/packages/wrangler/src/pipelines/cli/update.ts @@ -0,0 +1,300 @@ +import { readConfig } from "../../config"; +import { FatalError } from "../../errors"; +import { logger } from "../../logger"; +import * as metrics from "../../metrics"; +import { requireAuth } from "../../user"; +import { printWranglerBanner } from "../../wrangler-banner"; +import { getPipeline, updatePipeline } from "../client"; +import { + authorizeR2Bucket, + BYTES_PER_MB, + getAccountR2Endpoint, + parseTransform, +} from "../index"; +import { validateCorsOrigins, validateInRange } from "../validate"; +import type { + CommonYargsOptions, + StrictYargsOptionsToInterface, +} from "../../yargs-types"; +import type { HttpSource, Source } from "../client"; +import type { Argv } from "yargs"; + +/** + * Add all the positional and optional flags for the `wrangler pipelines update` command. + * + * @param yargs + */ +export function addUpdateOptions(yargs: Argv) { + /* These arguments are nearly identical to the option used for creating a pipeline, with some notable differences. + Particularly, not all options are available for updating, and the default values have been removed. In this case, + `undefined` is used to determine if the user provided that flag at all with the intent to change the value. + */ + return ( + yargs + .positional("pipeline", { + describe: "The name of the pipeline to update", + type: "string", + demandOption: true, + }) + .option("r2-bucket", { + type: "string", + describe: "Destination R2 bucket name", + demandOption: false, // Not required for updates. + }) + // Sources + .group( + [ + "enable-worker-binding", + "enable-http", + "require-http-auth", + "cors-origins", + ], + "Source settings:" + ) + .option("enable-worker-binding", { + type: "boolean", + describe: "Send data from a Worker to a Pipeline using a Binding", + demandOption: false, + }) + .option("enable-http", { + type: "boolean", + describe: "Generate an endpoint to ingest data via HTTP", + demandOption: false, + }) + .option("require-http-auth", { + type: "boolean", + describe: + "Require Cloudflare API Token for HTTPS endpoint authentication", + demandOption: false, + }) + .option("cors-origins", { + type: "array", + describe: + "CORS origin allowlist for HTTP endpoint (use * for any origin)", + demandOption: false, + coerce: validateCorsOrigins, + }) + + // Batching + .group( + ["batch-max-mb", "batch-max-rows", "batch-max-seconds"], + "Batch definition:" + ) + .option("batch-max-mb", { + type: "number", + describe: "Maximum batch size in megabytes before flushing", + demandOption: false, + coerce: validateInRange("batch-max-mb", 1, 100), + }) + .option("batch-max-rows", { + type: "number", + describe: "Maximum number of rows per batch before flushing", + demandOption: false, + coerce: validateInRange("batch-max-rows", 100, 1000000), + }) + .option("batch-max-seconds", { + type: "number", + describe: "Maximum age of batch in seconds before flushing", + demandOption: false, + coerce: validateInRange("batch-max-seconds", 1, 300), + }) + + // Transform options + .group(["transform-worker"], "Transformations:") + .option("transform-worker", { + type: "string", + describe: + "PipelineTransform worker and entrypoint (.)", + default: undefined, + demandOption: false, + }) + + // Destination options + .group( + [ + "r2-bucket", + "r2-access-key-id", + "r2-secret-access-key", + "r2-prefix", + "compression", + "file-template", + "partition-template", + ], + "Destination settings:" + ) + .option("r2-access-key-id", { + type: "string", + describe: + "R2 service Access Key ID for authentication. Leave empty for OAuth confirmation.", + demandOption: false, + }) + .option("r2-secret-access-key", { + type: "string", + describe: + "R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation.", + demandOption: false, + }) + // Require these flags to be provided together + .implies("r2-access-key-id", "r2-secret-access-key") + .implies("r2-secret-access-key", "r2-access-key-id") + .check((argv) => { + if ( + (argv["r2-access-key-id"] && !argv["r2-secret-access-key"]) || + (!argv["r2-access-key-id"] && argv["r2-secret-access-key"]) + ) { + throw new Error( + "--r2-access-key-id and --r2-secret-access-key must be provided together" + ); + } + return true; + }) + .option("r2-prefix", { + type: "string", + describe: "Prefix for storing files in the destination bucket", + demandOption: false, + }) + .option("compression", { + type: "string", + describe: "Compression format for output files", + choices: ["none", "gzip", "deflate"], + demandOption: false, + }) + .option("partition-template", { + type: "string", + describe: "Path template for partitioned files in the bucket", + demandOption: false, + }) + .option("file-template", { + type: "string", + describe: "Template for individual file names (must include ${slug})", + demandOption: false, + coerce: (val: string) => { + if (!val.includes("${slug}")) { + throw new Error("filename must contain ${slug}"); + } + return val; + }, + }) + ); +} + +export async function updatePipelineHandler( + args: StrictYargsOptionsToInterface +) { + await printWranglerBanner(); + + const name = args.pipeline; + // only the fields set will be updated - other fields will use the existing config + const config = readConfig(args); + const accountId = await requireAuth(config); + + const pipelineConfig = await getPipeline(accountId, name); + + if (args.compression) { + pipelineConfig.destination.compression.type = args.compression; + } + if (args.batchMaxMb) { + pipelineConfig.destination.batch.max_bytes = args.batchMaxMb * BYTES_PER_MB; // convert to bytes for the API + } + if (args.batchMaxSeconds) { + pipelineConfig.destination.batch.max_duration_s = args.batchMaxSeconds; + } + if (args.batchMaxRows) { + pipelineConfig.destination.batch.max_rows = args.batchMaxRows; + } + + const bucket = args.r2Bucket; + const accessKeyId = args.r2AccessKeyId; + const secretAccessKey = args.r2SecretAccessKey; + if (bucket || accessKeyId || secretAccessKey) { + const destination = pipelineConfig.destination; + if (bucket) { + pipelineConfig.destination.path.bucket = bucket; + } + destination.credentials = { + endpoint: getAccountR2Endpoint(accountId), + access_key_id: accessKeyId || "", + secret_access_key: secretAccessKey || "", + }; + if (!accessKeyId && !secretAccessKey) { + const auth = await authorizeR2Bucket( + name, + accountId, + destination.path.bucket + ); + destination.credentials.access_key_id = auth.accessKeyId; + destination.credentials.secret_access_key = auth.secretAccessKey; + } + if (!destination.credentials.access_key_id) { + throw new FatalError("Requires a r2 access key id"); + } + + if (!destination.credentials.secret_access_key) { + throw new FatalError("Requires a r2 secret access key"); + } + } + + if (args.enableWorkerBinding !== undefined) { + // strip off old source & keep if necessary + const source = pipelineConfig.source.find( + (s: Source) => s.type === "binding" + ); + pipelineConfig.source = pipelineConfig.source.filter( + (s: Source) => s.type !== "binding" + ); + // add back only if specified + if (args.enableWorkerBinding) { + pipelineConfig.source.push({ + ...source, + type: "binding", + format: "json", + }); + } + } + + if (args.enableHttp !== undefined) { + // strip off old source & keep if necessary + const source = pipelineConfig.source.find((s: Source) => s.type === "http"); + pipelineConfig.source = pipelineConfig.source.filter( + (s: Source) => s.type !== "http" + ); + // add back if specified + if (args.enableHttp) { + pipelineConfig.source.push({ + type: "http", + format: "json", + ...source, + authentication: + args.requireHttpAuth !== undefined + ? // if auth specified, use it + args.requireHttpAuth + : // if auth not specified, use previous value or default(false) + source?.authentication, + } satisfies HttpSource); + } + } + + if (args.transformWorker) { + pipelineConfig.transforms.push(parseTransform(args.transformWorker)); + } + + if (args.r2Prefix) { + pipelineConfig.destination.path.prefix = args.r2Prefix; + } + if (args.partitionTemplate) { + pipelineConfig.destination.path.filepath = args.partitionTemplate; + } + if (args.fileTemplate) { + pipelineConfig.destination.path.filename = args.fileTemplate; + } + + logger.log(`πŸŒ€ Updating pipeline "${name}"`); + const pipeline = await updatePipeline(accountId, name, pipelineConfig); + metrics.sendMetricsEvent("update pipeline", { + sendMetrics: config.send_metrics, + }); + + logger.log( + `βœ… Successfully updated pipeline "${pipeline.name}" with ID ${pipeline.id}\n` + ); +} diff --git a/packages/wrangler/src/pipelines/client.ts b/packages/wrangler/src/pipelines/client.ts index 06a7f7e85f3f..ceb5411dcf01 100644 --- a/packages/wrangler/src/pipelines/client.ts +++ b/packages/wrangler/src/pipelines/client.ts @@ -26,6 +26,9 @@ export type HttpSource = { format: string; schema?: string; authentication?: boolean; + cors?: { + origins: ["*"] | string[]; + }; }; export type BindingSource = { type: "binding"; diff --git a/packages/wrangler/src/pipelines/index.ts b/packages/wrangler/src/pipelines/index.ts index 2bfcab088dc8..1398f8723ecf 100644 --- a/packages/wrangler/src/pipelines/index.ts +++ b/packages/wrangler/src/pipelines/index.ts @@ -1,34 +1,22 @@ import { HeadBucketCommand, S3Client } from "@aws-sdk/client-s3"; -import { readConfig } from "../config"; -import { FatalError, UserError } from "../errors"; +import { FatalError } from "../errors"; import { logger } from "../logger"; -import * as metrics from "../metrics"; import { APIError } from "../parse"; -import { requireAuth } from "../user"; import { retryOnAPIFailure } from "../utils/retry"; -import { printWranglerBanner } from "../wrangler-banner"; -import { - createPipeline, - deletePipeline, - generateR2ServiceToken, - getPipeline, - getR2Bucket, - listPipelines, - updatePipeline, -} from "./client"; -import type { CommonYargsArgv, CommonYargsOptions } from "../yargs-types"; -import type { - BindingSource, - HttpSource, - PipelineUserConfig, - Source, -} from "./client"; -import type { Argv } from "yargs"; +import { addCreateOptions, createPipelineHandler } from "./cli/create"; +import { addDeleteOptions, deletePipelineHandler } from "./cli/delete"; +import { listPipelinesHandler } from "./cli/list"; +import { addShowOptions, showPipelineHandler } from "./cli/show"; +import { addUpdateOptions, updatePipelineHandler } from "./cli/update"; +import { generateR2ServiceToken, getR2Bucket } from "./client"; +import type { CommonYargsArgv } from "../yargs-types"; + +export const BYTES_PER_MB = 1000 * 1000; // flag to skip delays for tests let __testSkipDelaysFlag = false; -async function authorizeR2Bucket( +export async function authorizeR2Bucket( pipelineName: string, accountId: string, bucketName: string @@ -65,21 +53,11 @@ async function authorizeR2Bucket( !__testSkipDelaysFlag && (await retryOnAPIFailure( async () => { - try { - await r2.send( - new HeadBucketCommand({ - Bucket: bucketName, - }) - ); - } catch (err) { - if (err instanceof Error && err.name === "401") { - throw new AuthAPIError({ - status: 401, - text: "R2 HeadBucket request failed with status: 401", - }); - } - throw err; - } + await r2.send( + new HeadBucketCommand({ + Bucket: bucketName, + }) + ); }, 1000, 10 @@ -88,29 +66,12 @@ async function authorizeR2Bucket( return serviceToken; } -/** - * AuthAPIError always retries errors so that - * we always retry auth errors while waiting for an - * API token to propegate and start working. - */ -class AuthAPIError extends APIError { - override isRetryable(): boolean { - return true; - } -} - -function getAccountR2Endpoint(accountId: string) { +export function getAccountR2Endpoint(accountId: string) { return `https://${accountId}.r2.cloudflarestorage.com`; } -function validateName(label: string, name: string) { - if (!name.match(/^[a-zA-Z0-9-]+$/)) { - throw new Error(`Must provide a valid ${label}`); - } -} - // Parse out a transform of the form: