Skip to content

Commit

Permalink
Merge pull request #79 from urviljoshi/kafkahub
Browse files Browse the repository at this point in the history
consolidator to consolidator service
  • Loading branch information
urviljoshi authored Aug 24, 2021
2 parents b6a9d62 + e350f2d commit fd51f76
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 13 deletions.
4 changes: 2 additions & 2 deletions consolidator/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "mosip"
name = "consolidator"
version = "1.2.0"
name = "consolidatorService"
version = "0.1.0"

[build-options]
observabilityIncluded = true
9 changes: 5 additions & 4 deletions consolidator/consolidator_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import ballerinax/kafka;
import ballerina/websubhub;
import ballerina/lang.value;
import ballerina/log;
import consolidator.config;
import consolidator.util;
import consolidator.connections as conn;
import consolidator.persistence as persist;
import consolidatorService.config;
import consolidatorService.util;
import consolidatorService.connections as conn;
import consolidatorService.persistence as persist;

isolated function startConsolidator() returns error? {
do {
Expand All @@ -30,6 +30,7 @@ isolated function startConsolidator() returns error? {
if records.length() > 0 {
kafka:ConsumerRecord lastRecord = records.pop();
string lastPersistedData = check string:fromBytes(lastRecord.value);
log:printInfo("websub event received in consolidator",payload=lastPersistedData);
error? result = processPersistedData(lastPersistedData);
if result is error {
log:printError("Error occurred while processing received event ", 'error = result);
Expand Down
6 changes: 3 additions & 3 deletions consolidator/init_consolidator.bal
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import ballerinax/kafka;
import ballerina/websubhub;
import ballerina/lang.value;
import ballerina/log;
import consolidator.config;
import consolidator.util;
import consolidator.connections as conn;
import consolidatorService.config;
import consolidatorService.util;
import consolidatorService.connections as conn;

isolated map<websubhub:TopicRegistration> registeredTopicsCache = {};
isolated map<websubhub:VerifiedSubscription> subscribersCache = {};
Expand Down
2 changes: 1 addition & 1 deletion consolidator/modules/config/configurations.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// specific language governing permissions and limitations
// under the License.

import consolidator.util;
import consolidatorService.util;

# IP and Port of the Kafka bootstrap node
public configurable string KAFKA_BOOTSTRAP_NODE = "localhost:9092";
Expand Down
2 changes: 1 addition & 1 deletion consolidator/modules/connections/connections.bal
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// under the License.

import ballerinax/kafka;
import consolidator.config;
import consolidatorService.config;

// Producer which persist the current consolidated in-memory state of the system
kafka:ProducerConfiguration statePersistConfig = {
Expand Down
4 changes: 2 additions & 2 deletions consolidator/modules/persistence/persistence.bal
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// under the License.

import ballerina/websubhub;
import consolidator.config;
import consolidator.connections as conn;
import consolidatorService.config;
import consolidatorService.connections as conn;

public isolated function persistTopicRegistrations(map<websubhub:TopicRegistration> registeredTopicsCache) returns error? {
websubhub:TopicRegistration[] availableTopics = [];
Expand Down

0 comments on commit fd51f76

Please sign in to comment.