Skip to content

Commit

Permalink
chore(lint): reduce complexity.
Browse files Browse the repository at this point in the history
  • Loading branch information
Zenithar committed Nov 28, 2024
1 parent 1240ff0 commit 3b859a4
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 46 deletions.
51 changes: 28 additions & 23 deletions pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,30 +96,10 @@ func (jgv *JanusGraphEdgeWriter) startBackgroundWriter(ctx context.Context) {
err := jgv.batchWrite(ctx, batch.data)
if err != nil {
var e *errBatchWriter
if errors.As(err, &e) && e.retryable {
if errors.As(err, &e) {
// If the error is retryable, retry the write operation with a smaller batch.
if batch.retryCount < jgv.maxRetry {
_ = statsd.Count(ctx, metric.RetryWriterCall, 1, jgv.tags, 1)

// Compute the new batch size.
newBatchSize := len(batch.data) / 2
batch.retryCount++

log.Trace(ctx).Warnf("Retrying write operation with smaller edge batch (n:%d -> %d, r:%d): %v", len(batch.data), newBatchSize, batch.retryCount, e.Unwrap())

// Split the batch into smaller chunks and requeue them.
if len(batch.data[:newBatchSize]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[:newBatchSize],
retryCount: batch.retryCount,
}
}
if len(batch.data[newBatchSize:]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[newBatchSize:],
retryCount: batch.retryCount,
}
}
if e.retryable && batch.retryCount < jgv.maxRetry {
jgv.retrySplitAndRequeue(ctx, &batch, e)
continue

Check failure on line 103 in pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go

View workflow job for this annotation

GitHub Actions / linter

continue with no blank line before (nlreturn)
}

Expand All @@ -139,6 +119,31 @@ func (jgv *JanusGraphEdgeWriter) startBackgroundWriter(ctx context.Context) {
}()
}

// retrySplitAndRequeue will split the batch into smaller chunks and requeue them for writing.
func (jgv *JanusGraphEdgeWriter) retrySplitAndRequeue(ctx context.Context, batch *batchItem, e *errBatchWriter) {
_ = statsd.Count(ctx, metric.RetryWriterCall, 1, jgv.tags, 1)

// Compute the new batch size.
newBatchSize := len(batch.data) / 2
batch.retryCount++

log.Trace(ctx).Warnf("Retrying write operation with smaller edge batch (n:%d -> %d, r:%d): %v", len(batch.data), newBatchSize, batch.retryCount, e.Unwrap())

// Split the batch into smaller chunks and requeue them.
if len(batch.data[:newBatchSize]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[:newBatchSize],
retryCount: batch.retryCount,
}
}
if len(batch.data[newBatchSize:]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[newBatchSize:],
retryCount: batch.retryCount,
}
}
}

// batchWrite will write a batch of entries into the graph DB and block until the write completes.
// Callers are responsible for doing an Add(1) to the writingInFlight wait group to ensure proper synchronization.
func (jgv *JanusGraphEdgeWriter) batchWrite(ctx context.Context, data []any) error {
Expand Down
51 changes: 28 additions & 23 deletions pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,30 +107,10 @@ func (jgv *JanusGraphVertexWriter) startBackgroundWriter(ctx context.Context) {
err := jgv.batchWrite(ctx, batch.data)
if err != nil {
var e *errBatchWriter
if errors.As(err, &e) && e.retryable {
if errors.As(err, &e) {
// If the error is retryable, retry the write operation with a smaller batch.
if batch.retryCount < jgv.maxRetry {
_ = statsd.Count(ctx, metric.RetryWriterCall, 1, jgv.tags, 1)

// Compute the new batch size.
newBatchSize := len(batch.data) / 2
batch.retryCount++

log.Trace(ctx).Warnf("Retrying write operation with vertex smaller batch (n:%d -> %d, r:%d): %v", len(batch.data), newBatchSize, batch.retryCount, e.Unwrap())

// Split the batch into smaller chunks and requeue them.
if len(batch.data[:newBatchSize]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[:newBatchSize],
retryCount: batch.retryCount,
}
}
if len(batch.data[newBatchSize:]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[newBatchSize:],
retryCount: batch.retryCount,
}
}
if e.retryable && batch.retryCount < jgv.maxRetry {
jgv.retrySplitAndRequeue(ctx, &batch, e)
continue

Check failure on line 114 in pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go

View workflow job for this annotation

GitHub Actions / linter

continue with no blank line before (nlreturn)
}

Expand All @@ -150,6 +130,31 @@ func (jgv *JanusGraphVertexWriter) startBackgroundWriter(ctx context.Context) {
}()
}

// retrySplitAndRequeue will split the batch into smaller chunks and requeue them for writing.
func (jgv *JanusGraphVertexWriter) retrySplitAndRequeue(ctx context.Context, batch *batchItem, e *errBatchWriter) {
_ = statsd.Count(ctx, metric.RetryWriterCall, 1, jgv.tags, 1)

// Compute the new batch size.
newBatchSize := len(batch.data) / 2
batch.retryCount++

log.Trace(ctx).Warnf("Retrying write operation with smaller vertex batch (n:%d -> %d, r:%d): %v", len(batch.data), newBatchSize, batch.retryCount, e.Unwrap())

// Split the batch into smaller chunks and requeue them.
if len(batch.data[:newBatchSize]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[:newBatchSize],
retryCount: batch.retryCount,
}
}
if len(batch.data[newBatchSize:]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[newBatchSize:],
retryCount: batch.retryCount,
}
}
}

func (jgv *JanusGraphVertexWriter) cacheIds(ctx context.Context, idMap []*gremlin.Result) error {
for _, r := range idMap {
idMap, ok := r.GetInterface().(map[interface{}]interface{})
Expand Down

0 comments on commit 3b859a4

Please sign in to comment.