Skip to content

Commit

Permalink
included MqttTopic objects
Browse files Browse the repository at this point in the history
changed the structure of PolyStream. Now it includes the MqttTopic Objekt and the content of a message.
  • Loading branch information
NehaSelvan1512 committed Jul 11, 2023
1 parent 3f8ab3c commit 805a691
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.Extension;
import org.pf4j.Plugin;
Expand Down Expand Up @@ -61,9 +62,9 @@ public void start() {
Map<String, String> mqttDefaultSettings = new HashMap<>();
mqttDefaultSettings.put( "broker", "localhost" );
mqttDefaultSettings.put( "brokerPort", "1883" );
// TODO namespace und type müssen dringend eingegeben werden, so machen wie bei topics?
mqttDefaultSettings.put( "namespace", "public" );
mqttDefaultSettings.put( "namespace type", "RELATIONAL" );
// TODO Bei run mit reset: braucht es diese default settings sonst kommt nullpointer exception.
// mqttDefaultSettings.put( "namespace", "public" );
// mqttDefaultSettings.put( "namespace type", "RELATIONAL" );
QueryInterfaceManager.addInterfaceType( "mqtt", MqttStreamServer.class, mqttDefaultSettings );
}

Expand Down Expand Up @@ -95,11 +96,11 @@ public static class MqttStreamServer extends QueryInterface {

private final int brokerPort;

private ArrayList<String> topics = new ArrayList<String>();
private List<MqttTopic> topics = new ArrayList<MqttTopic>();

private static Mqtt3AsyncClient client;

private String namespace;
private String namespaceName;

private NamespaceType namespaceType;

Expand All @@ -114,8 +115,8 @@ public MqttStreamServer( TransactionManager transactionManager, Authenticator au
this.brokerPort = Integer.parseInt( settings.get( "brokerPort" ) );
String name = settings.get( "namespace" );
NamespaceType type = NamespaceType.valueOf(settings.get( "namespace type" ));
if ( validateNamespaceName( name, type ) ) {
this.namespace = name;
if ( StreamCapture.validateNamespaceName( name, type ) ) {
this.namespaceName = name;
this.namespaceType = type;
}
}
Expand Down Expand Up @@ -145,10 +146,10 @@ public void run() {
} else {
log.info( "{} started and is listening to broker on {}:{}", INTERFACE_NAME, broker, brokerPort );
/**List<String> topicsList = new ArrayList<>( List.of( this.settings.get( "topics" ).split( "," ) ) );
for ( int i = 0; i < topicsList.size(); i++ ) {
for ( int i = 0; i < atopicsList.size(); i++ ) {
topicsList.set( i, topicsList.get(i).trim() );
}**/
subscribe( topicsToList( this.settings.get( "topics" ) ) );
subscribe( topicsStringToList( this.settings.get( "topics" ) ) );
}
}
);
Expand Down Expand Up @@ -179,36 +180,10 @@ public void shutdown() {
}


private boolean validateNamespaceName( String namespaceName, NamespaceType namespaceType ) {
// TODO: Nachrichten an UI schicken falls Namespace name nicht geht
boolean nameCanBeUsed = false;
Catalog catalog = Catalog.getInstance();
// TODO: database ID evtl von UI abfragen?
if ( catalog.checkIfExistsSchema( Catalog.defaultDatabaseId, namespaceName ) ) {
CatalogSchema schema = null;
try {
schema = catalog.getSchema( Catalog.defaultDatabaseId, namespaceName );
} catch ( UnknownSchemaException e ) {
log.error( "The catalog seems to be corrupt, as it was impossible to retrieve an existing namespace." );
return nameCanBeUsed;
}
assert schema != null;
if ( schema.namespaceType == namespaceType ) {
nameCanBeUsed = true;
} else {
log.info( "There is already a namespace existing in this database with the given name but of type {}.", schema.getNamespaceType() );
log.info( "Please change the name or the type to {} to use the existing namespace.", schema.getNamespaceType() );
}
} else {
nameCanBeUsed = true;
}
//TODO: rmv
log.info( String.valueOf( nameCanBeUsed ) );
return nameCanBeUsed;
}


public List<String> topicsToList( String topics ) {

public List<String> topicsStringToList( String topics ) {
List<String> topicsList = new ArrayList<>( List.of( topics.split( "," ) ) );
for ( int i = 0; i < topicsList.size(); i++ ) {
topicsList.set( i, topicsList.get( i ).trim() );
Expand All @@ -222,11 +197,18 @@ protected void reloadSettings( List<String> updatedSettings ) {
for ( String changedSetting : updatedSettings ) {
switch ( changedSetting ) {
case "topics":
List<String> newTopicsList = topicsToList( this.getCurrentSettings().get( "topics" ) );
List<String> newTopicsList = topicsStringToList( this.getCurrentSettings().get( "topics" ) );

List<String> topicsToSub = new ArrayList<>();
for ( String newTopic : newTopicsList ) {
if ( !topics.contains( newTopic ) ) {
boolean containedInTopics = false;
for ( MqttTopic t : topics ) {
if ( ! t.topicName.equals( newTopic ) ){
containedInTopics = true;
break;
}
}
if ( !containedInTopics ) {
topicsToSub.add( newTopic );
}
}
Expand All @@ -235,9 +217,16 @@ protected void reloadSettings( List<String> updatedSettings ) {
subscribe( topicsToSub );
}

List<String> topicsToUnsub = new ArrayList<>();
for ( String oldTopic : topics ) {
if ( !newTopicsList.contains( oldTopic ) ) {
List<MqttTopic> topicsToUnsub = new ArrayList<>();
for ( MqttTopic oldTopic : topics ) {
boolean containedInNewTopicsList = false;
for ( String newTopic : newTopicsList ) {
if ( ! oldTopic.topicName.equals( newTopic ) ){
containedInNewTopicsList = true;
break;
}
}
if ( !containedInNewTopicsList ) {
topicsToUnsub.add( oldTopic );
}
}
Expand All @@ -248,10 +237,34 @@ protected void reloadSettings( List<String> updatedSettings ) {
break;

case "namespace":
this.validateNamespaceName( this.getCurrentSettings().get( "namespace" ), NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) ) );
if ( StreamCapture.validateNamespaceName( this.getCurrentSettings().get( "namespace" ), this.namespaceType ) ) {
this.namespaceName = this.getCurrentSettings().get( "namespace" );
List<MqttTopic> changedTopics = new ArrayList<>();
for ( MqttTopic t : topics ) {
MqttTopic newt = t.setNewNamespace( this.namespaceName, 0, this.namespaceType );
changedTopics.add(newt);
}
this.topics = changedTopics;
log.info( "namespsace name changed" );
} else {
//TODO: rmv
log.info( "new Name not updated in Objekts" );
}
break;
case "namespace type":
this.validateNamespaceName( this.getCurrentSettings().get( "namespace" ), NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) ) );
if ( StreamCapture.validateNamespaceName( this.namespaceName, NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) ) ) ) {
this.namespaceType = NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) );
List<MqttTopic> changedTopics = new ArrayList<>();
for ( MqttTopic t : topics ) {
MqttTopic newt = t.setNewNamespace( this.namespaceName, 0, this.namespaceType );
changedTopics.add(newt);
}
this.topics = changedTopics;
log.info( "namespsace type changed" );
} else {
//TODO: rmv
log.info( "new Name not updated in Opjekts" );
}
break;
}
}
Expand All @@ -267,51 +280,55 @@ void subscribe( List<String> newTopics ) {


/**
* subscribes to one given topic and adds it to the List topics.
* subscribes to given topic and adds it to the List topics.
*
* @param topic the topic the client should subscribe to.
* @param topicName the topic the client should subscribe to.
*/
public void subscribe( String topic ) {
client.subscribeWith().topicFilter( topic ).callback( subMsg -> {
public void subscribe( String topicName ) {
client.subscribeWith().topicFilter( topicName ).callback( subMsg -> {
log.info( "Received message from topic {}.", subMsg.getTopic() );
processMsg( subMsg );
} ).send().whenComplete( ( subAck, throwable ) -> {
if ( throwable != null ) {
log.info( "Subscription was not successfull. Please try again." );
} else {
this.topics.add( topic );
log.info( "Successful subscription to topic:{}.", topic );
MqttTopic newTopic = new MqttTopic( topicName, this.namespaceName, this.namespaceType, Catalog.defaultDatabaseId, Catalog.defaultUserId, this.getQueryInterfaceId() );
this.topics.add( newTopic );
log.info( "Successful subscription to topic:{}.", topicName );
}
} );
//info: no notify() here, because otherwise only the first topic will be subscribed from the method subscribeToAll().

}


public void unsubscribe( List<String> topics ) {
for ( String t : topics ) {
public void unsubscribe( List<MqttTopic> topics ) {
for ( MqttTopic t : topics ) {
unsubscribe( t );
}
}


public void unsubscribe( String topic ) {
client.unsubscribeWith().topicFilter( topic ).send().whenComplete( ( unsub, throwable ) -> {
public void unsubscribe( MqttTopic topic ) {
client.unsubscribeWith().topicFilter( topic.topicName ).send().whenComplete( ( unsub, throwable ) -> {
if ( throwable != null ) {

log.error( String.format( "Topic %s could not be unsubscribed.", topic ) );
log.error( String.format( "Topic %s could not be unsubscribed.", topic.topicName ) );
} else {
this.topics.remove( topic );
log.info( "Unsubscribed from topic:{}.", topic );
log.info( "Unsubscribed from topic:{}.", topic.topicName );
}
} );
}


void processMsg( Mqtt3Publish subMsg ) {
//TODO: attention: return values, not correct, might need a change of type.
String content = StreamProcessing.processMsg( subMsg );
PolyStream stream = new PolyStream( subMsg.getTopic().toString(), getUniqueName(), content, this.namespace, this.namespaceType );

//TODO: get topic from List don't create it like below
MqttTopic topic = new MqttTopic( subMsg.getTopic().toString(), this.namespaceName, this.namespaceType, Catalog.defaultDatabaseId, Catalog.defaultUserId, this.getQueryInterfaceId() );
String content = StreamProcessing.processMsg( subMsg.getPayloadAsBytes().toString(),topic );
PolyStream stream = new PolyStream( topic, content );
StreamCapture streamCapture = new StreamCapture( this.transactionManager, stream );
streamCapture.handleContent();
}
Expand Down Expand Up @@ -350,7 +367,7 @@ public MonitoringPage() {
// table to display topics
topicsTable = new InformationTable(
informationGroupTopics,
List.of( "Topics" )
List.of( "Topics", "Message Count" )
);

im.registerInformation( topicsTable );
Expand All @@ -360,13 +377,17 @@ public MonitoringPage() {


public void update() {
//TODO: Message Count
topicsTable.reset();
if ( topics.isEmpty() ) {
topicsTable.addRow( "No topic subscriptions" );
} else {
//TODO: korrect
/**
for ( String topic : topics ) {
topicsTable.addRow( topic );
}
**/
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2019-2023 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.mqtt;

import java.util.ArrayList;
import java.util.List;
import org.polypheny.db.catalog.Catalog.NamespaceType;
import org.polypheny.db.catalog.entity.CatalogObject;

public class MqttTopic {

// TODO: Überlegen, wie man das alles persistent machen kann
final String topicName;
final String namespaceName;
final long namespaceId;
final NamespaceType namespaceType;
final long databaseId;
final int userId;
final int queryInterfaceId;
long storeID = 0; // the ID of collection/graph/table... the place where info is/should be saved
CatalogObject storage = null;
int msgCount; //TODO: make persistent


public MqttTopic( String topicName, String namespaceName, NamespaceType namespaceType, long databaseId, int userId, int queryInterfaceId ) {
this.topicName = topicName;
this.namespaceName = namespaceName;
this.namespaceId = 0;
this.namespaceType = namespaceType;
this.databaseId = databaseId;
this.userId = userId;
this.queryInterfaceId = queryInterfaceId;
}


public MqttTopic( String topicName, String namespaceName, long namespaceId, NamespaceType namespaceType, long databaseId, int userId, int queryInterfaceId ) {
this.topicName = topicName;
this.namespaceName = namespaceName;
this.namespaceId = namespaceId;
this.namespaceType = namespaceType;
this.databaseId = databaseId;
this.userId = userId;
this.queryInterfaceId = queryInterfaceId;
}


public MqttTopic setDatabaseId( long databaseId ) {
return new MqttTopic( this.topicName, this.namespaceName, this.namespaceId, this.namespaceType, databaseId, this.userId, this.queryInterfaceId );
}


public MqttTopic setUserId( int userId ) {
return new MqttTopic( this.topicName, this.namespaceName, this.namespaceId, this.namespaceType, this.databaseId, userId, this.queryInterfaceId );
}


/**
* @param newId 0 if namespaceId is not known yet.
*/
public MqttTopic setNewNamespace( String newName, long newId, NamespaceType type ) {
return new MqttTopic( this.topicName, newName, newId, type, this.databaseId, this.userId, this.queryInterfaceId );
}


public MqttTopic setNamespaceId( long id ) {
return new MqttTopic( this.topicName, this.namespaceName, id, this.namespaceType, this.databaseId, this.userId, this.queryInterfaceId );
}


public void increaseMsgCount() {
++this.msgCount;
}
public List<String> getRecentMessages() {
//TODO: implement
List<String> msgList = new ArrayList<>();
msgList.add( "msg1" );
msgList.add( "msg2" );
return msgList;
}
}
Loading

0 comments on commit 805a691

Please sign in to comment.