Skip to content

Commit

Permalink
Merge pull request #295 from DataDog/zenithar/graphdb_batch_writer_re…
Browse files Browse the repository at this point in the history
…siliency

graphdb batch writer resiliency
  • Loading branch information
Zenithar authored Dec 2, 2024
2 parents 5cabd42 + c60598a commit d8de160
Show file tree
Hide file tree
Showing 14 changed files with 595 additions and 176 deletions.
7 changes: 5 additions & 2 deletions configs/etc/kubehound-reference.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ janusgraph:
# Timeout on requests to the JanusGraph DB instance
connection_timeout: 30s

# Number of worker threads for the JanusGraph writer pool
writer_worker_count: 10

#
# Datadog telemetry configuration
#
Expand Down Expand Up @@ -114,10 +117,10 @@ builder:
# worker_pool_capacity: 100

# # Batch size for edge inserts
# batch_size: 500
# batch_size: 250

# # Small batch size for edge inserts
# batch_size_small: 75
# batch_size_small: 50

# # Cluster impact batch size for edge inserts
# batch_size_cluster_impact: 1
Expand Down
7 changes: 5 additions & 2 deletions configs/etc/kubehound.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,22 @@ janusgraph:
# Timeout on requests to the JanusGraph DB instance
connection_timeout: 30s

# Number of worker threads for the JanusGraph writer pool
writer_worker_count: 10

# Graph builder configuration
builder:
# Vertex builder configuration
vertex:
# Batch size for vertex inserts
batch_size: 500
batch_size: 250

# Edge builder configuration
edge:
worker_pool_size: 2

# Batch size for edge inserts
batch_size: 500
batch_size: 250

# Cluster impact batch size for edge inserts
batch_size_cluster_impact: 10
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package config
const (
DefaultEdgeWorkerPoolSize = 5
DefaultEdgeWorkerPoolCapacity = 100
DefaultEdgeBatchSize = 500
DefaultEdgeBatchSize = 250
DefaultEdgeBatchSizeSmall = DefaultEdgeBatchSize / 5
DefaultEdgeBatchSizeClusterImpact = 10

DefaultVertexBatchSize = 500
DefaultVertexBatchSize = 250
DefaultVertexBatchSizeSmall = DefaultVertexBatchSize / 5

DefaultStopOnError = false
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ func SetDefaultValues(ctx context.Context, v *viper.Viper) {
// Defaults values for JanusGraph
v.SetDefault(JanusGraphUrl, DefaultJanusGraphUrl)
v.SetDefault(JanusGrapTimeout, DefaultConnectionTimeout)
v.SetDefault(JanusGraphWriterTimeout, defaultJanusGraphWriterTimeout)
v.SetDefault(JanusGraphWriterMaxRetry, defaultJanusGraphWriterMaxRetry)
v.SetDefault(JanusGraphWriterWorkerCount, defaultJanusGraphWriterWorkerCount)

// Profiler values
v.SetDefault(TelemetryProfilerPeriod, DefaultProfilerPeriod)
Expand Down Expand Up @@ -157,6 +160,9 @@ func SetEnvOverrides(ctx context.Context, c *viper.Viper) {

res = multierror.Append(res, c.BindEnv(MongoUrl, "KH_MONGODB_URL"))
res = multierror.Append(res, c.BindEnv(JanusGraphUrl, "KH_JANUSGRAPH_URL"))
res = multierror.Append(res, c.BindEnv(JanusGraphWriterMaxRetry, "KH_JANUSGRAPH_WRITER_MAX_RETRY"))
res = multierror.Append(res, c.BindEnv(JanusGraphWriterTimeout, "KH_JANUSGRAPH_WRITER_TIMEOUT"))
res = multierror.Append(res, c.BindEnv(JanusGraphWriterWorkerCount, "KH_JANUSGRAPH_WRITER_WORKER_COUNT"))

res = multierror.Append(res, c.BindEnv(IngestorAPIEndpoint, "KH_INGESTOR_API_ENDPOINT"))
res = multierror.Append(res, c.BindEnv(IngestorAPIInsecure, "KH_INGESTOR_API_INSECURE"))
Expand All @@ -166,6 +172,11 @@ func SetEnvOverrides(ctx context.Context, c *viper.Viper) {
res = multierror.Append(res, c.BindEnv(IngestorArchiveName, "KH_INGESTOR_ARCHIVE_NAME"))
res = multierror.Append(res, c.BindEnv(IngestorBlobRegion, "KH_INGESTOR_REGION"))

res = multierror.Append(res, c.BindEnv("builder.vertex.batch_size", "KH_BUILDER_VERTEX_BATCH_SIZE"))
res = multierror.Append(res, c.BindEnv("builder.vertex.batch_size_small", "KH_BUILDER_VERTEX_BATCH_SIZE_SMALL"))
res = multierror.Append(res, c.BindEnv("builder.edge.batch_size", "KH_BUILDER_EDGE_BATCH_SIZE"))
res = multierror.Append(res, c.BindEnv("builder.edge.batch_size_small", "KH_BUILDER_EDGE_BATCH_SIZE_SMALL"))

res = multierror.Append(res, c.BindEnv(TelemetryStatsdUrl, "STATSD_URL"))
res = multierror.Append(res, c.BindEnv(TelemetryTracerUrl, "TRACE_AGENT_URL"))

Expand Down
16 changes: 11 additions & 5 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func TestMustLoadConfig(t *testing.T) {
JanusGraph: JanusGraphConfig{
URL: "ws://localhost:8182/gremlin",
ConnectionTimeout: DefaultConnectionTimeout,
WriterTimeout: defaultJanusGraphWriterTimeout,
WriterMaxRetry: defaultJanusGraphWriterMaxRetry,
WriterWorkerCount: defaultJanusGraphWriterWorkerCount,
},
Telemetry: TelemetryConfig{
Statsd: StatsdConfig{
Expand All @@ -64,15 +67,15 @@ func TestMustLoadConfig(t *testing.T) {
},
Builder: BuilderConfig{
Vertex: VertexBuilderConfig{
BatchSize: 500,
BatchSizeSmall: 100,
BatchSize: 250,
BatchSizeSmall: 50,
},
Edge: EdgeBuilderConfig{
LargeClusterOptimizations: DefaultLargeClusterOptimizations,
WorkerPoolSize: 5,
WorkerPoolCapacity: 100,
BatchSize: 500,
BatchSizeSmall: 100,
BatchSize: 250,
BatchSizeSmall: 50,
BatchSizeClusterImpact: 10,
},
},
Expand Down Expand Up @@ -126,6 +129,9 @@ func TestMustLoadConfig(t *testing.T) {
JanusGraph: JanusGraphConfig{
URL: "ws://localhost:8182/gremlin",
ConnectionTimeout: DefaultConnectionTimeout,
WriterTimeout: defaultJanusGraphWriterTimeout,
WriterMaxRetry: defaultJanusGraphWriterMaxRetry,
WriterWorkerCount: defaultJanusGraphWriterWorkerCount,
},
Telemetry: TelemetryConfig{
Statsd: StatsdConfig{
Expand All @@ -139,7 +145,7 @@ func TestMustLoadConfig(t *testing.T) {
Builder: BuilderConfig{
Vertex: VertexBuilderConfig{
BatchSize: 1000,
BatchSizeSmall: 100,
BatchSizeSmall: 50,
},
Edge: EdgeBuilderConfig{
LargeClusterOptimizations: true,
Expand Down
16 changes: 14 additions & 2 deletions pkg/config/janusgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,24 @@ import (
const (
DefaultJanusGraphUrl = "ws://localhost:8182/gremlin"

JanusGraphUrl = "janusgraph.url"
JanusGrapTimeout = "janusgraph.connection_timeout"
defaultJanusGraphWriterTimeout = 60 * time.Second
defaultJanusGraphWriterMaxRetry = 3
defaultJanusGraphWriterWorkerCount = 10

JanusGraphUrl = "janusgraph.url"
JanusGrapTimeout = "janusgraph.connection_timeout"
JanusGraphWriterTimeout = "janusgraph.writer_timeout"
JanusGraphWriterMaxRetry = "janusgraph.writer_max_retry"
JanusGraphWriterWorkerCount = "janusgraph.writer_worker_count"
)

// JanusGraphConfig configures JanusGraph specific parameters.
type JanusGraphConfig struct {
URL string `mapstructure:"url"` // JanusGraph specific configuration
ConnectionTimeout time.Duration `mapstructure:"connection_timeout"`

// JanusGraph vertex/edge writer configuration
WriterTimeout time.Duration `mapstructure:"writer_timeout"`
WriterMaxRetry int `mapstructure:"writer_max_retry"`
WriterWorkerCount int `mapstructure:"writer_worker_count"`
}
22 changes: 22 additions & 0 deletions pkg/kubehound/storage/graphdb/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package graphdb

import "fmt"

// batchWriterError is an error type that wraps an error and indicates whether the
// error is retryable.
type batchWriterError struct {
err error
retryable bool
}

func (e batchWriterError) Error() string {
if e.err == nil {
return fmt.Sprintf("batch writer error (retriable:%v)", e.retryable)
}

return fmt.Sprintf("batch writer error (retriable:%v): %v", e.retryable, e.err.Error())
}

func (e batchWriterError) Unwrap() error {
return e.err
}
Loading

0 comments on commit d8de160

Please sign in to comment.