From c8d9a9b145d257c4287a6f8d6bcdaaca294a3759 Mon Sep 17 00:00:00 2001 From: Abhishek-saharn Date: Thu, 24 Oct 2024 19:24:55 +0530 Subject: [PATCH] event hub fixes --- .../microsoft_fabric/microsoft_fabric.go | 10 ++++++- .../microsoft_fabric/microsoft_fabric_eh.go | 28 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 plugins/outputs/microsoft_fabric/microsoft_fabric_eh.go diff --git a/plugins/outputs/microsoft_fabric/microsoft_fabric.go b/plugins/outputs/microsoft_fabric/microsoft_fabric.go index 632424596e5ee..25cd7d52a7df9 100644 --- a/plugins/outputs/microsoft_fabric/microsoft_fabric.go +++ b/plugins/outputs/microsoft_fabric/microsoft_fabric.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" ADX "github.com/influxdata/telegraf/plugins/outputs/azure_data_explorer" EH "github.com/influxdata/telegraf/plugins/outputs/event_hubs" + "github.com/influxdata/telegraf/plugins/serializers/json" ) //go:embed sample.conf @@ -54,8 +55,15 @@ func (m *MicrosoftFabric) Init() error { if strings.HasPrefix(ConnectionString, "Endpoint=sb") { m.Log.Info("Detected EventHouse endpoint, using EventHouse output plugin") + + //Need discussion on it + serializer := &json.Serializer{ + TimestampUnits: config.Duration(time.Nanosecond), + TimestampFormat: time.RFC3339Nano, + } m.EHConf.ConnectionString = ConnectionString m.EHConf.Log = m.Log + m.EHConf.SetSerializer(serializer) m.EHConf.Init() m.FabricSinkService = m.EHConf } else if isKustoEndpoint(strings.ToLower(ConnectionString)) { @@ -73,7 +81,6 @@ func (m *MicrosoftFabric) Init() error { func isKustoEndpoint(endpoint string) bool { prefixes := []string{ - "https://", "data source=", "addr=", "address=", @@ -98,6 +105,7 @@ func init() { CreateTables: true, }, EHConf: &EH.EventHubs{ + Hub: &eventHub{}, Timeout: config.Duration(30 * time.Second), }, } diff --git a/plugins/outputs/microsoft_fabric/microsoft_fabric_eh.go b/plugins/outputs/microsoft_fabric/microsoft_fabric_eh.go new file mode 100644 index 0000000000000..4ab461cd2a049 --- /dev/null +++ b/plugins/outputs/microsoft_fabric/microsoft_fabric_eh.go @@ -0,0 +1,28 @@ +package microsoft_fabric + +import ( + "context" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" +) + +type eventHub struct { + hub *eventhub.Hub +} + +func (e *eventHub) GetHub(connectionString string) error { + hub, err := eventhub.NewHubFromConnectionString(connectionString) + if err != nil { + return err + } + e.hub = hub + return nil +} + +func (e *eventHub) Close(ctx context.Context) error { + return e.hub.Close(ctx) +} + +func (e *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error { + return e.hub.SendBatch(ctx, iterator, opts...) +}