From 1c6bf437f5de242aeeb9e62a9cebab184b9974f2 Mon Sep 17 00:00:00 2001 From: bbrehm Date: Mon, 16 Sep 2024 17:46:14 +0200 Subject: [PATCH] Make diffgraph applier return number of transitive changes (#256) * also fixes a minor bug with node initialization order of transitively added nodes from diffgraphs --- .../scala/flatgraph/DiffGraphApplier.scala | 189 ++++++++++-------- .../src/test/scala/flatgraph/GraphTests.scala | 21 +- 2 files changed, 130 insertions(+), 80 deletions(-) diff --git a/core/src/main/scala/flatgraph/DiffGraphApplier.scala b/core/src/main/scala/flatgraph/DiffGraphApplier.scala index 97713fb8..7c883657 100644 --- a/core/src/main/scala/flatgraph/DiffGraphApplier.scala +++ b/core/src/main/scala/flatgraph/DiffGraphApplier.scala @@ -8,11 +8,20 @@ import flatgraph.misc.SchemaViolationReporter import scala.collection.{Iterator, mutable} object DiffGraphApplier { + + /** Apply a diff to a graph. This returns the number of successfully applied changes. + * + * Since this applies all changes that are reachable from the DiffGraphBuilder, the number of applied changes can be vastly larger than + * the size of the diffgraphBuilder, e.g. if there are newNodes that have properties / contained nodes that are themselves newNodes. + * + * This function destroys the diff, in order to permit fast freeing of memory (in cases where we run close to exhausting the heap with + * the diff, the old graph, the new graph, and the temporaries during diff application). + */ def applyDiff( graph: Graph, diff: DiffGraphBuilder, schemaViolationReporter: SchemaViolationReporter = new SchemaViolationReporter - ): Unit = { + ): Int = { if (graph.isClosed) throw new GraphClosedException(s"graph cannot be modified any longer since it's closed") new DiffGraphApplier(graph, diff, schemaViolationReporter).applyUpdate() } @@ -69,10 +78,13 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, a(pos).append(item) } - private def drainDeferred(): Unit = { + private def drainDeferred(): Int = { + var ndiff = 0 while (deferred.nonEmpty) { + ndiff += 1 deferred.removeHead().countAndVisitProperties(NewNodeInterface) } + ndiff } private def getGNode(node: DNodeOrNode): GNode = { @@ -99,98 +111,116 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, } } - private def splitUpdate(): Unit = { + private def splitUpdate(): Int = { + var ndiff = 0 diff.buffer.foreach { case delNode: DelNode if !AccessHelpers.isDeleted(delNode.node) => + ndiff += 1 AccessHelpers.markDeleted(delNode.node) delNodes.append(delNode.node) + case _ => } - diff.buffer.foreach { - case addNode: DNode => - getGNode(addNode) - case halfEdge: AddUnsafeHalfEdge => - val src = getGNode(halfEdge.src) - val dst = getGNode(halfEdge.dst) - if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) { - Direction.fromOrdinal(halfEdge.inout) match { - case Direction.Incoming => - insert( - newEdges, - new AddEdgeProcessed(dst, src, halfEdge.edgeKind, halfEdge.property), - graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, halfEdge.edgeKind) - ) - case Direction.Outgoing => - insert( - newEdges, - new AddEdgeProcessed(src, dst, halfEdge.edgeKind, halfEdge.property), - graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, halfEdge.edgeKind) - ) + diff.buffer.foreach { item => + item match { + case addNode: DNode => + getGNode(addNode) + case halfEdge: AddUnsafeHalfEdge => + val src = getGNode(halfEdge.src) + val dst = getGNode(halfEdge.dst) + if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) { + ndiff += 1 + Direction.fromOrdinal(halfEdge.inout) match { + case Direction.Incoming => + insert( + newEdges, + new AddEdgeProcessed(dst, src, halfEdge.edgeKind, halfEdge.property), + graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, halfEdge.edgeKind) + ) + case Direction.Outgoing => + insert( + newEdges, + new AddEdgeProcessed(src, dst, halfEdge.edgeKind, halfEdge.property), + graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, halfEdge.edgeKind) + ) + } } - } - case newEdge: AddEdgeUnprocessed => - val src = getGNode(newEdge.src) - val dst = getGNode(newEdge.dst) - if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) { + case newEdge: AddEdgeUnprocessed => + val src = getGNode(newEdge.src) + val dst = getGNode(newEdge.dst) + if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) { + ndiff += 1 + insert( + newEdges, + new AddEdgeProcessed(src, dst, newEdge.edgeKind, newEdge.property), + graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, newEdge.edgeKind) + ) + insert( + newEdges, + new AddEdgeProcessed(dst, src, newEdge.edgeKind, newEdge.property), + graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, newEdge.edgeKind) + ) + } else { + // TODO maybe throw exception + } + case setEdgeProperty: SetEdgeProperty + if !AccessHelpers.isDeleted(setEdgeProperty.edge.src) && !AccessHelpers.isDeleted(setEdgeProperty.edge.dst) => + ndiff += 1 + val (outR, inR) = normalizeRepresentation(setEdgeProperty.edge) insert( - newEdges, - new AddEdgeProcessed(src, dst, newEdge.edgeKind, newEdge.property), - graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, newEdge.edgeKind) + setEdgeProperties, + new EdgeRepr(outR.src, outR.dst, outR.edgeKind, outR.subSeq, setEdgeProperty.property), + graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind) ) insert( - newEdges, - new AddEdgeProcessed(dst, src, newEdge.edgeKind, newEdge.property), - graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, newEdge.edgeKind) + setEdgeProperties, + new EdgeRepr(inR.src, inR.dst, inR.edgeKind, inR.subSeq, setEdgeProperty.property), + graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind) ) - } else { - // TODO maybe throw exception - } - case setEdgeProperty: SetEdgeProperty - if !AccessHelpers.isDeleted(setEdgeProperty.edge.src) && !AccessHelpers.isDeleted(setEdgeProperty.edge.dst) => - val (outR, inR) = normalizeRepresentation(setEdgeProperty.edge) - insert( - setEdgeProperties, - new EdgeRepr(outR.src, outR.dst, outR.edgeKind, outR.subSeq, setEdgeProperty.property), - graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind) - ) - insert( - setEdgeProperties, - new EdgeRepr(inR.src, inR.dst, inR.edgeKind, inR.subSeq, setEdgeProperty.property), - graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind) - ) - case edgeDeletion: RemoveEdge if !AccessHelpers.isDeleted(edgeDeletion.edge.src) && !AccessHelpers.isDeleted(edgeDeletion.edge.dst) => - /** This is the delEdge case. It is massively annoying. - * - * In order to support edge properties, we need to grab the right edge from e.src->e.dst. If we assume that our graph was built - * normally, i.e. edges were sequentially/batched added without the unsafe unidirectional edges, then our graph has the following - * invariant: The kth edge connecting A->B corresponds to the kth edge connecting B<-A This sucks big time, because edge removal is - * potentially O(N**2). The degenerate behavior occurs when we have ~k edges of the same type starting in src = X or ending in the - * same dst = X. Each deletion then costs us ~k, and if we delete all ~k edges we pay ~ k*k. - * - * But k~N is possible where N is the graph size! - */ - val (outR, inR) = normalizeRepresentation(edgeDeletion.edge) - insert(delEdges, outR, graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind)) - insert(delEdges, inR, graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind)) - - case setNodeProperty: SetNodeProperty if !AccessHelpers.isDeleted(setNodeProperty.node) => - val iter = setNodeProperty.property match { - case null => Iterator.empty - case iterable: Iterable[_] => iterable.iterator - case a: Array[_] => a.iterator - case item => Iterator.single(item) - } - insertProperty0(setNodeProperty.node, setNodeProperty.propertyKind, iter) - case delNode: DelNode => - // already processed - assert(AccessHelpers.isDeleted(delNode.node), s"node should have been deleted already but wasn't: ${delNode.node}") + case edgeDeletion: RemoveEdge + if !AccessHelpers.isDeleted(edgeDeletion.edge.src) && !AccessHelpers.isDeleted(edgeDeletion.edge.dst) => + ndiff += 1 + + /** This is the delEdge case. It is massively annoying. + * + * In order to support edge properties, we need to grab the right edge from e.src->e.dst. If we assume that our graph was built + * normally, i.e. edges were sequentially/batched added without the unsafe unidirectional edges, then our graph has the following + * invariant: The kth edge connecting A->B corresponds to the kth edge connecting B<-A This sucks big time, because edge removal + * is potentially O(N**2). The degenerate behavior occurs when we have ~k edges of the same type starting in src = X or ending in + * the same dst = X. Each deletion then costs us ~k, and if we delete all ~k edges we pay ~ k*k. + * + * But k~N is possible where N is the graph size! + */ + val (outR, inR) = normalizeRepresentation(edgeDeletion.edge) + insert(delEdges, outR, graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind)) + insert(delEdges, inR, graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind)) + + case setNodeProperty: SetNodeProperty if !AccessHelpers.isDeleted(setNodeProperty.node) => + ndiff += 1 + val iter = setNodeProperty.property match { + case null => Iterator.empty + case iterable: Iterable[_] => iterable.iterator + case a: Array[_] => a.iterator + case item => Iterator.single(item) + } + insertProperty0(setNodeProperty.node, setNodeProperty.propertyKind, iter) + case delNode: DelNode => + // already processed + assert(AccessHelpers.isDeleted(delNode.node), s"node should have been deleted already but wasn't: ${delNode.node}") + } + // We need to drain the deferred nodes after each single processed update. We cannot wait until we processed all nodes, + // because that would break the following invariant: + // if you have two diffgraphs with no node deletions, and you apply them separately, you get the same result + // as if when you merge the updates and then apply them together + // The breakage of the invariant would be pretty mild -- the only thing that gets messed up is the order of nodes. + ndiff += drainDeferred() } - drainDeferred() + ndiff } - private[flatgraph] def applyUpdate(): Unit = { - splitUpdate() + private[flatgraph] def applyUpdate(): Int = { + val ndiff = splitUpdate() diff.buffer = null // set edge properties @@ -231,6 +261,7 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, // we can now clear the newnodes newNodes(nodeKind) = null } + ndiff } private def deleteNodes(): Unit = { diff --git a/tests/src/test/scala/flatgraph/GraphTests.scala b/tests/src/test/scala/flatgraph/GraphTests.scala index 6c3241e3..b32e41c8 100644 --- a/tests/src/test/scala/flatgraph/GraphTests.scala +++ b/tests/src/test/scala/flatgraph/GraphTests.scala @@ -11,7 +11,7 @@ import testdomains.generic.nodes.{NewNodeA, NewNodeB, NodeA} import scala.jdk.CollectionConverters.MapHasAsScala -class GraphTests extends AnyWordSpec with MockFactory { +class GraphTestsWithSchema extends AnyWordSpec with MockFactory { "node property: log warning for schema-unconform property usage" in { // unknown node properties often root in deserialising an old storage format, @@ -41,4 +41,23 @@ class GraphTests extends AnyWordSpec with MockFactory { genericDomain.nodeB.head.propertiesMap.asScala shouldBe Map() } + "diffgraph with contained nodes: Produce the correct node order when merged" in { + import testdomains.generic.nodes + val genDomain = GenericDomain.empty + val nodeB_implicit = nodes.NewNodeB().stringOptional("implicit") + val nodeB_explicit = nodes.NewNodeB().stringOptional("explicit") + val nodeA = nodes.NewNodeA().node_b(nodeB_implicit) + DiffGraphApplier.applyDiff(genDomain.graph, GenericDomain.newDiffGraphBuilder.addNode(nodeA).addNode(nodeB_explicit)) + genDomain.nodeB.stringOptional.l shouldBe List("implicit", "explicit") + } + "diffgraph with contained nodes: Produce the correct node order when split" in { + import testdomains.generic.nodes + val genDomain = GenericDomain.empty + val nodeB_implicit = nodes.NewNodeB().stringOptional("implicit") + val nodeB_explicit = nodes.NewNodeB().stringOptional("explicit") + val nodeA = nodes.NewNodeA().node_b(nodeB_implicit) + DiffGraphApplier.applyDiff(genDomain.graph, GenericDomain.newDiffGraphBuilder.addNode(nodeA)) + DiffGraphApplier.applyDiff(genDomain.graph, GenericDomain.newDiffGraphBuilder.addNode(nodeB_explicit)) + genDomain.nodeB.stringOptional.l shouldBe List("implicit", "explicit") + } }