Skip to content

Commit

Permalink
chore(#3207): Remove pipeline element recommender (#3210)
Browse files Browse the repository at this point in the history
* chore(#3207): Remove pipeline element recommender

* Merge model

* Fix layout
  • Loading branch information
dominikriemer authored Sep 3, 2024
1 parent 02b60f7 commit 755ca49
Show file tree
Hide file tree
Showing 23 changed files with 9 additions and 786 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public class PipelineElementRecommendation {
private String elementId;
private String name;
private String description;
private Float weight;
private Integer count;

public PipelineElementRecommendation() {

Expand Down Expand Up @@ -60,19 +58,4 @@ public void setDescription(String description) {
this.description = description;
}

public Float getWeight() {
return weight;
}

public void setWeight(Float weight) {
this.weight = weight;
}

public Integer getCount() {
return count;
}

public void setCount(Integer count) {
this.count = count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
public class PipelineElementRecommendationMessage {

private List<PipelineElementRecommendation> possibleElements;
private List<PipelineElementRecommendation> recommendedElements;

private boolean success;

public PipelineElementRecommendationMessage() {
this.possibleElements = new ArrayList<>();
this.recommendedElements = new ArrayList<>();
this.success = true;
}

Expand All @@ -56,15 +54,4 @@ public boolean isSuccess() {
public void setSuccess(boolean success) {
this.success = success;
}

public List<PipelineElementRecommendation> getRecommendedElements() {
return recommendedElements;
}

public void setRecommendedElements(
List<PipelineElementRecommendation> recommendedElements) {
this.recommendedElements = recommendedElements;
}


}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.streampipes.manager.matching.v2.StreamMatch;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
Expand All @@ -33,7 +32,6 @@
import org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage;
import org.apache.streampipes.model.pipeline.PipelineModification;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.storage.api.INoSqlStorage;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;

Expand Down Expand Up @@ -62,81 +60,13 @@ public ElementRecommender(Pipeline partialPipeline,

public PipelineElementRecommendationMessage findRecommendedElements() throws NoSuitableSepasAvailableException {
AllElementsProvider elementsProvider = new AllElementsProvider(this.pipeline);

String rootNodeId;
try {
rootNodeId = getRootNodeId(elementsProvider);
Optional<SpDataStream> outputStream = getOutputStream(elementsProvider);
outputStream.ifPresent(spDataStream -> validate(spDataStream, getAll()));
} catch (Exception e) {
LOG.warn("Could not find root node or output stream of provided pipeline");
return recommendationMessage;
}

if (recommendationMessage.getPossibleElements().isEmpty()) {
throw new NoSuitableSepasAvailableException();
} else {
recommendationMessage
.setRecommendedElements(calculateWeights(
filterOldElements(getNoSqlStorage()
.getConnectionStorageApi()
.getRecommendedElements(rootNodeId))));
return recommendationMessage;
}
}

private String getRootNodeId(AllElementsProvider elementsProvider) {
NamedStreamPipesEntity pe = elementsProvider.findElement(this.baseRecDomId);
return pe instanceof InvocableStreamPipesEntity ? ((InvocableStreamPipesEntity) pe).getBelongsTo() :
pe.getElementId();
}

private List<PipelineElementRecommendation> filterOldElements(
List<PipelineElementRecommendation> recommendedElements) {
return recommendedElements
.stream()
.filter(r -> getAll()
.stream()
.anyMatch(a -> a.getElementId().equals(r.getElementId())))
.collect(Collectors.toList());
}

private List<PipelineElementRecommendation> calculateWeights(
List<PipelineElementRecommendation> recommendedElements) {
int allConnectionsCount = recommendedElements
.stream()
.mapToInt(PipelineElementRecommendation::getCount)
.sum();

recommendedElements
.forEach(r -> {
r.setWeight(getWeight(r.getCount(), allConnectionsCount));
r.setName(getName(r.getElementId()));
r.setDescription(getDescription(r.getElementId()));
});

return recommendedElements;
}

private String getName(String elementId) {
return filter(elementId).getName();
}

private String getDescription(String elementId) {
return filter(elementId).getDescription();
}

private NamedStreamPipesEntity filter(String elementId) {
List<ConsumableStreamPipesEntity> allElements = getAll();
return allElements
.stream()
.filter(a -> a.getElementId().equals(elementId))
.findFirst()
.get();
}

private Float getWeight(Integer count, Integer allConnectionsCount) {
return ((float) (count)) / allConnectionsCount;
return recommendationMessage;
}

private void validate(SpDataStream offer, List<ConsumableStreamPipesEntity> entities) {
Expand All @@ -157,7 +87,7 @@ private void addPossibleElements(NamedStreamPipesEntity sepa) {

private List<ConsumableStreamPipesEntity> getAllDataProcessors() {
List<String> userObjects = new SpResourceManager().manageDataProcessors().findAllIdsOnly();
return getTripleStore()
return getNoSqlStore()
.getAllDataProcessors()
.stream()
.filter(e -> userObjects.stream().anyMatch(u -> u.equals(e.getAppId())))
Expand All @@ -168,7 +98,7 @@ private List<ConsumableStreamPipesEntity> getAllDataProcessors() {

private List<ConsumableStreamPipesEntity> getAllDataSinks() {
List<String> userObjects = new SpResourceManager().manageDataSinks().findAllIdsOnly();
return getTripleStore()
return getNoSqlStore()
.getAllDataSinks()
.stream()
.filter(e -> userObjects.stream().anyMatch(u -> u.equals(e.getAppId())))
Expand All @@ -183,14 +113,10 @@ private List<ConsumableStreamPipesEntity> getAll() {
return allElements;
}

private IPipelineElementDescriptionStorage getTripleStore() {
private IPipelineElementDescriptionStorage getNoSqlStore() {
return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineElementDescriptionStorage();
}

private INoSqlStorage getNoSqlStorage() {
return StorageDispatcher.INSTANCE.getNoSqlStore();
}

private Optional<SpDataStream> getOutputStream(AllElementsProvider elementsProvider) {

NamedStreamPipesEntity entity = elementsProvider.findElement(this.baseRecDomId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ private void createDatabases() {
// Set up streampipes internal databases
Utils.getCouchDbUserClient();
Utils.getCouchDbPipelineClient();
Utils.getCouchDbConnectionClient();
Utils.getCouchDbNotificationClient();
Utils.getCouchDbPipelineCategoriesClient();

Expand All @@ -72,7 +71,6 @@ private void createDatabases() {

private void createViews() {
addUserView();
addConnectionView();
addNotificationView();
addPipelineView();
}
Expand Down Expand Up @@ -166,30 +164,4 @@ private void addUserView() {
logFailure(PREPARING_USERS_TEXT, e);
}
}

private void addConnectionView() {
try {
DesignDocument connectionDocument = prepareDocument("_design/connection");
Map<String, MapReduce> views = new HashMap<>();

MapReduce frequentFunction = new MapReduce();
frequentFunction.setMap("function(doc) { if(doc.from && doc.to) { emit([doc.from, doc.to] , 1 ); } }");
frequentFunction.setReduce("function (key, values) { return sum(values); }");

views.put("frequent", frequentFunction);

connectionDocument.setViews(views);
Response resp = Utils.getCouchDbConnectionClient().design().synchronizeWithDb(connectionDocument);

if (resp.getError() != null) {
logFailure("Preparing database 'connection'...");
} else {
logSuccess("Preparing database 'connection'...");
}
} catch (Exception e) {
logFailure("Preparing database 'connection'...", e);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.streampipes.manager.data.PipelineGraph;
import org.apache.streampipes.manager.data.PipelineGraphBuilder;
import org.apache.streampipes.manager.matching.ConnectionStorageHandler;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
Expand All @@ -33,7 +32,7 @@

public class PipelineStorageService {

private Pipeline pipeline;
private final Pipeline pipeline;

public PipelineStorageService(Pipeline pipeline) {
this.pipeline = pipeline;
Expand Down Expand Up @@ -63,8 +62,6 @@ private void preparePipeline() {

pipeline.setSepas(sepas);
pipeline.setActions(secs);

new ConnectionStorageHandler(pipeline).storeConnections();
}

private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public boolean hasPermission(Authentication auth, Object o, Object permission) {

private boolean filterRecommendation(Authentication auth, PipelineElementRecommendationMessage message) {
Predicate<PipelineElementRecommendation> isForbidden = r -> !hasPermission(auth, r.getElementId());
message.getRecommendedElements().removeIf(isForbidden);
message.getPossibleElements().removeIf(isForbidden);

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public interface INoSqlStorage {

IPipelineStorage getPipelineStorageAPI();

IPipelineElementConnectionStorage getConnectionStorageApi();

IUserStorage getUserStorageAPI();

INotificationStorage getNotificationStorageApi();
Expand Down

This file was deleted.

Loading

0 comments on commit 755ca49

Please sign in to comment.