diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index 8fc3058..e2983ab 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -38,7 +38,7 @@ type ElasticsearchClient struct { // NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL // and kubearmor LogClient with endpoint. It has a retry mechanism for certain HTTP status codes and a backoff function for retry delays. // It then creates a new NewBulkIndexer with the esClient -func NewElasticsearchClient(esURL string) (*ElasticsearchClient, error) { +func NewElasticsearchClient(esURL string, esUser string, esPassword string) (*ElasticsearchClient, error) { retryBackoff := backoff.NewExponentialBackOff() cfg := elasticsearch.Config{ Addresses: []string{esURL}, @@ -56,6 +56,11 @@ func NewElasticsearchClient(esURL string) (*ElasticsearchClient, error) { MaxRetries: 5, } + if len(esUser) != 0 && len(esPassword) != 0 { + cfg.Username = esUser + cfg.Password = esPassword + } + esClient, err := elasticsearch.NewClient(cfg) if err != nil { return nil, fmt.Errorf("failed to create Elasticsearch client: %v", err) @@ -115,7 +120,7 @@ func (ecl *ElasticsearchClient) SendAlertToBuffer(alert *pb.Alert) { // and starting goroutines to consume messages from the alert channel and bulk index them. // The method starts a goroutine for each stream and waits for messages to be received. // Additional goroutines consume alert from the alert channel and bulk index them. -func (ecl *ElasticsearchClient) Start() error { +func (ecl *ElasticsearchClient) Start(AlertsIndex string) error { start = time.Now() ecl.ctx, ecl.cancel = context.WithCancel(context.Background()) var wg sync.WaitGroup @@ -126,7 +131,7 @@ func (ecl *ElasticsearchClient) Start() error { for { select { case alert := <-ecl.alertCh: - ecl.bulkIndex(alert, "alert") + ecl.bulkIndex(alert, AlertsIndex) case <-ecl.ctx.Done(): return } diff --git a/relay-server/main.go b/relay-server/main.go index bafad21..11a39f1 100644 --- a/relay-server/main.go +++ b/relay-server/main.go @@ -57,6 +57,12 @@ func main() { //get env enableEsDashboards := os.Getenv("ENABLE_DASHBOARDS") esUrl := os.Getenv("ES_URL") + esUser := os.Getenv("ES_USERNAME") + esPassword := os.Getenv("ES_PASSWORD") + esAlertsIndex := os.Getenv("ES_ALERTS_INDEX") + if esAlertsIndex == "" { + esAlertsIndex = "kubearmor-alerts" + } endPoint := os.Getenv("KUBEARMOR_SERVICE") if endPoint == "" { endPoint = "localhost:32767" @@ -84,13 +90,13 @@ func main() { // check and start an elasticsearch client if enableEsDashboards == "true" { - esCl, err := elasticsearch.NewElasticsearchClient(esUrl) + esCl, err := elasticsearch.NewElasticsearchClient(esUrl, esUser, esPassword) if err != nil { kg.Warnf("Failed to start a Elasticsearch Client") return } relayServer.ELKClient = esCl - go relayServer.ELKClient.Start() + go relayServer.ELKClient.Start(esAlertsIndex) defer relayServer.ELKClient.Stop() }