diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamPlugin.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamPlugin.java index 9746bf240a..1c19e3347f 100644 --- a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamPlugin.java +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamPlugin.java @@ -32,14 +32,21 @@ import org.polypheny.db.catalog.Catalog; import org.polypheny.db.catalog.Catalog.NamespaceType; import org.polypheny.db.catalog.entity.CatalogSchema; +import org.polypheny.db.catalog.exceptions.GenericCatalogException; +import org.polypheny.db.catalog.exceptions.NoTablePrimaryKeyException; +import org.polypheny.db.catalog.exceptions.UnknownDatabaseException; import org.polypheny.db.catalog.exceptions.UnknownSchemaException; +import org.polypheny.db.catalog.exceptions.UnknownUserException; import org.polypheny.db.iface.Authenticator; import org.polypheny.db.iface.QueryInterface; import org.polypheny.db.iface.QueryInterfaceManager; +import org.polypheny.db.information.InformationAction; import org.polypheny.db.information.InformationGroup; import org.polypheny.db.information.InformationManager; import org.polypheny.db.information.InformationPage; import org.polypheny.db.information.InformationTable; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; import org.polypheny.db.transaction.TransactionManager; @@ -101,8 +108,14 @@ public static class MqttStreamServer extends QueryInterface { private String namespace; + private long namespaceId; + private NamespaceType namespaceType; + private long databaseId; + + private int userId; + private final MonitoringPage monitoringPage; @@ -112,12 +125,24 @@ public MqttStreamServer( TransactionManager transactionManager, Authenticator au this.monitoringPage = new MonitoringPage(); this.broker = settings.get( "broker" ); this.brokerPort = Integer.parseInt( settings.get( "brokerPort" ) ); + //TODO: currently: setting user and database with default value from Catalog for getTransaction method -> E schönerer Weg? + // then current databaseid and userid are set. + this.databaseId = Catalog.defaultDatabaseId; + this.userId = Catalog.defaultUserId; + Transaction transaction = getTransaction(); + Statement statement = transaction.createStatement(); + this.userId = statement.getPrepareContext().getCurrentUserId(); + this.databaseId = statement.getPrepareContext().getDatabaseId(); + String name = settings.get( "namespace" ); NamespaceType type = NamespaceType.valueOf(settings.get( "namespace type" )); - if ( validateNamespaceName( name, type ) ) { + long namespaceId = getNamespaceId( name, type ); + if ( namespaceId != 0 ) { this.namespace = name; this.namespaceType = type; + this.namespaceId = namespaceId; } + } @@ -179,41 +204,36 @@ public void shutdown() { } - private boolean validateNamespaceName( String namespaceName, NamespaceType namespaceType ) { + private long getNamespaceId( String namespaceName, NamespaceType namespaceType ) { // TODO: Nachrichten an UI schicken falls Namespace name nicht geht - boolean nameCanBeUsed = false; + long namespaceId = 0; + Catalog catalog = Catalog.getInstance(); - // TODO: database ID evtl von UI abfragen? - if ( catalog.checkIfExistsSchema( Catalog.defaultDatabaseId, namespaceName ) ) { + if ( catalog.checkIfExistsSchema( this.databaseId, namespaceName ) ) { CatalogSchema schema = null; try { - schema = catalog.getSchema( Catalog.defaultDatabaseId, namespaceName ); + schema = catalog.getSchema( this.databaseId, namespaceName ); } catch ( UnknownSchemaException e ) { - log.error( "The catalog seems to be corrupt, as it was impossible to retrieve an existing namespace." ); - return nameCanBeUsed; + throw new RuntimeException(e); } assert schema != null; if ( schema.namespaceType == namespaceType ) { - nameCanBeUsed = true; + namespaceId = schema.id; } else { log.info( "There is already a namespace existing in this database with the given name but of type {}.", schema.getNamespaceType() ); log.info( "Please change the name or the type to {} to use the existing namespace.", schema.getNamespaceType() ); } } else { - nameCanBeUsed = true; - } - //TODO: rmv - log.info( String.valueOf( nameCanBeUsed ) ); - return nameCanBeUsed; - } - - public List topicsToList( String topics ) { - List topicsList = new ArrayList<>( List.of( topics.split( "," ) ) ); - for ( int i = 0; i < topicsList.size(); i++ ) { - topicsList.set( i, topicsList.get( i ).trim() ); + long id = catalog.addNamespace( namespaceName, this.databaseId, this.userId, namespaceType ); + try { + catalog.commit(); + namespaceId = id; + } catch ( NoTablePrimaryKeyException e ) { + throw new RuntimeException(e); + } } - return topicsList; + return namespaceId; } @@ -248,14 +268,24 @@ protected void reloadSettings( List updatedSettings ) { break; case "namespace": - this.validateNamespaceName( this.getCurrentSettings().get( "namespace" ), NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) ) ); + String newNamespaceName = this.getCurrentSettings().get( "namespace" ); + long namespaceId1 = this.getNamespaceId( newNamespaceName, this.namespaceType); + if ( namespaceId1 != 0 ) { + this.namespaceId = namespaceId1; + this.namespace = newNamespaceName; + } break; case "namespace type": - this.validateNamespaceName( this.getCurrentSettings().get( "namespace" ), NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) ) ); + NamespaceType newNamespaceType = NamespaceType.valueOf( this.getCurrentSettings().get( "namespace type" ) ); + long namespaceId2 = this.getNamespaceId( this.namespace, newNamespaceType); + if ( namespaceId2 != 0 ) { + this.namespaceId = namespaceId2; + this.namespaceType = newNamespaceType; + } break; + //TODO: handle change of Database maybe? } } - } @@ -312,11 +342,30 @@ void processMsg( Mqtt3Publish subMsg ) { //TODO: attention: return values, not correct, might need a change of type. String content = StreamProcessing.processMsg( subMsg ); PolyStream stream = new PolyStream( subMsg.getTopic().toString(), getUniqueName(), content, this.namespace, this.namespaceType ); + stream.setNamespaceID( this.namespaceId ); StreamCapture streamCapture = new StreamCapture( this.transactionManager, stream ); streamCapture.handleContent(); } + public List topicsToList( String topics ) { + List topicsList = new ArrayList<>( List.of( topics.split( "," ) ) ); + for ( int i = 0; i < topicsList.size(); i++ ) { + topicsList.set( i, topicsList.get( i ).trim() ); + } + return topicsList; + } + + + private Transaction getTransaction() { + try { + return transactionManager.startTransaction( this.userId, this.databaseId, false, "MQTT Stream" ); + } catch ( UnknownUserException | UnknownDatabaseException | UnknownSchemaException | GenericCatalogException e ) { + throw new RuntimeException( "Error while starting transaction", e ); + } + } + + @Override public void languageChange() { @@ -335,7 +384,10 @@ private class MonitoringPage { private final InformationGroup informationGroupTopics; + private final InformationGroup informationGroupMsg; + private final InformationTable topicsTable; + private final InformationAction msgButton; public MonitoringPage() { @@ -356,6 +408,17 @@ public MonitoringPage() { im.registerInformation( topicsTable ); informationGroupTopics.setRefreshFunction( this::update ); + //TODO: rmv button + informationGroupMsg = new InformationGroup( informationPage, "Publish a message" ).setOrder( 2 ); + im.addGroup( informationGroupMsg ); + + msgButton = new InformationAction( informationGroupMsg, "Send a msg", (parameters) -> { + String end = "Msg was published!"; + client.publishWith().topic( parameters.get( "topic" ) ).payload( parameters.get( "msg" ).getBytes() ).send(); + return end; + }).withParameters( "topic", "msg" ); + im.registerInformation( msgButton ); + } diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamCapture.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamCapture.java index c5ecae71f5..30edd004a4 100644 --- a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamCapture.java +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamCapture.java @@ -23,6 +23,7 @@ import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.BsonString; +import org.bson.codecs.pojo.annotations.BsonId; import org.polypheny.db.PolyImplementation; import org.polypheny.db.adapter.DataStore; import org.polypheny.db.algebra.AlgNode; @@ -60,24 +61,28 @@ public class StreamCapture { StreamCapture( final TransactionManager transactionManager, PolyStream stream ) { this.transactionManager = transactionManager; this.stream = stream; - Transaction transaction = getTransaction(); - Statement statement = transaction.createStatement(); - this.stream.setUserId( statement.getPrepareContext().getCurrentUserId() ); - this.stream.setDatabaseId( statement.getPrepareContext().getDatabaseId() ); + } public void handleContent() { - //String path = registerTopicFolder(topic); - long storeId = getCollectionID(); - if ( storeId != 0 ) { - stream.setStoreID( storeId ); - boolean saved = saveContent(); - //TODO: gescheite Tests + + if ( stream.getStoreID() == 0 ) { + //TODO: get store id of already existing or create new one. + + //TODO: maybe do this with interface + if ( stream.getNamespaceType() == NamespaceType.DOCUMENT ) { + long newstoreId = getCollectionID( ); + if ( newstoreId != 0 ) { + stream.setStoreID( newstoreId ); + } + } + } + boolean saved = saveContent(); + //TODO: gescheite Tests // Catalog catalog = Catalog.getInstance(); // CatalogSchema schema = null; // schema = catalog.getSchema( stream.getNamespaceID() ); - } } @@ -86,60 +91,22 @@ public void handleContent() { */ long getCollectionID() { Catalog catalog = Catalog.getInstance(); - List schemaList = catalog.getSchemas( this.stream.getDatabaseId(), null ); - - - // check for existing namespace with DOCUMENT NamespaceType: - if ( catalog.checkIfExistsSchema( this.stream.getDatabaseId(), this.stream.getNamespace() ) ) { - - CatalogSchema schema = null; - try { - schema = catalog.getSchema( this.stream.getDatabaseId(), this.stream.getNamespace() ); - } catch ( UnknownSchemaException e ) { - log.error( "The catalog seems to be corrupt, as it was impossible to retrieve an existing namespace." ); - return 0; - } - - assert schema != null; - if ( schema.namespaceType == NamespaceType.DOCUMENT ) { - this.stream.setNamespaceID( schema.id ); - //check for collection with same name //TODO: maybe change the collection name, currently collection name is the topic - List collectionList = catalog.getCollections( schema.id, null ); - for ( CatalogCollection collection : collectionList ) { - if ( collection.name.equals( this.stream.topic ) ) { - int queryInterfaceId = QueryInterfaceManager.getInstance().getQueryInterface( this.stream.getUniqueNameOfInterface() ).getQueryInterfaceId(); - if ( !collection.placements.contains( queryInterfaceId ) ) { - return collection.addPlacement( queryInterfaceId ).id; - } else { - return collection.id; - } - } + //check for collection with same name //TODO: maybe change the collection name, currently collection name is the topic + List collectionList = catalog.getCollections( stream.getNamespaceID(), null ); + for ( CatalogCollection collection : collectionList ) { + if ( collection.name.equals( this.stream.topic ) ) { + int queryInterfaceId = QueryInterfaceManager.getInstance().getQueryInterface( this.stream.getUniqueNameOfInterface() ).getQueryInterfaceId(); + if ( !collection.placements.contains( queryInterfaceId ) ) { + return collection.addPlacement( queryInterfaceId ).id; + } else { + return collection.id; } - return createNewCollection(); - - } else { - this.stream.setNamespaceID( createNewNamespace() ); - return createNewCollection(); } - } else { - this.stream.setNamespaceID( createNewNamespace() ); - return createNewCollection(); } - } - + return createNewCollection(); - private long createNewNamespace() { - Catalog catalog = Catalog.getInstance(); - long namespaceId = catalog.addNamespace( stream.getNamespace(), stream.getDatabaseId(), stream.getUserId(), NamespaceType.DOCUMENT ); - try { - catalog.commit(); - } catch ( NoTablePrimaryKeyException e ) { - log.error( "An error " ); - } - return namespaceId; } - private long createNewCollection() { Catalog catalog = Catalog.getInstance(); @@ -159,10 +126,10 @@ private long createNewCollection() { log.info( "Created Collection with name: {}", this.stream.topic ); transaction.commit(); } catch ( EntityAlreadyExistsException e ) { - log.error( "The generation of the collection was not possible because there is a collaction already existing with this name." ); + log.error( "The generation of the collection was not possible because there is a collection already existing with this name." ); return 0; } catch ( TransactionException e ) { - log.error( "The commit after creating a new Collection could be completed!" ); + log.error( "The commit after creating a new Collection could not be completed!" ); return 0; } //add placement @@ -181,7 +148,7 @@ private long createNewCollection() { // added by Datomo public void insertDocument() { - String collectionName = "users"; + String collectionName = this.stream.topic; Transaction transaction = getTransaction(); Statement statement = transaction.createStatement(); @@ -190,8 +157,8 @@ public void insertDocument() { // we insert document { age: 28, name: "David" } into the collection users BsonDocument document = new BsonDocument(); - document.put( "age", new BsonInt32( 28 ) ); - document.put( "name", new BsonString( "David" ) ); + document.put( "topic", new BsonString( this.stream.topic ) ); + document.put( "content", new BsonString( this.stream.getContent() ) ); AlgNode algNode = builder.docInsert( statement, collectionName, document ).build(); @@ -223,59 +190,9 @@ public void scanDocument() { boolean saveContent() { -/** - //TODO: save Message here -> Polyalgebra - Transaction transaction = getTransaction(); - Statement statement = transaction.createStatement(); - AlgBuilder algBuilder = AlgBuilder.create( statement ); - JavaTypeFactory typeFactory = transaction.getTypeFactory(); - RexBuilder rexBuilder = new RexBuilder( typeFactory ); - - PolyphenyDbCatalogReader catalogReader = statement.getTransaction().getCatalogReader(); - List names = new ArrayList<>(); - //TODO: change naming maybe - names.add( this.stream.topic ); - AlgOptTable table = catalogReader.getCollection( names ); - - // Values - AlgDataType tableRowType = table.getRowType( ); - List tableRows = tableRowType.getFieldList(); - - AlgOptPlanner planner = statement.getQueryProcessor().getPlanner(); - AlgOptCluster cluster = AlgOptCluster.create( planner, rexBuilder ); - - List valueColumnNames = this.valuesColumnNames( insertValueRequest.values ); - List rexValues = this.valuesNode( statement, algBuilder, rexBuilder, insertValueRequest, tableRows, inputStreams ).get( 0 ); - algBuilder.push( LogicalValues.createOneRow( cluster ) ); - algBuilder.project( rexValues, valueColumnNames ); - - // Table Modify - AlgNode algNode = algBuilder.build(); - Modify modify = new LogicalModify( - cluster, - algNode.getTraitSet(), - table, - catalogReader, - algNode, - LogicalModify.Operation.INSERT, - null, - null, - false - ); - - // Wrap {@link AlgNode} into a RelRoot - final AlgDataType rowType = modify.getRowType(); - final List> fields = Pair.zip( ImmutableIntList.identity( rowType.getFieldCount() ), rowType.getFieldNames() ); - final AlgCollation collation = - algNode instanceof Sort - ? ((Sort) algNode).collation - : AlgCollations.EMPTY; - AlgRoot root = new AlgRoot( modify, rowType, Kind.INSERT, fields, collation ); - log.debug( "AlgRoot was built." ); - - Context ctx = statement.getPrepareContext(); - log.info( executeAndTransformPolyAlg( root, statement, ctx ) ); - **/ + if ( this.stream.getNamespaceType() == NamespaceType.DOCUMENT ) { + insertDocument(); + } return true; } @@ -297,11 +214,6 @@ String executeAndTransformPolyAlg( AlgRoot algRoot, Statement statement, final C PolyImplementation result = statement.getQueryProcessor().prepareQuery( algRoot, false ); log.debug( "AlgRoot was prepared." ); - /*final Iterable iterable = result.enumerable( statement.getDataContext() ); - Iterator iterator = iterable.iterator(); - while ( iterator.hasNext() ) { - iterator.next(); - }*/ // todo transform into desired output format List> rows = result.getRows( statement, -1 ); @@ -316,34 +228,6 @@ String executeAndTransformPolyAlg( AlgRoot algRoot, Statement statement, final C } return null; } - //Pair result = restResult.getResult( ctx ); - - //return result.left; - //return null; - } - - - private static String registerTopicFolder( String topic ) { - PolyphenyHomeDirManager homeDirManager = PolyphenyHomeDirManager.getInstance(); - - String path = File.separator + "mqttStreamPlugin" + File.separator + topic.replace( "/", File.separator ); - ; - - File file = null; - if ( !homeDirManager.checkIfExists( path ) ) { - file = homeDirManager.registerNewFolder( path ); - log.info( "New Directory created!" ); - } else { - //TODO: rmv log - log.info( "Directory already exists" ); - } - - return path; - - } - - - public static void main( String[] args ) { } diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamProcessing.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamProcessing.java index 2d61e3effe..0a30cc16bb 100644 --- a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamProcessing.java +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamProcessing.java @@ -22,7 +22,7 @@ @Slf4j public class StreamProcessing { - +//TODO: receive all additional info from Wrapper around MqttStream public static String processMsg( Mqtt3Publish subMsg ) { String msg = toString( subMsg );