Skip to content

Commit

Permalink
Make diffgraph applier return number of transitive changes (#256)
Browse files Browse the repository at this point in the history
* also fixes a minor bug with node initialization order of transitively added nodes from diffgraphs
  • Loading branch information
bbrehm authored Sep 16, 2024
1 parent 2e05d59 commit 1c6bf43
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 80 deletions.
189 changes: 110 additions & 79 deletions core/src/main/scala/flatgraph/DiffGraphApplier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,20 @@ import flatgraph.misc.SchemaViolationReporter
import scala.collection.{Iterator, mutable}

object DiffGraphApplier {

/** Apply a diff to a graph. This returns the number of successfully applied changes.
*
* Since this applies all changes that are reachable from the DiffGraphBuilder, the number of applied changes can be vastly larger than
* the size of the diffgraphBuilder, e.g. if there are newNodes that have properties / contained nodes that are themselves newNodes.
*
* This function destroys the diff, in order to permit fast freeing of memory (in cases where we run close to exhausting the heap with
* the diff, the old graph, the new graph, and the temporaries during diff application).
*/
def applyDiff(
graph: Graph,
diff: DiffGraphBuilder,
schemaViolationReporter: SchemaViolationReporter = new SchemaViolationReporter
): Unit = {
): Int = {
if (graph.isClosed) throw new GraphClosedException(s"graph cannot be modified any longer since it's closed")
new DiffGraphApplier(graph, diff, schemaViolationReporter).applyUpdate()
}
Expand Down Expand Up @@ -69,10 +78,13 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
a(pos).append(item)
}

private def drainDeferred(): Unit = {
private def drainDeferred(): Int = {
var ndiff = 0
while (deferred.nonEmpty) {
ndiff += 1
deferred.removeHead().countAndVisitProperties(NewNodeInterface)
}
ndiff
}

private def getGNode(node: DNodeOrNode): GNode = {
Expand All @@ -99,98 +111,116 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
}
}

private def splitUpdate(): Unit = {
private def splitUpdate(): Int = {
var ndiff = 0
diff.buffer.foreach {
case delNode: DelNode if !AccessHelpers.isDeleted(delNode.node) =>
ndiff += 1
AccessHelpers.markDeleted(delNode.node)
delNodes.append(delNode.node)

case _ =>
}

diff.buffer.foreach {
case addNode: DNode =>
getGNode(addNode)
case halfEdge: AddUnsafeHalfEdge =>
val src = getGNode(halfEdge.src)
val dst = getGNode(halfEdge.dst)
if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) {
Direction.fromOrdinal(halfEdge.inout) match {
case Direction.Incoming =>
insert(
newEdges,
new AddEdgeProcessed(dst, src, halfEdge.edgeKind, halfEdge.property),
graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, halfEdge.edgeKind)
)
case Direction.Outgoing =>
insert(
newEdges,
new AddEdgeProcessed(src, dst, halfEdge.edgeKind, halfEdge.property),
graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, halfEdge.edgeKind)
)
diff.buffer.foreach { item =>
item match {
case addNode: DNode =>
getGNode(addNode)
case halfEdge: AddUnsafeHalfEdge =>
val src = getGNode(halfEdge.src)
val dst = getGNode(halfEdge.dst)
if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) {
ndiff += 1
Direction.fromOrdinal(halfEdge.inout) match {
case Direction.Incoming =>
insert(
newEdges,
new AddEdgeProcessed(dst, src, halfEdge.edgeKind, halfEdge.property),
graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, halfEdge.edgeKind)
)
case Direction.Outgoing =>
insert(
newEdges,
new AddEdgeProcessed(src, dst, halfEdge.edgeKind, halfEdge.property),
graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, halfEdge.edgeKind)
)
}
}
}
case newEdge: AddEdgeUnprocessed =>
val src = getGNode(newEdge.src)
val dst = getGNode(newEdge.dst)
if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) {
case newEdge: AddEdgeUnprocessed =>
val src = getGNode(newEdge.src)
val dst = getGNode(newEdge.dst)
if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) {
ndiff += 1
insert(
newEdges,
new AddEdgeProcessed(src, dst, newEdge.edgeKind, newEdge.property),
graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, newEdge.edgeKind)
)
insert(
newEdges,
new AddEdgeProcessed(dst, src, newEdge.edgeKind, newEdge.property),
graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, newEdge.edgeKind)
)
} else {
// TODO maybe throw exception
}
case setEdgeProperty: SetEdgeProperty
if !AccessHelpers.isDeleted(setEdgeProperty.edge.src) && !AccessHelpers.isDeleted(setEdgeProperty.edge.dst) =>
ndiff += 1
val (outR, inR) = normalizeRepresentation(setEdgeProperty.edge)
insert(
newEdges,
new AddEdgeProcessed(src, dst, newEdge.edgeKind, newEdge.property),
graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, newEdge.edgeKind)
setEdgeProperties,
new EdgeRepr(outR.src, outR.dst, outR.edgeKind, outR.subSeq, setEdgeProperty.property),
graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind)
)
insert(
newEdges,
new AddEdgeProcessed(dst, src, newEdge.edgeKind, newEdge.property),
graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, newEdge.edgeKind)
setEdgeProperties,
new EdgeRepr(inR.src, inR.dst, inR.edgeKind, inR.subSeq, setEdgeProperty.property),
graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind)
)
} else {
// TODO maybe throw exception
}
case setEdgeProperty: SetEdgeProperty
if !AccessHelpers.isDeleted(setEdgeProperty.edge.src) && !AccessHelpers.isDeleted(setEdgeProperty.edge.dst) =>
val (outR, inR) = normalizeRepresentation(setEdgeProperty.edge)
insert(
setEdgeProperties,
new EdgeRepr(outR.src, outR.dst, outR.edgeKind, outR.subSeq, setEdgeProperty.property),
graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind)
)
insert(
setEdgeProperties,
new EdgeRepr(inR.src, inR.dst, inR.edgeKind, inR.subSeq, setEdgeProperty.property),
graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind)
)
case edgeDeletion: RemoveEdge if !AccessHelpers.isDeleted(edgeDeletion.edge.src) && !AccessHelpers.isDeleted(edgeDeletion.edge.dst) =>
/** This is the delEdge case. It is massively annoying.
*
* In order to support edge properties, we need to grab the right edge from e.src->e.dst. If we assume that our graph was built
* normally, i.e. edges were sequentially/batched added without the unsafe unidirectional edges, then our graph has the following
* invariant: The kth edge connecting A->B corresponds to the kth edge connecting B<-A This sucks big time, because edge removal is
* potentially O(N**2). The degenerate behavior occurs when we have ~k edges of the same type starting in src = X or ending in the
* same dst = X. Each deletion then costs us ~k, and if we delete all ~k edges we pay ~ k*k.
*
* But k~N is possible where N is the graph size!
*/
val (outR, inR) = normalizeRepresentation(edgeDeletion.edge)
insert(delEdges, outR, graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind))
insert(delEdges, inR, graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind))

case setNodeProperty: SetNodeProperty if !AccessHelpers.isDeleted(setNodeProperty.node) =>
val iter = setNodeProperty.property match {
case null => Iterator.empty
case iterable: Iterable[_] => iterable.iterator
case a: Array[_] => a.iterator
case item => Iterator.single(item)
}
insertProperty0(setNodeProperty.node, setNodeProperty.propertyKind, iter)
case delNode: DelNode =>
// already processed
assert(AccessHelpers.isDeleted(delNode.node), s"node should have been deleted already but wasn't: ${delNode.node}")
case edgeDeletion: RemoveEdge
if !AccessHelpers.isDeleted(edgeDeletion.edge.src) && !AccessHelpers.isDeleted(edgeDeletion.edge.dst) =>
ndiff += 1

/** This is the delEdge case. It is massively annoying.
*
* In order to support edge properties, we need to grab the right edge from e.src->e.dst. If we assume that our graph was built
* normally, i.e. edges were sequentially/batched added without the unsafe unidirectional edges, then our graph has the following
* invariant: The kth edge connecting A->B corresponds to the kth edge connecting B<-A This sucks big time, because edge removal
* is potentially O(N**2). The degenerate behavior occurs when we have ~k edges of the same type starting in src = X or ending in
* the same dst = X. Each deletion then costs us ~k, and if we delete all ~k edges we pay ~ k*k.
*
* But k~N is possible where N is the graph size!
*/
val (outR, inR) = normalizeRepresentation(edgeDeletion.edge)
insert(delEdges, outR, graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind))
insert(delEdges, inR, graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind))

case setNodeProperty: SetNodeProperty if !AccessHelpers.isDeleted(setNodeProperty.node) =>
ndiff += 1
val iter = setNodeProperty.property match {
case null => Iterator.empty
case iterable: Iterable[_] => iterable.iterator
case a: Array[_] => a.iterator
case item => Iterator.single(item)
}
insertProperty0(setNodeProperty.node, setNodeProperty.propertyKind, iter)
case delNode: DelNode =>
// already processed
assert(AccessHelpers.isDeleted(delNode.node), s"node should have been deleted already but wasn't: ${delNode.node}")
}
// We need to drain the deferred nodes after each single processed update. We cannot wait until we processed all nodes,
// because that would break the following invariant:
// if you have two diffgraphs with no node deletions, and you apply them separately, you get the same result
// as if when you merge the updates and then apply them together
// The breakage of the invariant would be pretty mild -- the only thing that gets messed up is the order of nodes.
ndiff += drainDeferred()
}
drainDeferred()
ndiff
}

private[flatgraph] def applyUpdate(): Unit = {
splitUpdate()
private[flatgraph] def applyUpdate(): Int = {
val ndiff = splitUpdate()
diff.buffer = null

// set edge properties
Expand Down Expand Up @@ -231,6 +261,7 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
// we can now clear the newnodes
newNodes(nodeKind) = null
}
ndiff
}

private def deleteNodes(): Unit = {
Expand Down
21 changes: 20 additions & 1 deletion tests/src/test/scala/flatgraph/GraphTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import testdomains.generic.nodes.{NewNodeA, NewNodeB, NodeA}

import scala.jdk.CollectionConverters.MapHasAsScala

class GraphTests extends AnyWordSpec with MockFactory {
class GraphTestsWithSchema extends AnyWordSpec with MockFactory {

"node property: log warning for schema-unconform property usage" in {
// unknown node properties often root in deserialising an old storage format,
Expand Down Expand Up @@ -41,4 +41,23 @@ class GraphTests extends AnyWordSpec with MockFactory {
genericDomain.nodeB.head.propertiesMap.asScala shouldBe Map()
}

"diffgraph with contained nodes: Produce the correct node order when merged" in {
import testdomains.generic.nodes
val genDomain = GenericDomain.empty
val nodeB_implicit = nodes.NewNodeB().stringOptional("implicit")
val nodeB_explicit = nodes.NewNodeB().stringOptional("explicit")
val nodeA = nodes.NewNodeA().node_b(nodeB_implicit)
DiffGraphApplier.applyDiff(genDomain.graph, GenericDomain.newDiffGraphBuilder.addNode(nodeA).addNode(nodeB_explicit))
genDomain.nodeB.stringOptional.l shouldBe List("implicit", "explicit")
}
"diffgraph with contained nodes: Produce the correct node order when split" in {
import testdomains.generic.nodes
val genDomain = GenericDomain.empty
val nodeB_implicit = nodes.NewNodeB().stringOptional("implicit")
val nodeB_explicit = nodes.NewNodeB().stringOptional("explicit")
val nodeA = nodes.NewNodeA().node_b(nodeB_implicit)
DiffGraphApplier.applyDiff(genDomain.graph, GenericDomain.newDiffGraphBuilder.addNode(nodeA))
DiffGraphApplier.applyDiff(genDomain.graph, GenericDomain.newDiffGraphBuilder.addNode(nodeB_explicit))
genDomain.nodeB.stringOptional.l shouldBe List("implicit", "explicit")
}
}

0 comments on commit 1c6bf43

Please sign in to comment.