Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*Added Managed and Queued streaming ingestion types #2

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions plugins/outputs/azure_data_explorer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ of logs, metrics and time series data.
## Creates tables and relevant mapping if set to true(default).
## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role.
# create_tables = true

## Ingestion method to use.
## Available options are
## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below
## - queued -- queue up metrics data and process sequentially
# ingestion_type = "queued"
```

## Metrics Grouping
Expand Down Expand Up @@ -93,6 +99,18 @@ The corresponding table mapping would be like the following:
**Note**: This plugin will automatically create Azure Data Explorer tables and
corresponding table mapping as per the above mentioned commands.

## Ingestion type

**Note**:
[Streaming ingestion](https://aka.ms/AAhlg6s)
has to be enabled on ADX [configure the ADX cluster]
in case of `managed` option.
Refer the query below to check if streaming is enabled

```kql
.show database <DB-Name> policy streamingingestion
```

## Authentiation

### Supported Authentication Methods
Expand Down
140 changes: 126 additions & 14 deletions plugins/outputs/azure_data_explorer/azure_data_explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"time"

"github.com/Azure/azure-kusto-go/kusto"
kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/azure-kusto-go/kusto/unsafe"
"github.com/Azure/go-autorest/autorest/azure/auth"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/json"
Expand All @@ -34,10 +36,17 @@ type AzureDataExplorer struct {
MetricsGrouping string `toml:"metrics_grouping_type"`
TableName string `toml:"table_name"`
CreateTables bool `toml:"create_tables"`
client localClient
ingesters map[string]localIngestor
serializer serializers.Serializer
createIngestor ingestorFactory
IngestionType string `toml:"ingestion_type"`
//Deprecated: client of type *kusto.Client, ingestors of type ingest.Ingestor introduced
client localClient
ingesters map[string]localIngestor
/***/
serializer serializers.Serializer
//Deprecated
createIngestor ingestorFactory
/***/
kustoClient *kusto.Client
metricIngestors map[string]ingest.Ingestor
}

const (
Expand All @@ -60,11 +69,14 @@ type ingestorFactory func(localClient, string, string) (localIngestor, error)

const createTableCommand = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
const createTableMappingCommand = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'`
const managedIngestion = "managed"
const queuedIngestion = "queued"

func (*AzureDataExplorer) SampleConfig() string {
return sampleConfig
}

// Initialize the client and the ingestor
func (adx *AzureDataExplorer) Connect() error {
authorizer, err := auth.NewAuthorizerFromEnvironmentWithResource(adx.Endpoint)
if err != nil {
Expand All @@ -78,18 +90,45 @@ func (adx *AzureDataExplorer) Connect() error {
if err != nil {
return err
}
adx.kustoClient = client
adx.metricIngestors = make(map[string]ingest.Ingestor)
//Depticated
adx.client = client
adx.ingesters = make(map[string]localIngestor)
adx.createIngestor = createRealIngestor
/***/

return nil
}

// Clean up and close the ingestor
func (adx *AzureDataExplorer) Close() error {
var errs []error
for _, v := range adx.metricIngestors {
if err := v.Close(); err != nil {
// accumulate errors while closing ingestors
errs = append(errs, err)
}
}
if err := adx.kustoClient.Close(); err != nil {
errs = append(errs, err)
}

adx.kustoClient = nil
adx.metricIngestors = nil

if len(errs) == 0 {
adx.Log.Info("Closed ingestors and client")
return nil
}

//Deprecated
adx.client = nil
adx.ingesters = nil
/***/

return nil
// Combine errors into a single object and return the combined error
return kustoerrors.GetCombinedError(errs...)
}

func (adx *AzureDataExplorer) Write(metrics []telegraf.Metric) error {
Expand Down Expand Up @@ -151,19 +190,58 @@ func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error
}

func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error {
ingestor, err := adx.getIngestor(ctx, tableName)
if err != nil {
return err
var ingestor localIngestor
var metricIngestor ingest.Ingestor
var err error
if adx.client != nil && adx.createIngestor != nil {
ingestor, err = adx.getIngestor(ctx, tableName)
if err != nil {
return err
}
} else {
metricIngestor, err = adx.getMetricIngestor(ctx, tableName)
if err != nil {
return err
}
}

length := len(metricsArray)
adx.Log.Debugf("Writing %s metrics to table %q", length, tableName)
reader := bytes.NewReader(metricsArray)
mapping := ingest.IngestionMappingRef(fmt.Sprintf("%s_mapping", tableName), ingest.JSON)
if _, err := ingestor.FromReader(ctx, reader, format, mapping); err != nil {
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
if ingestor != nil {
//Deprecated
if _, err := ingestor.FromReader(ctx, reader, format, mapping); err != nil {
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
}
} else if metricIngestor != nil {
if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil {
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
}
}
return nil
}

func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) {
ingestor := adx.metricIngestors[tableName]

if ingestor == nil {
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
return nil, fmt.Errorf("creating table for %q failed: %v", tableName, err)
}
//create a new ingestor client for the table
tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType)
if err != nil {
return nil, fmt.Errorf("creating ingestor for %q failed: %v", tableName, err)
}
adx.metricIngestors[tableName] = tempIngestor
adx.Log.Debugf("Ingestor for table %s created", tableName)
ingestor = tempIngestor
}
return ingestor, nil
}

// Deprecated: getMetricIngestor introduced to use inget.Ingestor instead of localIngestor
func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string) (localIngestor, error) {
ingestor := adx.ingesters[tableName]

Expand All @@ -182,19 +260,33 @@ func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string)
return ingestor, nil
}

/***/

func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error {
if !adx.CreateTables {
adx.Log.Info("skipped table creation")
return nil
}
createStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableCommand, tableName))
if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil {
return err
if adx.client != nil {
if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil {
return err
}
} else if adx.kustoClient != nil {
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createStmt); err != nil {
return err
}
}

createTableMappingstmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName))
if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
return err
if adx.client != nil {
if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
return err
}
} else if adx.kustoClient != nil {
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
return err
}
}

return nil
Expand All @@ -219,6 +311,12 @@ func (adx *AzureDataExplorer) Init() error {
return errors.New("Metrics grouping type is not valid")
}

if adx.IngestionType == "" {
adx.IngestionType = queuedIngestion
} else if !(choice.Contains(adx.IngestionType, []string{managedIngestion, queuedIngestion})) {
return fmt.Errorf("unknown ingestion type %q", adx.IngestionType)
}

serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano, "")
if err != nil {
return err
Expand All @@ -236,10 +334,24 @@ func init() {
})
}

// Deprecated: createIngestorByTable should be used with ingestionType and ingest.Ingestor
func createRealIngestor(client localClient, database string, tableName string) (localIngestor, error) {
ingestor, err := ingest.New(client.(*kusto.Client), database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers))
if ingestor != nil {
return ingestor, nil
}
return nil, err
}

// For each table create the ingestor
func createIngestorByTable(client *kusto.Client, database string, tableName string, ingestionType string) (ingest.Ingestor, error) {
switch strings.ToLower(ingestionType) {
case managedIngestion:
mi, err := ingest.NewManaged(client, database, tableName)
return mi, err
case queuedIngestion:
qi, err := ingest.New(client, database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers))
return qi, err
}
return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, managedIngestion, queuedIngestion)
}
Loading