From 1010c95637f34391e80bd2754080e2e0db4a8207 Mon Sep 17 00:00:00 2001 From: Kevin Burke Date: Mon, 26 Feb 2024 14:14:50 -0800 Subject: [PATCH] all: replace segmentio/events with segmentio/log The latter uses the new log/slog standard in Go. While we are at it replace deprecated uses of io/ioutil. --- go.mod | 3 +- go.sum | 20 +-------- ldb_reader.go | 12 ++--- ldb_rotating_reader.go | 17 +++---- pkg/changelog/changelog_writer.go | 4 +- pkg/cmd/ctlstore-cli/cmd/utils.go | 4 +- pkg/cmd/ctlstore-cli/cmd/writer_limits.go | 4 +- pkg/cmd/ctlstore-mutator/main.go | 10 ++--- pkg/cmd/ctlstore/main.go | 52 +++++++++++++--------- pkg/event/changelog.go | 28 ++++++------ pkg/event/changelog_test.go | 1 - pkg/event/entry.go | 3 +- pkg/event/fake_log_writer.go | 8 ++-- pkg/executive/db_executive.go | 34 +++++++------- pkg/executive/db_limiter.go | 14 +++--- pkg/executive/db_limiter_test.go | 3 +- pkg/executive/executive.go | 2 +- pkg/executive/executive_endpoint.go | 18 ++++---- pkg/executive/executive_service.go | 16 +++---- pkg/executive/table_sizer.go | 14 +++--- pkg/executive/test_executive.go | 11 +++-- pkg/globalstats/stats.go | 4 +- pkg/heartbeat/heartbeat.go | 20 ++++----- pkg/ldb/ldbs.go | 5 +-- pkg/ldbwriter/changelog_callback.go | 8 ++-- pkg/ldbwriter/ldb_callback_writer.go | 4 +- pkg/ldbwriter/ldb_writer.go | 30 +++++++------ pkg/ldbwriter/ldb_writer_with_changelog.go | 10 ++--- pkg/ledger/ledger_monitor.go | 18 ++++---- pkg/ledger/ledger_monitor_test.go | 1 - pkg/reflector/download.go | 4 +- pkg/reflector/reflector.go | 46 ++++++++++--------- pkg/reflector/reflector_ctl.go | 8 ++-- pkg/reflector/reflector_test.go | 22 ++++----- pkg/reflector/shovel.go | 19 ++++---- pkg/reflector/wal_monitor.go | 16 +++---- pkg/supervisor/archived_snapshot.go | 22 ++++----- pkg/supervisor/gzip_pipe_test.go | 12 ++--- pkg/supervisor/s3_snapshot_test.go | 4 +- pkg/supervisor/supervisor.go | 18 ++++---- pkg/supervisor/supervisor_test.go | 5 +-- pkg/tests/tests.go | 3 +- pkg/unsafe/unsafe_test.go | 1 + pkg/utils/interface_slice.go | 9 ++-- pkg/version/version_go1_12.go | 1 + tools.go | 1 + 46 files changed, 281 insertions(+), 288 deletions(-) diff --git a/go.mod b/go.mod index 2b14b3cc..76a26aa1 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/segmentio/ctlstore -go 1.20 +go 1.21 require ( github.com/AlekSi/pointer v1.0.0 @@ -21,6 +21,7 @@ require ( github.com/segmentio/errors-go v1.0.0 github.com/segmentio/events/v2 v2.3.2 github.com/segmentio/go-sqlite3 v1.14.22-segment + github.com/segmentio/log v0.7.0 github.com/segmentio/stats/v4 v4.6.2 github.com/stretchr/testify v1.8.1 golang.org/x/sync v0.3.0 diff --git a/go.sum b/go.sum index 79f335cb..d4cf2dff 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/AlekSi/pointer v1.0.0 h1:KWCWzsvFxNLcmM5XmiqHsGTTsuwZMsLFwWF9Y+//bNE= github.com/AlekSi/pointer v1.0.0/go.mod h1:1kjywbfcPFCmncIxtk6fIEub6LKrfMz3gc5QKVOSOA8= -github.com/PuerkitoBio/goquery v1.8.1/go.mod h1:Q8ICL1kNUJ2sXGoAhPGUdYDJvgQgHzJsnnd3H7Ho5jQ= -github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA= github.com/aws/aws-sdk-go v1.37.8 h1:9kywcbuz6vQuTf+FD+U7FshafrHzmqUCjgAEiLuIJ8U= github.com/aws/aws-sdk-go v1.37.8/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go-v2 v1.21.0 h1:gMT0IW+03wtYJhRqTVYn0wLzwdnK9sRMcxmtfGzRdJc= @@ -113,10 +111,10 @@ github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e h1:uO75wNGioszj github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M= github.com/segmentio/go-snakecase v1.1.0 h1:ZJO4SNKKV0MjGOv0LHnixxN5FYv1JKBnVXEuBpwcbQI= github.com/segmentio/go-snakecase v1.1.0/go.mod h1:jk1miR5MS7Na32PZUykG89Arm+1BUSYhuGR6b7+hJto= -github.com/segmentio/go-sqlite3 v1.12.0 h1:NG3Hdja6V/dDz1uYCapBJCobL+nQLwn6b8Z28DLOU0s= -github.com/segmentio/go-sqlite3 v1.12.0/go.mod h1:ARXycbQZSoCAgThy5syFIL2aXbrKF3tE1DEHfOkxh1g= github.com/segmentio/go-sqlite3 v1.14.22-segment h1:CMV8jocJ3GqK5ALeFatw5lXANJpusyGAbLMmRWGb61I= github.com/segmentio/go-sqlite3 v1.14.22-segment/go.mod h1:XD2URsGK8aqqwao9zj/8f/OuOEiFWP45GguwGM906mc= +github.com/segmentio/log v0.7.0 h1:wPGvUjPi4icXYfy7sk57MvN6X13S/T/sCT39wFN4EfU= +github.com/segmentio/log v0.7.0/go.mod h1:lrlHKGKcvVm4/d7hL6rfuPipZqL20ifZL5+zFtUxx3s= github.com/segmentio/objconv v1.0.1 h1:QjfLzwriJj40JibCV3MGSEiAoXixbp4ybhwfTB8RXOM= github.com/segmentio/objconv v1.0.1/go.mod h1:auayaH5k3137Cl4SoXTgrzQcuQDmvuVtZgS0fb1Ahys= github.com/segmentio/stats/v4 v4.6.2 h1:++YfKPTOPTZxE1DvavnpeBvB3hlDIm7IM+ULFzbCxCU= @@ -131,11 +129,9 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -148,16 +144,12 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -171,22 +163,14 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/ldb_reader.go b/ldb_reader.go index ef70e411..7df9ec1e 100644 --- a/ldb_reader.go +++ b/ldb_reader.go @@ -12,7 +12,7 @@ import ( "time" "github.com/segmentio/errors-go" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" "github.com/segmentio/ctlstore/pkg/errs" @@ -600,7 +600,7 @@ func (reader *LDBReader) watchForLDBs(ctx context.Context, dirPath string, last case <-ticker.C: fsLast, err := lookupLastLDBSync(dirPath) if err != nil { - events.Log("failed checking for last LDB sync: %{error}+v", err) + log.EventLog("failed checking for last LDB sync: %{error}+v", err) errs.Incr("check-last-ldb-sync") continue } @@ -609,12 +609,12 @@ func (reader *LDBReader) watchForLDBs(ctx context.Context, dirPath string, last if fsLast <= last { continue } - events.Log("found new LDB (%d > %d), switching...", fsLast, last) + log.EventLog("found new LDB (%d > %d), switching...", fsLast, last) last = fsLast err = reader.switchLDB(dirPath, last) if err != nil { - events.Log("failed switching to new LDB: %{error}+v", err) + log.EventLog("failed switching to new LDB: %{error}+v", err) errs.Incr("switch-ldb") } } @@ -677,13 +677,13 @@ func lookupLastLDBSync(dirPath string) (int64, error) { fields := strings.Split(localPath, "/") if len(fields) != 2 || fields[1] != ldb.DefaultLDBFilename { - events.Log("ignoring unexpected file in LDB path (%+v)", fields) + log.EventLog("ignoring unexpected file in LDB path (%+v)", fields) errs.Incr("unexpected-local-file") return nil } timestamp, err := strconv.ParseInt(fields[0], 10, 64) if err != nil { - events.Log("ignoring file with invalid timestamp in LDB path (%+v)", fields) + log.EventLog("ignoring file with invalid timestamp in LDB path (%+v)", fields) errs.Incr("invalid-timestamp-local-file") return nil } diff --git a/ldb_rotating_reader.go b/ldb_rotating_reader.go index d32e3a45..bb18b0d1 100644 --- a/ldb_rotating_reader.go +++ b/ldb_rotating_reader.go @@ -4,15 +4,16 @@ import ( "context" "errors" "fmt" - "github.com/segmentio/ctlstore/pkg/errs" - "github.com/segmentio/ctlstore/pkg/globalstats" - "github.com/segmentio/ctlstore/pkg/ldb" - "github.com/segmentio/events/v2" - "github.com/segmentio/stats/v4" "path" "strconv" "sync/atomic" "time" + + "github.com/segmentio/ctlstore/pkg/errs" + "github.com/segmentio/ctlstore/pkg/globalstats" + "github.com/segmentio/ctlstore/pkg/ldb" + "github.com/segmentio/log" + "github.com/segmentio/stats/v4" ) // LDBRotatingReader reads data from multiple LDBs on a rotating schedule. @@ -82,7 +83,7 @@ func rotatingReader(minutesPerRotation RotationPeriod, ldbPaths ...string) (*LDB } var r LDBRotatingReader for _, p := range ldbPaths { - events.Log("Opening ldb %s for reading", p) + log.EventLog("Opening ldb %s for reading", p) reader, err := newLDBReader(p) if err != nil { return nil, err @@ -139,14 +140,14 @@ func (r *LDBRotatingReader) rotate(ctx context.Context) { globalstats.Set("rotating_reader.active", next) err := r.dbs[last].Close() if err != nil { - events.Log("failed to close LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err) + log.EventLog("failed to close LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err) errs.Incr("rotating_reader.closing_ldbreader", stats.T("id", strconv.Itoa(int(last)))) return } reader, err := newLDBReader(r.dbs[last].path) if err != nil { - events.Log("failed to open LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err) + log.EventLog("failed to open LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err) errs.Incr("rotating_reader.opening_ldbreader", stats.T("id", strconv.Itoa(int(last))), stats.T("path", path.Base(r.dbs[last].path))) diff --git a/pkg/changelog/changelog_writer.go b/pkg/changelog/changelog_writer.go index e3c2fba4..b759041e 100644 --- a/pkg/changelog/changelog_writer.go +++ b/pkg/changelog/changelog_writer.go @@ -4,7 +4,7 @@ import ( "encoding/json" "github.com/pkg/errors" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" ) type ( @@ -45,7 +45,7 @@ func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error { return errors.Wrap(err, "error marshalling json") } - events.Debug("changelogWriter.WriteChange: %{family}s.%{table}s => %{key}v", + log.EventDebug("changelogWriter.WriteChange: %{family}s.%{table}s => %{key}v", e.Family, e.Table, e.Key) return w.WriteLine.WriteLine(string(bytes)) diff --git a/pkg/cmd/ctlstore-cli/cmd/utils.go b/pkg/cmd/ctlstore-cli/cmd/utils.go index 8b54fa1d..ca0fe14a 100644 --- a/pkg/cmd/ctlstore-cli/cmd/utils.go +++ b/pkg/cmd/ctlstore-cli/cmd/utils.go @@ -3,7 +3,7 @@ package cmd import ( "bytes" "fmt" - "io/ioutil" + "io" "net/http" "net/url" "os" @@ -25,7 +25,7 @@ var ( func bailResponse(response *http.Response, msg string, args ...interface{}) { msg = fmt.Sprintf(msg, args...) // ok to ignore error here - b, _ := ioutil.ReadAll(response.Body) + b, _ := io.ReadAll(response.Body) respMsg := fmt.Sprintf("server returned [%d]: %s", response.StatusCode, b) fmt.Fprintln(os.Stderr, fmt.Sprintf("%s: %s", msg, respMsg)) os.Exit(1) diff --git a/pkg/cmd/ctlstore-cli/cmd/writer_limits.go b/pkg/cmd/ctlstore-cli/cmd/writer_limits.go index 5cf767fc..76d5d89d 100644 --- a/pkg/cmd/ctlstore-cli/cmd/writer_limits.go +++ b/pkg/cmd/ctlstore-cli/cmd/writer_limits.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "os" "text/tabwriter" @@ -37,7 +37,7 @@ var cliWriterLimits = &cli.CommandSet{ bailResponse(resp, "could not read limits") } var wrl limits.WriterRateLimits - b, err := ioutil.ReadAll(resp.Body) + b, err := io.ReadAll(resp.Body) if err != nil { bail("could not read response: %s", err) } diff --git a/pkg/cmd/ctlstore-mutator/main.go b/pkg/cmd/ctlstore-mutator/main.go index 252a523b..beb2d831 100644 --- a/pkg/cmd/ctlstore-mutator/main.go +++ b/pkg/cmd/ctlstore-mutator/main.go @@ -9,7 +9,7 @@ import ( "encoding/hex" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "strings" "time" @@ -111,7 +111,7 @@ func main() { } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - b, _ = ioutil.ReadAll(resp.Body) + b, _ = io.ReadAll(resp.Body) return fmt.Errorf("could not make mutation request: %d: %s", resp.StatusCode, b) } return nil @@ -137,7 +137,7 @@ func setup(cfg config, url string) error { } defer res.Body.Close() if res.StatusCode != http.StatusOK { - b, _ := ioutil.ReadAll(res.Body) + b, _ := io.ReadAll(res.Body) return fmt.Errorf("could register writer: %v: %s", res.StatusCode, b) } @@ -154,7 +154,7 @@ func setup(cfg config, url string) error { } defer res.Body.Close() if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict { - b, _ := ioutil.ReadAll(res.Body) + b, _ := io.ReadAll(res.Body) return fmt.Errorf("could not make family request: %v: %s", res.StatusCode, b) } @@ -184,7 +184,7 @@ func setup(cfg config, url string) error { } defer res.Body.Close() if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict { - b, _ := ioutil.ReadAll(res.Body) + b, _ := io.ReadAll(res.Body) return fmt.Errorf("could not make table request: %v: %s", res.StatusCode, b) } diff --git a/pkg/cmd/ctlstore/main.go b/pkg/cmd/ctlstore/main.go index 31bbb99a..3a18efea 100644 --- a/pkg/cmd/ctlstore/main.go +++ b/pkg/cmd/ctlstore/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "log/slog" "net/http" "os" "path" @@ -17,7 +18,8 @@ import ( "github.com/segmentio/conf" "github.com/segmentio/errors-go" "github.com/segmentio/events/v2" - _ "github.com/segmentio/events/v2/sigevents" + "github.com/segmentio/log" + _ "github.com/segmentio/log/sigusrdebug" "github.com/segmentio/stats/v4" "github.com/segmentio/stats/v4/datadog" "github.com/segmentio/stats/v4/procstats" @@ -177,7 +179,7 @@ func main() { ctx, cancel := events.WithSignals(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - events.DefaultLogger.EnableDebug = false + log.SetDebug(false) switch cmd, args := conf.LoadWith(nil, ld); cmd { case "version": @@ -204,8 +206,8 @@ func main() { } func enableDebug() { - events.DefaultLogger.EnableDebug = true - events.DefaultLogger.EnableSource = true + log.SetDebug(true) + log.SetSource(true) DebugEnabled = true } @@ -238,7 +240,7 @@ func configureDogstatsd(ctx context.Context, opts dogstatsdOpts) (dd *datadog.Cl }) stats.Register(dd) - events.Log("Setup dogstatsd with addr:%{addr}s, buffersize:%{buffersize}d, prefix:%{pfx}s, version:%{version}s", + log.EventLog("Setup dogstatsd with addr:%{addr}s, buffersize:%{buffersize}d, prefix:%{pfx}s, version:%{version}s", config.Address, config.BufferSize, opts.statsPrefix, ctlstore.Version) } @@ -330,7 +332,7 @@ func supervisor(ctx context.Context, args []string) { return nil }() if err != nil && !errs.IsCanceled(err) { - events.Log("Fatal Supervisor error: %{error}+v", err) + log.EventLog("Fatal Supervisor error: %{error}+v", err) errs.IncrDefault(stats.T("op", "startup")) } } @@ -362,7 +364,7 @@ func heartbeat(ctx context.Context, args []string) { Table: cliCfg.TableName, }) if err != nil { - events.Log("Fatal error starting heartbeat: %+v", err) + log.EventLog("Fatal error starting heartbeat: %+v", err) errs.IncrDefault(stats.T("op", "startup")) return } @@ -385,7 +387,7 @@ func executive(ctx context.Context, args []string) { loadConfig(&cliCfg, "executive", args) - events.Log("running with max/warn: %v %v", cliCfg.MaxTableSize, cliCfg.WarnTableSize) + log.EventLog("running with max/warn: %v %v", cliCfg.MaxTableSize, cliCfg.WarnTableSize) if cliCfg.Debug { enableDebug() @@ -414,7 +416,7 @@ func executive(ctx context.Context, args []string) { }) if err != nil { errs.IncrDefault(stats.T("op", "startup")) - events.Log("Fatal error starting Executive: %{error}+v", err) + log.EventLog("Fatal error starting Executive: %{error}+v", err) return } defer executive.Close() @@ -423,7 +425,7 @@ func executive(ctx context.Context, args []string) { if errors.Cause(err) != context.Canceled { errs.IncrDefault(stats.T("op", "service shutdown")) } - events.Log("executive quit: %v", err) + log.EventLog("executive quit: %v", err) } } @@ -444,7 +446,7 @@ func sidecar(ctx context.Context, args []string) { } sidecar, err := newSidecar(config) if err != nil { - events.Log("Fatal error starting sidecar: %{error}+v", err) + log.EventLog("Fatal error starting sidecar: %{error}+v", err) errs.IncrDefault(stats.T("op", "startup")) return } @@ -465,10 +467,10 @@ func reflector(ctx context.Context, args []string) { http.Handle("/metrics", promHandler) go func() { - events.Log("Serving Prometheus metrics on %s", cliCfg.MetricsBind) + log.EventLog("Serving Prometheus metrics on %s", cliCfg.MetricsBind) err := http.ListenAndServe(cliCfg.MetricsBind, nil) if err != nil { - events.Log("Failed to served Prometheus metrics: %s", err) + log.EventLog("Failed to served Prometheus metrics: %s", err) } }() } @@ -480,7 +482,7 @@ func reflector(ctx context.Context, args []string) { defer teardown() reflector, err := newReflector(cliCfg, false, 0) if err != nil { - events.Log("Fatal error starting Reflector: %{error}+v", err) + log.EventLog("Fatal error starting Reflector: %{error}+v", err) errs.IncrDefault(stats.T("op", "startup")) return } @@ -506,10 +508,10 @@ func multiReflector(ctx context.Context, args []string) { http.Handle("/metrics", promHandler) go func() { - events.Log("Serving Prometheus metrics on %s", cliCfg.MetricsBind) + log.EventLog("Serving Prometheus metrics on %s", cliCfg.MetricsBind) err := http.ListenAndServe(cliCfg.MetricsBind, nil) if err != nil { - events.Log("Failed to served Prometheus metrics: %s", err) + log.EventLog("Failed to served Prometheus metrics: %s", err) } }() } @@ -529,7 +531,7 @@ func multiReflector(ctx context.Context, args []string) { x := cliCfg x.LDBPath = p if i > 0 { - events.Log("changelog only created for 1st ldb path: %{path}, skipping #%{num}d", cliCfg.MultiReflector.LDBPaths[0], i+1) + log.EventLog("changelog only created for 1st ldb path: %{path}, skipping #%{num}d", cliCfg.MultiReflector.LDBPaths[0], i+1) x.ChangelogPath = "" x.ChangelogSize = 0 @@ -538,7 +540,7 @@ func multiReflector(ctx context.Context, args []string) { defer wg.Done() r, err := newReflector(x, false, idx) if err != nil { - events.Log("Fatal error starting Reflector: %{error}+v", err) + log.EventLog("Fatal error starting Reflector: %{error}+v", err) errs.IncrDefault(stats.T("op", "startup"), stats.T("path", p)) errChan <- err return @@ -565,7 +567,7 @@ func multiReflector(ctx context.Context, args []string) { err := grp.Wait() if err != nil { - events.Log("reflectors ended in error %{error}v", err) + log.EventLog("reflectors ended in error %{error}v", err) errs.Incr("multi.shutdown", stats.T("err", reflect.ValueOf(err).Type().String())) return } @@ -629,11 +631,17 @@ func newSidecar(config sidecarConfig) (*sidecarpkg.Sidecar, error) { func newReflector(cliCfg reflectorCliConfig, isSupervisor bool, i int) (*reflectorpkg.Reflector, error) { if cliCfg.LedgerHealth.Disable { - events.Log("DEPRECATION NOTICE: use --disable-ecs-behavior instead of --disable to control this ledger monitor behavior") + log.EventLog("DEPRECATION NOTICE: use --disable-ecs-behavior instead of --disable to control this ledger monitor behavior") } id := fmt.Sprintf("%s-%d", path.Base(cliCfg.LDBPath), i) - l := events.NewLogger(events.DefaultHandler).With(events.Args{{"id", id}}) - l.EnableDebug = cliCfg.Debug + var level slog.Level + if cliCfg.Debug { + level = slog.LevelDebug + } else { + level = slog.LevelInfo + } + handler := log.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: level}) + l := log.New(handler).With("id", id) return reflectorpkg.ReflectorFromConfig(reflectorpkg.ReflectorConfig{ LDBPath: cliCfg.LDBPath, ChangelogPath: cliCfg.ChangelogPath, diff --git a/pkg/event/changelog.go b/pkg/event/changelog.go index eca611f8..10ac62b0 100644 --- a/pkg/event/changelog.go +++ b/pkg/event/changelog.go @@ -13,7 +13,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/segmentio/ctlstore/pkg/errs" "github.com/segmentio/errors-go" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" ) @@ -69,7 +69,7 @@ func (c *fileChangelog) start(ctx context.Context) error { select { case <-ctx.Done(): if err := watcher.Close(); err != nil { - events.Log("Could not close watcher: %{err}s", err) + log.EventLog("Could not close watcher: %{err}s", err) } } }() @@ -88,7 +88,7 @@ func (c *fileChangelog) start(ctx context.Context) error { for { select { case err := <-watcher.Errors: - events.Log("FS err: %{err}s", err) + log.EventLog("FS err: %{err}s", err) select { case fsErrCh <- err: case <-ctx.Done(): @@ -140,10 +140,10 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event defer func() { if err := f.Close(); err != nil { errs.Incr("changelog-errors", stats.T("op", "close file")) - events.Log("Could not close changelog file: %{error}s", err) + log.EventLog("Could not close changelog file: %{error}s", err) } }() - events.Debug("Opening changelog...") + log.EventDebug("Opening changelog...") br := bufio.NewReaderSize(f, 60*1024) @@ -198,11 +198,11 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event } select { case <-time.After(time.Second): - events.Debug("Manually checking log") + log.EventDebug("Manually checking log") continue case err := <-fsErrCh: if err := readEvents(); err != io.EOF { - events.Log("could not consume rest of file: %{error}s", err) + log.EventLog("could not consume rest of file: %{error}s", err) } return errors.Wrap(err, "watcher error") case event := <-fsNotifyCh: @@ -210,16 +210,16 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event case fsnotify.Write: continue case fsnotify.Create: - events.Debug("New changelog created. Consuming the rest of current one...") + log.EventDebug("New changelog created. Consuming the rest of current one...") err := readEvents() if err != io.EOF { return errors.Wrap(err, "consume rest of changelog") } - events.Debug("Restarting reader") + log.EventDebug("Restarting reader") return nil } case <-ctx.Done(): - events.Debug("Changelog context finished. Exiting.") + log.EventDebug("Changelog context finished. Exiting.") return ctx.Err() } } @@ -230,12 +230,12 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event case errs.IsCanceled(err): return case os.IsNotExist(errors.Cause(err)): - events.Log("Changelog file does not exist, rechecking...") + log.EventLog("Changelog file does not exist, rechecking...") select { case <-fsNotifyCh: - events.Log("Changelog notified") + log.EventLog("Changelog notified") case <-time.After(time.Second): - events.Log("Manually checking") + log.EventLog("Manually checking") } default: errs.Incr("changelog-errors", stats.T("op", "open file")) @@ -261,7 +261,7 @@ func (c *fileChangelog) validate() error { switch { case err == nil: case os.IsNotExist(err): - events.Log("changelog does not exist. waiting 1s for rotation") + log.EventLog("changelog does not exist. waiting 1s for rotation") time.Sleep(time.Second) _, err = os.Stat(c.path) switch { diff --git a/pkg/event/changelog_test.go b/pkg/event/changelog_test.go index 3e813caa..7d7cbdf6 100644 --- a/pkg/event/changelog_test.go +++ b/pkg/event/changelog_test.go @@ -13,7 +13,6 @@ import ( "time" "github.com/segmentio/ctlstore/pkg/tests" - _ "github.com/segmentio/events/v2/text" "github.com/stretchr/testify/require" ) diff --git a/pkg/event/entry.go b/pkg/event/entry.go index e175153d..f777102d 100644 --- a/pkg/event/entry.go +++ b/pkg/event/entry.go @@ -2,7 +2,8 @@ package event // entry represents a single row in the changelog // e.g. -// {"seq":1,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} +// +// {"seq":1,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} type entry struct { Seq int64 `json:"seq"` Family string `json:"family"` diff --git a/pkg/event/fake_log_writer.go b/pkg/event/fake_log_writer.go index 0f39505b..a237548a 100644 --- a/pkg/event/fake_log_writer.go +++ b/pkg/event/fake_log_writer.go @@ -9,7 +9,7 @@ import ( "time" "github.com/segmentio/errors-go" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" ) type fakeLogWriter struct { @@ -30,7 +30,7 @@ func (w *fakeLogWriter) writeN(ctx context.Context, n int) error { return errors.Wrap(err, "create file") } defer func() { - events.Debug("Done writing %{num}d events", n) + log.EventDebug("Done writing %{num}d events", n) if err == nil { err = f.Close() } @@ -71,7 +71,7 @@ func (w *fakeLogWriter) writeN(ctx context.Context, n int) error { } // fmt.Println(info.Size(), w.rotateAfterBytes) if info.Size() > int64(w.rotateAfterBytes) { - events.Log("Rotation required (file size is %{bytes}d seq=%{seq}d)", info.Size(), atomic.LoadInt64(&w.seq)) + log.EventLog("Rotation required (file size is %{bytes}d seq=%{seq}d)", info.Size(), atomic.LoadInt64(&w.seq)) doRotate = true } } @@ -80,7 +80,7 @@ func (w *fakeLogWriter) writeN(ctx context.Context, n int) error { } if doRotate { - events.Debug("Rotating log file..") + log.EventDebug("Rotating log file..") these = 0 if err := f.Close(); err != nil { return errors.Wrap(err, "close during rotation") diff --git a/pkg/executive/db_executive.go b/pkg/executive/db_executive.go index fc2bbf4f..41a941da 100644 --- a/pkg/executive/db_executive.go +++ b/pkg/executive/db_executive.go @@ -15,8 +15,8 @@ import ( "github.com/segmentio/ctlstore/pkg/scanfunc" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/sqlgen" - "github.com/segmentio/events/v2" "github.com/segmentio/go-sqlite3" + "github.com/segmentio/log" ) const dmlLedgerTableName = "ctlstore_dml_ledger" @@ -184,8 +184,8 @@ func (e *dbExecutive) CreateTable(familyName string, tableName string, fieldName return err } - events.Debug("[CreateTable %{tableName}s] ctldb DDL: %{ddl}s", tableName, ddl) - events.Debug("[CreateTable %{tableName}s] log DDL: %{ddl}s", tableName, logDDL) + log.EventDebug("[CreateTable %{tableName}s] ctldb DDL: %{ddl}s", tableName, ddl) + log.EventDebug("[CreateTable %{tableName}s] log DDL: %{ddl}s", tableName, logDDL) tx, err := e.DB.BeginTx(ctx, nil) if err != nil { @@ -223,7 +223,7 @@ func (e *dbExecutive) CreateTable(familyName string, tableName string, fieldName return err } - events.Log("Successfully created new table `%{tableName}s` at seq %{seq}v", tableName, seq) + log.EventLog("Successfully created new table `%{tableName}s` at seq %{seq}v", tableName, seq) return nil } @@ -303,8 +303,8 @@ func (e *dbExecutive) AddFields(familyName string, tableName string, fieldNames if err != nil { return err } - events.Debug("[CreateTable %{tableName}s] ctldb DDL: %{ddl}s", tableName, ddl) - events.Debug("[CreateTable %{tableName}s] log DDL: %{ddl}s", tableName, logDDL) + log.EventDebug("[CreateTable %{tableName}s] ctldb DDL: %{ddl}s", tableName, ddl) + log.EventDebug("[CreateTable %{tableName}s] log DDL: %{ddl}s", tableName, logDDL) // create a func here to make rollback semantics a bit easier err = func() error { tx, err := e.DB.BeginTx(ctx, nil) @@ -345,7 +345,7 @@ func (e *dbExecutive) AddFields(familyName string, tableName string, fieldNames if err != nil { return errors.Wrap(err, "commit tx") } - events.Log("Successfully created new field `%{fieldName}s %{fieldType}v` on table %{tableName}s at seq %{seq}v", fieldName, fieldType, tableName, seq) + log.EventLog("Successfully created new field `%{fieldName}s %{fieldType}v` on table %{tableName}s at seq %{seq}v", fieldName, fieldType, tableName, seq) return nil }() if err != nil { @@ -588,7 +588,7 @@ func (e *dbExecutive) Mutate( // Execute the actual DML write _, err = tx.ExecContext(ctx, dmlSQL) if err != nil { - events.Log("dml exec error, Request: %{req}+v SQL: %{sql}s", req, dmlSQL) + log.EventLog("dml exec error, Request: %{req}+v SQL: %{sql}s", req, dmlSQL) return errors.Wrap(err, "dml exec error") } @@ -611,7 +611,7 @@ func (e *dbExecutive) Mutate( return errors.Wrap(err, "commit failed") } - events.Debug( + log.EventDebug( "Mutate success on family %{familyName}s "+ "applied %{mutationCount}d mutations "+ "at seq %{lastSeq}d "+ @@ -910,7 +910,7 @@ func (e *dbExecutive) UpdateTableSizeLimit(limit limits.TableSizeLimit) error { func (e *dbExecutive) DeleteTableSizeLimit(ft schema.FamilyTable) error { ctx, cancel := e.ctx() defer cancel() - events.Log("deleting from max table sizes where f=%v and t=%v", ft.Family, ft.Table) + log.EventLog("deleting from max table sizes where f=%v and t=%v", ft.Family, ft.Table) resp, err := e.DB.ExecContext(ctx, "delete from max_table_sizes where family_name=? and table_name=?", ft.Family, ft.Table) if err != nil { @@ -1043,8 +1043,8 @@ func (e *dbExecutive) DropTable(table schema.FamilyTable) error { logDDL := dmlLogTbl.DropTableDDL() - events.Debug("[DropTable %{tableName}s] ctldb DDL: %{ddl}s", table, ddl) - events.Debug("[DropTable %{tableName}s] log DDL: %{ddl}s", table, logDDL) + log.EventDebug("[DropTable %{tableName}s] ctldb DDL: %{ddl}s", table, ddl) + log.EventDebug("[DropTable %{tableName}s] log DDL: %{ddl}s", table, logDDL) tx, err := e.DB.BeginTx(ctx, nil) if err != nil { @@ -1078,7 +1078,7 @@ func (e *dbExecutive) DropTable(table schema.FamilyTable) error { return errors.Wrap(err, "error committing transaction") } - events.Log("Successfully dropped `%{tableName}s` at seq %{seq}v", table.String(), seq) + log.EventLog("Successfully dropped `%{tableName}s` at seq %{seq}v", table.String(), seq) return nil } @@ -1115,8 +1115,8 @@ func (e *dbExecutive) ClearTable(table schema.FamilyTable) error { logDDL := dmlLogTbl.ClearTableDDL() - events.Debug("[ClearTable %{tableName}s] ctldb DDL: %{ddl}s", table, ddl) - events.Debug("[ClearTable %{tableName}s] log DDL: %{ddl}s", table, logDDL) + log.EventDebug("[ClearTable %{tableName}s] ctldb DDL: %{ddl}s", table, ddl) + log.EventDebug("[ClearTable %{tableName}s] log DDL: %{ddl}s", table, logDDL) tx, err := e.DB.BeginTx(ctx, nil) if err != nil { @@ -1150,7 +1150,7 @@ func (e *dbExecutive) ClearTable(table schema.FamilyTable) error { return errors.Wrap(err, "error committing transaction") } - events.Log("Successfully deleted all rows from `%{tableName}s` at seq %{seq}v", table.String(), seq) + log.EventLog("Successfully deleted all rows from `%{tableName}s` at seq %{seq}v", table.String(), seq) return nil } @@ -1159,7 +1159,7 @@ func (e *dbExecutive) ReadFamilyTableNames(family schema.FamilyName) (tables []s ctx, cancel := e.ctx() defer cancel() - events.Debug("reading family table names where f=%s", family) + log.EventDebug("reading family table names where f=%s", family) rows, err := e.DB.QueryContext(ctx, fmt.Sprintf(`select table_name from information_schema.tables where table_name like '%s___%%'`, family.String())) if err != nil { return nil, errors.Wrap(err, "error reading family table names") diff --git a/pkg/executive/db_limiter.go b/pkg/executive/db_limiter.go index 4fcb1cff..f68572d8 100644 --- a/pkg/executive/db_limiter.go +++ b/pkg/executive/db_limiter.go @@ -11,7 +11,7 @@ import ( "github.com/segmentio/ctlstore/pkg/limits" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/utils" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" ) @@ -114,7 +114,7 @@ func (l *dbLimiter) checkWriterRates(ctx context.Context, tx *sql.Tx, lr limiter } writerLimit := l.limitForWriter(lr.writerName) allowed := amount <= writerLimit - events.Debug("limiter: writer:%v writerLimit:%v amount:%v allowed:%v", lr.writerName, writerLimit, amount, allowed) + log.Debug("limiter", "writer", lr.writerName, "writerLimit", writerLimit, "amount", amount, "allowed", allowed) return allowed, nil } @@ -136,7 +136,7 @@ func (l *dbLimiter) checkTableSizes(ctx context.Context, lr limiterRequest) erro // start initializes the db limiter and spawns necessary goroutines func (l *dbLimiter) start(ctx context.Context) error { - events.Log("Starting the db limiter") + log.EventLog("Starting the db limiter") if err := l.tableSizer.start(ctx); err != nil { return errors.Wrap(err, "could not start sizer") } @@ -151,14 +151,14 @@ func (l *dbLimiter) start(ctx context.Context) error { // after we've done one refreshWriterLimits successfully, we'll do the rest async go utils.CtxLoop(ctx, defaultRefreshPeriod, func() { if err := l.refreshWriterLimits(ctx); err != nil { - events.Log("could not update limits: %{error}s", err) + log.EventLog("could not update limits: %{error}s", err) instrumentUpdateErr(err) } }) // also, periodically try to clean up the writer_usage table. go utils.CtxLoop(ctx, defaultDeleteUsagePeriod, func() { if err := l.deleteOldUsageData(ctx); err != nil { - events.Log("could not collect garbage %{error}s", err) + log.EventLog("could not collect garbage %{error}s", err) errs.IncrDefault(stats.Tag{Name: "op", Value: "limiter-collect-garbage"}) } }) @@ -178,7 +178,7 @@ func (l *dbLimiter) deleteOldUsageData(ctx context.Context) error { return errors.Wrap(err, "could not get rows affected after deleting from writer_usage") } if rows > 0 { - events.Log("deleted %{rows}d rows from the writer_usage table", rows) + log.EventLog("deleted %{rows}d rows from the writer_usage table", rows) } stats.Add("writer-usage-rows-deleted", rows) return nil @@ -206,7 +206,7 @@ func (l *dbLimiter) refreshWriterLimits(ctx context.Context) error { if err != nil { return errors.Wrap(err, "adjust found rate limit") } - events.Debug("adjusted %v limit from %v/%v to %v/%v", writerName, maxRowsPerMinute, time.Minute, adjustedRate, l.defaultWriterLimit.Period) + log.EventDebug("adjusted %v limit from %v/%v to %v/%v", writerName, maxRowsPerMinute, time.Minute, adjustedRate, l.defaultWriterLimit.Period) writerLimits[writerName] = adjustedRate } if err := rows.Err(); err != nil { diff --git a/pkg/executive/db_limiter_test.go b/pkg/executive/db_limiter_test.go index 2af65075..882c1cef 100644 --- a/pkg/executive/db_limiter_test.go +++ b/pkg/executive/db_limiter_test.go @@ -8,7 +8,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "net/http/httptest" "sync/atomic" @@ -66,7 +65,7 @@ func testDBLimiter(t *testing.T, dbType string) { resp := w.Result() defer resp.Body.Close() if expectedCode != resp.StatusCode { - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) require.Failf(t, "request failed", "Expected %d, got %d: %s", expectedCode, resp.StatusCode, b) } } diff --git a/pkg/executive/executive.go b/pkg/executive/executive.go index db6826fd..1903c167 100644 --- a/pkg/executive/executive.go +++ b/pkg/executive/executive.go @@ -78,7 +78,7 @@ func newMutationRequest(famName schema.FamilyName, req ExecutiveMutationRequest) } // Returns the request Values as a slice in the order specified by the -//fieldOrder param. An error will be returned if a field is missing. +// fieldOrder param. An error will be returned if a field is missing. func (r *mutationRequest) valuesByOrder(fieldOrder []schema.FieldName) ([]interface{}, error) { values := []interface{}{} for _, fn := range fieldOrder { diff --git a/pkg/executive/executive_endpoint.go b/pkg/executive/executive_endpoint.go index f0090fe0..7a4cd77b 100644 --- a/pkg/executive/executive_endpoint.go +++ b/pkg/executive/executive_endpoint.go @@ -2,7 +2,7 @@ package executive import ( "encoding/json" - "io/ioutil" + "io" "net/http" "strconv" "time" @@ -10,11 +10,11 @@ import ( "github.com/segmentio/ctlstore/pkg/errs" "github.com/segmentio/ctlstore/pkg/limits" "github.com/segmentio/ctlstore/pkg/schema" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" "github.com/gorilla/mux" "github.com/pkg/errors" - "github.com/segmentio/events/v2" ) // ExecutiveEndpoint is an HTTP 'wrapper' for ExecutiveInterface @@ -42,7 +42,7 @@ func (ee *ExecutiveEndpoint) handleFamilyRoute(w http.ResponseWriter, r *http.Re } func (ee *ExecutiveEndpoint) handleTablesRoute(w http.ResponseWriter, r *http.Request) { - rawBody, err := ioutil.ReadAll(r.Body) + rawBody, err := io.ReadAll(r.Body) if err != nil { writeErrorResponse(err, w) return @@ -68,7 +68,7 @@ func (ee *ExecutiveEndpoint) handleTableRoute(w http.ResponseWriter, r *http.Req familyName := vars["familyName"] tableName := vars["tableName"] - rawBody, err := ioutil.ReadAll(r.Body) + rawBody, err := io.ReadAll(r.Body) if err != nil { writeErrorResponse(err, w) return @@ -142,7 +142,7 @@ func (ee *ExecutiveEndpoint) handleCookieRoute(w http.ResponseWriter, r *http.Re return } if r.Method == "POST" { - rawBody, err := ioutil.ReadAll(r.Body) + rawBody, err := io.ReadAll(r.Body) if err != nil { writeErrorResponse(err, w) return @@ -206,7 +206,7 @@ func (ee *ExecutiveEndpoint) handleWritersRoute(w http.ResponseWriter, r *http.R vars := mux.Vars(r) if r.Method == "POST" { - rawBody, err := ioutil.ReadAll(r.Body) + rawBody, err := io.ReadAll(r.Body) if err != nil { writeErrorResponse(err, w) return @@ -251,7 +251,7 @@ func (ee *ExecutiveEndpoint) handleMutationsRoute(w http.ResponseWriter, r *http } `json:"mutations"` }{} - rawBody, err := ioutil.ReadAll(r.Body) + rawBody, err := io.ReadAll(r.Body) if err != nil { writeErrorResponse(err, w) return @@ -317,7 +317,7 @@ func (ee *ExecutiveEndpoint) handleStatusRoute(w http.ResponseWriter, r *http.Re err := ee.HealthChecker.HealthCheck() if err != nil { w.WriteHeader(http.StatusInternalServerError) - events.Log("Health check failure: %{error}+v", err) + log.EventLog("Health check failure: %{error}+v", err) return } w.WriteHeader(http.StatusOK) @@ -599,7 +599,7 @@ func writeErrorResponse(e error, w http.ResponseWriter) { w.WriteHeader(status) _, _ = w.Write([]byte(resBody)) - events.Log("Error Status %{status}v, Reason: %{reason}v, Internal Error: %{error}+v", + log.EventLog("Error Status %{status}v, Reason: %{reason}v, Internal Error: %{error}+v", status, resBody, e.Error()) return diff --git a/pkg/executive/executive_service.go b/pkg/executive/executive_service.go index e54c4b9f..fe122447 100644 --- a/pkg/executive/executive_service.go +++ b/pkg/executive/executive_service.go @@ -16,7 +16,7 @@ import ( "github.com/segmentio/ctlstore/pkg/errs" "github.com/segmentio/ctlstore/pkg/limits" "github.com/segmentio/ctlstore/pkg/utils" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" ) @@ -78,7 +78,7 @@ func (s *executiveService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } defer ep.Close() - events.Debug("Request: %{request}+v", cR) + log.EventDebug("Request: %{request}+v", cR) ep.Handler().ServeHTTP(w, cR) } @@ -99,21 +99,21 @@ func (s *executiveService) Start(ctx context.Context, bind string) error { h := &http.Server{Addr: bind, Handler: s} go func() { - events.Log("Listening on %{addr}s...", bind) + log.EventLog("Listening on %{addr}s...", bind) if err := h.ListenAndServe(); err != nil && err != http.ErrServerClosed { - events.Log("Error listening: %{error}+v", err) + log.EventLog("Error listening: %{error}+v", err) } else { - events.Log("Server stopped.") + log.EventLog("Server stopped.") } }() <-stop - events.Log("Shutting down the server...") + log.EventLog("Shutting down the server...") sctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := h.Shutdown(sctx); err != nil { - events.Log("Shutdown error: %{error}+v", err) + log.EventLog("Shutdown error: %{error}+v", err) } return nil @@ -130,7 +130,7 @@ func (s *executiveService) instrumentLedgerRowCount(ctx context.Context) { row := s.ctldb.QueryRowContext(ctx, "select table_rows from information_schema.tables where table_name=?", dmlLedgerTableName) var rowCount int64 if err := row.Scan(&rowCount); err != nil { - events.Log("Could not scan ledger row count: %s", err) + log.EventLog("Could not scan ledger row count: %s", err) errs.IncrDefault(stats.Tag{Name: "op", Value: "instrument-ledger-row-count"}) return } diff --git a/pkg/executive/table_sizer.go b/pkg/executive/table_sizer.go index a787ee91..1aff79fa 100644 --- a/pkg/executive/table_sizer.go +++ b/pkg/executive/table_sizer.go @@ -12,7 +12,7 @@ import ( "github.com/segmentio/ctlstore/pkg/limits" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/utils" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" ) @@ -41,7 +41,7 @@ func newTableSizer(ctldb *sql.DB, dbType string, defaultTableLimit limits.SizeLi // the table sizer does not work for sqlite3 databases. enabled := dbType != "sqlite3" if !enabled { - events.Log("Table sizer is disabled due to dbType=%s", dbType) + log.EventLog("Table sizer is disabled due to dbType=%s", dbType) } return &tableSizer{ enabled: enabled, @@ -68,7 +68,7 @@ func (s *tableSizer) tableOK(ft schema.FamilyTable) (found bool, err error) { if !tableFound { // we don't know about it yet. this is normal when a new table has been created // and the sizer has not yet refreshed its table size map. - events.Debug("received table size check about unknown table '%s'", ft) + log.EventDebug("received table size check about unknown table '%s'", ft) errs.Incr("table-sizer-unknown-table", ft.Tag()) return false, nil } @@ -95,10 +95,10 @@ func (s *tableSizer) tableOK(ft schema.FamilyTable) (found bool, err error) { // poll period. func (s *tableSizer) start(ctx context.Context) error { if !s.enabled { - events.Log("Table sizer not starting b/c it is disabled") + log.EventLog("Table sizer not starting b/c it is disabled") return nil } - events.Log("starting table sizer with a period of %v", s.pollPeriod) + log.EventLog("starting table sizer with a period of %v", s.pollPeriod) doRefresh := func() error { err := s.refresh(ctx) if err != nil { @@ -111,7 +111,7 @@ func (s *tableSizer) start(ctx context.Context) error { } go utils.CtxLoop(ctx, s.pollPeriod, func() { if err := doRefresh(); err != nil { - events.Log("could not refresh table sizer: %{err}v", err) + log.EventLog("could not refresh table sizer: %{err}v", err) } }) return nil @@ -189,7 +189,7 @@ func (s *tableSizer) getSizes(ctx context.Context) (map[schema.FamilyTable]int64 } stats.Set("table-sizes", amount, stats.T("family", ft.Family), stats.T("table", ft.Table)) res[ft] = amount - events.Debug("table sizer: %v=%v", name, amount) + log.EventDebug("table sizer: %v=%v", name, amount) } return res, rows.Err() } diff --git a/pkg/executive/test_executive.go b/pkg/executive/test_executive.go index 08f53f63..3d32667c 100644 --- a/pkg/executive/test_executive.go +++ b/pkg/executive/test_executive.go @@ -3,7 +3,6 @@ package executive import ( "context" "database/sql" - "io/ioutil" "net" "net/http" "os" @@ -17,7 +16,7 @@ import ( "github.com/segmentio/ctlstore/pkg/limits" "github.com/segmentio/ctlstore/pkg/sqlgen" "github.com/segmentio/ctlstore/pkg/units" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" ) type TestExecutiveService struct { @@ -28,7 +27,7 @@ type TestExecutiveService struct { } func NewTestExecutiveService(bindTo string) (*TestExecutiveService, error) { - tmpDir, err := ioutil.TempDir("", "") + tmpDir, err := os.MkdirTemp("", "") if err != nil { return nil, err } @@ -58,7 +57,7 @@ func NewTestExecutiveService(bindTo string) (*TestExecutiveService, error) { go func() { listener, err := net.Listen("tcp", bindTo) if err != nil { - events.Log("Error listening: %{error}+v", err) + log.EventLog("Error listening: %{error}+v", err) started.Done() return } @@ -68,7 +67,7 @@ func NewTestExecutiveService(bindTo string) (*TestExecutiveService, error) { started.Done() if err := svc.h.Serve(listener); err != nil && err != http.ErrServerClosed { - events.Log("Error serving: %{error}+v", err) + log.EventLog("Error serving: %{error}+v", err) } }() @@ -89,7 +88,7 @@ func (s *TestExecutiveService) shutdown() { defer cancel() if err := s.h.Shutdown(sctx); err != nil { - events.Log("Shutdown error: %{error}+v", err) + log.EventLog("Shutdown error: %{error}+v", err) } } diff --git a/pkg/globalstats/stats.go b/pkg/globalstats/stats.go index 7a895572..4b5e2bdf 100644 --- a/pkg/globalstats/stats.go +++ b/pkg/globalstats/stats.go @@ -11,7 +11,7 @@ import ( "github.com/segmentio/ctlstore/pkg/version" "github.com/segmentio/errors-go" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" ) @@ -142,7 +142,7 @@ func Initialize(ctx context.Context, cfg Config) { // Validate that all config values are valid. if cfg.FlushEvery < 0 { - events.Log("Could not initialize ctlstore global stats: %{error}s", errors.New("flush rate must be a positive duration")) + log.EventLog("Could not initialize ctlstore global stats: %{error}s", errors.New("flush rate must be a positive duration")) return } diff --git a/pkg/heartbeat/heartbeat.go b/pkg/heartbeat/heartbeat.go index fc5c929e..56ccdc4b 100644 --- a/pkg/heartbeat/heartbeat.go +++ b/pkg/heartbeat/heartbeat.go @@ -2,7 +2,7 @@ package heartbeat import ( "context" - "io/ioutil" + "io" "net/http" "strings" "time" @@ -10,7 +10,7 @@ import ( "github.com/pkg/errors" "github.com/segmentio/ctlstore/pkg/errs" "github.com/segmentio/ctlstore/pkg/utils" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" ) type ( @@ -56,8 +56,8 @@ func HeartbeatFromConfig(config HeartbeatConfig) (*Heartbeat, error) { } func (h *Heartbeat) Start(ctx context.Context) { - events.Log("Heartbeat starting") - defer events.Log("Heartbeat stopped") + log.EventLog("Heartbeat starting") + defer log.EventLog("Heartbeat stopped") utils.CtxFireLoop(ctx, h.interval, func() { h.pulse(ctx) }) } @@ -98,14 +98,14 @@ func (h *Heartbeat) pulse(ctx context.Context) { } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return errors.Errorf("could not make mutation request: %d: %s", resp.StatusCode, b) } - events.Log("Heartbeat: %v", heartbeat) + log.EventLog("Heartbeat: %v", heartbeat) return nil }() if err != nil { - events.Log("Heartbeat failed: %s", err) + log.EventLog("Heartbeat failed: %s", err) errs.Incr("heartbeat-errors") } } @@ -121,7 +121,7 @@ func (h *Heartbeat) init() error { } defer res.Body.Close() if res.StatusCode != http.StatusOK { - b, _ := ioutil.ReadAll(res.Body) + b, _ := io.ReadAll(res.Body) return errors.Errorf("could not register writer: %d: %s", res.StatusCode, b) } @@ -137,7 +137,7 @@ func (h *Heartbeat) init() error { } defer res.Body.Close() if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict { - b, _ := ioutil.ReadAll(res.Body) + b, _ := io.ReadAll(res.Body) return errors.Errorf("could not make family request: %v: %s", res.StatusCode, b) } @@ -161,7 +161,7 @@ func (h *Heartbeat) init() error { } defer res.Body.Close() if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict { - b, _ := ioutil.ReadAll(res.Body) + b, _ := io.ReadAll(res.Body) return errors.Errorf("could not make table request: %v: %s", res.StatusCode, b) } diff --git a/pkg/ldb/ldbs.go b/pkg/ldb/ldbs.go index 9fca941d..c13724d9 100644 --- a/pkg/ldb/ldbs.go +++ b/pkg/ldb/ldbs.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "io/ioutil" "os" "path/filepath" "sync/atomic" @@ -45,7 +44,7 @@ var ( var testTmpSeq int64 = 0 func LDBForTestWithPath(t testing.TB) (res *sql.DB, teardown func(), path string) { - tmpDir, err := ioutil.TempDir("", "ldb-for-test") + tmpDir, err := os.MkdirTemp("", "ldb-for-test") if err != nil { t.Fatal(err) } @@ -94,7 +93,7 @@ func EnsureLdbInitialized(ctx context.Context, db *sql.DB) error { } func NewLDBTmpPath(t *testing.T) (string, func()) { - path, err := ioutil.TempDir("", "ldb-tmp-path") + path, err := os.MkdirTemp("", "ldb-tmp-path") if err != nil { t.Fatal(err) } diff --git a/pkg/ldbwriter/changelog_callback.go b/pkg/ldbwriter/changelog_callback.go index be6d58fb..a9e55157 100644 --- a/pkg/ldbwriter/changelog_callback.go +++ b/pkg/ldbwriter/changelog_callback.go @@ -6,7 +6,7 @@ import ( "github.com/segmentio/ctlstore/pkg/changelog" "github.com/segmentio/ctlstore/pkg/schema" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" ) type ChangelogCallback struct { @@ -20,7 +20,7 @@ func (c *ChangelogCallback) LDBWritten(ctx context.Context, data LDBWriteMetadat if err != nil { // This is expected because it'll capture tables like ctlstore_dml_ledger, // which aren't tables this cares about. - events.Debug("Skipped logging change to %{tableName}s, can't decode table: %{error}v", + log.EventDebug("Skipped logging change to %{tableName}s, can't decode table: %{error}v", change.TableName, err) continue @@ -28,7 +28,7 @@ func (c *ChangelogCallback) LDBWritten(ctx context.Context, data LDBWriteMetadat keys, err := change.ExtractKeys(data.DB) if err != nil { - events.Log("Skipped logging change to %{tableName}, can't extract keys: %{error}v", + log.EventLog("Skipped logging change to %{tableName}, can't extract keys: %{error}v", change.TableName, err) continue @@ -43,7 +43,7 @@ func (c *ChangelogCallback) LDBWritten(ctx context.Context, data LDBWriteMetadat Key: key, }) if err != nil { - events.Log("Skipped logging change to %{family}s.%{table}s:%{key}v: %{err}v", + log.EventLog("Skipped logging change to %{family}s.%{table}s:%{key}v: %{err}v", fam, tbl, key, err) continue } diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index ad1a4148..35dd3693 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -6,7 +6,7 @@ import ( "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/sqlite" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" ) // CallbackWriter is an LDBWriter that delegates to another @@ -25,7 +25,7 @@ func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema } changes := w.ChangeBuffer.Pop() for _, callback := range w.Callbacks { - events.Debug("Writing DML callback for %{cb}T", callback) + log.EventDebug("Writing DML callback for %{cb}T", callback) callback.LDBWritten(ctx, LDBWriteMetadata{ DB: w.DB, Statement: statement, diff --git a/pkg/ldbwriter/ldb_writer.go b/pkg/ldbwriter/ldb_writer.go index a3dfe84f..b999babf 100644 --- a/pkg/ldbwriter/ldb_writer.go +++ b/pkg/ldbwriter/ldb_writer.go @@ -4,8 +4,10 @@ import ( "context" "database/sql" "fmt" + "log/slog" + "github.com/pkg/errors" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" "github.com/segmentio/ctlstore/pkg/errs" @@ -38,7 +40,7 @@ type SqlLdbWriter struct { Db *sql.DB LedgerTx *sql.Tx // uniquely identify this SqlWriter - Logger *events.Logger + Logger *slog.Logger ID string } @@ -75,7 +77,7 @@ func (w *SqlLdbWriter) ApplyDMLStatement(_ context.Context, statement schema.DML return errors.New("invariant violation") } w.LedgerTx = tx - logger.Debug("Begin TX at %{sequence}v", statement.Sequence) + logger.Debug("Begin TX", "sequence", statement.Sequence) } // Update the last update table. This will allow the ldb reader @@ -146,13 +148,13 @@ func (w *SqlLdbWriter) ApplyDMLStatement(_ context.Context, statement schema.DML if err != nil { tx.Rollback() errs.Incr("sql_ldb_writer.ledgerTx.commit.error", stats.T("id", w.ID)) - logger.Log("Failed to commit Tx at seq %{seq}s: %{error}+v", - statement.Sequence, - err) + logger.Error(fmt.Sprintf("Failed to commit Tx at seq %v: %+v", statement.Sequence, err), + "sequence", statement.Sequence, + "err", err) return errors.Wrap(err, "commit multi-statement dml tx error") } stats.Incr("sql_ldb_writer.ledgerTx.commit.success", stats.T("id", w.ID)) - logger.Debug("Committed TX at %{sequence}v", statement.Sequence) + logger.Debug("Committed TX", "sequence", statement.Sequence) w.LedgerTx = nil return nil } @@ -167,9 +169,9 @@ func (w *SqlLdbWriter) ApplyDMLStatement(_ context.Context, statement schema.DML stats.Incr("sql_ldb_writer.exec.success", stats.T("id", w.ID)) - logger.Debug("Applying DML[%{sequence}d]: '%{statement}s'", - statement.Sequence, - statement.Statement) + logger.Debug(fmt.Sprintf("Applying DML[%d]: '%s'", statement.Sequence, statement.Statement), + "sequence", statement.Sequence, + "stmt", statement.Statement) // Commit if not inside a ledger transaction, since that would be // a single statement transaction. @@ -228,7 +230,7 @@ var ( func (w *SqlLdbWriter) Checkpoint(checkpointingType CheckpointType) (*PragmaWALResult, error) { res, err := w.Db.Query(fmt.Sprintf("PRAGMA wal_checkpoint(%s)", string(checkpointingType))) if err != nil { - w.logger().Log("error in checkpointing, %{error}", err) + w.logger().Error("error in checkpointing", "err", err) errs.Incr("sql_ldb_writer.wal_checkpoint.query.error", stats.T("id", w.ID)) return nil, err } @@ -238,7 +240,7 @@ func (w *SqlLdbWriter) Checkpoint(checkpointingType CheckpointType) (*PragmaWALR if res.Next() { err := res.Scan(&p.Busy, &p.Log, &p.Checkpointed) if err != nil { - w.logger().Log("error in scanning checkpointing, %{error}") + w.logger().Error("error in scanning checkpointing", "err", err) errs.Incr("sql_ldb_writer.wal_checkpoint.scan.error", stats.T("id", w.ID)) return nil, err } @@ -247,9 +249,9 @@ func (w *SqlLdbWriter) Checkpoint(checkpointingType CheckpointType) (*PragmaWALR return &p, nil } -func (w *SqlLdbWriter) logger() *events.Logger { +func (w *SqlLdbWriter) logger() *slog.Logger { if w.Logger == nil { - w.Logger = events.DefaultLogger + w.Logger = log.Default() } return w.Logger } diff --git a/pkg/ldbwriter/ldb_writer_with_changelog.go b/pkg/ldbwriter/ldb_writer_with_changelog.go index 6f3bc9e5..9e638d20 100644 --- a/pkg/ldbwriter/ldb_writer_with_changelog.go +++ b/pkg/ldbwriter/ldb_writer_with_changelog.go @@ -8,7 +8,7 @@ import ( "github.com/segmentio/ctlstore/pkg/changelog" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/sqlite" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" ) type LDBWriterWithChangelog struct { @@ -19,7 +19,6 @@ type LDBWriterWithChangelog struct { Seq int64 } -// // NOTE: How does the changelog work? // // This is sort of the crux of how the changelog comes together. The Reflector @@ -33,7 +32,6 @@ type LDBWriterWithChangelog struct { // This is pretty complex, but after enumerating about 8 different options, it // ended up actually being the most simple. Other options involved not-so-great // options like parsing SQL or maintaining triggers on every table. -// func (w *LDBWriterWithChangelog) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error { err := w.LdbWriter.ApplyDMLStatement(ctx, statement) if err != nil { @@ -45,7 +43,7 @@ func (w *LDBWriterWithChangelog) ApplyDMLStatement(ctx context.Context, statemen if err != nil { // This is expected because it'll capture tables like ctlstore_dml_ledger, // which aren't tables this cares about. - events.Debug("Skipped logging change to %{tableName}s, can't decode table: %{error}v", + log.EventDebug("Skipped logging change to %{tableName}s, can't decode table: %{error}v", change.TableName, err) continue @@ -53,7 +51,7 @@ func (w *LDBWriterWithChangelog) ApplyDMLStatement(ctx context.Context, statemen keys, err := change.ExtractKeys(w.DB) if err != nil { - events.Log("Skipped logging change to %{tableName}, can't extract keys: %{error}v", + log.EventLog("Skipped logging change to %{tableName}, can't extract keys: %{error}v", change.TableName, err) continue @@ -68,7 +66,7 @@ func (w *LDBWriterWithChangelog) ApplyDMLStatement(ctx context.Context, statemen Key: key, }) if err != nil { - events.Log("Skipped logging change to %{family}s.%{table}s:%{key}v: %{err}v", + log.EventLog("Skipped logging change to %{family}s.%{table}s:%{key}v: %{err}v", fam, tbl, key, err) continue } diff --git a/pkg/ledger/ledger_monitor.go b/pkg/ledger/ledger_monitor.go index 4354609d..4af9f087 100644 --- a/pkg/ledger/ledger_monitor.go +++ b/pkg/ledger/ledger_monitor.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "time" @@ -13,7 +13,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ecs" "github.com/segmentio/errors-go" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" "github.com/segmentio/ctlstore/pkg/errs" @@ -68,8 +68,8 @@ func NewLedgerMonitor(cfg HealthConfig, llf latencyFunc, opts ...MonitorOpt) (*M } func (m *Monitor) Start(ctx context.Context) { - events.Log("Ledger monitor starting") - defer events.Log("Ledger monitor stopped") + log.EventLog("Ledger monitor starting") + defer log.EventLog("Ledger monitor stopped") var health *bool // pointer for tri-state logic temporaryErrorLimit := 3 utils.CtxFireLoopTicker(ctx, m.tickerFunc(), func() { @@ -114,18 +114,18 @@ func (m *Monitor) Start(ctx context.Context) { case errors.Is("temporary", err) && temporaryErrorLimit > 0: // don't increment error metric for a temporary error temporaryErrorLimit-- - events.Log("Temporary monitor ledger latency error: %s", err) + log.EventLog("Temporary monitor ledger latency error: %s", err) stats.Incr("ledger-monitor-temporary-errors") default: // this is an error that must be instrumented - events.Log("Could not monitor ledger latency: %s", err) + log.EventLog("Could not monitor ledger latency: %s", err) errs.IncrDefault(stats.Tag{Name: "op", Value: "monitor-ledger-latency"}) } }) } func (m *Monitor) setHealthAttribute(ctx context.Context, attrValue string) error { - events.Log("Setting ECS instance attribute: %s=%s", m.cfg.AttributeName, attrValue) + log.EventLog("Setting ECS instance attribute: %s=%s", m.cfg.AttributeName, attrValue) ecsMeta, err := m.getECSMetadata(ctx) if err != nil { return errors.Wrap(err, "get ecs metadata") @@ -134,7 +134,7 @@ func (m *Monitor) setHealthAttribute(ctx context.Context, attrValue string) erro if err != nil { return errors.Wrap(err, "build cluster ARN") } - events.Log("Putting attribute name=%{attName}v value=%{attValue}v targetID=%{targetID}v targetType=%{targetType}v", + log.EventLog("Putting attribute name=%{attName}v value=%{attValue}v targetID=%{targetID}v targetType=%{targetType}v", m.cfg.AttributeName, attrValue, ecsMeta.ContainerInstanceArn, ecsContainerInstanceTargetType) client := m.getECSClient() _, err = client.PutAttributes(&ecs.PutAttributesInput{ @@ -193,7 +193,7 @@ func (m *Monitor) getECSMetadata(ctx context.Context) (meta EcsMetadata, err err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return errors.Errorf("could not get ecs metadata: [%d]: %s", resp.StatusCode, b) } if err = json.NewDecoder(resp.Body).Decode(&meta); err != nil { diff --git a/pkg/ledger/ledger_monitor_test.go b/pkg/ledger/ledger_monitor_test.go index 0eb861d9..5a87ec28 100644 --- a/pkg/ledger/ledger_monitor_test.go +++ b/pkg/ledger/ledger_monitor_test.go @@ -11,7 +11,6 @@ import ( "github.com/pkg/errors" "github.com/segmentio/ctlstore/pkg/ledger" "github.com/segmentio/ctlstore/pkg/ledger/fakes" - _ "github.com/segmentio/events/v2/log" "github.com/stretchr/testify/require" ) diff --git a/pkg/reflector/download.go b/pkg/reflector/download.go index 7e7f958f..c1a972c9 100644 --- a/pkg/reflector/download.go +++ b/pkg/reflector/download.go @@ -13,7 +13,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/segmentio/errors-go" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" "github.com/segmentio/ctlstore/pkg/errs" @@ -69,7 +69,7 @@ func (d *S3Downloader) DownloadTo(w io.Writer) (n int64, err error) { return n, errors.Wrap(err, "copy from s3 to writer") } if compressedSize != nil { - events.Log("LDB inflated %d -> %d bytes", *compressedSize, n) + log.EventLog("LDB inflated %d -> %d bytes", *compressedSize, n) } return diff --git a/pkg/reflector/reflector.go b/pkg/reflector/reflector.go index 71fdaaaa..d001a98d 100644 --- a/pkg/reflector/reflector.go +++ b/pkg/reflector/reflector.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "net/url" "os" "strings" @@ -26,7 +27,7 @@ import ( "github.com/segmentio/ctlstore/pkg/sqlite" "github.com/segmentio/errors-go" "github.com/segmentio/events/v2" - _ "github.com/segmentio/events/v2/log" // lets events actually log + "github.com/segmentio/log" "github.com/segmentio/stats/v4" ) @@ -37,7 +38,7 @@ import ( type Reflector struct { shovel func() (*shovel, error) ldb *sql.DB - logger *events.Logger + logger *slog.Logger upstreamdb *sql.DB ledgerMonitor *ledger.Monitor walMonitor starter @@ -76,7 +77,7 @@ type ReflectorConfig struct { DoMonitorWAL bool // optional BusyTimeoutMS int // optional ID string - Logger *events.Logger + Logger *slog.Logger } type DownloadMetric struct { @@ -106,13 +107,13 @@ var driverNameSequence int64 // ReflectorFromConfig instantiates a Reflector instance using the // configuration specified by a ReflectorConfig instance func ReflectorFromConfig(config ReflectorConfig) (*Reflector, error) { - events.Log("Config: %{config}s", config.Printable()) + log.EventLog("Config: %{config}s", config.Printable()) if config.BootstrapURL != "" { if _, err := os.Stat(config.LDBPath); err != nil { switch { case os.IsNotExist(err): - events.Log("LDB File %{file}s doesn't exist, beginning bootstrap...", config.LDBPath) + log.EventLog("LDB File %{file}s doesn't exist, beginning bootstrap...", config.LDBPath) err = bootstrapLDB(ldbBootstrapConfig{ url: config.BootstrapURL, path: config.LDBPath, @@ -126,7 +127,7 @@ func ReflectorFromConfig(config ReflectorConfig) (*Reflector, error) { return nil, err } } else { - events.Log("LDB File %{file}s exists, skipping bootstrap.", config.LDBPath) + log.EventLog("LDB File %{file}s exists, skipping bootstrap.", config.LDBPath) } } @@ -185,14 +186,14 @@ func ReflectorFromConfig(config ReflectorConfig) (*Reflector, error) { return nil, errors.Wrap(err, "find max seq from ledger") } - events.Log("Max known ledger sequence: %{seq}d", maxKnownSeq) + log.EventLog("Max known ledger sequence: %{seq}d", maxKnownSeq) path := "/var/spool/ctlstore/metrics.json" err = emitMetricFromFile(path) if err != nil { - events.Log("Failed to emit metric from file", err) + log.EventLog("Failed to emit metric from file", err) } - events.Log("Successfully emitted metric from file") + log.EventLog("Successfully emitted metric from file") // TODO: check Upstream fields stop := make(chan struct{}) @@ -226,7 +227,7 @@ func ReflectorFromConfig(config ReflectorConfig) (*Reflector, error) { ldbWriteCallbacks = append(ldbWriteCallbacks, &ldbwriter.ChangelogCallback{ ChangelogWriter: clw, }) - events.Log("Writing changelog to %{path}s", config.ChangelogPath) + log.EventLog("Writing changelog to %{path}s", config.ChangelogPath) } if config.LDBWriteCallback != nil { @@ -245,7 +246,7 @@ func ReflectorFromConfig(config ReflectorConfig) (*Reflector, error) { } lastSeq, err := ldb.FetchSeqFromLdb(context.TODO(), ldbDB) - events.Log("Latest seq from %s: %d", config.ID, lastSeq.Int()) + log.EventLog("Latest seq from %s: %d", config.ID, lastSeq.Int()) if err != nil { return nil, fmt.Errorf("Error when fetching last sequence from LDB: %v", err) } @@ -345,7 +346,7 @@ func emitMetricFromFile(path string) error { func (r *Reflector) Start(ctx context.Context) error { - r.logger.Log("Starting Reflector.") + r.logger.Info("Starting Reflector.") go r.ledgerMonitor.Start(ctx) go r.walMonitor.Start(ctx) for { @@ -355,15 +356,16 @@ func (r *Reflector) Start(ctx context.Context) error { return errors.Wrap(err, "build shovel") } defer shovel.Close() - r.logger.Log("Shoveling...") + r.logger.Info("Shoveling...") stats.Incr("reflector.shovel_start") err = shovel.Start(ctx) return errors.Wrap(err, "shovel") }() switch { case errs.IsCanceled(err): // this is normal + // true if we got a SIGTERM case events.IsTermination(errors.Cause(err)): // this is normal - r.logger.Log("Reflector received termination signal") + r.logger.Info("Reflector received termination signal") case err != nil: switch { case errors.Is("SkippedSequence", err): @@ -373,7 +375,7 @@ func (r *Reflector) Start(ctx context.Context) error { default: errs.Incr("reflector.shovel_error") } - r.logger.Log("Error encountered during shoveling: %{error}+v", err) + r.logger.Error("Error encountered during shoveling", "err", err) } select { case <-r.stop: @@ -393,7 +395,7 @@ func (r *Reflector) Stop() { func (r *Reflector) Close() error { var err error - r.logger.Log("Close() reflector") + r.logger.Info("Close() reflector") err = r.ldb.Close() if err != nil { @@ -425,7 +427,7 @@ func bootstrapLDB(cfg ldbBootstrapConfig) error { shortURL = shortURL[:256] } - events.Log("Bootstrap: %{url}s (region:%{region}q) to %{path}s", shortURL, cfg.region, cfg.path) + log.EventLog("Bootstrap: %{url}s (region:%{region}q) to %{path}s", shortURL, cfg.region, cfg.path) parsed, err := url.Parse(cfg.url) if err != nil { @@ -490,21 +492,21 @@ func bootstrapLDB(cfg ldbBootstrapConfig) error { if err != nil { return err } - events.Log("Bootstrap: Downloaded %{bytes}d bytes", bytes) + log.EventLog("Bootstrap: Downloaded %{bytes}d bytes", bytes) return nil case errors.Is(errs.ErrTypeTemporary, err): incrError("temporary") - events.Log("Temporary error trying to download snapshot: %{error}s", err) + log.EventLog("Temporary error trying to download snapshot: %{error}s", err) delay := cfg.retryDelay if delay == 0 { delay = time.Second } - events.Log("Retrying in %{delay}s", delay) + log.EventLog("Retrying in %{delay}s", delay) time.Sleep(delay) case errors.Is(errs.ErrTypePermanent, err): incrError("permanent") - events.Log("Could not download snapshot: %{error}s", err) - events.Log("Starting with a new LDB") + log.EventLog("Could not download snapshot: %{error}s", err) + log.EventLog("Starting with a new LDB") return nil default: incrError("generic") diff --git a/pkg/reflector/reflector_ctl.go b/pkg/reflector/reflector_ctl.go index bfb35817..d65b5117 100644 --- a/pkg/reflector/reflector_ctl.go +++ b/pkg/reflector/reflector_ctl.go @@ -8,7 +8,7 @@ import ( "github.com/pkg/errors" "github.com/segmentio/ctlstore/pkg/errs" "github.com/segmentio/ctlstore/pkg/utils" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" ) @@ -68,7 +68,7 @@ func (r *ReflectorCtl) Start(ctx context.Context) { Value: "start", }) }() - events.Log("Starting reflector") + log.EventLog("Starting reflector") r.assertNotClosed() r.initLifecycle(ctx) result := make(chan error) @@ -98,7 +98,7 @@ func (r *ReflectorCtl) Stop(ctx context.Context) { Value: "stop", }) }() - events.Log("Stopping reflector") + log.EventLog("Stopping reflector") r.assertNotClosed() r.initLifecycle(ctx) result := make(chan error) @@ -138,7 +138,7 @@ func (r *ReflectorCtl) lifecycle(appCtx context.Context) { select { case <-appCtx.Done(): // the application is quitting - events.Log("reflectorCtl stopping due to context err=%v", appCtx.Err()) + log.EventLog("reflectorCtl stopping due to context err=%v", appCtx.Err()) return case msg := <-r.messages: // another goroutine has asked for the reflector to either diff --git a/pkg/reflector/reflector_test.go b/pkg/reflector/reflector_test.go index 9d8fda93..2e1b3882 100644 --- a/pkg/reflector/reflector_test.go +++ b/pkg/reflector/reflector_test.go @@ -4,26 +4,26 @@ import ( "context" "database/sql" "encoding/base64" - "github.com/stretchr/testify/assert" - "io/ioutil" "os" "path/filepath" "sync/atomic" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/google/go-cmp/cmp" "github.com/segmentio/ctlstore/pkg/ctldb" "github.com/segmentio/ctlstore/pkg/ldb" "github.com/segmentio/ctlstore/pkg/ldbwriter" "github.com/segmentio/ctlstore/pkg/ledger" "github.com/segmentio/errors-go" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/stretchr/testify/require" ) func TestShovelSequenceReset(t *testing.T) { - tmpPath, err := ioutil.TempDir("", "") + tmpPath, err := os.MkdirTemp("", "") require.NoError(t, err) defer os.RemoveAll(tmpPath) @@ -44,7 +44,7 @@ func TestShovelSequenceReset(t *testing.T) { defer ldbDB.Close() ldb.EnsureLdbInitialized(context.TODO(), ldbDB) - emptyLdbContents, err := ioutil.ReadFile(emptyLdbPath) + emptyLdbContents, err := os.ReadFile(emptyLdbPath) require.NoError(t, err) encodedEmpty := base64.URLEncoding.EncodeToString(emptyLdbContents) @@ -66,7 +66,7 @@ func TestShovelSequenceReset(t *testing.T) { LedgerHealth: ledger.HealthConfig{ DisableECSBehavior: true, }, - Logger: events.DefaultLogger, + Logger: log.Default(), } reflector, err := ReflectorFromConfig(cfg) require.NoError(t, err) @@ -116,7 +116,7 @@ func TestShovelSequenceReset(t *testing.T) { } func TestReflector(t *testing.T) { - tmpPath, err := ioutil.TempDir("", "") + tmpPath, err := os.MkdirTemp("", "") if err != nil { t.Fatalf("Encountered unexpected error creating temp path, %v", err) } @@ -133,7 +133,7 @@ func TestReflector(t *testing.T) { defer ldbDB.Close() ldb.EnsureLdbInitialized(context.TODO(), ldbDB) - emptyLdbContents, err := ioutil.ReadFile(emptyLdbPath) + emptyLdbContents, err := os.ReadFile(emptyLdbPath) require.NoError(t, err) encodedEmpty := base64.URLEncoding.EncodeToString(emptyLdbContents) @@ -156,7 +156,7 @@ func TestReflector(t *testing.T) { DisableECSBehavior: true, PollInterval: 10 * time.Second, }, - Logger: events.DefaultLogger, + Logger: log.Default(), } ctx, cancel := context.WithCancel(context.Background()) @@ -195,7 +195,7 @@ func TestReflector(t *testing.T) { go func() { close(waitCh) reflector.Start(ctx) - events.Log("Reflector terminated") + log.EventLog("Reflector terminated") atomic.AddInt64(&isTerminated, 1) }() <-waitCh @@ -205,7 +205,7 @@ func TestReflector(t *testing.T) { cancel() } - clBytes, err := ioutil.ReadFile(changelogPath) + clBytes, err := os.ReadFile(changelogPath) require.NoError(t, err) expectChangelog := "{\"seq\":1,\"family\":\"family1\",\"table\":\"table1234\",\"key\":[{\"name\":\"field1\",\"type\":\"INTEGER\",\"value\":1234}]}\n" diff --git a/pkg/reflector/shovel.go b/pkg/reflector/shovel.go index a2b8d628..19bf3b3c 100644 --- a/pkg/reflector/shovel.go +++ b/pkg/reflector/shovel.go @@ -3,13 +3,14 @@ package reflector import ( "context" "io" + "log/slog" "time" "github.com/segmentio/ctlstore/pkg/errs" "github.com/segmentio/ctlstore/pkg/ldbwriter" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/errors-go" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" ) @@ -23,7 +24,7 @@ type shovel struct { abortOnSeqSkip bool maxSeqOnStartup int64 stop chan struct{} - log *events.Logger + log *slog.Logger } func (s *shovel) Start(ctx context.Context) error { @@ -45,7 +46,7 @@ func (s *shovel) Start(ctx context.Context) error { // early exit here if the shovel should be stopped select { case <-s.stop: - s.logger().Log("Shovel stopping normally") + s.logger().Info("Shovel stopping normally") return nil default: } @@ -80,7 +81,7 @@ func (s *shovel) Start(ctx context.Context) error { // pollSleep := jitr.Jitter(s.pollInterval, s.jitterCoefficient) - s.logger().Debug("Poll sleep %{sleepTime}s", pollSleep) + s.logger().Debug("Poll sleep", "time", pollSleep) select { case <-ctx.Done(): @@ -92,12 +93,12 @@ func (s *shovel) Start(ctx context.Context) error { continue } - s.logger().Debug("Shovel applying %{statement}v", st) + s.logger().Debug("Shovel applying statement", "stmt", st) if lastSeq != 0 { if st.Sequence > lastSeq+1 && st.Sequence.Int() > s.maxSeqOnStartup { stats.Incr("shovel.skipped_sequence") - s.logger().Log("shovel skip sequence from:%{fromSeq}d to:%{toSeq}d", lastSeq, st.Sequence) + s.logger().Info("shovel skip sequence", "from", lastSeq, "to", st.Sequence) if s.abortOnSeqSkip { // Mitigation for a bug that we haven't found yet @@ -134,15 +135,15 @@ func (s *shovel) Close() error { for _, closer := range s.closers { err := closer.Close() if err != nil { - s.logger().Log("shovel encountered error during close: %{error}s", err) + s.logger().Error("shovel encountered error during close", "err", err) } } return nil } -func (s *shovel) logger() *events.Logger { +func (s *shovel) logger() *slog.Logger { if s.log == nil { - s.log = events.DefaultLogger + s.log = log.Default() } return s.log } diff --git a/pkg/reflector/wal_monitor.go b/pkg/reflector/wal_monitor.go index dfa27b62..4011ce9c 100644 --- a/pkg/reflector/wal_monitor.go +++ b/pkg/reflector/wal_monitor.go @@ -6,7 +6,7 @@ import ( "path" "time" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" "github.com/segmentio/ctlstore/pkg/errs" @@ -65,10 +65,10 @@ func NewMonitor(cfg MonitorConfig, checkpointTester checkpointTesterFunc, opts . // Start runs the wal file size check and sqlite checkpoint check on PollInterval's cadence // this method blocks func (m *WALMonitor) Start(ctx context.Context) { - events.Log("WAL monitor starting") - defer events.Log("WAL monitor stopped") + log.EventLog("WAL monitor starting") + defer log.EventLog("WAL monitor stopped") if m.walPath == "" { - events.Log("Not monitoring the WAL because its path is not set") + log.EventLog("Not monitoring the WAL because its path is not set") return } loopCtx, cancel := context.WithCancel(ctx) @@ -84,11 +84,11 @@ func (m *WALMonitor) Start(ctx context.Context) { } size, err := fn(m.walPath) if err != nil { - events.Log("error retrieving wal stat, %s", err) + log.EventLog("error retrieving wal stat, %s", err) failedInARow++ if failedInARow >= m.consecutiveMaxErrors { // cancel to prevent log spamming - events.Log("canceling WAL size monitoring due to consistent error, %s", err) + log.EventLog("canceling WAL size monitoring due to consistent error, %s", err) errs.Incr("reflector.wal_monitor.persistent_stat_error") cancel() } @@ -105,11 +105,11 @@ func (m *WALMonitor) Start(ctx context.Context) { res, err := m.cpTesterFunc() if err != nil { - events.Log("error checking wal's checkpoint status, %s", err) + log.EventLog("error checking wal's checkpoint status, %s", err) failedInARow++ if failedInARow >= m.consecutiveMaxErrors { // cancel to prevent log spamming - events.Log("canceling WAL checkpoint monitoring due to consistent error, %s", err) + log.EventLog("canceling WAL checkpoint monitoring due to consistent error, %s", err) errs.Incr("reflector.wal_monitor.persistent_checkpoint_error") cancel() } diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index bd5ace73..0827a32a 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -16,7 +16,7 @@ import ( "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/pkg/errors" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" "github.com/segmentio/ctlstore/pkg/utils" @@ -83,11 +83,11 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error { var gpr *gzipCompressionReader if strings.HasSuffix(key, ".gz") { - events.Log("Compressing s3 payload with GZIP") + log.EventLog("Compressing s3 payload with GZIP") gpr = newGZIPCompressionReader(reader) reader = gpr } - events.Log("Uploading %{file}s (%d bytes) to %{bucket}s/%{key}s", path, size, c.Bucket, key) + log.EventLog("Uploading %{file}s (%d bytes) to %{bucket}s/%{key}s", path, size, c.Bucket, key) start := time.Now() if err = c.sendToS3(ctx, key, c.Bucket, reader, cs); err != nil { @@ -95,13 +95,13 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error { } stats.Observe("ldb-upload-time", time.Since(start), stats.T("compressed", isCompressed(gpr))) - events.Log("Successfully uploaded %{file}s to %{bucket}s/%{key}s", path, c.Bucket, key) + log.EventLog("Successfully uploaded %{file}s to %{bucket}s/%{key}s", path, c.Bucket, key) if gpr != nil { stats.Set("ldb-size-bytes-compressed", gpr.bytesRead) if size > 0 { ratio := 1 - (float64(gpr.bytesRead) / float64(size)) stats.Set("s3-compression-ratio", ratio) - events.Log("Compression reduced %d -> %d bytes (%0.2f %%)", size, gpr.bytesRead, ratio*100) + log.EventLog("Compression reduced %d -> %d bytes (%0.2f %%)", size, gpr.bytesRead, ratio*100) } } return nil @@ -116,11 +116,11 @@ func getChecksum(path string) (string, error) { h := sha1.New() if _, err := io.Copy(h, f); err != nil { - events.Log("failed to generate sha1 of snapshot", err) + log.EventLog("failed to generate sha1 of snapshot", err) } cs := base64.StdEncoding.EncodeToString(h.Sum(nil)) - events.Log("base64 encoding of sha1: %s", cs) + log.EventLog("base64 encoding of sha1: %s", cs) return cs, nil } @@ -164,9 +164,9 @@ func (c *s3Snapshot) sendToS3(ctx context.Context, key string, bucket string, bo }, }) if err == nil { - events.Log("Wrote to S3 location: %s", output.Location) + log.EventLog("Wrote to S3 location: %s", output.Location) } else { - events.Log("Couldn't upload s3 snapshot to %v:%v. Here's why: %v\n", + log.EventLog("Couldn't upload s3 snapshot to %v:%v. Here's why: %v\n", bucket, key, err) } return errors.Wrap(err, "upload with context") @@ -193,10 +193,10 @@ func archivedSnapshotFromURL(URL string) (archivedSnapshot, error) { } switch parsed.Scheme { case "s3": - events.Log("Using s3 destination for snapshots bucket=%v", parsed.Host) + log.EventLog("Using s3 destination for snapshots bucket=%v", parsed.Host) return &s3Snapshot{Bucket: parsed.Host, Key: parsed.Path}, nil case "file": - events.Log("Using local FS destination for snapshots file=%v", parsed.Path) + log.EventLog("Using local FS destination for snapshots file=%v", parsed.Path) return &localSnapshot{parsed.Path}, nil default: return nil, errors.Errorf("Unknown scheme %s", parsed.Scheme) diff --git a/pkg/supervisor/gzip_pipe_test.go b/pkg/supervisor/gzip_pipe_test.go index f1cf8121..ebfb4b15 100644 --- a/pkg/supervisor/gzip_pipe_test.go +++ b/pkg/supervisor/gzip_pipe_test.go @@ -16,12 +16,12 @@ func TestGZIPPipeReader(t *testing.T) { input := "hello world" var reader io.Reader = strings.NewReader(input) reader = newGZIPCompressionReader(reader) - deflated, err := ioutil.ReadAll(reader) + deflated, err := io.ReadAll(reader) require.NoError(t, err) reader, err = gzip.NewReader(bytes.NewReader(deflated)) require.NoError(t, err) - inflated, err := ioutil.ReadAll(reader) + inflated, err := io.ReadAll(reader) require.NoError(t, err) require.EqualValues(t, input, string(inflated)) } @@ -58,7 +58,7 @@ func TestGZIPPipeReaderErr(t *testing.T) { closeErr: test.closeErr, } reader := newGZIPCompressionReader(fake) - _, err := ioutil.ReadAll(reader) + _, err := io.ReadAll(reader) if test.expected == nil { require.NoError(t, err) } else { @@ -80,7 +80,7 @@ func TestIOPipes(t *testing.T) { // verify that the entire payload is read uncompressed reader = bytes.NewReader(data) - deflated, err := ioutil.ReadAll(reader) + deflated, err := io.ReadAll(reader) require.NoError(t, err) require.Equal(t, bufSize, len(deflated)) @@ -101,13 +101,13 @@ func TestIOPipes(t *testing.T) { }()) }() - deflated, err = ioutil.ReadAll(pr) + deflated, err = io.ReadAll(pr) require.NoError(t, err) require.True(t, len(deflated) < bufSize, "source=%d res=%d", bufSize, len(deflated)) reader, err = gzip.NewReader(bytes.NewReader(deflated)) require.NoError(t, err) - inflated, err := ioutil.ReadAll(reader) + inflated, err := io.ReadAll(reader) require.NoError(t, err) require.EqualValues(t, data, inflated) } diff --git a/pkg/supervisor/s3_snapshot_test.go b/pkg/supervisor/s3_snapshot_test.go index 1edfe81e..1e2f1d82 100644 --- a/pkg/supervisor/s3_snapshot_test.go +++ b/pkg/supervisor/s3_snapshot_test.go @@ -55,7 +55,7 @@ func TestS3SnapshotCompression(t *testing.T) { s3snap.sendToS3Func = func(ctx context.Context, key string, bucket string, body io.Reader) (err error) { sent.key = key sent.bucket = bucket - sent.bytes, err = ioutil.ReadAll(body) + sent.bytes, err = io.ReadAll(body) return } file, err := ioutil.TempFile("", test.name) @@ -72,7 +72,7 @@ func TestS3SnapshotCompression(t *testing.T) { if test.compression { r, err := gzip.NewReader(bytes.NewReader(sent.bytes)) require.NoError(t, err) - b, err := ioutil.ReadAll(r) + b, err := io.ReadAll(r) require.NoError(t, err) require.Equal(t, test.payload, string(b)) } else { diff --git a/pkg/supervisor/supervisor.go b/pkg/supervisor/supervisor.go index 05f1e013..dce5a75e 100644 --- a/pkg/supervisor/supervisor.go +++ b/pkg/supervisor/supervisor.go @@ -10,7 +10,7 @@ import ( "github.com/pkg/errors" "github.com/segmentio/ctlstore/pkg/reflector" - "github.com/segmentio/events/v2" + "github.com/segmentio/log" "github.com/segmentio/stats/v4" ) @@ -59,7 +59,7 @@ func SupervisorFromConfig(config SupervisorConfig) (Supervisor, error) { } func (s *supervisor) snapshot(ctx context.Context) error { - events.Log("Taking a snapshot") + log.EventLog("Taking a snapshot") s.reflectorCtl.Stop(ctx) defer s.reflectorCtl.Start(ctx) if err := s.checkpointLDB(); err != nil { @@ -110,13 +110,13 @@ func (s *supervisor) checkpointLDB() error { if err != nil { return errors.Wrap(err, "locking database") } - events.Log("Acquired write lock on %{srcDb}s", s.LDBPath) + log.EventLog("Acquired write lock on %{srcDb}s", s.LDBPath) _, err = conn.ExecContext(ctx, "COMMIT;") if err != nil { return errors.Wrap(err, "commit") } - events.Log("Released write lock on %{srcDb}s", s.LDBPath) - events.Log("Checkpointed WAL on %{srcDb}s", s.LDBPath) + log.EventLog("Released write lock on %{srcDb}s", s.LDBPath) + log.EventLog("Checkpointed WAL on %{srcDb}s", s.LDBPath) return nil } @@ -126,9 +126,9 @@ func (s *supervisor) incrementSnapshotErrorMetric(value int) { func (s *supervisor) Start(ctx context.Context) { s.incrementSnapshotErrorMetric(0) // initialize the metric since it's sparse - events.Log("Starting supervisor") + log.EventLog("Starting supervisor") s.reflectorCtl.Start(ctx) - defer events.Log("Stopped Supervisor") + defer log.EventLog("Stopped Supervisor") sleepDur := s.SleepDuration for { // Wait for the reflector to make changes to its LDB before stopping it. Sometimes @@ -153,14 +153,14 @@ func (s *supervisor) Start(ctx context.Context) { // reset to default sleepDur = s.SleepDuration case <-ctx.Done(): - events.Log("Supervisor exiting because context done (err=%v)", ctx.Err()) + log.EventLog("Supervisor exiting because context done (err=%v)", ctx.Err()) // Outer context is done, aborting everything return } err := s.snapshot(ctx) if err != nil && errors.Cause(err) != context.Canceled { s.incrementSnapshotErrorMetric(1) - events.Log("Error taking snapshot: %{error}+v", err) + log.EventLog("Error taking snapshot: %{error}+v", err) // Use a shorter sleep duration for faster retries sleepDur = s.BreatheDuration } diff --git a/pkg/supervisor/supervisor_test.go b/pkg/supervisor/supervisor_test.go index 1d85cf54..9c0a7145 100644 --- a/pkg/supervisor/supervisor_test.go +++ b/pkg/supervisor/supervisor_test.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "io/ioutil" "os" "path/filepath" "testing" @@ -35,7 +34,7 @@ func TestSupervisorParsingSnapshotURL(t *testing.T) { } func TestSupervisor(t *testing.T) { - tmpPath, err := ioutil.TempDir("", "") + tmpPath, err := os.MkdirTemp("", "") require.NoError(t, err) defer os.RemoveAll(tmpPath) @@ -166,7 +165,7 @@ func TestSupervisorSnapshotReflectorCtl(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - tmpPath, err := ioutil.TempDir("", "") + tmpPath, err := os.MkdirTemp("", "") require.NoError(t, err) defer os.RemoveAll(tmpPath) diff --git a/pkg/tests/tests.go b/pkg/tests/tests.go index 663cd997..e7721b93 100644 --- a/pkg/tests/tests.go +++ b/pkg/tests/tests.go @@ -3,7 +3,6 @@ package tests import ( "context" "database/sql" - "io/ioutil" "os" "path/filepath" "testing" @@ -15,7 +14,7 @@ import ( ) func WithTmpDir(t testing.TB) (dir string, teardown func()) { - tmpDir, err := ioutil.TempDir("", "") + tmpDir, err := os.MkdirTemp("", "") if err != nil { t.Fatal(err) } diff --git a/pkg/unsafe/unsafe_test.go b/pkg/unsafe/unsafe_test.go index fc2d8ac7..a81bd436 100644 --- a/pkg/unsafe/unsafe_test.go +++ b/pkg/unsafe/unsafe_test.go @@ -1,3 +1,4 @@ +//go:build !race // +build !race package unsafe diff --git a/pkg/utils/interface_slice.go b/pkg/utils/interface_slice.go index 8a8c4752..fcafb61e 100644 --- a/pkg/utils/interface_slice.go +++ b/pkg/utils/interface_slice.go @@ -6,14 +6,13 @@ import "reflect" // // - Returns empty slice if no args are passed // -// - For a single argument which is of a slice type, the slice -// is converted and returned. +// - For a single argument which is of a slice type, the slice +// is converted and returned. // -// - For a single argument which is not a slice type, the value is -// returned within a single-element slice. +// - For a single argument which is not a slice type, the value is +// returned within a single-element slice. // // - For multiple arguments, returns a slice with all the args -// func InterfaceSlice(any ...interface{}) []interface{} { if len(any) == 0 { return []interface{}{} diff --git a/pkg/version/version_go1_12.go b/pkg/version/version_go1_12.go index 329d9156..3058439e 100644 --- a/pkg/version/version_go1_12.go +++ b/pkg/version/version_go1_12.go @@ -1,3 +1,4 @@ +//go:build go1.12 // +build go1.12 package version diff --git a/tools.go b/tools.go index 689ef896..97576b08 100644 --- a/tools.go +++ b/tools.go @@ -1,3 +1,4 @@ +//go:build tools // +build tools package ctlstore