From b002f7e86f549420638fde57fdf4c1bbf5e9df04 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Mon, 20 Jan 2025 10:15:44 +0100 Subject: [PATCH] feat: add quickstart for collector --- collector/quickstart/collector/config.yaml | 16 ++++ .../collector/resources/dedupe-cache.yaml | 4 + .../quickstart/collector/streams/input.yaml | 75 +++++++++++++++++++ .../quickstart/collector/streams/output.yaml | 16 ++++ collector/quickstart/docker-compose.yaml | 65 ++++++++++++++++ collector/quickstart/seeder/config.yaml | 63 ++++++++++++++++ quickstart/config.yaml | 5 ++ quickstart/docker-compose.yaml | 2 +- 8 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 collector/quickstart/collector/config.yaml create mode 100644 collector/quickstart/collector/resources/dedupe-cache.yaml create mode 100644 collector/quickstart/collector/streams/input.yaml create mode 100644 collector/quickstart/collector/streams/output.yaml create mode 100644 collector/quickstart/docker-compose.yaml create mode 100644 collector/quickstart/seeder/config.yaml diff --git a/collector/quickstart/collector/config.yaml b/collector/quickstart/collector/config.yaml new file mode 100644 index 000000000..991abf745 --- /dev/null +++ b/collector/quickstart/collector/config.yaml @@ -0,0 +1,16 @@ +http: + enabled: true + address: 0.0.0.0:4195 + debug_endpoints: false + +metrics: + prometheus: {} + +#tracer: +# open_telemetry_collector: +# grpc: +# - address: :4317 +# tags: +# application: openmeter-collector +# sampling: +# enabled: false diff --git a/collector/quickstart/collector/resources/dedupe-cache.yaml b/collector/quickstart/collector/resources/dedupe-cache.yaml new file mode 100644 index 000000000..592001a04 --- /dev/null +++ b/collector/quickstart/collector/resources/dedupe-cache.yaml @@ -0,0 +1,4 @@ +cache_resources: + - label: dedupe_cache + memory: + default_ttl: 1h diff --git a/collector/quickstart/collector/streams/input.yaml b/collector/quickstart/collector/streams/input.yaml new file mode 100644 index 000000000..08b011988 --- /dev/null +++ b/collector/quickstart/collector/streams/input.yaml @@ -0,0 +1,75 @@ +input: + http_server: + address: 0.0.0.0:8889 + path: /api/v1/events + sync_response: + status: '${! meta("http_response_status").or("204") }' + +pipeline: + processors: + - switch: + - check: meta("Content-Type").lowercase() == "application/cloudevents-batch+json" + processors: + - unarchive: + format: json_array + - check: meta("Content-Type").lowercase() == "application/cloudevents+json" + processors: + - noop: {} + - check: "" + processors: + - log: + level: ERROR + message: 'Unexpected Content-Type: ${!meta("Content-Type")}' + - mapping: | + meta http_response_status = "400" + + root = { + "type": "about:blank", + "title": "Bad Request", + "status": 400, + "detail":"request body has an error: header Content-Type has unexpected value \"%s\"".format(meta("Content-Type")), + } + - sync_response: {} + - mapping: "root = deleted()" + - dedupe: + cache: "dedupe_cache" + key: '${! content().hash("xxhash64") }' + drop_on_err: false + - json_schema: + schema_path: "file://./cloudevents.spec.json" + - catch: + - log: + level: ERROR + message: "Schema validation failed due to: ${!error()}" + - mapping: | + meta http_response_status = "400" + + root = { + "type": "about:blank", + "title": "Bad Request", + "status": 400, + "detail":"request body has an error: %s".format(error()), + } + - sync_response: {} + - mapping: "root = deleted()" + +output: + switch: + cases: + - check: "" + continue: true + output: + broker: + pattern: fan_out + outputs: + - sync_response: {} + processors: + - mapping: root = null + # https://github.com/benthosdev/benthos/discussions/2324 + # https://github.com/benthosdev/benthos/issues/1946 + - inproc: openmeter + + - check: '"${DEBUG_INPUT:false}" == "true"' + output: + stdout: + codec: lines diff --git a/collector/quickstart/collector/streams/output.yaml b/collector/quickstart/collector/streams/output.yaml new file mode 100644 index 000000000..d5923e0fd --- /dev/null +++ b/collector/quickstart/collector/streams/output.yaml @@ -0,0 +1,16 @@ +input: + inproc: openmeter + +buffer: + sqlite: + path: /var/lib/collector/buffer.sqlite + post_processors: + - split: {} + +output: + openmeter: + url: "${OPENMETER_URL:https://openmeter.cloud}" + token: "${OPENMETER_TOKEN:}" + batching: + count: ${BATCH_SIZE:1} + period: ${BATCH_PERIOD:30s} diff --git a/collector/quickstart/docker-compose.yaml b/collector/quickstart/docker-compose.yaml new file mode 100644 index 000000000..08bb249d2 --- /dev/null +++ b/collector/quickstart/docker-compose.yaml @@ -0,0 +1,65 @@ +include: + - ../../quickstart/docker-compose.yaml + +services: + collector: + image: ghcr.io/openmeterio/benthos-collector:latest + ports: + - "127.0.0.1:4195:4195" + environment: + OPENMETER_URL: http://openmeter:8888 + BATCH_SIZE: 1 + command: [ + "--config", + "/etc/collector/config.yaml", + "--log.level", + "debug", + "--resources", + "/etc/collector/resources/*.yaml", + "streams", + "--no-api", + "/etc/collector/streams/*.yaml", + ] + healthcheck: + test: [ "CMD", "wget", "--spider", "http://collector:4195/ready" ] + interval: 10s + timeout: 5s + retries: 30 + volumes: + - type: volume + source: collector_data + target: /var/lib/collector + - type: bind + source: ./collector + target: /etc/collector + depends_on: + openmeter: + condition: service_healthy + + seeder: + image: ghcr.io/openmeterio/benthos-collector:latest + environment: + OPENMETER_URL: http://collector:8889 + SEEDER_LOG: true + SEEDER_COUNT: 100 + command: [ + "--config", + "/etc/seeder/config.yaml", + ] + ports: + - "127.0.0.1:4196:4196" + healthcheck: + test: [ "CMD", "wget", "--spider", "http://seeder:4196/ready" ] + interval: 10s + timeout: 5s + retries: 30 + volumes: + - type: bind + source: ./seeder + target: /etc/seeder + depends_on: + collector: + condition: service_healthy + +volumes: + collector_data: diff --git a/collector/quickstart/seeder/config.yaml b/collector/quickstart/seeder/config.yaml new file mode 100644 index 000000000..801f97d45 --- /dev/null +++ b/collector/quickstart/seeder/config.yaml @@ -0,0 +1,63 @@ +http: + enabled: true + address: 0.0.0.0:4196 + debug_endpoints: false + +input: + generate: + count: ${SEEDER_COUNT:0} + interval: "${SEEDER_INTERVAL:50ms}" + # batch_size: 1 + mapping: | + let max_subjects = ${SEEDER_MAX_SUBJECTS:10} + + let event_type = "request" + let source = "api-gateway" + let methods = ["GET", "POST"] + let paths = ["/", "/about", "/contact", "/pricing", "/docs"] + let regions = ["us-east-1", "us-west-1", "us-east-2", "us-west-2"] + let zoneSuffixes = ["a", "b", "c", "d"] + + let subject = "customer-%d".format(random_int(seed: timestamp_unix_nano()) % $max_subjects) + let time = (now().ts_sub_iso8601("P3D").ts_unix() + random_int(min: 60, max: 60 * 60 * 24 * 3)).ts_format() + + let method = $methods.index(random_int(seed: timestamp_unix_nano()) % $methods.length()) + let path = $paths.index(random_int(seed: timestamp_unix_nano()) % $paths.length()) + let region = $regions.index(random_int(seed: timestamp_unix_nano()) % $regions.length()) + let zone = "%s%s".format($region, $zoneSuffixes.index(random_int(seed: timestamp_unix_nano()) % $zoneSuffixes.length())) + let duration = random_int(seed: timestamp_unix_nano(), max: 1000) + + root = { + "id": uuid_v4(), + "specversion": "1.0", + "type": $event_type, + "source": $source, + "subject": $subject, + "time": $time, + "data": { + "method": $method, + "path": $path, + "region": $region, + "zone": $zone, + "duration_ms": $duration, + }, + } + +output: + switch: + cases: + - check: "" + continue: true + output: + http_client: + url: ${OPENMETER_URL:http://127.0.0.1:8888}/api/v1/events + verb: POST + headers: + Content-Type: application/cloudevents+json + Authorization: "Bearer ${OPENMETER_TOKEN:}" + max_in_flight: 1 + + - check: '"${SEEDER_LOG:false}" == "true"' + output: + stdout: + codec: lines diff --git a/quickstart/config.yaml b/quickstart/config.yaml index ab2a53a00..95dbd4b01 100644 --- a/quickstart/config.yaml +++ b/quickstart/config.yaml @@ -1,6 +1,9 @@ ingest: kafka: broker: kafka:9092 + brokerAddressFamily: v4 + socketKeepAliveEnable: true + topicMetadataRefreshInterval: 10s aggregation: clickhouse: @@ -13,6 +16,8 @@ sink: kafka: brokers: kafka:9092 brokerAddressFamily: v4 + socketKeepAliveEnable: true + topicMetadataRefreshInterval: 10s dedupe: enabled: true driver: redis diff --git a/quickstart/docker-compose.yaml b/quickstart/docker-compose.yaml index df085f54d..9f3bc6714 100644 --- a/quickstart/docker-compose.yaml +++ b/quickstart/docker-compose.yaml @@ -16,7 +16,7 @@ services: volumes: - ./config.yaml:/etc/openmeter/config.yaml healthcheck: - test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/meters/api_requests_total/query"] + test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/debug/metrics"] interval: 5s timeout: 3s retries: 30