Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat graph investigate query time explosion #306

Merged
merged 9 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions deployments/kubehound/graph/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY dsl/kubehound/pom.xml /home/app
RUN mvn -f /home/app/pom.xml clean install

# Now build our janusgraph wrapper container with KubeHound customizations
FROM janusgraph/janusgraph:1.0.0
FROM janusgraph/janusgraph:1.1.0
LABEL org.opencontainers.image.source="https://github.com/DataDog/kubehound/"

# Add our initialization script for the database schema to the startup directory
Expand Down Expand Up @@ -59,9 +59,16 @@ ENV gremlinserver.metrics.slf4jReporter.enabled=false
ENV gremlinserver.metrics.graphiteReporter.enabled=false
ENV gremlinserver.metrics.csvReporter.enabled=false

# Add safety net settings to prevent OOM and other issues
ENV janusgraph.query.force-index=false
ENV janusgraph.cluster.max-partitions=512
ENV janusgraph.query.batch=true
ENV janusgraph.query.hard-max-limit=100000
ENV janusgraph.query.smart-limit=true

# Performance tweaks based on: https://www.sailpoint.com/blog/souping-up-the-gremlin/
# gremlinPool will default to Runtime.availableProcessors()
ENV gremlinserver.gremlinPool=0
ENV gremlinserver.gremlinPool=0
# threadPoolWorker should be 2x VCPU (TODO: can we set dynamically?)
ENV gremlinserver.threadPoolWorker=16

Expand Down
3 changes: 2 additions & 1 deletion deployments/kubehound/graph/conf/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
#################

-XX:+UseG1GC
-XX:+UseContainerSupport
-XX:SurvivorRatio=8
-XX:MaxTenuringThreshold=1

-javaagent:"/opt/janusgraph/lib/jmx_prometheus_javaagent-0.18.0.jar"=8099:/opt/janusgraph/lib/exporter-config.yaml
-javaagent:"/opt/janusgraph/lib/jmx_prometheus_javaagent-0.18.0.jar"=8099:/opt/janusgraph/lib/exporter-config.yaml
27 changes: 26 additions & 1 deletion deployments/kubehound/graph/kubehound-db-init.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ mgmt.buildIndex('byServiceEndpoint', Vertex.class).addKey(serviceEndpoint).build
mgmt.buildIndex('byServiceDns', Vertex.class).addKey(serviceDns).buildCompositeIndex();
mgmt.buildIndex('byExposure', Vertex.class).addKey(exposure).buildCompositeIndex();

// Create composite indices for the properties we want to search on
mgmt.buildIndex('byClusterAndRunIDComposite', Vertex.class).addKey(cluster).addKey(runID).buildCompositeIndex();
mgmt.buildIndex('byClassAndRunIDComposite', Vertex.class).addKey(cls).addKey(runID).buildCompositeIndex();
mgmt.buildIndex('byClassAndClusterComposite', Vertex.class).addKey(cls).addKey(cluster).buildCompositeIndex();
mgmt.buildIndex('byClassAndTypeComposite', Vertex.class).addKey(cls).addKey(type).buildCompositeIndex();
mgmt.buildIndex('byClassAndExposureComposite', Vertex.class).addKey(cls).addKey(exposure).buildCompositeIndex();
mgmt.buildIndex('byTypeAndNameComposite', Vertex.class).addKey(type).addKey(name).buildCompositeIndex();
mgmt.buildIndex('byImageAndRunIDComposite', Vertex.class).addKey(image).addKey(runID).buildCompositeIndex();
mgmt.buildIndex('byAppAndRunIDComposite', Vertex.class).addKey(app).addKey(runID).buildCompositeIndex();
mgmt.buildIndex('byNamespaceAndRunIDComposite', Vertex.class).addKey(namespace).addKey(runID).buildCompositeIndex();
Comment on lines +182 to +191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, those will definitely be super useful.
Just by curiosity, do you know if it would have a significant impact on insert time? Or does it stay negligeable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For in-memory, I don't see any relevant/noticeable write performance improvements, except for the graph nuking, which is using the index now. It is more visible with concrete datastore and search index backends.


mgmt.commit();

Expand All @@ -194,9 +204,24 @@ ManagementSystem.awaitGraphIndexStatus(graph, 'byName').status(SchemaStatus.ENAB
ManagementSystem.awaitGraphIndexStatus(graph, 'byNamespace').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byType').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byCritical').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byPort').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byPortName').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byServiceEndpoint').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byServiceDns').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byExposure').status(SchemaStatus.ENABLED).call();

ManagementSystem.awaitGraphIndexStatus(graph, 'byClusterAndRunIDComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byClassAndRunIDComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byClassAndClusterComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byClassAndTypeComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byClassAndExposureComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byTypeAndNameComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byImageAndRunIDComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byAppAndRunIDComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byNamespaceAndRunIDComposite').status(SchemaStatus.ENABLED).call();

System.out.println("[KUBEHOUND] graph schema and indexes ready");
mgmt.close();

// Close the open connection
:remote close
:remote close
66 changes: 33 additions & 33 deletions deployments/kubehound/ui/LowHangingFruit-ContainerEscape.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .where(\n",
" repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .limit(1)\n",
" )\n",
" .dedup().by(\"image\")\n",
Expand Down Expand Up @@ -201,11 +201,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .where(\n",
" repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .limit(1)\n",
" )\n",
" .dedup()\n",
Expand Down Expand Up @@ -313,11 +313,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .has(\"app\",graph.variables().get('containerEscape_vulnApp_yourid').get().trim())\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(elementMap())\n",
" .limit(1000)"
]
Expand Down Expand Up @@ -360,11 +360,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .has(\"app\",graph.variables().get('containerEscape_vulnApp_yourid').get().trim())\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path()\n",
" .by(valueMap(\"app\", \"class\",\"critical\").with(WithOptions.tokens,WithOptions.labels))\n",
" .dedup()\n",
Expand Down Expand Up @@ -402,11 +402,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .has(\"app\",graph.variables().get('containerEscape_vulnApp_yourid').get().trim())\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(label())\n",
" .dedup()\n",
" .limit(1000)"
Expand Down Expand Up @@ -438,11 +438,11 @@
" .or().has(\"namespace\", within(graph.variables().get('containerEscape_whiteListedNamespace_yourid').get()))\n",
" )\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(elementMap())\n",
" .limit(1000)"
]
Expand All @@ -469,11 +469,11 @@
" .or().has(\"namespace\", within(graph.variables().get('containerEscape_whiteListedNamespace_yourid').get()))\n",
" )\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path()\n",
" .by(valueMap(\"app\", \"class\",\"critical\").with(WithOptions.tokens,WithOptions.labels))\n",
" .dedup()\n",
Expand Down Expand Up @@ -502,12 +502,12 @@
" .or().has(\"namespace\", within(graph.variables().get('containerEscape_whiteListedNamespace_yourid').get()))\n",
" )\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(label())\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(\"class\")\n",
" .dedup()\n",
" .limit(1000)"
]
Expand Down
14 changes: 14 additions & 0 deletions pkg/kubehound/graph/vertex/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package vertex

var (
// Labels is a list of all possible labels for a vertex in the graph.
Labels = []string{
ContainerLabel,
EndpointLabel,
IdentityLabel,
NodeLabel,
PermissionSetLabel,
PodLabel,
VolumeLabel,
}
)
91 changes: 47 additions & 44 deletions pkg/kubehound/storage/graphdb/janusgraph_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,50 +67,53 @@ func (jgp *JanusGraphProvider) Prepare(ctx context.Context) error {
return nil
}

g := gremlin.Traversal_().WithRemote(jgp.drc)
tx := g.Tx()
defer tx.Close()

for {
// Begin a new transaction.
gtx, err := tx.Begin()
if err != nil {
return err
}

// Retrieve the number of vertices in the graph.
page, err := gtx.V().Count().Next()
if err != nil {
return err
}

// Decode the number of vertices from the page.
count, err := page.GetInt()
if err != nil {
return err
}

// If there are no more vertices to delete, break the loop.
if count == 0 {
break
}

// Delete the vertices in the graph.
err = <-gtx.V().Limit(deleteBatchSize).Drop().Iterate()
if err != nil {
return err
}

// Commit the transaction.
if err := tx.Commit(); err != nil {
return err
}

// Check context for cancellation.
select {
case <-ctx.Done():
return ctx.Err()
default:
// These vertex types are defined in the schema.
Zenithar marked this conversation as resolved.
Show resolved Hide resolved
for _, vertexType := range vertex.Labels {
g := gremlin.Traversal_().WithRemote(jgp.drc)
tx := g.Tx()
defer tx.Close()

for {
// Begin a new transaction.
gtx, err := tx.Begin()
if err != nil {
return err
}

// Retrieve the number of vertices in the graph.
page, err := gtx.V().Has("class", vertexType).Count().Next()
if err != nil {
return err
}

// Decode the number of vertices from the page.
count, err := page.GetInt()
if err != nil {
return err
}

// If there are no more vertices to delete, break the loop.
if count == 0 {
break
}

// Delete the vertices in the graph.
err = <-gtx.V().Has("class", vertexType).Limit(deleteBatchSize).Drop().Iterate()
if err != nil {
return err
}

// Commit the transaction.
if err := tx.Commit(); err != nil {
return err
}

// Check context for cancellation.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions test/system/graph_dsl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ func (suite *DslTestSuite) TestTraversal_criticalPaths() {
// There are A LOT of paths in the test cluster. Just sample a few
expected := []string{
"path[Endpoint, ENDPOINT_EXPLOIT, Container, IDENTITY_ASSUME, Identity, PERMISSION_DISCOVER, PermissionSet]",
"path[Endpoint, ENDPOINT_EXPLOIT, Container, VOLUME_DISCOVER, Volume, TOKEN_STEAL, Identity, PERMISSION_DISCOVER, PermissionSet]",
"path[Endpoint, ENDPOINT_EXPLOIT, Container, CE_NSENTER, Node, IDENTITY_ASSUME, Identity, PERMISSION_DISCOVER, PermissionSet]",
"path[Endpoint, ENDPOINT_EXPLOIT, Container, CE_MODULE_LOAD, Node, IDENTITY_ASSUME, Identity, PERMISSION_DISCOVER, PermissionSet]",
"path[Endpoint, ENDPOINT_EXPLOIT, Container, CE_PRIV_MOUNT, Node, IDENTITY_ASSUME, Identity, PERMISSION_DISCOVER, PermissionSet]",
Expand All @@ -283,7 +282,7 @@ func (suite *DslTestSuite) TestTraversal_minHopsToCritical() {

serviceHops, err := res.GetInt()
suite.NoError(err)
suite.Equal(serviceHops, 4)
suite.Equal(4, serviceHops)

// Container should have 1 less hop
raw, err = suite.client.Submit("kh.containers().minHopsToCritical(6)")
Expand All @@ -295,7 +294,7 @@ func (suite *DslTestSuite) TestTraversal_minHopsToCritical() {

containerHops, err := res.GetInt()
suite.NoError(err)
suite.Equal(containerHops, serviceHops-1)
suite.Equal(serviceHops-1, containerHops)
}

func (suite *DslTestSuite) TestTraversal_criticalPathsFilter() {
Expand Down
2 changes: 1 addition & 1 deletion test/system/graph_edge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (suite *EdgeTestSuite) TestEdge_IDENTITY_ASSUME_Node() {
func (suite *EdgeTestSuite) TestEdge_POD_ATTACH() {
// Every pod should have a POD_ATTACH incoming from a node
rawCount, err := suite.g.V().
HasLabel("Pod").
Has("class", "Pod").
Count().Next()

suite.NoError(err)
Expand Down
30 changes: 18 additions & 12 deletions test/system/graph_vertex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,20 +356,26 @@ func (suite *VertexTestSuite) TestVertexIdentity() {

func (suite *VertexTestSuite) TestVertexClusterProperty() {
// All vertices should have the cluster property set
results, err := suite.g.V().
Values("cluster").
Dedup().
ToList()

suite.NoError(err)
suite.GreaterOrEqual(len(results), 1)

present := suite.resultsToStringArray(results)
expected := []string{
"kind-kubehound.test.local",
for _, label := range vertex.Labels {
suite.Run(label, func() {
results, err := suite.g.V().
Has("class", label).
Values("cluster").
Dedup().
ToList()

suite.NoError(err)
suite.GreaterOrEqual(len(results), 1)

present := suite.resultsToStringArray(results)
expected := []string{
"kind-kubehound.test.local",
}

suite.Subset(present, expected)
})
}

suite.Subset(present, expected)
}

func (suite *VertexTestSuite) TearDownSuite() {
Expand Down
Loading