Skip to content

Commit

Permalink
reverted changes made because of MqttTopic
Browse files Browse the repository at this point in the history
  • Loading branch information
NehaSelvan1512 committed Jul 13, 2023
1 parent 68be5dd commit adf1278
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,21 @@
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.Catalog.NamespaceType;
import org.polypheny.db.catalog.entity.CatalogSchema;
import org.polypheny.db.catalog.exceptions.GenericCatalogException;
import org.polypheny.db.catalog.exceptions.NoTablePrimaryKeyException;
import org.polypheny.db.catalog.exceptions.UnknownDatabaseException;
import org.polypheny.db.catalog.exceptions.UnknownSchemaException;
import org.polypheny.db.catalog.exceptions.UnknownUserException;
import org.polypheny.db.iface.Authenticator;
import org.polypheny.db.iface.QueryInterface;
import org.polypheny.db.iface.QueryInterfaceManager;
import org.polypheny.db.information.InformationAction;
import org.polypheny.db.information.InformationGroup;
import org.polypheny.db.information.InformationManager;
import org.polypheny.db.information.InformationPage;
import org.polypheny.db.information.InformationTable;
import org.polypheny.db.transaction.Statement;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.transaction.TransactionManager;


Expand Down Expand Up @@ -101,8 +108,14 @@ public static class MqttStreamServer extends QueryInterface {

private String namespace;

private long namespaceId;

private NamespaceType namespaceType;

private long databaseId;

private int userId;

private final MonitoringPage monitoringPage;


Expand All @@ -112,12 +125,24 @@ public MqttStreamServer( TransactionManager transactionManager, Authenticator au
this.monitoringPage = new MonitoringPage();
this.broker = settings.get( "broker" );
this.brokerPort = Integer.parseInt( settings.get( "brokerPort" ) );
//TODO: currently: setting user and database with default value from Catalog for getTransaction method -> E schönerer Weg?
// then current databaseid and userid are set.
this.databaseId = Catalog.defaultDatabaseId;
this.userId = Catalog.defaultUserId;
Transaction transaction = getTransaction();
Statement statement = transaction.createStatement();
this.userId = statement.getPrepareContext().getCurrentUserId();
this.databaseId = statement.getPrepareContext().getDatabaseId();

String name = settings.get( "namespace" );
NamespaceType type = NamespaceType.valueOf(settings.get( "namespace type" ));
if ( validateNamespaceName( name, type ) ) {
long namespaceId = getNamespaceId( name, type );
if ( namespaceId != 0 ) {
this.namespace = name;
this.namespaceType = type;
this.namespaceId = namespaceId;
}

}


Expand Down Expand Up @@ -179,41 +204,36 @@ public void shutdown() {
}


private boolean validateNamespaceName( String namespaceName, NamespaceType namespaceType ) {
private long getNamespaceId( String namespaceName, NamespaceType namespaceType ) {
// TODO: Nachrichten an UI schicken falls Namespace name nicht geht
boolean nameCanBeUsed = false;
long namespaceId = 0;

Catalog catalog = Catalog.getInstance();
// TODO: database ID evtl von UI abfragen?
if ( catalog.checkIfExistsSchema( Catalog.defaultDatabaseId, namespaceName ) ) {
if ( catalog.checkIfExistsSchema( this.databaseId, namespaceName ) ) {
CatalogSchema schema = null;
try {
schema = catalog.getSchema( Catalog.defaultDatabaseId, namespaceName );
schema = catalog.getSchema( this.databaseId, namespaceName );
} catch ( UnknownSchemaException e ) {
log.error( "The catalog seems to be corrupt, as it was impossible to retrieve an existing namespace." );
return nameCanBeUsed;
throw new RuntimeException(e);
}
assert schema != null;
if ( schema.namespaceType == namespaceType ) {
nameCanBeUsed = true;
namespaceId = schema.id;
} else {
log.info( "There is already a namespace existing in this database with the given name but of type {}.", schema.getNamespaceType() );
log.info( "Please change the name or the type to {} to use the existing namespace.", schema.getNamespaceType() );
}
} else {
nameCanBeUsed = true;
}
//TODO: rmv
log.info( String.valueOf( nameCanBeUsed ) );
return nameCanBeUsed;
}


public List<String> topicsToList( String topics ) {
List<String> topicsList = new ArrayList<>( List.of( topics.split( "," ) ) );
for ( int i = 0; i < topicsList.size(); i++ ) {
topicsList.set( i, topicsList.get( i ).trim() );
long id = catalog.addNamespace( namespaceName, this.databaseId, this.userId, namespaceType );
try {
catalog.commit();
namespaceId = id;
} catch ( NoTablePrimaryKeyException e ) {
throw new RuntimeException(e);
}
}
return topicsList;
return namespaceId;
}


Expand Down Expand Up @@ -248,14 +268,24 @@ protected void reloadSettings( List<String> updatedSettings ) {
break;

case "namespace":
this.validateNamespaceName( this.getCurrentSettings().get( "namespace" ), NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) ) );
String newNamespaceName = this.getCurrentSettings().get( "namespace" );
long namespaceId1 = this.getNamespaceId( newNamespaceName, this.namespaceType);
if ( namespaceId1 != 0 ) {
this.namespaceId = namespaceId1;
this.namespace = newNamespaceName;
}
break;
case "namespace type":
this.validateNamespaceName( this.getCurrentSettings().get( "namespace" ), NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) ) );
NamespaceType newNamespaceType = NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) );
long namespaceId2 = this.getNamespaceId( this.namespace, newNamespaceType);
if ( namespaceId2 != 0 ) {
this.namespaceId = namespaceId2;
this.namespaceType = newNamespaceType;
}
break;
//TODO: handle change of Database maybe?
}
}

}


Expand Down Expand Up @@ -312,11 +342,30 @@ void processMsg( Mqtt3Publish subMsg ) {
//TODO: attention: return values, not correct, might need a change of type.
String content = StreamProcessing.processMsg( subMsg );
PolyStream stream = new PolyStream( subMsg.getTopic().toString(), getUniqueName(), content, this.namespace, this.namespaceType );
stream.setNamespaceID( this.namespaceId );
StreamCapture streamCapture = new StreamCapture( this.transactionManager, stream );
streamCapture.handleContent();
}


public List<String> topicsToList( String topics ) {
List<String> topicsList = new ArrayList<>( List.of( topics.split( "," ) ) );
for ( int i = 0; i < topicsList.size(); i++ ) {
topicsList.set( i, topicsList.get( i ).trim() );
}
return topicsList;
}


private Transaction getTransaction() {
try {
return transactionManager.startTransaction( this.userId, this.databaseId, false, "MQTT Stream" );
} catch ( UnknownUserException | UnknownDatabaseException | UnknownSchemaException | GenericCatalogException e ) {
throw new RuntimeException( "Error while starting transaction", e );
}
}


@Override
public void languageChange() {

Expand All @@ -335,7 +384,10 @@ private class MonitoringPage {

private final InformationGroup informationGroupTopics;

private final InformationGroup informationGroupMsg;

private final InformationTable topicsTable;
private final InformationAction msgButton;


public MonitoringPage() {
Expand All @@ -356,6 +408,17 @@ public MonitoringPage() {
im.registerInformation( topicsTable );
informationGroupTopics.setRefreshFunction( this::update );

//TODO: rmv button
informationGroupMsg = new InformationGroup( informationPage, "Publish a message" ).setOrder( 2 );
im.addGroup( informationGroupMsg );

msgButton = new InformationAction( informationGroupMsg, "Send a msg", (parameters) -> {
String end = "Msg was published!";
client.publishWith().topic( parameters.get( "topic" ) ).payload( parameters.get( "msg" ).getBytes() ).send();
return end;
}).withParameters( "topic", "msg" );
im.registerInformation( msgButton );

}


Expand Down
Loading

0 comments on commit adf1278

Please sign in to comment.