diff --git a/pkg/cmd/dump.go b/pkg/cmd/dump.go index dd9367936..a42ef7459 100644 --- a/pkg/cmd/dump.go +++ b/pkg/cmd/dump.go @@ -61,6 +61,6 @@ func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) { func InitLocalIngestCmd(cmd *cobra.Command) { cmd.PersistentFlags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)") - viper.BindPFlag(config.IngestorClusterName, cmd.Flags().Lookup("cluster")) //nolint: errcheck - cmd.MarkFlagRequired("cluster") //nolint: errcheck + viper.BindPFlag(config.IngestorClusterName, cmd.PersistentFlags().Lookup("cluster")) //nolint: errcheck + cmd.MarkFlagRequired("cluster") //nolint: errcheck } diff --git a/pkg/collector/collector.go b/pkg/collector/collector.go index a1d0989ab..ecf9bd370 100644 --- a/pkg/collector/collector.go +++ b/pkg/collector/collector.go @@ -3,6 +3,7 @@ package collector import ( "context" "fmt" + "time" "github.com/DataDog/KubeHound/pkg/config" "github.com/DataDog/KubeHound/pkg/globals/types" @@ -77,6 +78,11 @@ type EndpointIngestor interface { Complete(context.Context) error } +// MetadataIngestor defines the interface to allow an ingestor to computed metrics and metadata from a collector. +type MetadataIngestor interface { + DumpMetadata(context.Context, Metadata) error +} + //go:generate mockery --name CollectorClient --output mockcollector --case underscore --filename collector_client.go --with-expecter type CollectorClient interface { //nolint: interfacebloat services.Dependency @@ -84,8 +90,8 @@ type CollectorClient interface { //nolint: interfacebloat // ClusterInfo returns the target cluster information for the current run. ClusterInfo(ctx context.Context) (*config.ClusterInfo, error) - // Tags return the tags for the current run. - Tags(ctx context.Context) []string + // Compute the metrics and gather all the metadata and dump it through the ingestor.DumpMetadata + ComputeMetadata(ctx context.Context, ingestor MetadataIngestor) error // StreamNodes will iterate through all NodeType objects collected by the collector and invoke the ingestor.IngestNode method on each. // Once all the NodeType objects have been exhausted the ingestor.Complete method will be invoked to signal the end of the stream. @@ -142,8 +148,8 @@ type collectorTags struct { baseTags []string } -func newCollectorTags() *collectorTags { - return &collectorTags{ +func newCollectorTags() collectorTags { + return collectorTags{ pod: tag.GetBaseTagsWith(tag.Collector(FileCollectorName), tag.Entity(tag.EntityPods)), role: tag.GetBaseTagsWith(tag.Collector(FileCollectorName), tag.Entity(tag.EntityRoles)), rolebinding: tag.GetBaseTagsWith(tag.Collector(FileCollectorName), tag.Entity(tag.EntityRolebindings)), @@ -154,3 +160,16 @@ func newCollectorTags() *collectorTags { baseTags: tag.GetBaseTags(), } } + +type Metrics struct { + DumpTime time.Time `json:"dump_time"` + RunDuration time.Duration `json:"run_duration"` + TotalWaitTime time.Duration `json:"total_wait_time"` + ThrottlingPercentage float64 `json:"throttling_percentage"` +} + +type Metadata struct { + RunID string `json:"run_id"` + ClusterName string `json:"cluster"` + Metrics Metrics `json:"metrics"` +} diff --git a/pkg/collector/file.go b/pkg/collector/file.go index 56f4f541a..9ad853bb7 100644 --- a/pkg/collector/file.go +++ b/pkg/collector/file.go @@ -45,6 +45,7 @@ const ( PodPath = "pods.json" RolesPath = "roles.rbac.authorization.k8s.io.json" RoleBindingsPath = "rolebindings.rbac.authorization.k8s.io.json" + MetadataPath = "metadata.json" ) const ( @@ -55,7 +56,7 @@ const ( type FileCollector struct { cfg *config.FileCollectorConfig log *log.KubehoundLogger - tags *collectorTags + tags collectorTags } // NewFileCollector creates a new instance of the file collector from the provided application config. @@ -78,8 +79,8 @@ func NewFileCollector(ctx context.Context, cfg *config.KubehoundConfig) (Collect }, nil } -// TODO: remove this after all PR -func (c *FileCollector) Tags(ctx context.Context) []string { +// This function has no meaning in the file collector as it should already have all the metadata gathered in the dumped files. +func (c *FileCollector) ComputeMetadata(ctx context.Context, ingestor MetadataIngestor) error { return nil } diff --git a/pkg/collector/k8s_api.go b/pkg/collector/k8s_api.go index 7cf13fe0a..2a995d459 100644 --- a/pkg/collector/k8s_api.go +++ b/pkg/collector/k8s_api.go @@ -40,6 +40,8 @@ type k8sAPICollector struct { startTime time.Time mu *sync.Mutex isStreaming bool + clusterName string + runID string } const ( @@ -114,19 +116,36 @@ func NewK8sAPICollector(ctx context.Context, cfg *config.KubehoundConfig) (Colle } return &k8sAPICollector{ - cfg: cfg.Collector.Live, - clientset: clientset, - log: l, - rl: ratelimit.New(cfg.Collector.Live.RateLimitPerSecond), // per second - tags: *newCollectorTags(), - waitTime: map[string]time.Duration{}, - startTime: time.Now(), - mu: &sync.Mutex{}, + cfg: cfg.Collector.Live, + clientset: clientset, + log: l, + rl: ratelimit.New(cfg.Collector.Live.RateLimitPerSecond), // per second + tags: newCollectorTags(), + waitTime: map[string]time.Duration{}, + startTime: time.Now(), + mu: &sync.Mutex{}, + clusterName: clusterName, + runID: cfg.Dynamic.RunID.String(), }, nil } -// TODO: remove this after all PR -func (c *k8sAPICollector) Tags(ctx context.Context) []string { +func (c *k8sAPICollector) ComputeMetadata(ctx context.Context, ingestor MetadataIngestor) error { + metrics, err := c.computeMetrics(ctx) + if err != nil { + return fmt.Errorf("error computing metrics: %w", err) + } + + metadata := Metadata{ + ClusterName: c.clusterName, + RunID: c.runID, + Metrics: metrics, + } + + err = ingestor.DumpMetadata(ctx, metadata) + if err != nil { + return fmt.Errorf("ingesting metadata: %w", err) + } + return nil } @@ -186,7 +205,7 @@ func (c *k8sAPICollector) ClusterInfo(ctx context.Context) (*config.ClusterInfo, } // Generate metrics for k8sAPI collector -func (c *k8sAPICollector) computeMetrics(_ context.Context) error { +func (c *k8sAPICollector) computeMetrics(_ context.Context) (Metrics, error) { var errMetric error var runTotalWaitTime time.Duration for _, wait := range c.waitTime { @@ -213,16 +232,18 @@ func (c *k8sAPICollector) computeMetrics(_ context.Context) error { } c.log.Infof("Stats for the run time duration: %s / wait: %s / throttling: %f%%", runDuration, runTotalWaitTime, 100*runThrottlingPercentage) //nolint:gomnd - return errMetric + // SaveMetadata + metadata := Metrics{ + DumpTime: time.Now(), + RunDuration: runDuration, + TotalWaitTime: runTotalWaitTime, + ThrottlingPercentage: runThrottlingPercentage, + } + + return metadata, errMetric } func (c *k8sAPICollector) Close(ctx context.Context) error { - err := c.computeMetrics(ctx) - if err != nil { - // We don't want to return an error here as it is just metrics and won't affect the collection of data - c.log.Errorf("Error computing metrics: %s", err) - } - return nil } diff --git a/pkg/collector/mockcollector/collector_client.go b/pkg/collector/mockcollector/collector_client.go index 1ec959e21..8190b48ad 100644 --- a/pkg/collector/mockcollector/collector_client.go +++ b/pkg/collector/mockcollector/collector_client.go @@ -120,6 +120,49 @@ func (_c *CollectorClient_ClusterInfo_Call) RunAndReturn(run func(context.Contex return _c } +// ComputeMetadata provides a mock function with given fields: ctx, ingestor +func (_m *CollectorClient) ComputeMetadata(ctx context.Context, ingestor collector.MetadataIngestor) error { + ret := _m.Called(ctx, ingestor) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, collector.MetadataIngestor) error); ok { + r0 = rf(ctx, ingestor) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// CollectorClient_ComputeMetadata_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ComputeMetadata' +type CollectorClient_ComputeMetadata_Call struct { + *mock.Call +} + +// ComputeMetadata is a helper method to define mock.On call +// - ctx context.Context +// - ingestor collector.MetadataIngestor +func (_e *CollectorClient_Expecter) ComputeMetadata(ctx interface{}, ingestor interface{}) *CollectorClient_ComputeMetadata_Call { + return &CollectorClient_ComputeMetadata_Call{Call: _e.mock.On("ComputeMetadata", ctx, ingestor)} +} + +func (_c *CollectorClient_ComputeMetadata_Call) Run(run func(ctx context.Context, ingestor collector.MetadataIngestor)) *CollectorClient_ComputeMetadata_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(collector.MetadataIngestor)) + }) + return _c +} + +func (_c *CollectorClient_ComputeMetadata_Call) Return(_a0 error) *CollectorClient_ComputeMetadata_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *CollectorClient_ComputeMetadata_Call) RunAndReturn(run func(context.Context, collector.MetadataIngestor) error) *CollectorClient_ComputeMetadata_Call { + _c.Call.Return(run) + return _c +} + // HealthCheck provides a mock function with given fields: ctx func (_m *CollectorClient) HealthCheck(ctx context.Context) (bool, error) { ret := _m.Called(ctx) @@ -514,50 +557,6 @@ func (_c *CollectorClient_StreamRoles_Call) RunAndReturn(run func(context.Contex return _c } -// Tags provides a mock function with given fields: ctx -func (_m *CollectorClient) Tags(ctx context.Context) []string { - ret := _m.Called(ctx) - - var r0 []string - if rf, ok := ret.Get(0).(func(context.Context) []string); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]string) - } - } - - return r0 -} - -// CollectorClient_Tags_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Tags' -type CollectorClient_Tags_Call struct { - *mock.Call -} - -// Tags is a helper method to define mock.On call -// - ctx context.Context -func (_e *CollectorClient_Expecter) Tags(ctx interface{}) *CollectorClient_Tags_Call { - return &CollectorClient_Tags_Call{Call: _e.mock.On("Tags", ctx)} -} - -func (_c *CollectorClient_Tags_Call) Run(run func(ctx context.Context)) *CollectorClient_Tags_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *CollectorClient_Tags_Call) Return(_a0 []string) *CollectorClient_Tags_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *CollectorClient_Tags_Call) RunAndReturn(run func(context.Context) []string) *CollectorClient_Tags_Call { - _c.Call.Return(run) - return _c -} - type mockConstructorTestingTNewCollectorClient interface { mock.TestingT Cleanup(func()) diff --git a/pkg/dump/ingestor.go b/pkg/dump/ingestor.go index 6765c9086..78d9cd99a 100644 --- a/pkg/dump/ingestor.go +++ b/pkg/dump/ingestor.go @@ -2,7 +2,10 @@ package dump import ( "context" + "encoding/json" "fmt" + "os" + "path/filepath" "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" @@ -50,6 +53,22 @@ func getClusterName(ctx context.Context, collector collector.CollectorClient) (s return cluster.Name, nil } +func (d *DumpIngestor) Metadata() (collector.Metadata, error) { + path := filepath.Join(d.writer.OutputPath(), collector.MetadataPath) + data, err := os.ReadFile(path) + if err != nil { + return collector.Metadata{}, err + } + + md := collector.Metadata{} + err = json.Unmarshal(data, &md) + if err != nil { + return collector.Metadata{}, err + } + + return md, nil +} + func (d *DumpIngestor) OutputPath() string { return d.writer.OutputPath() } @@ -71,7 +90,7 @@ func (d *DumpIngestor) DumpK8sObjects(ctx context.Context) error { return fmt.Errorf("run pipeline ingestor: %w", err) } - return pipeline.Wait(ctx) + return pipeline.WaitAndClose(ctx) } // Close() is invoked by the collector to close all handlers used to dump k8s objects. diff --git a/pkg/dump/pipeline/metadata.go b/pkg/dump/pipeline/metadata.go new file mode 100644 index 000000000..10929995d --- /dev/null +++ b/pkg/dump/pipeline/metadata.go @@ -0,0 +1,28 @@ +package pipeline + +import ( + "context" + + "github.com/DataDog/KubeHound/pkg/collector" + "github.com/DataDog/KubeHound/pkg/dump/writer" +) + +type MetadataIngestor struct { + buffer map[string]collector.Metadata + writer writer.DumperWriter +} + +func NewMetadataIngestor(ctx context.Context, dumpWriter writer.DumperWriter) *MetadataIngestor { + return &MetadataIngestor{ + buffer: make(map[string]collector.Metadata), + writer: dumpWriter, + } +} + +func (d *MetadataIngestor) DumpMetadata(ctx context.Context, metadata collector.Metadata) error { + data := make(map[string]*collector.Metadata) + data[collector.MetadataPath] = &metadata + + return dumpObj[*collector.Metadata](ctx, data, d.writer) + +} diff --git a/pkg/dump/pipeline/pipeline.go b/pkg/dump/pipeline/pipeline.go index d42d83790..091df25dd 100644 --- a/pkg/dump/pipeline/pipeline.go +++ b/pkg/dump/pipeline/pipeline.go @@ -77,15 +77,30 @@ func dumpIngestorSequence(collector collector.CollectorClient, writer writer.Dum } } +// dumpIngestorClosingSequence returns the pipeline sequence for closing the dumping sequence (this pipeline is single-threaded) +func dumpIngestorClosingSequence(collector collector.CollectorClient, writer writer.DumperWriter) []DumpIngestorPipeline { + return []DumpIngestorPipeline{ + { + operationName: span.DumperMetadata, + entity: "Metadata", + streamFunc: func(ctx context.Context) error { + return collector.ComputeMetadata(ctx, NewMetadataIngestor(ctx, writer)) + }, + }, + } +} + // PipelineDumpIngestor is a parallelized pipeline based ingestor implementation. type PipelineDumpIngestor struct { - sequence []DumpIngestorPipeline - wp worker.WorkerPool - WorkerNumber int + sequence []DumpIngestorPipeline + closingSequence []DumpIngestorPipeline + wp worker.WorkerPool + WorkerNumber int } func NewPipelineDumpIngestor(ctx context.Context, collector collector.CollectorClient, writer writer.DumperWriter) (context.Context, *PipelineDumpIngestor, error) { sequence := dumpIngestorSequence(collector, writer) + cleanupSequence := dumpIngestorClosingSequence(collector, writer) // Getting the number of workers from the writer to setup multi-threading if possible workerNumber := writer.WorkerNumber() @@ -113,9 +128,10 @@ func NewPipelineDumpIngestor(ctx context.Context, collector collector.CollectorC } return ctx, &PipelineDumpIngestor{ - wp: wp, - sequence: sequence, - WorkerNumber: workerNumber, + wp: wp, + sequence: sequence, + closingSequence: cleanupSequence, + WorkerNumber: workerNumber, }, nil } @@ -125,7 +141,7 @@ func (p *PipelineDumpIngestor) Run(ctx context.Context) error { for _, v := range p.sequence { v := v p.wp.Submit(func() error { - _, errDump := dumpK8sObjs(ctx, v.operationName, v.entity, v.streamFunc) + errDump := dumpK8sObjs(ctx, v.operationName, v.entity, v.streamFunc) if errDump != nil { err = errors.Join(err, errDump) } @@ -137,12 +153,25 @@ func (p *PipelineDumpIngestor) Run(ctx context.Context) error { return err } -func (p *PipelineDumpIngestor) Wait(ctx context.Context) error { - return p.wp.WaitForComplete() +func (p *PipelineDumpIngestor) WaitAndClose(ctx context.Context) error { + err := p.wp.WaitForComplete() + if err != nil { + return fmt.Errorf("wait for complete: %w", err) + } + + for _, v := range p.closingSequence { + v := v + errDump := dumpK8sObjs(ctx, v.operationName, v.entity, v.streamFunc) + if errDump != nil { + err = errors.Join(err, errDump) + } + } + + return err } // Static wrapper to dump k8s object dynamically (streams Kubernetes objects to the collector writer). -func dumpK8sObjs(ctx context.Context, operationName string, entity string, streamFunc StreamFunc) (context.Context, error) { +func dumpK8sObjs(ctx context.Context, operationName string, entity string, streamFunc StreamFunc) error { log.I.Infof("Dumping %s", entity) span, ctx := tracer.StartSpanFromContext(ctx, operationName, tracer.Measured()) span.SetTag(tag.EntityTag, entity) @@ -151,5 +180,5 @@ func dumpK8sObjs(ctx context.Context, operationName string, entity string, strea err = streamFunc(ctx) log.I.Infof("Dumping %s done", entity) - return ctx, err + return err } diff --git a/pkg/dump/pipeline/pipeline_faker.go b/pkg/dump/pipeline/pipeline_faker.go index a39059fba..732de76f3 100644 --- a/pkg/dump/pipeline/pipeline_faker.go +++ b/pkg/dump/pipeline/pipeline_faker.go @@ -74,6 +74,8 @@ func PipelineLiveTest(ctx context.Context, t *testing.T, workerNum int) (*mockwr mDumpWriter.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() } + mDumpWriter.EXPECT().Write(mock.Anything, mock.Anything, collector.MetadataPath).Return(nil).Once() + return mDumpWriter, collectorClient } diff --git a/pkg/dump/pipeline/pipeline_test.go b/pkg/dump/pipeline/pipeline_test.go index 5708ec8c4..4a0fdb11e 100644 --- a/pkg/dump/pipeline/pipeline_test.go +++ b/pkg/dump/pipeline/pipeline_test.go @@ -29,6 +29,20 @@ func newFakeDumpIngestorPipeline(ctx context.Context, t *testing.T, mockCollecto } +func closingSequence(ctx context.Context, t *testing.T, mDumpWriter *mockwriter.DumperWriter, mCollectorClient *mockcollector.CollectorClient) (*mockwriter.DumperWriter, *mockcollector.CollectorClient) { + t.Helper() + + closingSequence := dumpIngestorClosingSequence(mCollectorClient, mDumpWriter) + for _, step := range closingSequence { + switch step.entity { //nolint: gocritic + case "Metadata": + mCollectorClient.EXPECT().ComputeMetadata(mock.Anything, NewMetadataIngestor(ctx, mDumpWriter)).Return(nil).Once() + } + } + + return mDumpWriter, mCollectorClient +} + func TestPipelineDumpIngestor_Run(t *testing.T) { t.Parallel() ctx := context.Background() @@ -65,6 +79,8 @@ func TestPipelineDumpIngestor_Run(t *testing.T) { } } + mDumpWriter, mCollectorClient = closingSequence(ctx, t, mDumpWriter, mCollectorClient) + return mDumpWriter, mCollectorClient } @@ -99,6 +115,8 @@ func TestPipelineDumpIngestor_Run(t *testing.T) { } } + mDumpWriter, mCollectorClient = closingSequence(ctx, t, mDumpWriter, mCollectorClient) + return mDumpWriter, mCollectorClient } @@ -151,7 +169,7 @@ func TestPipelineDumpIngestor_Run(t *testing.T) { t.Errorf("PipelineDumpIngestor.Run() error = %v, wantErr %v", err, tt.wantErr) } - if err := pipeline.Wait(ctx); (err != nil) != tt.wantErr { + if err := pipeline.WaitAndClose(ctx); (err != nil) != tt.wantErr { t.Errorf("PipelineDumpIngestor.Wait() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/pkg/dump/result.go b/pkg/dump/result.go index 3c0955983..c936cd9c8 100644 --- a/pkg/dump/result.go +++ b/pkg/dump/result.go @@ -1,16 +1,20 @@ package dump import ( + "context" + "encoding/json" "fmt" + "os" "path" "regexp" + + "github.com/DataDog/KubeHound/pkg/collector" ) type DumpResult struct { - clusterName string - RunID string - isDir bool - extension string + isDir bool + extension string + Metadata collector.Metadata } const ( @@ -18,17 +22,17 @@ const ( DumpResultRunIDRegex = `([a-z0-9]{26})` DumpResultExtensionRegex = `\.?([a-z0-9\.]+)?` DumpResultPrefix = "kubehound_" - DumpResultFilenameRegex = DumpResultPrefix + DumpResultClusterNameRegex + "_" + DumpResultRunIDRegex + DumpResultExtensionRegex - DumpResultPathRegex = DumpResultClusterNameRegex + "/" + DumpResultFilenameRegex DumpResultTarWriterExtension = "tar.gz" ) func NewDumpResult(clusterName, runID string, isCompressed bool) (*DumpResult, error) { dumpResult := &DumpResult{ - clusterName: clusterName, - RunID: runID, - isDir: true, + Metadata: collector.Metadata{ + ClusterName: clusterName, + RunID: runID, + }, + isDir: true, } if isCompressed { dumpResult.Compressed() @@ -44,18 +48,18 @@ func NewDumpResult(clusterName, runID string, isCompressed bool) (*DumpResult, e func (i *DumpResult) Validate() error { re := regexp.MustCompile(DumpResultClusterNameRegex) - if !re.MatchString(i.clusterName) { - return fmt.Errorf("Invalid clustername: %q", i.clusterName) + if !re.MatchString(i.Metadata.ClusterName) { + return fmt.Errorf("Invalid clustername: %q", i.Metadata.ClusterName) } - matches := re.FindStringSubmatch(i.clusterName) - if len(matches) == 2 && matches[1] != i.clusterName { - return fmt.Errorf("Invalid clustername: %q", i.clusterName) + matches := re.FindStringSubmatch(i.Metadata.ClusterName) + if len(matches) == 2 && matches[1] != i.Metadata.ClusterName { + return fmt.Errorf("Invalid clustername: %q", i.Metadata.ClusterName) } re = regexp.MustCompile(DumpResultRunIDRegex) - if !re.MatchString(i.RunID) { - return fmt.Errorf("Invalid runID: %q", i.RunID) + if !re.MatchString(i.Metadata.RunID) { + return fmt.Errorf("Invalid runID: %q", i.Metadata.RunID) } return nil @@ -70,11 +74,11 @@ func (i *DumpResult) Compressed() { func (i *DumpResult) GetFullPath() string { filename := i.GetFilename() - return path.Join(i.clusterName, filename) + return path.Join(i.Metadata.ClusterName, filename) } func (i *DumpResult) GetFilename() string { - filename := fmt.Sprintf("%s%s_%s", DumpResultPrefix, i.clusterName, i.RunID) + filename := fmt.Sprintf("%s%s_%s", DumpResultPrefix, i.Metadata.ClusterName, i.Metadata.RunID) if i.isDir { return filename } @@ -82,32 +86,22 @@ func (i *DumpResult) GetFilename() string { return fmt.Sprintf("%s.%s", filename, i.extension) } -func ParsePath(path string) (*DumpResult, error) { - // ./<clusterName>/kubehound_<clusterName>_<run_id>[.tar.gz] - // re := regexp.MustCompile(`([a-z0-9\.\-_]+)/kubehound_([a-z0-9\.-_]+)_([a-z0-9]{26})\.?([a-z0-9\.]+)?`) - re := regexp.MustCompile(DumpResultPathRegex) - if !re.MatchString(path) { - return nil, fmt.Errorf("Invalid path provided: %q", path) - } +func ParseMetadata(ctx context.Context, metadataFilePath string) (collector.Metadata, error) { + var metadata collector.Metadata - matches := re.FindStringSubmatch(path) - // The cluster name should match (parent dir and in the filename) - if matches[1] != matches[2] { - return nil, fmt.Errorf("Cluster name does not match in the path provided: %q", path) + bytes, err := os.ReadFile(metadataFilePath) + if err != nil { + return metadata, fmt.Errorf("read file %s: %w", metadataFilePath, err) } - clusterName := matches[1] - runID := matches[3] - extension := matches[4] - - isCompressed := false - if extension != "" { - isCompressed = true + if len(bytes) == 0 { + return metadata, nil } - result, err := NewDumpResult(clusterName, runID, isCompressed) + + err = json.Unmarshal(bytes, &metadata) if err != nil { - return nil, err + return metadata, fmt.Errorf("unmarshalling %T in %s: %w", metadata, metadataFilePath, err) } - return result, nil + return metadata, nil } diff --git a/pkg/dump/result_test.go b/pkg/dump/result_test.go index e084711d5..20e2009ed 100644 --- a/pkg/dump/result_test.go +++ b/pkg/dump/result_test.go @@ -5,6 +5,8 @@ import ( "path" "reflect" "testing" + + "github.com/DataDog/KubeHound/pkg/collector" ) const ( @@ -16,85 +18,6 @@ const ( nonValidRunID = "01j2qs8TH6yarr5hkafysekn0j" ) -func TestParsePath(t *testing.T) { - t.Parallel() - type args struct { - path string - } - tests := []struct { - name string - args args - want *DumpResult - wantErr bool - }{ - { - name: "valid path with no compression", - args: args{ - path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", - }, - want: &DumpResult{ - clusterName: validClusterName, - RunID: validRunID, - isDir: true, - extension: "", - }, - wantErr: false, - }, - { - name: "valid path with compressed data", - args: args{ - path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", - }, - want: &DumpResult{ - clusterName: validClusterName, - RunID: validRunID, - isDir: false, - extension: "tar.gz", - }, - wantErr: false, - }, - { - name: "invalid path", - args: args{ - path: "/tmp/cluster1.k8s.local/cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", - }, - want: nil, - wantErr: true, - }, - { - name: "not matching clustername ", - args: args{ - path: "/tmp/cluster1.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j", - }, - want: nil, - wantErr: true, - }, - { - name: "invalid runID", - args: args{ - path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8TH6yarr5hkafysekn0j", - }, - want: nil, - wantErr: true, - }, - } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - got, err := ParsePath(tt.args.path) - if (err != nil) != tt.wantErr { - t.Errorf("ParsePath() error = %v, wantErr %v", err, tt.wantErr) - - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("ParsePath() = %v, want %v", got, tt.want) - } - }) - } -} - func TestDumpResult_GetFilename(t *testing.T) { t.Parallel() @@ -135,10 +58,12 @@ func TestDumpResult_GetFilename(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() i := &DumpResult{ - clusterName: tt.fields.ClusterName, - RunID: tt.fields.RunID, - isDir: tt.fields.IsDir, - extension: tt.fields.Extension, + Metadata: collector.Metadata{ + ClusterName: tt.fields.ClusterName, + RunID: tt.fields.RunID, + }, + isDir: tt.fields.IsDir, + extension: tt.fields.Extension, } if got := i.GetFilename(); got != tt.want { t.Errorf("DumpResult.GetFilename() = %v, want %v", got, tt.want) @@ -187,10 +112,12 @@ func TestDumpResult_GetFullPath(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() i := &DumpResult{ - clusterName: tt.fields.ClusterName, - RunID: tt.fields.RunID, - isDir: tt.fields.IsDir, - extension: tt.fields.Extension, + Metadata: collector.Metadata{ + ClusterName: tt.fields.ClusterName, + RunID: tt.fields.RunID, + }, + isDir: tt.fields.IsDir, + extension: tt.fields.Extension, } if got := i.GetFullPath(); got != tt.want { t.Errorf("DumpResult.GetFullPath() = %v, want %v", got, tt.want) @@ -221,9 +148,11 @@ func TestNewDumpResult(t *testing.T) { isCompressed: false, }, want: &DumpResult{ - clusterName: validClusterName, - RunID: validRunID, - isDir: true, + Metadata: collector.Metadata{ + ClusterName: validClusterName, + RunID: validRunID, + }, + isDir: true, }, wantErr: false, }, diff --git a/pkg/dump/writer/mockwriter/writer.go b/pkg/dump/writer/mockwriter/writer.go index 08f9836e9..951369f5a 100644 --- a/pkg/dump/writer/mockwriter/writer.go +++ b/pkg/dump/writer/mockwriter/writer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.43.0. DO NOT EDIT. package mocks @@ -25,6 +25,10 @@ func (_m *DumperWriter) EXPECT() *DumperWriter_Expecter { func (_m *DumperWriter) Close(_a0 context.Context) error { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for Close") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(_a0) @@ -67,6 +71,10 @@ func (_c *DumperWriter_Close_Call) RunAndReturn(run func(context.Context) error) func (_m *DumperWriter) Flush(_a0 context.Context) error { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for Flush") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(_a0) @@ -109,6 +117,10 @@ func (_c *DumperWriter_Flush_Call) RunAndReturn(run func(context.Context) error) func (_m *DumperWriter) OutputPath() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for OutputPath") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -150,6 +162,10 @@ func (_c *DumperWriter_OutputPath_Call) RunAndReturn(run func() string) *DumperW func (_m *DumperWriter) WorkerNumber() int { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for WorkerNumber") + } + var r0 int if rf, ok := ret.Get(0).(func() int); ok { r0 = rf() @@ -191,6 +207,10 @@ func (_c *DumperWriter_WorkerNumber_Call) RunAndReturn(run func() int) *DumperWr func (_m *DumperWriter) Write(_a0 context.Context, _a1 []byte, _a2 string) error { ret := _m.Called(_a0, _a1, _a2) + if len(ret) == 0 { + panic("no return value specified for Write") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []byte, string) error); ok { r0 = rf(_a0, _a1, _a2) @@ -231,13 +251,58 @@ func (_c *DumperWriter_Write_Call) RunAndReturn(run func(context.Context, []byte return _c } -type mockConstructorTestingTNewDumperWriter interface { - mock.TestingT - Cleanup(func()) +// WriteMetadata provides a mock function with given fields: _a0 +func (_m *DumperWriter) WriteMetadata(_a0 context.Context) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for WriteMetadata") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DumperWriter_WriteMetadata_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteMetadata' +type DumperWriter_WriteMetadata_Call struct { + *mock.Call +} + +// WriteMetadata is a helper method to define mock.On call +// - _a0 context.Context +func (_e *DumperWriter_Expecter) WriteMetadata(_a0 interface{}) *DumperWriter_WriteMetadata_Call { + return &DumperWriter_WriteMetadata_Call{Call: _e.mock.On("WriteMetadata", _a0)} +} + +func (_c *DumperWriter_WriteMetadata_Call) Run(run func(_a0 context.Context)) *DumperWriter_WriteMetadata_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *DumperWriter_WriteMetadata_Call) Return(_a0 error) *DumperWriter_WriteMetadata_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DumperWriter_WriteMetadata_Call) RunAndReturn(run func(context.Context) error) *DumperWriter_WriteMetadata_Call { + _c.Call.Return(run) + return _c } // NewDumperWriter creates a new instance of DumperWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewDumperWriter(t mockConstructorTestingTNewDumperWriter) *DumperWriter { +// The first argument is typically a *testing.T value. +func NewDumperWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *DumperWriter { mock := &DumperWriter{} mock.Mock.Test(t) diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index d59a180a3..2207648cf 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -30,7 +30,7 @@ import ( ) type API interface { - Ingest(ctx context.Context, clusterName string, runID string) error + Ingest(ctx context.Context, path string) error Notify(ctx context.Context, clusterName string, runID string) error } @@ -83,22 +83,14 @@ func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedClus latestDumpIngestTime := latestDump.ModTime latestDumpKey := latestDump.Key - dumpResult, err := dump.ParsePath(latestDumpKey) - if err != nil { - errRet = errors.Join(errRet, fmt.Errorf("parsing dump path %s: %w", latestDumpKey, err)) - - continue - } - runID := dumpResult.RunID - - clusterErr := g.Ingest(ctx, clusterName, runID) + clusterErr := g.Ingest(ctx, latestDumpKey) if clusterErr != nil { - errRet = errors.Join(errRet, fmt.Errorf("ingesting cluster %s/%s: %w", clusterName, runID, clusterErr)) + errRet = errors.Join(errRet, fmt.Errorf("ingesting cluster %s: %w", latestDumpKey, clusterErr)) } - log.I.Infof("Rehydrated cluster: %s, date: %s, run_id: %s", clusterName, latestDumpIngestTime.Format("01-02-2006 15:04:05"), runID) + log.I.Infof("Rehydrated cluster: %s, date: %s, key: %s", clusterName, latestDumpIngestTime.Format("01-02-2006 15:04:05"), latestDumpKey) ingestedCluster := &grpc.IngestedCluster{ ClusterName: clusterName, - RunId: runID, + Key: latestDumpKey, Date: timestamppb.New(latestDumpIngestTime), } res = append(res, ingestedCluster) @@ -109,33 +101,11 @@ func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedClus return res, errRet } -func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string) error { - events.PushEvent( - fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), - fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), - []string{ - tag.IngestionRunID(runID), - }, - ) +func (g *IngestorAPI) Ingest(_ context.Context, path string) error { // Settings global variables for the run in the context to propagate them to the spans runCtx := context.Background() - runCtx = context.WithValue(runCtx, span.ContextLogFieldClusterName, clusterName) - runCtx = context.WithValue(runCtx, span.ContextLogFieldRunID, runID) - - spanJob, runCtx := span.SpanIngestRunFromContext(runCtx, span.IngestorStartJob) - var err error - defer func() { spanJob.Finish(tracer.WithError(err)) }() - alreadyIngested, err := g.isAlreadyIngestedInGraph(runCtx, clusterName, runID) //nolint: contextcheck - if err != nil { - return err - } - - if alreadyIngested { - return fmt.Errorf("%w [%s:%s]", ErrAlreadyIngested, clusterName, runID) - } - - archivePath, err := g.puller.Pull(runCtx, clusterName, runID) //nolint: contextcheck + archivePath, err := g.puller.Pull(runCtx, path) //nolint: contextcheck if err != nil { return err } @@ -149,6 +119,14 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string return err } + metadataFilePath := filepath.Join(filepath.Dir(archivePath), collector.MetadataPath) + md, err := dump.ParseMetadata(runCtx, metadataFilePath) //nolint: contextcheck + if err != nil { + return err + } + clusterName := md.ClusterName + runID := md.RunID + runCfg := g.Cfg runCfg.Collector = config.CollectorConfig{ Type: config.CollectorTypeFile, @@ -158,6 +136,29 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string }, } + events.PushEvent( + fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), + fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), + []string{ + tag.IngestionRunID(runID), + }, + ) + + runCtx = context.WithValue(runCtx, span.ContextLogFieldClusterName, clusterName) + runCtx = context.WithValue(runCtx, span.ContextLogFieldRunID, runID) + + spanJob, runCtx := span.SpanIngestRunFromContext(runCtx, span.IngestorStartJob) + defer func() { spanJob.Finish(tracer.WithError(err)) }() + + alreadyIngested, err := g.isAlreadyIngestedInGraph(runCtx, clusterName, runID) //nolint: contextcheck + if err != nil { + return err + } + + if alreadyIngested { + return fmt.Errorf("%w [%s:%s]", ErrAlreadyIngested, clusterName, runID) + } + // We need to flush the cache to prevent warnings/errors when overwriting elements in cache from the previous ingestion // This avoid conflicts from previous ingestion (there is no need to reuse the cache from a previous ingestion) log.I.Info("Preparing cache provider") diff --git a/pkg/ingestor/api/api_test.go b/pkg/ingestor/api/api_test.go index b127bc7ae..c14f97225 100644 --- a/pkg/ingestor/api/api_test.go +++ b/pkg/ingestor/api/api_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/DataDog/KubeHound/pkg/config" + "github.com/DataDog/KubeHound/pkg/dump" mocksNotifier "github.com/DataDog/KubeHound/pkg/ingestor/notifier/mocks" mocksPuller "github.com/DataDog/KubeHound/pkg/ingestor/puller/mocks" "github.com/DataDog/KubeHound/pkg/kubehound/providers" @@ -150,7 +151,17 @@ func TestIngestorAPI_Ingest(t *testing.T) { g := NewIngestorAPI(tt.fields.cfg, mockedPuller, mockedNotifier, mockedProvider) noPreviousScan(mt, g) tt.mock(mockedPuller, mockedNotifier, mockedCache, mockedStoreDB, mockedGraphDB) - if err := g.Ingest(context.TODO(), tt.args.clusterName, tt.args.runID); (err != nil) != tt.wantErr { + + // Construct dump result path + dumpResult, err := dump.NewDumpResult(tt.args.clusterName, tt.args.runID, true) + if err != nil { + t.Errorf("dump.NewDumpResult() error = %v, wantErr %v", err, tt.wantErr) + + return + } + dumpResultPath := dumpResult.GetFullPath() + + if err := g.Ingest(context.TODO(), dumpResultPath); (err != nil) != tt.wantErr { t.Errorf("IngestorAPI.Ingest() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/pkg/ingestor/api/grpc/grpc.go b/pkg/ingestor/api/grpc/grpc.go index 0584a983c..caed4317e 100644 --- a/pkg/ingestor/api/grpc/grpc.go +++ b/pkg/ingestor/api/grpc/grpc.go @@ -4,6 +4,7 @@ import ( "context" "net" + "github.com/DataDog/KubeHound/pkg/dump" "github.com/DataDog/KubeHound/pkg/ingestor/api" pb "github.com/DataDog/KubeHound/pkg/ingestor/api/grpc/pb" "github.com/DataDog/KubeHound/pkg/telemetry/log" @@ -32,7 +33,14 @@ type server struct { // Ingest is just a GRPC wrapper around the Ingest method from the API package func (s *server) Ingest(ctx context.Context, in *pb.IngestRequest) (*pb.IngestResponse, error) { - err := s.api.Ingest(ctx, in.GetClusterName(), in.GetRunId()) + // Rebuilding the path for the dump archive file + dumpResult, err := dump.NewDumpResult(in.GetClusterName(), in.GetRunId(), true) + if err != nil { + return nil, err + } + key := dumpResult.GetFullPath() + + err = s.api.Ingest(ctx, key) if err != nil { log.I.Errorf("Ingest failed: %v", err) diff --git a/pkg/ingestor/api/grpc/pb/api.pb.go b/pkg/ingestor/api/grpc/pb/api.pb.go index 446a1174d..e6443197a 100644 --- a/pkg/ingestor/api/grpc/pb/api.pb.go +++ b/pkg/ingestor/api/grpc/pb/api.pb.go @@ -158,7 +158,7 @@ type IngestedCluster struct { unknownFields protoimpl.UnknownFields ClusterName string `protobuf:"bytes,1,opt,name=cluster_name,json=clusterName,proto3" json:"cluster_name,omitempty"` - RunId string `protobuf:"bytes,2,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"` + Key string `protobuf:"bytes,2,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"` Date *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=date,proto3" json:"date,omitempty"` } @@ -203,7 +203,7 @@ func (x *IngestedCluster) GetClusterName() string { func (x *IngestedCluster) GetRunId() string { if x != nil { - return x.RunId + return x.Key } return "" } diff --git a/pkg/ingestor/api/mocks/mock_api.go b/pkg/ingestor/api/mocks/mock_api.go new file mode 100644 index 000000000..7f5715935 --- /dev/null +++ b/pkg/ingestor/api/mocks/mock_api.go @@ -0,0 +1,179 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// API is an autogenerated mock type for the API type +type API struct { + mock.Mock +} + +type API_Expecter struct { + mock *mock.Mock +} + +func (_m *API) EXPECT() *API_Expecter { + return &API_Expecter{mock: &_m.Mock} +} + +// Ingest provides a mock function with given fields: ctx, clusterName, runID +func (_m *API) Ingest(ctx context.Context, clusterName string, runID string) error { + ret := _m.Called(ctx, clusterName, runID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, clusterName, runID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// API_Ingest_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ingest' +type API_Ingest_Call struct { + *mock.Call +} + +// Ingest is a helper method to define mock.On call +// - ctx context.Context +// - clusterName string +// - runID string +func (_e *API_Expecter) Ingest(ctx interface{}, clusterName interface{}, runID interface{}) *API_Ingest_Call { + return &API_Ingest_Call{Call: _e.mock.On("Ingest", ctx, clusterName, runID)} +} + +func (_c *API_Ingest_Call) Run(run func(ctx context.Context, clusterName string, runID string)) *API_Ingest_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *API_Ingest_Call) Return(_a0 error) *API_Ingest_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *API_Ingest_Call) RunAndReturn(run func(context.Context, string, string) error) *API_Ingest_Call { + _c.Call.Return(run) + return _c +} + +// Notify provides a mock function with given fields: ctx, clusterName, runID +func (_m *API) Notify(ctx context.Context, clusterName string, runID string) error { + ret := _m.Called(ctx, clusterName, runID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, clusterName, runID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// API_Notify_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Notify' +type API_Notify_Call struct { + *mock.Call +} + +// Notify is a helper method to define mock.On call +// - ctx context.Context +// - clusterName string +// - runID string +func (_e *API_Expecter) Notify(ctx interface{}, clusterName interface{}, runID interface{}) *API_Notify_Call { + return &API_Notify_Call{Call: _e.mock.On("Notify", ctx, clusterName, runID)} +} + +func (_c *API_Notify_Call) Run(run func(ctx context.Context, clusterName string, runID string)) *API_Notify_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *API_Notify_Call) Return(_a0 error) *API_Notify_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *API_Notify_Call) RunAndReturn(run func(context.Context, string, string) error) *API_Notify_Call { + _c.Call.Return(run) + return _c +} + +// isAlreadyIngestedInGraph provides a mock function with given fields: ctx, clusterName, runID +func (_m *API) isAlreadyIngestedInGraph(ctx context.Context, clusterName string, runID string) (bool, error) { + ret := _m.Called(ctx, clusterName, runID) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (bool, error)); ok { + return rf(ctx, clusterName, runID) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) bool); ok { + r0 = rf(ctx, clusterName, runID) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, clusterName, runID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// API_isAlreadyIngestedInGraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'isAlreadyIngestedInGraph' +type API_isAlreadyIngestedInGraph_Call struct { + *mock.Call +} + +// isAlreadyIngestedInGraph is a helper method to define mock.On call +// - ctx context.Context +// - clusterName string +// - runID string +func (_e *API_Expecter) isAlreadyIngestedInGraph(ctx interface{}, clusterName interface{}, runID interface{}) *API_isAlreadyIngestedInGraph_Call { + return &API_isAlreadyIngestedInGraph_Call{Call: _e.mock.On("isAlreadyIngestedInGraph", ctx, clusterName, runID)} +} + +func (_c *API_isAlreadyIngestedInGraph_Call) Run(run func(ctx context.Context, clusterName string, runID string)) *API_isAlreadyIngestedInGraph_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *API_isAlreadyIngestedInGraph_Call) Return(_a0 bool, _a1 error) *API_isAlreadyIngestedInGraph_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *API_isAlreadyIngestedInGraph_Call) RunAndReturn(run func(context.Context, string, string) (bool, error)) *API_isAlreadyIngestedInGraph_Call { + _c.Call.Return(run) + return _c +} + +type mockConstructorTestingTNewAPI interface { + mock.TestingT + Cleanup(func()) +} + +// NewAPI creates a new instance of API. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewAPI(t mockConstructorTestingTNewAPI) *API { + mock := &API{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index f87a3c9c5..5a11cd1c6 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -172,17 +172,12 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName } // Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive -func (bs *BlobStore) Pull(outer context.Context, clusterName string, runID string) (string, error) { - log.I.Infof("Pulling data from blob store bucket %s, %s, %s", bs.bucketName, clusterName, runID) +func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) { + log.I.Infof("Pulling data from blob store bucket %s, %s", bs.bucketName, key) spanPull, ctx := span.SpanIngestRunFromContext(outer, span.IngestorBlobPull) var err error defer func() { spanPull.Finish(tracer.WithError(err)) }() - dumpResult, err := dump.NewDumpResult(clusterName, runID, true) - if err != nil { - return "", err - } - key := dumpResult.GetFullPath() log.I.Infof("Opening bucket: %s", bs.bucketName) b, err := bs.openBucket(ctx) if err != nil { diff --git a/pkg/ingestor/puller/blob/blob_test.go b/pkg/ingestor/puller/blob/blob_test.go index c41e170ec..6e1305dba 100644 --- a/pkg/ingestor/puller/blob/blob_test.go +++ b/pkg/ingestor/puller/blob/blob_test.go @@ -289,7 +289,14 @@ func TestBlobStore_Pull(t *testing.T) { cfg: tt.fields.cfg, region: tt.fields.region, } - got, err := bs.Pull(ctx, tt.args.clusterName, tt.args.runID) + dumpResult, err := dump.NewDumpResult(tt.args.clusterName, tt.args.runID, true) + if err != nil { + t.Errorf("dump.NewDumpResult() error = %v, wantErr %v", err, tt.wantErr) + + return + } + key := dumpResult.GetFullPath() + got, err := bs.Pull(ctx, key) if (err != nil) != tt.wantErr { t.Errorf("BlobStore.Pull() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/ingestor/puller/mocks/mock_puller.go b/pkg/ingestor/puller/mocks/mock_puller.go index 6599ffa29..8692bbaa1 100644 --- a/pkg/ingestor/puller/mocks/mock_puller.go +++ b/pkg/ingestor/puller/mocks/mock_puller.go @@ -164,23 +164,23 @@ func (_c *DataPuller_ListFiles_Call) RunAndReturn(run func(context.Context, stri return _c } -// Pull provides a mock function with given fields: ctx, clusterName, runID -func (_m *DataPuller) Pull(ctx context.Context, clusterName string, runID string) (string, error) { - ret := _m.Called(ctx, clusterName, runID) +// Pull provides a mock function with given fields: ctx, path +func (_m *DataPuller) Pull(ctx context.Context, path string) (string, error) { + ret := _m.Called(ctx, path) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) (string, error)); ok { - return rf(ctx, clusterName, runID) + if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { + return rf(ctx, path) } - if rf, ok := ret.Get(0).(func(context.Context, string, string) string); ok { - r0 = rf(ctx, clusterName, runID) + if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { + r0 = rf(ctx, path) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, clusterName, runID) + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, path) } else { r1 = ret.Error(1) } @@ -195,15 +195,14 @@ type DataPuller_Pull_Call struct { // Pull is a helper method to define mock.On call // - ctx context.Context -// - clusterName string -// - runID string -func (_e *DataPuller_Expecter) Pull(ctx interface{}, clusterName interface{}, runID interface{}) *DataPuller_Pull_Call { - return &DataPuller_Pull_Call{Call: _e.mock.On("Pull", ctx, clusterName, runID)} +// - path string +func (_e *DataPuller_Expecter) Pull(ctx interface{}, path interface{}) *DataPuller_Pull_Call { + return &DataPuller_Pull_Call{Call: _e.mock.On("Pull", ctx, path)} } -func (_c *DataPuller_Pull_Call) Run(run func(ctx context.Context, clusterName string, runID string)) *DataPuller_Pull_Call { +func (_c *DataPuller_Pull_Call) Run(run func(ctx context.Context, path string)) *DataPuller_Pull_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) + run(args[0].(context.Context), args[1].(string)) }) return _c } @@ -213,7 +212,7 @@ func (_c *DataPuller_Pull_Call) Return(_a0 string, _a1 error) *DataPuller_Pull_C return _c } -func (_c *DataPuller_Pull_Call) RunAndReturn(run func(context.Context, string, string) (string, error)) *DataPuller_Pull_Call { +func (_c *DataPuller_Pull_Call) RunAndReturn(run func(context.Context, string) (string, error)) *DataPuller_Pull_Call { _c.Call.Return(run) return _c } diff --git a/pkg/ingestor/puller/puller.go b/pkg/ingestor/puller/puller.go index f95216b9a..7f30073c1 100644 --- a/pkg/ingestor/puller/puller.go +++ b/pkg/ingestor/puller/puller.go @@ -17,7 +17,7 @@ import ( //go:generate mockery --name DataPuller --output mocks --case underscore --filename mock_puller.go --with-expecter type DataPuller interface { - Pull(ctx context.Context, clusterName string, runID string) (string, error) + Pull(ctx context.Context, path string) (string, error) Extract(ctx context.Context, archivePath string) error Close(ctx context.Context, basePath string) error ListFiles(ctx context.Context, prefix string, recursive bool) ([]*ListObject, error) diff --git a/pkg/kubehound/core/core_grpc_client.go b/pkg/kubehound/core/core_grpc_client.go index 89f3f2da3..1156de033 100644 --- a/pkg/kubehound/core/core_grpc_client.go +++ b/pkg/kubehound/core/core_grpc_client.go @@ -69,7 +69,7 @@ func CoreClientGRPCRehydrateLatest(ingestorConfig config.IngestorConfig) error { } for _, res := range results.IngestedCluster { - log.I.Infof("Rehydrated cluster: %s, date: %s, run_id: %s", res.ClusterName, res.Date.AsTime().Format("01-02-2006 15:04:05"), res.RunId) + log.I.Infof("Rehydrated cluster: %s, date: %s, run_id: %s", res.ClusterName, res.Date.AsTime().Format("01-02-2006 15:04:05"), res.Key) } return nil diff --git a/pkg/kubehound/core/core_ingest_local.go b/pkg/kubehound/core/core_ingest_local.go index 5f69982ea..f2165ebb7 100644 --- a/pkg/kubehound/core/core_ingest_local.go +++ b/pkg/kubehound/core/core_ingest_local.go @@ -4,16 +4,18 @@ import ( "context" "fmt" "os" + "path/filepath" + "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" + "github.com/DataDog/KubeHound/pkg/dump" "github.com/DataDog/KubeHound/pkg/ingestor/puller" ) func CoreLocalIngest(ctx context.Context, khCfg *config.KubehoundConfig, resultPath string) error { // Using the collector config to ingest the data khCfg.Collector.Type = config.CollectorTypeFile - // Applying the clusterName from the dynamic config (from CLI or env var) to the collector config - khCfg.Collector.File.ClusterName = khCfg.Dynamic.ClusterName + // Treating by default as data not compressed (directory of the results) khCfg.Collector.File.Directory = resultPath @@ -22,7 +24,7 @@ func CoreLocalIngest(ctx context.Context, khCfg *config.KubehoundConfig, resultP if err != nil { return err } - + metadataFilePath := filepath.Join(resultPath, collector.MetadataPath) if compress { tmpDir, err := os.MkdirTemp("/tmp/", "kh-local-ingest-*") if err != nil { @@ -35,7 +37,14 @@ func CoreLocalIngest(ctx context.Context, khCfg *config.KubehoundConfig, resultP if err != nil { return err } + metadataFilePath = filepath.Join(tmpDir, collector.MetadataPath) + } + // Getting the metadata from the metadata file + md, err := dump.ParseMetadata(ctx, metadataFilePath) + if err != nil { + return err } + khCfg.Collector.File.ClusterName = md.ClusterName return CoreLive(ctx, khCfg) } diff --git a/pkg/telemetry/span/spans.go b/pkg/telemetry/span/spans.go index 538654e7c..25725da82 100644 --- a/pkg/telemetry/span/spans.go +++ b/pkg/telemetry/span/spans.go @@ -48,6 +48,7 @@ const ( DumperClusterRoles = "kubehound.dumper.clusterroles" DumperRoleBindings = "kubehound.dumper.rolebindings" DumperClusterRoleBindings = "kubehound.dumper.clusterrolebindings" + DumperMetadata = "kubehound.dumper.metadata" DumperReadFile = "kubehound.dumper.readFile" DumperS3Push = "kubehound.dumper.s3_push"