Skip to content

Commit

Permalink
Merge pull request #81 from mosip/kafkahub
Browse files Browse the repository at this point in the history
Kafkahub
  • Loading branch information
urviljoshi authored Aug 31, 2021
2 parents cc94503 + 2e59acf commit e7926de
Show file tree
Hide file tree
Showing 85 changed files with 1,371 additions and 3,128 deletions.
40 changes: 28 additions & 12 deletions .github/workflows/push_trigger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
- develop
- 1.*
- websub_scalling
- kafkahub
workflow_dispatch:

jobs:
Expand All @@ -14,6 +15,7 @@ jobs:
env:
NAMESPACE: ${{ secrets. dev_namespace_docker_hub }}
SERVICE_NAME: websub-service
CONSOLIDATOR_SERVICE_NAME: consolidator-websub-service

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
Expand All @@ -32,28 +34,42 @@ jobs:
- name: Install ballerina
run: |
wget -q --show-progress https://dist.ballerina.io/downloads/1.2.13/ballerina-linux-installer-x64-1.2.13.deb
sudo dpkg -i ballerina-linux-installer-x64-1.2.13.deb
- name: Ballerina Build
wget -q --show-progress https://dist.ballerina.io/downloads/swan-lake-beta2/ballerina-linux-installer-x64-swan-lake-beta2.deb -O ballerina-linux-installer-x64.deb
sudo dpkg -i ballerina-linux-installer-x64.deb
- name: Hub-service build
run: |
bal build ./hub
- name: Consolidator service build
run: |
ballerina build --all
bal build ./consolidator
- name: Build image
- name: Build hub image
run: |
docker build . --file Dockerfile --tag ${{ env.SERVICE_NAME }}
docker build ./hub --file ./hub/Dockerfile --tag ${{ env.SERVICE_NAME }}
- name: Build consolidator image
run: |
docker build ./consolidator --file ./consolidator/Dockerfile --tag ${{ env.CONSOLIDATOR_SERVICE_NAME }}
- name: Log into registry
run: echo "${{ secrets.release_docker_hub }}" | docker login -u ${{ secrets.actor_docker_hub }} --password-stdin

- name: Push image
- name: Push images
run: |
IMAGE_ID=$NAMESPACE/$SERVICE_NAME
HUB_IMAGE_ID=$NAMESPACE/$SERVICE_NAME
CONSOLIDATOR_IMAGE_ID=$NAMESPACE/$CONSOLIDATOR_SERVICE_NAME
# Change all uppercase to lowercase
IMAGE_ID=$(echo $IMAGE_ID | tr '[A-Z]' '[a-z]')
HUB_IMAGE_ID=$(echo $HUB_IMAGE_ID | tr '[A-Z]' '[a-z]')
CONSOLIDATOR_IMAGE_ID=$(echo $CONSOLIDATOR_IMAGE_ID | tr '[A-Z]' '[a-z]')
VERSION=$BRANCH_NAME
echo "push version $VERSION"
echo IMAGE_ID=$IMAGE_ID
echo HUB_IMAGE_ID=$HUB_IMAGE_ID
echo CONSOLIDATOR_IMAGE_ID=$CONSOLIDATOR_IMAGE_ID
echo VERSION=$VERSION
docker tag $SERVICE_NAME $IMAGE_ID:$VERSION
docker push $IMAGE_ID:$VERSION
docker tag $SERVICE_NAME $HUB_IMAGE_ID:$VERSION
docker push $HUB_IMAGE_ID:$VERSION
docker tag $CONSOLIDATOR_SERVICE_NAME $CONSOLIDATOR_IMAGE_ID:$VERSION
docker push $CONSOLIDATOR_IMAGE_ID:$VERSION
14 changes: 0 additions & 14 deletions Ballerina.toml

This file was deleted.

9 changes: 0 additions & 9 deletions Dockerfile

This file was deleted.

7 changes: 7 additions & 0 deletions consolidator/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[package]
org = "mosip"
name = "consolidatorService"
version = "0.1.0"

[build-options]
observabilityIncluded = true
10 changes: 10 additions & 0 deletions consolidator/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM openjdk:11
ARG consolidator_config_url
ARG ballerina_download_url=https://dist.ballerina.io/downloads/swan-lake-beta2/ballerina-linux-installer-x64-swan-lake-beta2.deb
ENV consolidator_config_file_url_env=${consolidator_config_url}
COPY ./target/bin/*.jar consolidator.jar
RUN wget -q --show-progress ${ballerina_download_url} -O ballerina-linux-installer-x64.deb
RUN dpkg -i ballerina-linux-installer-x64.deb
#TODO Link to be parameterized instead of hardcoding
CMD wget -q --show-progress "${consolidator_config_file_url_env}" -O Config.toml;\
bal run ./consolidator.jar ;\
120 changes: 120 additions & 0 deletions consolidator/consolidator_service.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2021, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerinax/kafka;
import ballerina/websubhub;
import ballerina/lang.value;
import ballerina/log;
import consolidatorService.config;
import consolidatorService.util;
import consolidatorService.connections as conn;
import consolidatorService.persistence as persist;

isolated function startConsolidator() returns error? {
do {
while true {
kafka:ConsumerRecord[] records = check conn:websubEventConsumer->poll(config:POLLING_INTERVAL);
if records.length() > 0 {
kafka:ConsumerRecord lastRecord = records.pop();
string lastPersistedData = check string:fromBytes(lastRecord.value);
log:printInfo("websub event received in consolidator",payload=lastPersistedData);
error? result = processPersistedData(lastPersistedData);
if result is error {
log:printError("Error occurred while processing received event ", 'error = result);
}
}
}
} on fail var e {
_ = check conn:websubEventConsumer->close(config:GRACEFUL_CLOSE_PERIOD);
return e;
}
}

isolated function processPersistedData(string persistedData) returns error? {
json payload = check value:fromJsonString(persistedData);
string hubMode = check payload.hubMode;
match hubMode {
"register" => {
check processTopicRegistration(payload);
}
"deregister" => {
check processTopicDeregistration(payload);
}
"subscribe" => {
check processSubscription(payload);
}
"unsubscribe" => {
check processUnsubscription(payload);
}
_ => {
return error(string `Error occurred while deserializing subscriber events with invalid hubMode [${hubMode}]`);
}
}
}

isolated function processTopicRegistration(json payload) returns error? {
websubhub:TopicRegistration registration = check retrieveTopicRegistration(payload);
string topicName = util:sanitizeTopicName(registration.topic);
lock {
// add the topic if topic-registration event received
registeredTopicsCache[topicName] = registration.cloneReadOnly();
_ = check persist:persistTopicRegistrations(registeredTopicsCache);
}
}

isolated function retrieveTopicRegistration(json payload) returns websubhub:TopicRegistration|error {
string topic = check payload.topic;
return {
topic: topic
};
}

isolated function processTopicDeregistration(json payload) returns error? {
websubhub:TopicDeregistration deregistration = check retrieveTopicDeregistration(payload);
string topicName = util:sanitizeTopicName(deregistration.topic);
lock {
// remove the topic if topic-deregistration event received
_ = registeredTopicsCache.removeIfHasKey(topicName);
_ = check persist:persistTopicRegistrations(registeredTopicsCache);
}
}

isolated function retrieveTopicDeregistration(json payload) returns websubhub:TopicDeregistration|error {
string topic = check payload.topic;
return {
topic: topic
};
}

isolated function processSubscription(json payload) returns error? {
websubhub:VerifiedSubscription subscription = check payload.cloneWithType(websubhub:VerifiedSubscription);
string groupName = util:generateGroupName(subscription.hubTopic, subscription.hubCallback);
lock {
// add the subscriber if subscription event received
subscribersCache[groupName] = subscription.cloneReadOnly();
_ = check persist:persistSubscriptions(subscribersCache);
}
}

isolated function processUnsubscription(json payload) returns error? {
websubhub:VerifiedUnsubscription unsubscription = check payload.cloneWithType(websubhub:VerifiedUnsubscription);
string groupName = util:generateGroupName(unsubscription.hubTopic, unsubscription.hubCallback);
lock {
// remove the subscriber if the unsubscription event received
_ = subscribersCache.removeIfHasKey(groupName);
_ = check persist:persistSubscriptions(subscribersCache);
}
}
128 changes: 128 additions & 0 deletions consolidator/init_consolidator.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) 2021, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerinax/kafka;
import ballerina/websubhub;
import ballerina/lang.value;
import ballerina/log;
import consolidatorService.config;
import consolidatorService.util;
import consolidatorService.connections as conn;

isolated map<websubhub:TopicRegistration> registeredTopicsCache = {};
isolated map<websubhub:VerifiedSubscription> subscribersCache = {};

public function main() returns error? {
// Initialize consolidator-service state
check syncRegsisteredTopicsCache();
_ = check conn:consolidatedTopicsConsumer->close(config:GRACEFUL_CLOSE_PERIOD);
check syncSubscribersCache();
_ = check conn:consolidatedSubscriberConsumer->close(config:GRACEFUL_CLOSE_PERIOD);
log:printInfo("Starting Event Consolidator Service");

// start the consolidator-service
check startConsolidator();
}

isolated function syncRegsisteredTopicsCache() returns error? {
do {
websubhub:TopicRegistration[]|error? persistedTopics = getPersistedTopics();
if persistedTopics is websubhub:TopicRegistration[] {
refreshTopicCache(persistedTopics);
}
} on fail var e {
_ = check conn:consolidatedTopicsConsumer->close(config:GRACEFUL_CLOSE_PERIOD);
return e;
}
}

isolated function getPersistedTopics() returns websubhub:TopicRegistration[]|error? {
kafka:ConsumerRecord[] records = check conn:consolidatedTopicsConsumer->poll(config:POLLING_INTERVAL);
if records.length() > 0 {
kafka:ConsumerRecord lastRecord = records.pop();
string|error lastPersistedData = string:fromBytes(lastRecord.value);
if lastPersistedData is string {
return deSerializeTopicsMessage(lastPersistedData);
} else {
log:printError("Error occurred while retrieving topic-details ", err = lastPersistedData.message());
return lastPersistedData;
}
}
}

isolated function deSerializeTopicsMessage(string lastPersistedData) returns websubhub:TopicRegistration[]|error {
websubhub:TopicRegistration[] currentTopics = [];
json[] payload = <json[]> check value:fromJsonString(lastPersistedData);
foreach var data in payload {
websubhub:TopicRegistration topic = check data.cloneWithType(websubhub:TopicRegistration);
currentTopics.push(topic);
}
return currentTopics;
}

isolated function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) {
foreach var topic in persistedTopics.cloneReadOnly() {
string topicName = util:sanitizeTopicName(topic.topic);
lock {
registeredTopicsCache[topicName] = topic.cloneReadOnly();
}
}
}

isolated function syncSubscribersCache() returns error? {
do {
websubhub:VerifiedSubscription[]|error? persistedSubscribers = getPersistedSubscribers();
if persistedSubscribers is websubhub:VerifiedSubscription[] {
refreshSubscribersCache(persistedSubscribers);
}
} on fail var e {
_ = check conn:consolidatedSubscriberConsumer->close(config:GRACEFUL_CLOSE_PERIOD);
return e;
}
}

isolated function getPersistedSubscribers() returns websubhub:VerifiedSubscription[]|error? {
kafka:ConsumerRecord[] records = check conn:consolidatedSubscriberConsumer->poll(config:POLLING_INTERVAL);
if records.length() > 0 {
kafka:ConsumerRecord lastRecord = records.pop();
string|error lastPersistedData = string:fromBytes(lastRecord.value);
if lastPersistedData is string {
return deSerializeSubscribersMessage(lastPersistedData);
} else {
log:printError("Error occurred while retrieving consolidated-subscriber-details ", err = lastPersistedData.message());
return lastPersistedData;
}
}
}

isolated function deSerializeSubscribersMessage(string lastPersistedData) returns websubhub:VerifiedSubscription[]|error {
websubhub:VerifiedSubscription[] currentSubscriptions = [];
json[] payload = <json[]> check value:fromJsonString(lastPersistedData);
foreach var data in payload {
websubhub:VerifiedSubscription subscription = check data.cloneWithType(websubhub:VerifiedSubscription);
currentSubscriptions.push(subscription);
}
return currentSubscriptions;
}

isolated function refreshSubscribersCache(websubhub:VerifiedSubscription[] persistedSubscribers) {
foreach var subscriber in persistedSubscribers {
string groupName = util:generateGroupName(subscriber.hubTopic, subscriber.hubCallback);
lock {
subscribersCache[groupName] = subscriber.cloneReadOnly();
}
}
}
Loading

0 comments on commit e7926de

Please sign in to comment.