Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EPH and network issues #214

Open
elgatito opened this issue May 19, 2021 · 6 comments
Open

EPH and network issues #214

elgatito opened this issue May 19, 2021 · 6 comments

Comments

@elgatito
Copy link

elgatito commented May 19, 2021

Expected Behavior

Hello,

We are using pretty default eph functionality, waiting for events, while context is a global context which ends with application close.
The problem is that after some time - application stops receiving events.
Currently there are only few events per day, so periods between them can be 10-15 hours.

I can't see any way to get proper error from eph to know that we need to restart it, for example.

Code:


	if err = eh.cl.blob.CheckContainer(ctx, eh.c.Storage.CheckpointContainer); err != nil {
		eh.error("check CheckpointContainer", err)
		return err
	}

	// create a leaser and checkpointer backed by a storage container
	leaserCheckpointer, err := eventhubsstorage.NewStorageLeaserCheckpointer(
		cred,
		eh.c.Storage.AccountName,
		eh.c.Storage.CheckpointContainer,
		azure.PublicCloud)
	if err != nil {
		eh.error("Create leaserCheckpointer", err)
		return err
	}

	// create Event Processor
	p, err := eph.New(
		ctx,
		eh.c.EventHub.Namespace,
		eh.c.EventHub.IncomingHub,
		eh.tp,
		leaserCheckpointer,
		leaserCheckpointer,
		eph.WithNoBanner(), eph.WithWebSocketConnection())
	if err != nil {
		eh.error("Create EPH", err)
		return err
	}

	_, err = p.RegisterHandler(ctx, func(ctx context.Context, event *eventhubs.Event) error {
		blobEvents := BlobEvents{}
		if err := json.Unmarshal(event.Data, &blobEvents); err != nil {
			eh.error("Unmarshal blob events", err)
			return err
		}

		for _, be := range blobEvents {
			eh.cl.debug("Received event: %s", litter.Sdump(be))
		}

		return nil
	})
	if err != nil {
		eh.error("Register EventHub handler", err)
		return err
	}

	err = p.StartNonBlocking(ctx)
	if err != nil {
		eh.error("Start EPH", err)
		return err
	}

	go func(c context.Context) {
		closer := eh.cl.closer.C()

		select {
		case <-closer:
		case <-c.Done():
		}

		eh.cl.info("Stopping EventHub")
		p.Close(c)
	}(ctx)


I have started it with tags=debug and DEBUG_LEVEL=3, will give more information.

Actual Behavior

With the flow of time eph stops receiving events.

Environment

  • OS: RHEL7
  • Go version: 1.16.4
  • Version of Library: 3.3.7
@elgatito
Copy link
Author

This is interesting, looks like token has expired and leaser now fails requesting the blob:


                                                                                      ===== RESPONSE ERROR (ServiceCode=AuthenticationFailed) =====
                                                                                      Description=Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature.
                                                                                      RequestId:***
                                                                                      Time:2021-05-20T05:50:48.4395751Z, Details:
                                                                                         AuthenticationErrorDetail: Lifetime validation failed. The token is expired.
                                                                                         Code: AuthenticationFailed
                                                                                         GET https://***.blob.core.windows.net/***/0?timeout=60
                                                                                         Authorization: REDACTED
                                                                                         User-Agent: [Azure-Storage/0.10 (go1.16.3; linux)]
                                                                                         X-Ms-Client-Request-Id: [***]
                                                                                         X-Ms-Version: [2019-02-02]
                                                                                         --------------------------------------------------------------------------------
                                                                                         RESPONSE Status: 403 Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature.
                                                                                         Content-Length: [424]
                                                                                         Content-Type: [application/xml]
                                                                                         Date: [Thu, 20 May 2021 05:50:47 GMT]
                                                                                         Server: [Microsoft-HTTPAPI/2.0]
                                                                                         X-Ms-Error-Code: [AuthenticationFailed]
                                                                                         X-Ms-Request-Id: [***]



                                                                                      goroutine 5770 [running]:
                                                                                      github.com/Azure/azure-storage-blob-go/azblob.stack(0xc0005e4570, 0xc00051aa00, 0xc00065e360)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/azblob/zc_policy_request_log.go:146 +0x9f
                                                                                      github.com/Azure/azure-storage-blob-go/azblob.NewRequestLogPolicyFactory.func1.1(0x1095fa8, 0xc001c86840, 0xc00051aa00, 0x2, 0x2, 0xc00067cf00, 0xc0004a51e0)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/azblob/zc_policy_request_log.go:96 +0x665
                                                                                      github.com/Azure/azure-pipeline-go/pipeline.PolicyFunc.Do(0xc0004e2820, 0x1095fa8, 0xc001c86840, 0xc00051aa00, 0xc0004a52b0, 0xc00067cf00, 0x4db, 0x0)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/pipeline/core.go:43 +0x44
                                                                                      github.com/Azure/azure-storage-blob-go/azblob.(*tokenCredential).New.func1(0x1095fa8, 0xc001c86840, 0xc00051aa00, 0x27232a0, 0x1095fa8, 0xc001c86840, 0xc000030fd0)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/azblob/zc_credential_token.go:135 +0x212
                                                                                      github.com/Azure/azure-pipeline-go/pipeline.PolicyFunc.Do(0xc001bd0520, 0x1095fa8, 0xc001c86840, 0xc00051aa00, 0xc001c86840, 0xc000030fd0, 0xdf844a819, 0x0)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/pipeline/core.go:43 +0x44
                                                                                      github.com/Azure/azure-storage-blob-go/azblob.NewRetryPolicyFactory.func1.1(0x1095f38, 0xc0000c1f80, 0xc00051a900, 0x10, 0xdca3a0, 0x64492d747301, 0xc0004a5080)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/azblob/zc_policy_retry.go:204 +0x69d
                                                                                      github.com/Azure/azure-pipeline-go/pipeline.PolicyFunc.Do(0xc0004e2870, 0x1095f38, 0xc0000c1f80, 0xc00051a900, 0xc0004a5138, 0x40db9b, 0xc000030eb0, 0x10)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/pipeline/core.go:43 +0x44
                                                                                      github.com/Azure/azure-storage-blob-go/azblob.NewUniqueRequestIDPolicyFactory.func1.1(0x1095f38, 0xc0000c1f80, 0xc00051a900, 0x10, 0xdca3a0, 0x40e201, 0xc0004a5080)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/azblob/zc_policy_unique_request_id.go:19 +0xae
                                                                                      github.com/Azure/azure-pipeline-go/pipeline.PolicyFunc.Do(0xc0005ee618, 0x1095f38, 0xc0000c1f80, 0xc00051a900, 0xc0004a5120, 0x24, 0xc00066ab10, 0xc00180b788)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/pipeline/core.go:43 +0x44
                                                                                      github.com/Azure/azure-storage-blob-go/azblob.NewTelemetryPolicyFactory.func1.1(0x1095f38, 0xc0000c1f80, 0xc00051a900, 0x1, 0x0, 0x1, 0xc0000bc0d0)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/azblob/zc_policy_telemetry.go:34 +0x162
                                                                                      github.com/Azure/azure-pipeline-go/pipeline.PolicyFunc.Do(0xc001f9e7e0, 0x1095f38, 0xc0000c1f80, 0xc00051a900, 0xc001f9e7e0, 0x0, 0xc00180b858, 0x40e2f8)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/pipeline/core.go:43 +0x44
                                                                                      github.com/Azure/azure-pipeline-go/pipeline.(*pipeline).Do(0xc000674600, 0x1095f38, 0xc0000c1f80, 0x1084240, 0xc0006784d0, 0xc00051a900, 0x28, 0xc001bc4120, 0x14, 0x0)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/pipeline/core.go:129 +0x82
                                                                                      github.com/Azure/azure-storage-blob-go/azblob.blobClient.Download(0xc00066aae0, 0x5, 0x0, 0x0, 0x0, 0xc00066aae8, 0x28, 0xc001bc4120, 0x14, 0x0, ...)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/azblob/zz_generated_blob.go:669 +0x405
                                                                                      github.com/Azure/azure-storage-blob-go/azblob.BlobURL.Download(0xc00066aae0, 0x5, 0x0, 0x0, 0x0, 0xc00066aae8, 0x28, 0xc001bc4120, 0x14, 0x0, ...)
                                                                                              /app/go/pkg/mod/github.com/!azure/[email protected]/azblob/url_blob.go:73 +0x1ea
                                                                                      github.com/Azure/azure-event-hubs-go/v3/storage.(*LeaserCheckpointer).getLease(0xc00033c280, 0x1095f38, 0xc0000c1f80, 0x15c9b00, 0x1, 0x0, 0x0, 0x0)
                                                                                              /app/go/pkg/mod/github.com/!azure/azure-event-hubs-go/[email protected]/storage/storage.go:631 +0x20a
                                                                                      github.com/Azure/azure-event-hubs-go/v3/storage.(*LeaserCheckpointer).GetLeases.func1(0xc00033c280, 0xc000030db0, 0xc00052ca20, 0x15c9b00, 0x1)
                                                                                              /app/go/pkg/mod/github.com/!azure/azure-event-hubs-go/[email protected]/storage/storage.go:213 +0x73
                                                                                      created by github.com/Azure/azure-event-hubs-go/v3/storage.(*LeaserCheckpointer).GetLeases
                                                                                              /app/go/pkg/mod/github.com/!azure/azure-event-hubs-go/[email protected]/storage/storage.go:212 +0x2a7

@elgatito
Copy link
Author

Okay, it looks that I've missed somehow the refresher variable in token constructor, now it is working.

I have now met another issue:

adal: Failed to execute the refresh request. Error = 'Post \"https://login.microsoftonline.com/****/oauth2/token?api-version=1.0\": EOF'

It is raised by this call: token.RefreshWithContext()

The code:

	token, err := getServicePrincipalToken(ctx)
	if err != nil {
		return nil, err
	}

	// Refresh OAuth2 token.
	if err := token.RefreshWithContext(ctx); err != nil {
		return nil, err
	}

And I see that there are no retry attempts for those things with tokens.

@catalinaperalta
Copy link
Member

Hi! Thanks for reaching out. I'm taking a look at the information you provided and will get back to you on where the issue may be.
Adding @jhendrixMSFT who might have more information.

@elgatito
Copy link
Author

I have tested work with refresher with a long idle periods and it is working fine.

So this issue if a feature request to add retries to token getters/refreshers, and if possible, add some method to get EPH status if something happened inside, when starting as non-blocking.

@ryepup
Copy link

ryepup commented Mar 29, 2022

I'm also interested in getting some health information from an eph to include in kubernetes health probes.

@kwontae
Copy link

kwontae commented May 23, 2023

Okay, it looks that I've missed somehow the refresher variable in token constructor, now it is working.

I have now met another issue:

adal: Failed to execute the refresh request. Error = 'Post \"https://login.microsoftonline.com/****/oauth2/token?api-version=1.0\": EOF'

It is raised by this call: token.RefreshWithContext()

The code:

	token, err := getServicePrincipalToken(ctx)
	if err != nil {
		return nil, err
	}

	// Refresh OAuth2 token.
	if err := token.RefreshWithContext(ctx); err != nil {
		return nil, err
	}

And I see that there are no retry attempts for those things with tokens.

can you elaborate on this?

We are also facing a similar issue with v3.5.0 where we are seeing a drop in number of events received after a day. We are using Sas token provider. I'm wondering if what we are experiencing is the related.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants