Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make diffgraph applier return number of transitive changes #256

Merged
merged 7 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 103 additions & 80 deletions core/src/main/scala/flatgraph/DiffGraphApplier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object DiffGraphApplier {
graph: Graph,
diff: DiffGraphBuilder,
schemaViolationReporter: SchemaViolationReporter = new SchemaViolationReporter
): Unit = {
): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our future selves are demanding to add some comments like /** returns the number of modifications that were successfully applied to the graph */ to methods like this, which only return an Int that could mean anything or nothing.

if (graph.isClosed) throw new GraphClosedException(s"graph cannot be modified any longer since it's closed")
new DiffGraphApplier(graph, diff, schemaViolationReporter).applyUpdate()
}
Expand Down Expand Up @@ -69,10 +69,13 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
a(pos).append(item)
}

private def drainDeferred(): Unit = {
private def drainDeferred(): Int = {
var ndiff = 0
while (deferred.nonEmpty) {
ndiff += 1
deferred.removeHead().countAndVisitProperties(NewNodeInterface)
}
ndiff
}

private def getGNode(node: DNodeOrNode): GNode = {
Expand All @@ -99,98 +102,116 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
}
}

private def splitUpdate(): Unit = {
private def splitUpdate(): Int = {
var ndiff = 0
diff.buffer.foreach {
case delNode: DelNode if !AccessHelpers.isDeleted(delNode.node) =>
ndiff += 1
AccessHelpers.markDeleted(delNode.node)
delNodes.append(delNode.node)

case _ =>
}

diff.buffer.foreach {
case addNode: DNode =>
getGNode(addNode)
case halfEdge: AddUnsafeHalfEdge =>
val src = getGNode(halfEdge.src)
val dst = getGNode(halfEdge.dst)
if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) {
Direction.fromOrdinal(halfEdge.inout) match {
case Direction.Incoming =>
insert(
newEdges,
new AddEdgeProcessed(dst, src, halfEdge.edgeKind, halfEdge.property),
graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, halfEdge.edgeKind)
)
case Direction.Outgoing =>
insert(
newEdges,
new AddEdgeProcessed(src, dst, halfEdge.edgeKind, halfEdge.property),
graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, halfEdge.edgeKind)
)
diff.buffer.foreach { item =>
item match {
case addNode: DNode =>
getGNode(addNode)
case halfEdge: AddUnsafeHalfEdge =>
val src = getGNode(halfEdge.src)
val dst = getGNode(halfEdge.dst)
if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) {
ndiff += 1
Direction.fromOrdinal(halfEdge.inout) match {
case Direction.Incoming =>
insert(
newEdges,
new AddEdgeProcessed(dst, src, halfEdge.edgeKind, halfEdge.property),
graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, halfEdge.edgeKind)
)
case Direction.Outgoing =>
insert(
newEdges,
new AddEdgeProcessed(src, dst, halfEdge.edgeKind, halfEdge.property),
graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, halfEdge.edgeKind)
)
}
}
}
case newEdge: AddEdgeUnprocessed =>
val src = getGNode(newEdge.src)
val dst = getGNode(newEdge.dst)
if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) {
case newEdge: AddEdgeUnprocessed =>
val src = getGNode(newEdge.src)
val dst = getGNode(newEdge.dst)
if (!AccessHelpers.isDeleted(src) && !AccessHelpers.isDeleted(dst)) {
ndiff += 1
insert(
newEdges,
new AddEdgeProcessed(src, dst, newEdge.edgeKind, newEdge.property),
graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, newEdge.edgeKind)
)
insert(
newEdges,
new AddEdgeProcessed(dst, src, newEdge.edgeKind, newEdge.property),
graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, newEdge.edgeKind)
)
} else {
// TODO maybe throw exception
}
case setEdgeProperty: SetEdgeProperty
if !AccessHelpers.isDeleted(setEdgeProperty.edge.src) && !AccessHelpers.isDeleted(setEdgeProperty.edge.dst) =>
ndiff += 1
val (outR, inR) = normalizeRepresentation(setEdgeProperty.edge)
insert(
newEdges,
new AddEdgeProcessed(src, dst, newEdge.edgeKind, newEdge.property),
graph.schema.neighborOffsetArrayIndex(src.nodeKind, Outgoing, newEdge.edgeKind)
setEdgeProperties,
new EdgeRepr(outR.src, outR.dst, outR.edgeKind, outR.subSeq, setEdgeProperty.property),
graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind)
)
insert(
newEdges,
new AddEdgeProcessed(dst, src, newEdge.edgeKind, newEdge.property),
graph.schema.neighborOffsetArrayIndex(dst.nodeKind, Incoming, newEdge.edgeKind)
setEdgeProperties,
new EdgeRepr(inR.src, inR.dst, inR.edgeKind, inR.subSeq, setEdgeProperty.property),
graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind)
)
} else {
// TODO maybe throw exception
}
case setEdgeProperty: SetEdgeProperty
if !AccessHelpers.isDeleted(setEdgeProperty.edge.src) && !AccessHelpers.isDeleted(setEdgeProperty.edge.dst) =>
val (outR, inR) = normalizeRepresentation(setEdgeProperty.edge)
insert(
setEdgeProperties,
new EdgeRepr(outR.src, outR.dst, outR.edgeKind, outR.subSeq, setEdgeProperty.property),
graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind)
)
insert(
setEdgeProperties,
new EdgeRepr(inR.src, inR.dst, inR.edgeKind, inR.subSeq, setEdgeProperty.property),
graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind)
)
case edgeDeletion: RemoveEdge if !AccessHelpers.isDeleted(edgeDeletion.edge.src) && !AccessHelpers.isDeleted(edgeDeletion.edge.dst) =>
/** This is the delEdge case. It is massively annoying.
*
* In order to support edge properties, we need to grab the right edge from e.src->e.dst. If we assume that our graph was built
* normally, i.e. edges were sequentially/batched added without the unsafe unidirectional edges, then our graph has the following
* invariant: The kth edge connecting A->B corresponds to the kth edge connecting B<-A This sucks big time, because edge removal is
* potentially O(N**2). The degenerate behavior occurs when we have ~k edges of the same type starting in src = X or ending in the
* same dst = X. Each deletion then costs us ~k, and if we delete all ~k edges we pay ~ k*k.
*
* But k~N is possible where N is the graph size!
*/
val (outR, inR) = normalizeRepresentation(edgeDeletion.edge)
insert(delEdges, outR, graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind))
insert(delEdges, inR, graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind))

case setNodeProperty: SetNodeProperty if !AccessHelpers.isDeleted(setNodeProperty.node) =>
val iter = setNodeProperty.property match {
case null => Iterator.empty
case iterable: Iterable[_] => iterable.iterator
case a: Array[_] => a.iterator
case item => Iterator.single(item)
}
insertProperty0(setNodeProperty.node, setNodeProperty.propertyKind, iter)
case delNode: DelNode =>
// already processed
assert(AccessHelpers.isDeleted(delNode.node), s"node should have been deleted already but wasn't: ${delNode.node}")
case edgeDeletion: RemoveEdge
if !AccessHelpers.isDeleted(edgeDeletion.edge.src) && !AccessHelpers.isDeleted(edgeDeletion.edge.dst) =>
ndiff += 1

/** This is the delEdge case. It is massively annoying.
*
* In order to support edge properties, we need to grab the right edge from e.src->e.dst. If we assume that our graph was built
* normally, i.e. edges were sequentially/batched added without the unsafe unidirectional edges, then our graph has the following
* invariant: The kth edge connecting A->B corresponds to the kth edge connecting B<-A This sucks big time, because edge removal
* is potentially O(N**2). The degenerate behavior occurs when we have ~k edges of the same type starting in src = X or ending in
* the same dst = X. Each deletion then costs us ~k, and if we delete all ~k edges we pay ~ k*k.
*
* But k~N is possible where N is the graph size!
*/
val (outR, inR) = normalizeRepresentation(edgeDeletion.edge)
insert(delEdges, outR, graph.schema.neighborOffsetArrayIndex(outR.src.nodeKind, Outgoing, outR.edgeKind))
insert(delEdges, inR, graph.schema.neighborOffsetArrayIndex(inR.src.nodeKind, Incoming, inR.edgeKind))

case setNodeProperty: SetNodeProperty if !AccessHelpers.isDeleted(setNodeProperty.node) =>
ndiff += 1
val iter = setNodeProperty.property match {
case null => Iterator.empty
case iterable: Iterable[_] => iterable.iterator
case a: Array[_] => a.iterator
case item => Iterator.single(item)
}
insertProperty0(setNodeProperty.node, setNodeProperty.propertyKind, iter)
case delNode: DelNode =>
// already processed
assert(AccessHelpers.isDeleted(delNode.node), s"node should have been deleted already but wasn't: ${delNode.node}")
}
// We need to drain the deferred nodes after each single processed update. We cannot wait until we processed all nodes,
// because that would break the following invariant:
// if you have two diffgraphs with no node deletions, and you apply them separately, you get the same result
// as if when you merge the updates and then apply them together
// The breakage of the invariant would be pretty mild -- the only thing that gets messed up is the order of nodes.
ndiff += drainDeferred()
}
drainDeferred()
ndiff
}

private[flatgraph] def applyUpdate(): Unit = {
splitUpdate()
private[flatgraph] def applyUpdate(): Int = {
val ndiff = splitUpdate()
diff.buffer = null

// set edge properties
Expand Down Expand Up @@ -231,6 +252,7 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
// we can now clear the newnodes
newNodes(nodeKind) = null
}
ndiff
}

private def deleteNodes(): Unit = {
Expand Down Expand Up @@ -488,12 +510,13 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
var idx = 0
while (idx < deletionSeqIndexEnd - deletionSeqIndexStart) {
if (deletion != null && idx == deletion.subSeq - 1) {
deletionCounter += 1
assert(
deletion.dst == oldNeighbors(deletionSeqIndexStart + idx),
s"deletion.dst was supposed to be `${oldNeighbors(deletionSeqIndexStart + idx)}`, but instead is ${deletion.dst}"
)
deletionCounter += 1
deletion = if (deletionCounter < deletions.size) deletions(deletionCounter) else null
if (deletion != null && deletion.src.seq() != deletionSeq) deletion = null
} else {
newNeighbors(deletionSeqIndexStart + idx - deletionCounter) = oldNeighbors(deletionSeqIndexStart + idx)
if (oldProperty != null)
Expand Down
38 changes: 38 additions & 0 deletions core/src/test/scala/flatgraph/GraphTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,44 @@ class GraphTests extends AnyWordSpec with Matchers {
debugDump(g) shouldBe expectation
}

"permit edge deletions with unfortunate ordering" in {
val g = new Graph(schema)
val v0 = new GenericDNode(0)
val v1 = new GenericDNode(0)

DiffGraphApplier.applyDiff(
g,
new DiffGraphBuilder(schema)
.addNode(v0)
.addNode(v1)
._addEdge(v0, v0, 0)
._addEdge(v0, v0, 0)
._addEdge(v1, v1, 0)
._addEdge(v1, v1, 0)
)
debugDump(g) shouldBe """#Node numbers (kindId, nnodes) (0: 2), total 2
|Node kind 0. (eid, nEdgesOut, nEdgesIn): (0, 4 [dense], 4 [dense]),
| V0_0 [0] -> V0_0, V0_0
| V0_0 [0] <- V0_0, V0_0
| V0_1 [0] -> V0_1, V0_1
| V0_1 [0] <- V0_1, V0_1
|""".stripMargin
DiffGraphApplier.applyDiff(
g,
new DiffGraphBuilder(schema)
.removeEdge(Accessors.getEdgesOut(v0.storedRef.get, 0)(0))
.removeEdge(Accessors.getEdgesOut(v1.storedRef.get, 0)(1))
)
debugDump(g) shouldBe """#Node numbers (kindId, nnodes) (0: 2), total 2
|Node kind 0. (eid, nEdgesOut, nEdgesIn): (0, 2 [dense], 2 [dense]),
| V0_0 [0] -> V0_0
| V0_0 [0] <- V0_0
| V0_1 [0] -> V0_1
| V0_1 [0] <- V0_1
|""".stripMargin

}

"permit node deletion" in {
var g = mkGraph()
debugDump(g) shouldBe
Expand Down
21 changes: 20 additions & 1 deletion tests/src/test/scala/flatgraph/GraphTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import testdomains.generic.nodes.{NewNodeA, NewNodeB, NodeA}

import scala.jdk.CollectionConverters.MapHasAsScala

class GraphTests extends AnyWordSpec with MockFactory {
class GraphTestsWithSchema extends AnyWordSpec with MockFactory {

"node property: log warning for schema-unconform property usage" in {
// unknown node properties often root in deserialising an old storage format,
Expand Down Expand Up @@ -41,4 +41,23 @@ class GraphTests extends AnyWordSpec with MockFactory {
genericDomain.nodeB.head.propertiesMap.asScala shouldBe Map()
}

"diffgraph with contained nodes: Produce the correct node order when merged" in {
import testdomains.generic.nodes
val genDomain = GenericDomain.empty
val nodeB_implicit = nodes.NewNodeB().stringOptional("implicit")
val nodeB_explicit = nodes.NewNodeB().stringOptional("explicit")
val nodeA = nodes.NewNodeA().node_b(nodeB_implicit)
DiffGraphApplier.applyDiff(genDomain.graph, GenericDomain.newDiffGraphBuilder.addNode(nodeA).addNode(nodeB_explicit))
genDomain.nodeB.stringOptional.l shouldBe List("implicit", "explicit")
}
"diffgraph with contained nodes: Produce the correct node order when split" in {
import testdomains.generic.nodes
val genDomain = GenericDomain.empty
val nodeB_implicit = nodes.NewNodeB().stringOptional("implicit")
val nodeB_explicit = nodes.NewNodeB().stringOptional("explicit")
val nodeA = nodes.NewNodeA().node_b(nodeB_implicit)
DiffGraphApplier.applyDiff(genDomain.graph, GenericDomain.newDiffGraphBuilder.addNode(nodeA))
DiffGraphApplier.applyDiff(genDomain.graph, GenericDomain.newDiffGraphBuilder.addNode(nodeB_explicit))
genDomain.nodeB.stringOptional.l shouldBe List("implicit", "explicit")
}
}
Loading