From 4bcb4990b363c700e4a03c49bc227d6ffc07acf8 Mon Sep 17 00:00:00 2001
From: Krisztian Gacsal <chrisgacsal@users.noreply.github.com>
Date: Mon, 20 Jan 2025 10:15:44 +0100
Subject: [PATCH] feat: add quickstart for collector

---
 collector/quickstart/collector/config.yaml    |  4 ++
 .../collector/resources/dedupe-cache.yaml     |  4 ++
 .../quickstart/collector/streams/input.yaml   | 71 +++++++++++++++++++
 .../quickstart/collector/streams/output.yaml  | 31 ++++++++
 collector/quickstart/docker-compose.yaml      | 64 +++++++++++++++++
 collector/quickstart/seeder/config.yaml       | 63 ++++++++++++++++
 quickstart/config.yaml                        |  5 ++
 quickstart/docker-compose.yaml                |  2 +-
 8 files changed, 243 insertions(+), 1 deletion(-)
 create mode 100644 collector/quickstart/collector/config.yaml
 create mode 100644 collector/quickstart/collector/resources/dedupe-cache.yaml
 create mode 100644 collector/quickstart/collector/streams/input.yaml
 create mode 100644 collector/quickstart/collector/streams/output.yaml
 create mode 100644 collector/quickstart/docker-compose.yaml
 create mode 100644 collector/quickstart/seeder/config.yaml

diff --git a/collector/quickstart/collector/config.yaml b/collector/quickstart/collector/config.yaml
new file mode 100644
index 000000000..e4f79ab3b
--- /dev/null
+++ b/collector/quickstart/collector/config.yaml
@@ -0,0 +1,4 @@
+http:
+  enabled: true
+  address: 0.0.0.0:4195
+  debug_endpoints: false
diff --git a/collector/quickstart/collector/resources/dedupe-cache.yaml b/collector/quickstart/collector/resources/dedupe-cache.yaml
new file mode 100644
index 000000000..c4a1e505c
--- /dev/null
+++ b/collector/quickstart/collector/resources/dedupe-cache.yaml
@@ -0,0 +1,4 @@
+cache_resources:
+  - label: dedupe_cache
+    memory:
+      default_ttl: 3600s
diff --git a/collector/quickstart/collector/streams/input.yaml b/collector/quickstart/collector/streams/input.yaml
new file mode 100644
index 000000000..c11485a19
--- /dev/null
+++ b/collector/quickstart/collector/streams/input.yaml
@@ -0,0 +1,71 @@
+input:
+  http_server:
+    address: 0.0.0.0:8889
+    path: /api/v1/events
+    sync_response:
+      status: '${! meta("http_response_status").or("204") }'
+
+pipeline:
+  processors:
+    - switch:
+        - check: meta("Content-Type").lowercase() == "application/cloudevents-batch+json"
+          processors:
+            - unarchive:
+                format: json_array
+        - check: meta("Content-Type").lowercase() == "application/cloudevents+json"
+          processors:
+            - noop: {}
+        - check: ""
+          processors:
+            - log:
+                level: ERROR
+                message: 'Unexpected Content-Type: ${!meta("Content-Type")}'
+            - mapping: |
+                meta http_response_status = "400"
+
+                root = {
+                  "type": "about:blank",
+                  "title": "Bad Request",
+                  "status": 400,
+                  "detail":"request body has an error: header Content-Type has unexpected value \"%s\"".format(meta("Content-Type")),
+                }
+            - sync_response: {}
+            - mapping: "root = deleted()"
+    - json_schema:
+        schema_path: "file://./cloudevents.spec.json"
+    - catch:
+        - log:
+            level: ERROR
+            message: "Schema validation failed due to: ${!error()}"
+        - mapping: |
+            meta http_response_status = "400"
+
+            root = {
+              "type": "about:blank",
+              "title": "Bad Request",
+              "status": 400,
+              "detail":"request body has an error: %s".format(error()),
+            }
+        - sync_response: {}
+        - mapping: "root = deleted()"
+
+output:
+  switch:
+    cases:
+      - check: ""
+        continue: true
+        output:
+          broker:
+            pattern: fan_out
+            outputs:
+              - sync_response: {}
+                processors:
+                  - mapping: root = null
+              # https://github.com/benthosdev/benthos/discussions/2324
+              # https://github.com/benthosdev/benthos/issues/1946
+              - inproc: openmeter
+
+      - check: '"${DEBUG_INPUT:false}" == "true"'
+        output:
+          stdout:
+            codec: lines
diff --git a/collector/quickstart/collector/streams/output.yaml b/collector/quickstart/collector/streams/output.yaml
new file mode 100644
index 000000000..492436409
--- /dev/null
+++ b/collector/quickstart/collector/streams/output.yaml
@@ -0,0 +1,31 @@
+input:
+  inproc: openmeter
+
+buffer:
+  sqlite:
+    path: /var/lib/collector/buffer.sqlite
+
+output:
+  retry:
+    max_retries: 0
+    backoff:
+      initial_interval: 500ms
+      max_interval: 3s
+      max_elapsed_time: 0s
+    output:
+      switch:
+        cases:
+          - check: ""
+            continue: true
+            output:
+              openmeter:
+                url: "${OPENMETER_URL:https://openmeter.cloud}"
+                token: "${OPENMETER_TOKEN:}"
+                batching:
+                  count: ${BATCH_SIZE:20}
+                  period: ${BATCH_PERIOD:}
+
+          - check: '"${DEBUG_OUTPUT:false}" == "true"'
+            output:
+              stdout:
+                codec: lines
diff --git a/collector/quickstart/docker-compose.yaml b/collector/quickstart/docker-compose.yaml
new file mode 100644
index 000000000..8ca702445
--- /dev/null
+++ b/collector/quickstart/docker-compose.yaml
@@ -0,0 +1,64 @@
+include:
+  - ../../quickstart/docker-compose.yaml
+
+services:
+  collector:
+    image: ghcr.io/openmeterio/benthos-collector:latest
+    ports:
+      - "127.0.0.1:4195:4195"
+    environment:
+      OPENMETER_URL: http://openmeter:8888
+    command: [
+      "--log.level",
+      "info",
+      "--config",
+      "/etc/collector/config.yaml",
+      "--resources",
+      "/etc/collector/resources/*.yaml",
+      "streams",
+      "--no-api",
+      "/etc/collector/streams/*.yaml",
+    ]
+    healthcheck:
+      test: [ "CMD", "wget", "--spider", "http://collector:4195/ready" ]
+      interval: 10s
+      timeout: 5s
+      retries: 30
+    volumes:
+      - type: volume
+        source: collector_data
+        target: /var/lib/collector
+      - type: bind
+        source: ./collector
+        target: /etc/collector
+    depends_on:
+      openmeter:
+        condition: service_healthy
+
+  seeder:
+    image: ghcr.io/openmeterio/benthos-collector:latest
+    environment:
+      OPENMETER_URL: http://collector:8889
+      SEEDER_LOG: true
+      SEEDER_COUNT: 100
+    command: [
+      "--config",
+      "/etc/seeder/config.yaml",
+    ]
+    ports:
+      - "127.0.0.1:4196:4196"
+    healthcheck:
+      test: [ "CMD", "wget", "--spider", "http://seeder:4196/ready" ]
+      interval: 10s
+      timeout: 5s
+      retries: 30
+    volumes:
+      - type: bind
+        source: ./seeder
+        target: /etc/seeder
+    depends_on:
+      collector:
+        condition: service_healthy
+
+volumes:
+  collector_data:
diff --git a/collector/quickstart/seeder/config.yaml b/collector/quickstart/seeder/config.yaml
new file mode 100644
index 000000000..801f97d45
--- /dev/null
+++ b/collector/quickstart/seeder/config.yaml
@@ -0,0 +1,63 @@
+http:
+  enabled: true
+  address: 0.0.0.0:4196
+  debug_endpoints: false
+
+input:
+  generate:
+    count: ${SEEDER_COUNT:0}
+    interval: "${SEEDER_INTERVAL:50ms}"
+    # batch_size: 1
+    mapping: |
+      let max_subjects = ${SEEDER_MAX_SUBJECTS:10}
+
+      let event_type = "request"
+      let source = "api-gateway"
+      let methods = ["GET", "POST"]
+      let paths = ["/", "/about", "/contact", "/pricing", "/docs"]
+      let regions = ["us-east-1", "us-west-1", "us-east-2", "us-west-2"]
+      let zoneSuffixes = ["a", "b", "c", "d"]
+
+      let subject = "customer-%d".format(random_int(seed: timestamp_unix_nano()) % $max_subjects)
+      let time = (now().ts_sub_iso8601("P3D").ts_unix() + random_int(min: 60, max: 60 * 60 * 24 * 3)).ts_format()
+
+      let method = $methods.index(random_int(seed: timestamp_unix_nano()) % $methods.length())
+      let path = $paths.index(random_int(seed: timestamp_unix_nano()) % $paths.length())
+      let region = $regions.index(random_int(seed: timestamp_unix_nano()) % $regions.length())
+      let zone = "%s%s".format($region, $zoneSuffixes.index(random_int(seed: timestamp_unix_nano()) % $zoneSuffixes.length()))
+      let duration = random_int(seed: timestamp_unix_nano(), max: 1000)
+
+      root = {
+        "id": uuid_v4(),
+        "specversion": "1.0",
+        "type": $event_type,
+        "source": $source,
+        "subject": $subject,
+        "time": $time,
+        "data": {
+          "method": $method,
+          "path": $path,
+          "region": $region,
+          "zone": $zone,
+          "duration_ms": $duration,
+        },
+      }
+
+output:
+  switch:
+    cases:
+      - check: ""
+        continue: true
+        output:
+          http_client:
+            url: ${OPENMETER_URL:http://127.0.0.1:8888}/api/v1/events
+            verb: POST
+            headers:
+              Content-Type: application/cloudevents+json
+              Authorization: "Bearer ${OPENMETER_TOKEN:}"
+            max_in_flight: 1
+
+      - check: '"${SEEDER_LOG:false}" == "true"'
+        output:
+          stdout:
+            codec: lines
diff --git a/quickstart/config.yaml b/quickstart/config.yaml
index ab2a53a00..95dbd4b01 100644
--- a/quickstart/config.yaml
+++ b/quickstart/config.yaml
@@ -1,6 +1,9 @@
 ingest:
   kafka:
     broker: kafka:9092
+    brokerAddressFamily: v4
+    socketKeepAliveEnable: true
+    topicMetadataRefreshInterval: 10s
 
 aggregation:
   clickhouse:
@@ -13,6 +16,8 @@ sink:
   kafka:
     brokers: kafka:9092
     brokerAddressFamily: v4
+    socketKeepAliveEnable: true
+    topicMetadataRefreshInterval: 10s
   dedupe:
     enabled: true
     driver: redis
diff --git a/quickstart/docker-compose.yaml b/quickstart/docker-compose.yaml
index df085f54d..9f3bc6714 100644
--- a/quickstart/docker-compose.yaml
+++ b/quickstart/docker-compose.yaml
@@ -16,7 +16,7 @@ services:
     volumes:
       - ./config.yaml:/etc/openmeter/config.yaml
     healthcheck:
-      test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/meters/api_requests_total/query"]
+      test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/debug/metrics"]
       interval: 5s
       timeout: 3s
       retries: 30