Skip to content

Commit

Permalink
Merge pull request #911 from ayeshLK/ext-consolidator-url
Browse files Browse the repository at this point in the history
[KafkaHub] Update KafkaHub deployment configurations
  • Loading branch information
shafreenAnfar authored Jan 2, 2024
2 parents 1b0e8dd + 0f424c9 commit d6b17cb
Show file tree
Hide file tree
Showing 37 changed files with 130 additions and 162 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
password
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
password
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
password
Binary file not shown.
Binary file not shown.
Binary file not shown.
117 changes: 83 additions & 34 deletions examples/kafka-hub/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
version: '2'
name: 'kafkahub'

services:
lb:
image: 'ayeshalmeida/loadbalancersvc:1.0.0'
container_name: lb
ports:
- '9090:9090'
depends_on:
hub-1:
condition: service_healthy
restart: true
hub-2:
condition: service_healthy
restart: true
network_mode: "host"
# lb:
# image: 'ayeshalmeida/loadbalancersvc:1.0.0'
# container_name: lb
# ports:
# - '9090:9090'
# depends_on:
# hub-1:
# condition: service_healthy
# restart: true
# hub-2:
# condition: service_healthy
# restart: true
# networks:
# - hub_network

hub-1:
image: 'ayeshalmeida/kafkahub:7.0.0'
image: 'ayeshalmeida/kafkahub:8.0.0'
hostname: hub1
container_name: hub-1
ports:
- '9000:9000'
depends_on:
consolidator:
condition: service_healthy
Expand All @@ -28,7 +33,11 @@ services:
HUB_PORT: 9000
SERVER_ID: "hub-1"
# Update following enviornment variable to point to the relevant kafka-cluster
KAFKA_BOOTSTRAP_NODE: "localhost:9094"
KAFKA_BOOTSTRAP_NODE: "broker:9094"
# Update following enviornment variable to point to the consolidator state-snapshot endpoint
STATE_SNAPSHOT_ENDPOINT: "http://consolidator:10001"
# Update following enviornment variable to point to the IdP JWKS endpoint
IDP_JWKS_ENDPOINT: "https://idp:9443/oauth2/jwks"
# Consumer group name uses for `websub-events` consumer
WEBSUB_EVENTS_CONSUMER_GROUP: "websub-events-receiver-hub-1"
# Update following enviornment variable to set the client truststore name
Expand All @@ -41,20 +50,28 @@ services:
KEYSTORE_PASSWORD: "password"
volumes:
# Kafka client truststore file
- ./_resources/brokercerts/client-truststore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks
- ./_resources/secrets/kafka-client/kafka-client.trustStore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks
# Kafka client keystore file
- ./_resources/brokercerts/client-keystore.jks:/home/ballerina/resources/brokercerts/client-keystore.jks
- ./_resources/secrets/kafka-client/kafka-client.keystore.jks:/home/ballerina/resources/brokercerts/client-keystore.jks
# IdP client truststore
- ./_resources/secrets/idp/client-truststore.jks:/home/ballerina/resources/client-truststore.jks
# SSL configurations
- ./_resources/secrets/hub/hub-1.keystore.jks:/home/ballerina/resources/hub.keystore.jks
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --no-check-certificate --tries=1 --spider 'https://localhost:9000/health' || exit 1"]
test: ["CMD-SHELL", "wget --no-verbose --no-check-certificate --tries=1 --spider 'https://hub1:9000/health' || exit 1"]
interval: 30s
timeout: 10s
start_period: 30s
retries: 10
network_mode: "host"
networks:
- hub_network

hub-2:
image: 'ayeshalmeida/kafkahub:7.0.0'
image: 'ayeshalmeida/kafkahub:8.0.0'
hostname: hub2
container_name: hub-2
ports:
- '9001:9001'
depends_on:
consolidator:
condition: service_healthy
Expand All @@ -65,7 +82,11 @@ services:
HUB_PORT: 9001
SERVER_ID: "hub-2"
# Update following enviornment variable to point to the relevant kafka-cluster
KAFKA_BOOTSTRAP_NODE: "localhost:9094"
KAFKA_BOOTSTRAP_NODE: "broker:9094"
# Update following enviornment variable to point to the consolidator state-snapshot endpoint
STATE_SNAPSHOT_ENDPOINT: "http://consolidator:10001"
# Update following enviornment variable to point to the IdP JWKS endpoint
IDP_JWKS_ENDPOINT: "https://idp:9443/oauth2/jwks"
# Consumer group name uses for `websub-events` consumer
WEBSUB_EVENTS_CONSUMER_GROUP: "websub-events-receiver-hub-2"
# Update following enviornment variable to set the client truststore name
Expand All @@ -78,25 +99,31 @@ services:
KEYSTORE_PASSWORD: "password"
volumes:
# Kafka client truststore file
- ./_resources/brokercerts/client-truststore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks
- ./_resources/secrets/kafka-client/kafka-client.trustStore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks
# Kafka client keystore file
- ./_resources/brokercerts/client-keystore.jks:/home/ballerina/resources/brokercerts/client-keystore.jks
- ./_resources/secrets/kafka-client/kafka-client.keystore.jks:/home/ballerina/resources/brokercerts/client-keystore.jks # IdP client truststore
# IdP client truststore
- ./_resources/secrets/idp/client-truststore.jks:/home/ballerina/resources/client-truststore.jks
# SSL configurations
- ./_resources/secrets/hub/hub-2.keystore.jks:/home/ballerina/resources/hub.keystore.jks
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --no-check-certificate --tries=1 --spider 'https://localhost:9001/health' || exit 1"]
test: ["CMD-SHELL", "wget --no-verbose --no-check-certificate --tries=1 --spider 'https://hub2:9001/health' || exit 1"]
interval: 30s
timeout: 10s
start_period: 30s
retries: 10
network_mode: "host"
networks:
- hub_network

consolidator:
image: 'ayeshalmeida/consolidator:6.0.0'
hostname: consolidator
container_name: consolidator
ports:
- '10001:10001'
environment:
# Update following enviornment variable to point to the relevant kafka-cluster
KAFKA_BOOTSTRAP_NODE: "localhost:9094"
KAFKA_BOOTSTRAP_NODE: "broker:9094"
# Consumer group name uses for `websub-events` consumer
WEBSUB_EVENTS_CONSUMER_GROUP: "websub-events-receiver-consolidator"
# Consumer group name uses for `websub-events-snapshot` consumer
Expand All @@ -114,23 +141,31 @@ services:
condition: service_started
volumes:
# Kafka client truststore file
- ./_resources/brokercerts/client-truststore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks
- ./_resources/secrets/kafka-client/kafka-client.trustStore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks
# Kafka client keystore file
- ./_resources/brokercerts/client-keystore.jks:/home/ballerina/resources/brokercerts/client-keystore.jks
- ./_resources/secrets/kafka-client/kafka-client.keystore.jks:/home/ballerina/resources/brokercerts/client-keystore.jks
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider 'http://localhost:10001/health/readiness' || exit 1"]
test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider 'http://consolidator:10001/health/readiness' || exit 1"]
interval: 30s
timeout: 10s
start_period: 30s
retries: 10
network_mode: "host"
networks:
- hub_network

idp:
image: 'ayeshalmeida/wso2-is:5.11.0.update'
hostname: idp
container_name: idp
ports:
- '9443:9443'
network_mode: "host"
volumes:
# IdP truststore file
- ./_resources/secrets/idp/client-truststore.jks:/opt/wso2is-5.11.0/repository/resources/security/client-truststore.jks
# IdP keystore file
- ./_resources/secrets/idp/idp.keystore.jks:/opt/wso2is-5.11.0/repository/resources/security/wso2carbon.jks
networks:
- hub_network

zookeeper:
image: 'confluentinc/cp-zookeeper:7.5.0'
Expand All @@ -141,20 +176,23 @@ services:
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- hub_network

broker:
image: 'ayeshalmeida/cp-server:7.5.0'
image: 'confluentinc/cp-kafka:7.5.0'
hostname: broker
container_name: kafka-broker
depends_on:
- zookeeper
ports:
- '9094:9094'
- '9092:9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: PLAINTEXT
KAFKA_NUM_PARTITIONS: 3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,EX_PLAINTEXT://localhost:9092,EX_SSL://localhost:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,EX_PLAINTEXT://localhost:9092,EX_SSL://broker:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EX_PLAINTEXT:PLAINTEXT,EX_SSL:SSL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 3
Expand All @@ -167,3 +205,14 @@ services:
KAFKA_SSL_KEY_CREDENTIALS: broker_sslkey_creds
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_creds
volumes:
- ./_resources/secrets/kafka-broker/broker_keystore_creds:/etc/kafka/secrets/broker_keystore_creds
- ./_resources/secrets/kafka-broker/broker_sslkey_creds:/etc/kafka/secrets/broker_sslkey_creds
- ./_resources/secrets/kafka-broker/broker_truststore_creds:/etc/kafka/secrets/broker_truststore_creds
- ./_resources/secrets/kafka-broker/broker.keystore.jks:/etc/kafka/secrets/kafka.broker.keystore.jks
- ./_resources/secrets/kafka-broker/broker.trustStore.jks:/etc/kafka/secrets/kafka.broker.truststore.jks
networks:
- hub_network

networks:
hub_network:
2 changes: 1 addition & 1 deletion examples/kafka-hub/hub/Cloud.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[container.image]
repository="ballerina"
name="kafkahub"
tag="7.0.0"
tag="8.0.0"

[[container.copy.files]]
sourceFile="./resources"
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-hub/hub/hub_state_update.bal
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kafkaHub.connections as conn;
import ballerinax/kafka;

function initializeHubState() returns error? {
http:Client stateSnapshotClient = check new (config:STATE_SNAPSHOT_ENDPOINT);
http:Client stateSnapshotClient = check new (config:STATE_SNAPSHOT_ENDPOINT_URL);
do {
types:SystemStateSnapshot systemStateSnapshot = check stateSnapshotClient->/consolidator/state\-snapshot;
check processWebsubTopicsSnapshotState(systemStateSnapshot.topics);
Expand Down
2 changes: 2 additions & 0 deletions examples/kafka-hub/hub/modules/config/configurations.bal
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public configurable string WEBSUB_EVENTS_TOPIC = "websub-events";
# Consolidator HTTP endpoint to be used to retrieve current state-snapshot
public configurable string STATE_SNAPSHOT_ENDPOINT = "http://localhost:10001";

public final string STATE_SNAPSHOT_ENDPOINT_URL = os:getEnv("STATE_SNAPSHOT_ENDPOINT") == "" ? STATE_SNAPSHOT_ENDPOINT : os:getEnv("STATE_SNAPSHOT_ENDPOINT");

# The interval in which Kafka consumers wait for new messages
public configurable decimal POLLING_INTERVAL = 10;

Expand Down
9 changes: 7 additions & 2 deletions examples/kafka-hub/hub/modules/security/security.bal
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import ballerina/log;
import ballerina/http;
import kafkaHub.config;
import ballerina/jwt;
import ballerina/os;

final http:ListenerJwtAuthHandler handler = new({
issuer: config:OAUTH2_CONFIG.issuer,
issuer: getIdpUrlConfig("TOKEN_ISSUER", config:OAUTH2_CONFIG.issuer),
audience: config:OAUTH2_CONFIG.audience,
signatureConfig: {
jwksConfig: {
url: config:OAUTH2_CONFIG.jwksUrl,
url: getIdpUrlConfig("IDP_JWKS_ENDPOINT", config:OAUTH2_CONFIG.jwksUrl),
clientConfig: {
secureSocket: {
cert: {
Expand All @@ -38,6 +39,10 @@ final http:ListenerJwtAuthHandler handler = new({
scopeKey: "scope"
});

isolated function getIdpUrlConfig(string envVariableName, string defaultValue) returns string {
return os:getEnv(envVariableName) == "" ? defaultValue : os:getEnv(envVariableName);
}

# Checks for authorization for the current request.
#
# + headers - `http:Headers` for the current request
Expand Down
Binary file modified examples/kafka-hub/hub/resources/client-truststore.jks
Binary file not shown.
Binary file added examples/kafka-hub/hub/resources/hub.keystore.jks
Binary file not shown.
4 changes: 2 additions & 2 deletions examples/kafka-hub/hub/start_hub.bal
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public function main() returns error? {
http:Listener httpListener = check new (hubPort,
secureSocket = {
key: {
certFile: "./resources/server.crt",
keyFile: "./resources/server.key"
path: "./resources/hub.keystore.jks",
password: "password"
}
}
);
Expand Down
10 changes: 8 additions & 2 deletions examples/kafka-hub/load_balancer/resources/lb-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
{
"url": "https://localhost:9000",
"secureSocket": {
"cert": "./resources/server.crt"
"cert": {
"path": "./resources/lb.truststore.jks",
"password": "password"
}
}
},
{
"url": "https://localhost:9001",
"secureSocket": {
"cert": "./resources/server.crt"
"cert": {
"path": "./resources/lb.truststore.jks",
"password": "password"
}
}
}
]
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/kafka-hub/publisher/Cloud.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[container.image]
repository="ballerina"
name="wbsbpublisher"
tag="3.0.0"
tag="4.0.0"

[[container.copy.files]]
sourceFile="./resources"
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-hub/publisher/Config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[oauth2Config]
tokenUrl = "https://localhost:9443/oauth2/token"
tokenUrl = "https://idp:9443/oauth2/token"
clientId= "8EsaVTsN64t4sMDhGvBqJoqMi8Ea"
clientSecret = "QC71AIfbBjhgAibpi0mpfIEK_bMa"
trustStore = "./resources/client-truststore.jks"
Expand Down
8 changes: 6 additions & 2 deletions examples/kafka-hub/publisher/main.bal
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ json DEFAULT_PAYLOAD = {

final string topicName = os:getEnv("TOPIC_NAME") == "" ? "priceUpdate" : os:getEnv("TOPIC_NAME");
final json payload = os:getEnv("PAYLOAD") == "" ? DEFAULT_PAYLOAD : check os:getEnv("PAYLOAD").fromJsonString();
final string hubUrl = os:getEnv("HUB_URL") == "https://lb:9090/hub" ? "priceUpdate" : os:getEnv("HUB_URL");

type OAuth2Config record {|
string tokenUrl;
Expand All @@ -40,7 +41,7 @@ type OAuth2Config record {|
configurable OAuth2Config oauth2Config = ?;

public function main() returns error? {
websubhub:PublisherClient websubHubClientEP = check new ("https://localhost:9090/hub",
websubhub:PublisherClient websubHubClientEP = check new (hubUrl,
auth = {
tokenUrl: oauth2Config.tokenUrl,
clientId: oauth2Config.clientId,
Expand All @@ -56,7 +57,10 @@ public function main() returns error? {
}
},
secureSocket = {
cert: "./resources/server.crt"
cert: {
path: "./resources/publisher.truststore.jks",
password: "password"
}
}
);
websubhub:Acknowledgement response = check websubHubClientEP->publishUpdate(topicName, payload);
Expand Down
Binary file modified examples/kafka-hub/publisher/resources/client-truststore.jks
Binary file not shown.
Binary file not shown.
22 changes: 0 additions & 22 deletions examples/kafka-hub/publisher/resources/server.crt

This file was deleted.

Loading

0 comments on commit d6b17cb

Please sign in to comment.