Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add collector to quickstart #2118

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions collector/quickstart/collector/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
http:
enabled: true
address: 0.0.0.0:4195
debug_endpoints: false

metrics:
prometheus: {}

#tracer:
# open_telemetry_collector:
# grpc:
# - address: <host>:4317
# tags:
# application: openmeter-collector
# sampling:
# enabled: false
4 changes: 4 additions & 0 deletions collector/quickstart/collector/resources/dedupe-cache.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cache_resources:
- label: dedupe_cache
memory:
default_ttl: 1h
75 changes: 75 additions & 0 deletions collector/quickstart/collector/streams/input.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
input:
http_server:
address: 0.0.0.0:8889
path: /api/v1/events
sync_response:
status: '${! meta("http_response_status").or("204") }'

pipeline:
processors:
- switch:
- check: meta("Content-Type").lowercase() == "application/cloudevents-batch+json"
processors:
- unarchive:
format: json_array
- check: meta("Content-Type").lowercase() == "application/cloudevents+json"
processors:
- noop: {}
- check: ""
processors:
- log:
level: ERROR
message: 'Unexpected Content-Type: ${!meta("Content-Type")}'
- mapping: |
meta http_response_status = "400"

root = {
"type": "about:blank",
"title": "Bad Request",
"status": 400,
"detail":"request body has an error: header Content-Type has unexpected value \"%s\"".format(meta("Content-Type")),
}
- sync_response: {}
- mapping: "root = deleted()"
- dedupe:
cache: "dedupe_cache"
key: '${! content().hash("xxhash64") }'
drop_on_err: false
- json_schema:
schema_path: "file://./cloudevents.spec.json"
- catch:
- log:
level: ERROR
message: "Schema validation failed due to: ${!error()}"
- mapping: |
meta http_response_status = "400"

root = {
"type": "about:blank",
"title": "Bad Request",
"status": 400,
"detail":"request body has an error: %s".format(error()),
}
- sync_response: {}
- mapping: "root = deleted()"

output:
switch:
cases:
- check: ""
continue: true
output:
broker:
pattern: fan_out
outputs:
- sync_response: {}
processors:
- mapping: root = null
# https://github.com/benthosdev/benthos/discussions/2324
# https://github.com/benthosdev/benthos/issues/1946
- inproc: openmeter

- check: '"${DEBUG_INPUT:false}" == "true"'
output:
stdout:
codec: lines
16 changes: 16 additions & 0 deletions collector/quickstart/collector/streams/output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
input:
inproc: openmeter

buffer:
sqlite:
path: /var/lib/collector/buffer.sqlite
post_processors:
- split: {}

output:
openmeter:
url: "${OPENMETER_URL:https://openmeter.cloud}"
token: "${OPENMETER_TOKEN:}"
batching:
count: ${BATCH_SIZE:1}
period: ${BATCH_PERIOD:30s}
65 changes: 65 additions & 0 deletions collector/quickstart/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
include:
- ../../quickstart/docker-compose.yaml

services:
collector:
image: ghcr.io/openmeterio/benthos-collector:latest
ports:
- "127.0.0.1:4195:4195"
environment:
OPENMETER_URL: http://openmeter:8888
BATCH_SIZE: 1
command: [
"--config",
"/etc/collector/config.yaml",
"--log.level",
"debug",
"--resources",
"/etc/collector/resources/*.yaml",
"streams",
"--no-api",
"/etc/collector/streams/*.yaml",
]
healthcheck:
test: [ "CMD", "wget", "--spider", "http://collector:4195/ready" ]
interval: 10s
timeout: 5s
retries: 30
volumes:
- type: volume
source: collector_data
target: /var/lib/collector
- type: bind
source: ./collector
target: /etc/collector
depends_on:
openmeter:
condition: service_healthy

seeder:
image: ghcr.io/openmeterio/benthos-collector:latest
environment:
OPENMETER_URL: http://collector:8889
SEEDER_LOG: true
SEEDER_COUNT: 100
command: [
"--config",
"/etc/seeder/config.yaml",
]
ports:
- "127.0.0.1:4196:4196"
healthcheck:
test: [ "CMD", "wget", "--spider", "http://seeder:4196/ready" ]
interval: 10s
timeout: 5s
retries: 30
volumes:
- type: bind
source: ./seeder
target: /etc/seeder
depends_on:
collector:
condition: service_healthy

volumes:
collector_data:
63 changes: 63 additions & 0 deletions collector/quickstart/seeder/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
http:
enabled: true
address: 0.0.0.0:4196
debug_endpoints: false

input:
generate:
count: ${SEEDER_COUNT:0}
interval: "${SEEDER_INTERVAL:50ms}"
# batch_size: 1
mapping: |
let max_subjects = ${SEEDER_MAX_SUBJECTS:10}

let event_type = "request"
let source = "api-gateway"
let methods = ["GET", "POST"]
let paths = ["/", "/about", "/contact", "/pricing", "/docs"]
let regions = ["us-east-1", "us-west-1", "us-east-2", "us-west-2"]
let zoneSuffixes = ["a", "b", "c", "d"]

let subject = "customer-%d".format(random_int(seed: timestamp_unix_nano()) % $max_subjects)
let time = (now().ts_sub_iso8601("P3D").ts_unix() + random_int(min: 60, max: 60 * 60 * 24 * 3)).ts_format()

let method = $methods.index(random_int(seed: timestamp_unix_nano()) % $methods.length())
let path = $paths.index(random_int(seed: timestamp_unix_nano()) % $paths.length())
let region = $regions.index(random_int(seed: timestamp_unix_nano()) % $regions.length())
let zone = "%s%s".format($region, $zoneSuffixes.index(random_int(seed: timestamp_unix_nano()) % $zoneSuffixes.length()))
let duration = random_int(seed: timestamp_unix_nano(), max: 1000)

root = {
"id": uuid_v4(),
"specversion": "1.0",
"type": $event_type,
"source": $source,
"subject": $subject,
"time": $time,
"data": {
"method": $method,
"path": $path,
"region": $region,
"zone": $zone,
"duration_ms": $duration,
},
}

output:
switch:
cases:
- check: ""
continue: true
output:
http_client:
url: ${OPENMETER_URL:http://127.0.0.1:8888}/api/v1/events
verb: POST
headers:
Content-Type: application/cloudevents+json
Authorization: "Bearer ${OPENMETER_TOKEN:}"
max_in_flight: 1

- check: '"${SEEDER_LOG:false}" == "true"'
output:
stdout:
codec: lines
5 changes: 5 additions & 0 deletions quickstart/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
ingest:
kafka:
broker: kafka:9092
brokerAddressFamily: v4
socketKeepAliveEnable: true
topicMetadataRefreshInterval: 10s

aggregation:
clickhouse:
@@ -13,6 +16,8 @@ sink:
kafka:
brokers: kafka:9092
brokerAddressFamily: v4
socketKeepAliveEnable: true
topicMetadataRefreshInterval: 10s
dedupe:
enabled: true
driver: redis
2 changes: 1 addition & 1 deletion quickstart/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ services:
volumes:
- ./config.yaml:/etc/openmeter/config.yaml
healthcheck:
test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/meters/api_requests_total/query"]
test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/debug/metrics"]
interval: 5s
timeout: 3s
retries: 30