Skip to content

Commit

Permalink
Move graph deletion to datapipe (#517)
Browse files Browse the repository at this point in the history
* chore: move graph deletion logic to datapipe

* fix: go generate

* fix: broken sql

* chore: better logging and datapipe status for purge

* chore: fix tests

* chore: delete bad comment

---------

Co-authored-by: Alyx Holms <[email protected]>
  • Loading branch information
rvazarkar and superlinkx authored Mar 22, 2024
1 parent 05c6832 commit e774e08
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 93 deletions.
44 changes: 3 additions & 41 deletions cmd/api/src/api/v2/database_wipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"strings"

"github.com/gofrs/uuid"
"github.com/specterops/bloodhound/dawgs/graph"
"github.com/specterops/bloodhound/dawgs/ops"
"github.com/specterops/bloodhound/log"
"github.com/specterops/bloodhound/src/api"
"github.com/specterops/bloodhound/src/model"
Expand Down Expand Up @@ -97,11 +95,8 @@ func (s Resources) HandleDatabaseWipe(response http.ResponseWriter, request *htt

// delete graph
if payload.DeleteCollectedGraphData {
if failed := s.deleteCollectedGraphData(request.Context(), auditEntry); failed {
errors = append(errors, "collected graph data")
} else {
kickoffAnalysis = true
}
s.TaskNotifier.RequestDeletion()
s.handleAuditLogForDatabaseWipe(request.Context(), auditEntry, true, "collected graph data")
}

// delete asset group selectors
Expand Down Expand Up @@ -146,39 +141,6 @@ func (s Resources) HandleDatabaseWipe(response http.ResponseWriter, request *htt

}

func (s Resources) deleteCollectedGraphData(ctx context.Context, auditEntry *model.AuditEntry) (failure bool) {
var nodeIDs []graph.ID

if err := s.Graph.ReadTransaction(ctx,
func(tx graph.Transaction) error {
fetchedNodeIDs, err := ops.FetchNodeIDs(tx.Nodes())

nodeIDs = append(nodeIDs, fetchedNodeIDs...)
return err
},
); err != nil {
log.Errorf("%s: %s", "error fetching all nodes", err.Error())
s.handleAuditLogForDatabaseWipe(ctx, auditEntry, false, "collected graph data")
return true
} else if err := s.Graph.BatchOperation(ctx, func(batch graph.Batch) error {
for _, nodeId := range nodeIDs {
// deleting a node also deletes all of its edges due to a sql trigger
if err := batch.DeleteNode(nodeId); err != nil {
return err
}
}
return nil
}); err != nil {
log.Errorf("%s: %s", "error deleting all nodes", err.Error())
s.handleAuditLogForDatabaseWipe(ctx, auditEntry, false, "collected graph data")
return true
} else {
// if successful, handle audit log and kick off analysis
s.handleAuditLogForDatabaseWipe(ctx, auditEntry, true, "collected graph data")
return false
}
}

func (s Resources) deleteHighValueSelectors(ctx context.Context, auditEntry *model.AuditEntry, assetGroupIDs []int) (failure bool) {

if err := s.DB.DeleteAssetGroupSelectorsForAssetGroups(ctx, assetGroupIDs); err != nil {
Expand Down Expand Up @@ -218,7 +180,7 @@ func (s Resources) handleAuditLogForDatabaseWipe(ctx context.Context, auditEntry
if success {
auditEntry.Status = model.AuditLogStatusSuccess
auditEntry.Model = model.AuditData{
"delete_successful": msg,
"delete_request_successful": msg,
}
} else {
auditEntry.Status = model.AuditLogStatusFailure
Expand Down
53 changes: 6 additions & 47 deletions cmd/api/src/api/v2/database_wipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,57 +80,17 @@ func TestDatabaseWipe(t *testing.T) {
},
},
{
Name: "failed fetching nodes during attempt to delete collected graph data",
Name: "deletion of collected graph data kicks off tasker",
Input: func(input *apitest.Input) {
apitest.SetHeader(input, headers.ContentType.String(), mediatypes.ApplicationJson.String())
apitest.BodyStruct(input, v2.DatabaseWipe{DeleteCollectedGraphData: true})
},
Setup: func() {
taskerIntent := mockTasker.EXPECT().RequestDeletion().Times(1)
successfulAuditLogIntent := mockDB.EXPECT().AppendAuditLog(gomock.Any(), gomock.Any()).Return(nil).Times(1)
failedFetchNodesToDelete := mockGraph.EXPECT().ReadTransaction(gomock.Any(), gomock.Any()).Return(errors.New("oopsy!")).Times(1)
successfulAuditLogFailure := mockDB.EXPECT().AppendAuditLog(gomock.Any(), gomock.Any()).Return(nil).Times(1)

gomock.InOrder(successfulAuditLogIntent, failedFetchNodesToDelete, successfulAuditLogFailure)
},
Test: func(output apitest.Output) {
apitest.StatusCode(output, http.StatusInternalServerError)
apitest.BodyContains(output, "We encountered an error while deleting collected graph data")
},
},
{
Name: "failed batch operation to delete nodes during attempt to delete collected graph data",
Input: func(input *apitest.Input) {
apitest.SetHeader(input, headers.ContentType.String(), mediatypes.ApplicationJson.String())
apitest.BodyStruct(input, v2.DatabaseWipe{DeleteCollectedGraphData: true})
},
Setup: func() {
successfulAuditLogIntent := mockDB.EXPECT().AppendAuditLog(gomock.Any(), gomock.Any()).Return(nil).Times(1)
successfulFetchNodesToDelete := mockGraph.EXPECT().ReadTransaction(gomock.Any(), gomock.Any()).Return(nil).Times(1)
failedBatchDelete := mockGraph.EXPECT().BatchOperation(gomock.Any(), gomock.Any()).Return(errors.New("oopsy!")).Times(1)
successfulAuditLogFailure := mockDB.EXPECT().AppendAuditLog(gomock.Any(), gomock.Any()).Return(nil).Times(1)

gomock.InOrder(successfulAuditLogIntent, successfulFetchNodesToDelete, failedBatchDelete, successfulAuditLogFailure)

},
Test: func(output apitest.Output) {
apitest.StatusCode(output, http.StatusInternalServerError)
apitest.BodyContains(output, "We encountered an error while deleting collected graph data")
},
},
{
Name: "succesful deletion of collected graph data kicks of analysis",
Input: func(input *apitest.Input) {
apitest.SetHeader(input, headers.ContentType.String(), mediatypes.ApplicationJson.String())
apitest.BodyStruct(input, v2.DatabaseWipe{DeleteCollectedGraphData: true})
},
Setup: func() {
successfulAuditLogIntent := mockDB.EXPECT().AppendAuditLog(gomock.Any(), gomock.Any()).Return(nil).Times(1)
successfulFetchNodesToDelete := mockGraph.EXPECT().ReadTransaction(gomock.Any(), gomock.Any()).Return(nil).Times(1)
successfulBatchDelete := mockGraph.EXPECT().BatchOperation(gomock.Any(), gomock.Any()).Return(nil).Times(1)
successfulAuditLogSuccess := mockDB.EXPECT().AppendAuditLog(gomock.Any(), gomock.Any()).Return(nil).Times(1)
sucsessfulAnalysisKickoff := mockTasker.EXPECT().RequestAnalysis().Times(1)
successfulAuditLogWipe := mockDB.EXPECT().AppendAuditLog(gomock.Any(), gomock.Any()).Return(nil).Times(1)

gomock.InOrder(successfulAuditLogIntent, successfulFetchNodesToDelete, successfulBatchDelete, successfulAuditLogSuccess, sucsessfulAnalysisKickoff)
gomock.InOrder(successfulAuditLogIntent, taskerIntent, successfulAuditLogWipe)

},
Test: func(output apitest.Output) {
Expand Down Expand Up @@ -287,10 +247,9 @@ func TestDatabaseWipe(t *testing.T) {
successfulAuditLogIntent := mockDB.EXPECT().AppendAuditLog(gomock.Any(), gomock.Any()).Return(nil).Times(1)

// collected graph data operations
fetchNodesToDelete := mockGraph.EXPECT().ReadTransaction(gomock.Any(), gomock.Any()).Return(nil).Times(1)
batchDelete := mockGraph.EXPECT().BatchOperation(gomock.Any(), gomock.Any()).Return(nil).Times(1)
taskerIntent := mockTasker.EXPECT().RequestDeletion().Times(1)
nodesDeletedAuditLog := mockDB.EXPECT().AppendAuditLog(gomock.Any(), gomock.Any()).Return(nil).Times(1)
gomock.InOrder(successfulAuditLogIntent, fetchNodesToDelete, batchDelete, nodesDeletedAuditLog)
gomock.InOrder(successfulAuditLogIntent, taskerIntent, nodesDeletedAuditLog)

// high value selector operations
assetGroupSelectorsDelete := mockDB.EXPECT().DeleteAssetGroupSelectorsForAssetGroups(gomock.Any(), gomock.Any()).Return(nil).Times(1)
Expand Down
43 changes: 43 additions & 0 deletions cmd/api/src/daemons/datapipe/datapipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (

type Tasker interface {
RequestAnalysis()
RequestDeletion()
GetStatus() model.DatapipeStatusWrapper
}

Expand All @@ -49,6 +50,7 @@ type Daemon struct {
cache cache.Cache
cfg config.Configuration
analysisRequested *atomic.Bool
deletionRequested *atomic.Bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
Expand All @@ -66,6 +68,7 @@ func NewDaemon(ctx context.Context, cfg config.Configuration, connections bootst
cache: cache,
cfg: cfg,
ctx: ctx,
deletionRequested: &atomic.Bool{},
analysisRequested: &atomic.Bool{},
orphanedFileSweeper: NewOrphanFileSweeper(NewOSFileOperations(), cfg.TempDirectory()),
tickInterval: tickInterval,
Expand All @@ -77,9 +80,18 @@ func NewDaemon(ctx context.Context, cfg config.Configuration, connections bootst
}

func (s *Daemon) RequestAnalysis() {
if s.getDeletionRequested() {
log.Warnf("Rejecting analysis request as deletion is in progress")
return
}
s.setAnalysisRequested(true)
}

func (s *Daemon) RequestDeletion() {
s.setAnalysisRequested(false)
s.setDeletionRequested(true)
}

func (s *Daemon) GetStatus() model.DatapipeStatusWrapper {
return s.status
}
Expand All @@ -92,6 +104,14 @@ func (s *Daemon) setAnalysisRequested(requested bool) {
s.analysisRequested.Store(requested)
}

func (s *Daemon) setDeletionRequested(requested bool) {
s.deletionRequested.Store(requested)
}

func (s *Daemon) getDeletionRequested() bool {
return s.deletionRequested.Load()
}

func (s *Daemon) analyze() {
// Ensure that the user-requested analysis switch is flipped back to false. This is done at the beginning of the
// function so that any re-analysis requests are caught while analysis is in-progress.
Expand Down Expand Up @@ -158,6 +178,10 @@ func (s *Daemon) Start(ctx context.Context) {
s.clearOrphanedData()

case <-datapipeLoopTimer.C:
if s.getDeletionRequested() {
s.deleteData()
}

// Ingest all available ingest tasks
s.ingestAvailableTasks()

Expand All @@ -182,6 +206,25 @@ func (s *Daemon) Start(ctx context.Context) {
}
}

func (s *Daemon) deleteData() {
defer func() {
s.status.Update(model.DatapipeStatusIdle, false)
s.setDeletionRequested(false)
s.setAnalysisRequested(true)
}()
defer log.Measure(log.LevelInfo, "Purge Graph Data Completed")()
s.status.Update(model.DatapipeStatusPurging, false)
log.Infof("Begin Purge Graph Data")

if err := s.db.CancelAllFileUploads(s.ctx); err != nil {
log.Errorf("Error cancelling jobs during data deletion: %v", err)
} else if err := s.db.DeleteAllIngestTasks(s.ctx); err != nil {
log.Errorf("Error deleting ingest tasks during data deletion: %v", err)
} else if err := DeleteCollectedGraphData(s.ctx, s.graphdb); err != nil {
log.Errorf("Error deleting graph data: %v", err)
}
}

func (s *Daemon) Stop(ctx context.Context) error {
return nil
}
Expand Down
35 changes: 35 additions & 0 deletions cmd/api/src/daemons/datapipe/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package datapipe

import (
"context"
"fmt"
"github.com/specterops/bloodhound/dawgs/graph"
"github.com/specterops/bloodhound/dawgs/ops"
)

func DeleteCollectedGraphData(ctx context.Context, graphDB graph.Database) error {
var nodeIDs []graph.ID

if err := graphDB.ReadTransaction(ctx,
func(tx graph.Transaction) error {
fetchedNodeIDs, err := ops.FetchNodeIDs(tx.Nodes())

nodeIDs = append(nodeIDs, fetchedNodeIDs...)
return err
},
); err != nil {
return fmt.Errorf("error fetching all nodes: %w", err)
} else if err := graphDB.BatchOperation(ctx, func(batch graph.Batch) error {
for _, nodeId := range nodeIDs {
// deleting a node also deletes all of its edges due to a sql trigger
if err := batch.DeleteNode(nodeId); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("error deleting all nodes: %w", err)
} else {
return nil
}
}
12 changes: 12 additions & 0 deletions cmd/api/src/daemons/datapipe/mocks/tasker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions cmd/api/src/database/file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (s *BloodhoundDB) GetFileUploadJobsWithStatus(ctx context.Context, status m
return jobs, CheckError(result)
}

func (s *BloodhoundDB) CancelAllFileUploads(ctx context.Context) error {
runningStates := []model.JobStatus{model.JobStatusAnalyzing, model.JobStatusRunning, model.JobStatusIngesting}
return CheckError(s.db.Model(model.FileUploadJob{}).WithContext(ctx).Where("status in ?", runningStates).Update("status", model.JobStatusCanceled))
}

func (s *BloodhoundDB) GetAllFileUploadJobs(ctx context.Context, skip int, limit int, order string, filter model.SQLFilter) ([]model.FileUploadJob, int, error) {
var (
jobs []model.FileUploadJob
Expand Down Expand Up @@ -91,3 +96,7 @@ func (s *BloodhoundDB) DeleteAllFileUploads(ctx context.Context) error {
s.db.WithContext(ctx).Exec("DELETE FROM file_upload_jobs"),
)
}

func (s *BloodhoundDB) DeleteAllIngestTasks(ctx context.Context) error {
return CheckError(s.db.WithContext(ctx).Exec("DELETE FROM ingest_tasks"))
}
28 changes: 28 additions & 0 deletions cmd/api/src/database/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions cmd/api/src/model/datapipe_status.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright 2023 Specter Ops, Inc.
//
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// SPDX-License-Identifier: Apache-2.0

package model
Expand All @@ -24,6 +24,7 @@ const (
DatapipeStatusIdle DatapipeStatus = "idle"
DatapipeStatusIngesting DatapipeStatus = "ingesting"
DatapipeStatusAnalyzing DatapipeStatus = "analyzing"
DatapipeStatusPurging DatapipeStatus = "purging"
)

type DatapipeStatusWrapper struct {
Expand Down
Loading

0 comments on commit e774e08

Please sign in to comment.