Skip to content

Commit

Permalink
[chore][pkg/stanza] Ensure all start/stop calls in are balanced (open…
Browse files Browse the repository at this point in the history
…-telemetry#28294)

Follows
open-telemetry#28228

This normalizes calls to `Start` and `Stop` across the test suite. 

In some cases, `poll` is called directly in order to trigger behavior
independently of timing. However, we should _either_ use `poll`
directly, or use both `Start` and `Stop` exactly once. In the future, I
expect `poll` will be exported and tested directly as part of an
internal package.
  • Loading branch information
djaglowski authored Oct 24, 2023
1 parent 501ef10 commit cde5da6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 39 deletions.
25 changes: 6 additions & 19 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,8 @@ See this issue for details: https://github.com/census-instrumentation/opencensus
operator, _ := buildTestManager(t, cfg)

_ = openTemp(t, tempDir)
err := operator.Start(testutil.NewUnscopedMockPersister())
require.NoError(t, err)
defer func() {
require.NoError(t, operator.Stop())
}()
require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
require.NoError(t, operator.Stop())
}

// AddFields tests that the `log.file.name` and `log.file.path` fields are included
Expand Down Expand Up @@ -491,9 +488,6 @@ func TestReadNewLogs(t *testing.T) {

// Poll once so we know this isn't a new file
operator.poll(context.Background())
defer func() {
require.NoError(t, operator.Stop())
}()

// Create a new file
temp := openTemp(t, tempDir)
Expand Down Expand Up @@ -1583,28 +1577,25 @@ func TestHeaderPersistance(t *testing.T) {
writeString(t, temp, "#headerField: headerValue\nlog line\n")

persister := testutil.NewUnscopedMockPersister()
require.NoError(t, op1.Start(persister))

require.NoError(t, op1.Start(persister))
waitForTokenWithAttributes(t, emitCalls1, []byte("log line"), map[string]any{
"header_key": "headerField",
"header_value": "headerValue",
attrs.LogFileName: filepath.Base(temp.Name()),
})

require.NoError(t, op1.Stop())

writeString(t, temp, "log line 2\n")

op2, emitCalls2 := buildTestManager(t, cfg)

require.NoError(t, op2.Start(persister))

waitForTokenWithAttributes(t, emitCalls2, []byte("log line 2"), map[string]any{
"header_key": "headerField",
"header_value": "headerValue",
attrs.LogFileName: filepath.Base(temp.Name()),
})

require.NoError(t, op2.Stop())
}

Expand All @@ -1626,12 +1617,10 @@ func TestHeaderPersistanceInHeader(t *testing.T) {
writeString(t, temp, "|headerField1: headerValue1\n")

persister := testutil.NewUnscopedMockPersister()
require.NoError(t, op1.Start(persister))

// The operator will poll at fixed time intervals, but we just want to make sure at least
// one poll operation occurs between now and when we stop.
op1.poll(context.Background())

// Start and stop the operator, ensuring that at least one poll cycle occurs in between
require.NoError(t, op1.Start(persister))
time.Sleep(2 * cfg1.PollInterval)
require.NoError(t, op1.Stop())

writeString(t, temp, "|headerField2: headerValue2\nlog line\n")
Expand All @@ -1643,13 +1632,11 @@ func TestHeaderPersistanceInHeader(t *testing.T) {
op2, emitCalls := buildTestManager(t, cfg2)

require.NoError(t, op2.Start(persister))

waitForTokenWithAttributes(t, emitCalls, []byte("log line"), map[string]any{
"header_value_1": "headerValue1",
"header_value_2": "headerValue2",
attrs.LogFileName: filepath.Base(temp.Name()),
})

require.NoError(t, op2.Stop())
}

Expand Down
22 changes: 2 additions & 20 deletions pkg/stanza/fileconsumer/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,6 @@ func TestMoveFile(t *testing.T) {
temp1.Close()

operator.poll(context.Background())
defer func() {
require.NoError(t, operator.Stop())
}()

waitForToken(t, emitCalls, []byte("testlog1"))

// Wait until all goroutines are finished before renaming
Expand Down Expand Up @@ -397,10 +393,6 @@ func TestTrackMovedAwayFiles(t *testing.T) {
temp1.Close()

operator.poll(context.Background())
defer func() {
require.NoError(t, operator.Stop())
}()

waitForToken(t, emitCalls, []byte("testlog1"))

// Wait until all goroutines are finished before renaming
Expand Down Expand Up @@ -557,12 +549,7 @@ func TestTruncateThenWrite(t *testing.T) {
writeString(t, temp1, "testlog1\ntestlog2\n")

operator.poll(context.Background())
defer func() {
require.NoError(t, operator.Stop())
}()

waitForToken(t, emitCalls, []byte("testlog1"))
waitForToken(t, emitCalls, []byte("testlog2"))
waitForTokens(t, emitCalls, []byte("testlog1"), []byte("testlog2"))

require.NoError(t, temp1.Truncate(0))
_, err := temp1.Seek(0, 0)
Expand Down Expand Up @@ -594,12 +581,7 @@ func TestCopyTruncateWriteBoth(t *testing.T) {
writeString(t, temp1, "testlog1\ntestlog2\n")

operator.poll(context.Background())
defer func() {
require.NoError(t, operator.Stop())
}()

waitForToken(t, emitCalls, []byte("testlog1"))
waitForToken(t, emitCalls, []byte("testlog2"))
waitForTokens(t, emitCalls, []byte("testlog1"), []byte("testlog2"))
operator.wg.Wait() // wait for all goroutines to finish

// Copy the first file to a new file, and add another log
Expand Down

0 comments on commit cde5da6

Please sign in to comment.