Skip to content

Commit

Permalink
Merge pull request #78 from urviljoshi/kafkahub
Browse files Browse the repository at this point in the history
Kafkahub
  • Loading branch information
urviljoshi authored Aug 24, 2021
2 parents 4ed4a52 + d585a00 commit b6a9d62
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 20 deletions.
4 changes: 2 additions & 2 deletions hub/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "mosip"
name = "hub"
version = "1.2.0"
name = "kafkaHub"
version = "0.1.0"

[build-options]
observabilityIncluded = true
25 changes: 16 additions & 9 deletions hub/hub_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
import ballerina/websubhub;
import ballerina/log;
import ballerina/http;
import hub.security;
import hub.persistence as persist;
import hub.config;
import hub.util;
import kafkaHub.security;
import kafkaHub.persistence as persist;
import kafkaHub.config;
import kafkaHub.util;

websubhub:Service hubService = @websubhub:ServiceConfig {
}
service object {
websubhub:Service hubService = service object {

# Registers a `topic` in the hub.
#
Expand All @@ -34,9 +32,11 @@ service object {
# if topic registration failed or `error` if there is any unexpected error
isolated remote function onRegisterTopic(websubhub:TopicRegistration message, http:Headers headers)
returns websubhub:TopicRegistrationSuccess|websubhub:TopicRegistrationError|error {
if config:SECURITY_ON {
log:printInfo("topic registration");
if config:SECURITY_ON {
check security:authorizePublisher(headers, message.topic);
}
log:printInfo("Running topic registration ", payload = message);
check self.registerTopic(message);
return websubhub:TOPIC_REGISTRATION_SUCCESS;
}
Expand All @@ -47,6 +47,7 @@ service object {
if registeredTopicsCache.hasKey(topicName) {
return error websubhub:TopicRegistrationError("Topic has already registered with the Hub");
}
log:printInfo("Registering topic ");
error? persistingResult = persist:addRegsiteredTopic(message.cloneReadOnly());
if persistingResult is error {
log:printError("Error occurred while persisting the topic-registration ", err = persistingResult.message());
Expand All @@ -65,6 +66,7 @@ service object {
if config:SECURITY_ON {
check security:authorizePublisher(headers, message.topic);
}
log:printInfo("Running topic de-registration ", payload = message);
check self.deregisterTopic(message);
return websubhub:TOPIC_DEREGISTRATION_SUCCESS;
}
Expand All @@ -75,6 +77,7 @@ service object {
if !registeredTopicsCache.hasKey(topicName) {
return error websubhub:TopicDeregistrationError("Topic has not been registered in the Hub");
}
log:printInfo("Deregistering topic");
error? persistingResult = persist:removeRegsiteredTopic(message.cloneReadOnly());
if persistingResult is error {
log:printError("Error occurred while persisting the topic-deregistration ", err = persistingResult.message());
Expand All @@ -93,7 +96,8 @@ service object {
if config:SECURITY_ON {
check security:authorizeSubscriber(headers, message.hubTopic);
}
return websubhub:SUBSCRIPTION_ACCEPTED;
log:printInfo("subscription request received", payload = message);
return websubhub:SUBSCRIPTION_ACCEPTED;
}

# Validates a incomming subscription request.
Expand Down Expand Up @@ -199,6 +203,7 @@ service object {
if config:SECURITY_ON {
check security:authorizePublisher(headers, message.hubTopic);
}
log:printInfo("Running content update ", payload = message);
check self.updateMessage(message);
return websubhub:ACKNOWLEDGEMENT;
}
Expand All @@ -210,6 +215,7 @@ service object {
topicAvailable = registeredTopicsCache.hasKey(topicName);
}
if topicAvailable {
log:printInfo("Updating topic ");
error? errorResponse = persist:addUpdateMessage(topicName, msg);
if errorResponse is websubhub:UpdateMessageError {
return errorResponse;
Expand All @@ -223,3 +229,4 @@ service object {
}
};


2 changes: 1 addition & 1 deletion hub/modules/config/configurations.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// specific language governing permissions and limitations
// under the License.

import hub.util;
import kafkaHub.util;

# Flag to check whether to enable/disable security
public configurable boolean SECURITY_ON = true;
Expand Down
4 changes: 2 additions & 2 deletions hub/modules/connections/connections.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

import ballerinax/kafka;
import ballerina/websubhub;
import hub.config;
import hub.util;
import kafkaHub.config;
import kafkaHub.util;

// Producer which persist the current in-memory state of the Hub
kafka:ProducerConfiguration statePersistConfig = {
Expand Down
4 changes: 2 additions & 2 deletions hub/modules/persistence/persistence.bal
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// under the License.

import ballerina/websubhub;
import hub.config;
import hub.connections as conn;
import kafkaHub.config;
import kafkaHub.connections as conn;

public isolated function addRegsiteredTopic(websubhub:TopicRegistration message) returns error? {
check updateTopicDetails(message, "register");
Expand Down
2 changes: 1 addition & 1 deletion hub/modules/security/security.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import ballerina/http;
import ballerina/regex;
import hub.config;
import kafkaHub.config;

const string SUFFIX_GENERAL = "_GENERAL";
const string SUFFIX_ALL_INDIVIDUAL = "_ALL_INDIVIDUAL";
Expand Down
6 changes: 3 additions & 3 deletions hub/start_hub.bal
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import ballerina/log;
import ballerina/websubhub;
import ballerinax/kafka;
import ballerina/lang.value;
import hub.util;
import hub.connections as conn;
import kafkaHub.util;
import kafkaHub.connections as conn;
import ballerina/mime;
import hub.config;
import kafkaHub.config;

isolated map<websubhub:TopicRegistration> registeredTopicsCache = {};
isolated map<websubhub:VerifiedSubscription> subscribersCache = {};
Expand Down

0 comments on commit b6a9d62

Please sign in to comment.