From 518fb18b7844297106f8baf4c0a5c85bac08d83a Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Tue, 7 Nov 2023 11:27:26 -0500 Subject: [PATCH] rotate logs; dedup api * rotate logs: individual nodes and entire cluster * CLI: `ais advanced rotate-logs` * `api` package: reduce copy/paste, ref Signed-off-by: Alex Aizman --- ais/htrun.go | 4 ++-- ais/proxy.go | 2 ++ ais/prxclu.go | 17 ++++++++++++++--- ais/tgtcp.go | 2 ++ api/apc/actmsg.go | 3 +++ api/cluster.go | 30 +++++++++++++++--------------- api/daemon.go | 33 +++++++++++++++------------------ cmd/aisnode/main.go | 2 +- cmd/aisnodeprofile/main.go | 2 +- cmd/authn/main.go | 4 ++-- cmd/cli/cli/advanced_hdlr.go | 29 +++++++++++++++++++++++++++++ cmd/cli/cli/const.go | 1 + cmd/cli/cli/stats.go | 2 +- cmd/cli/go.mod | 2 +- cmd/cli/go.sum | 4 ++-- cmn/cos/assert.go | 6 +++--- cmn/cos/err.go | 4 ++-- cmn/debug/debug_on.go | 2 +- cmn/nlog/api.go | 22 +++++++++++++--------- stats/common_stats.go | 4 ++-- 20 files changed, 112 insertions(+), 63 deletions(-) diff --git a/ais/htrun.go b/ais/htrun.go index 937d209433a..03edb5e85dc 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -887,8 +887,8 @@ func (h *htrun) bcastAsyncIC(msg *aisMsg) { freeBcArgs(args) } -func (h *htrun) bcastReqGroup(w http.ResponseWriter, r *http.Request, args *bcastArgs, to int) { - args.to = to +func (h *htrun) bcastAllNodes(w http.ResponseWriter, r *http.Request, args *bcastArgs) { + args.to = cluster.AllNodes results := h.bcastGroup(args) for _, res := range results { if res.err != nil { diff --git a/ais/proxy.go b/ais/proxy.go index 329f0a08226..558c3349f0a 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -2695,6 +2695,8 @@ func (p *proxy) httpdaeput(w http.ResponseWriter, r *http.Request) { if err := p.owner.config.resetDaemonConfig(); err != nil { p.writeErr(w, r, err) } + case apc.ActRotateLogs: + nlog.Flush(nlog.ActRotate) case apc.ActResetStats: errorsOnly := msg.Value.(bool) p.statsT.ResetStats(errorsOnly) diff --git a/ais/prxclu.go b/ais/prxclu.go index e6859582709..69cdc5a21b4 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -961,6 +961,8 @@ func (p *proxy) cluputJSON(w http.ResponseWriter, r *http.Request) { } case apc.ActResetConfig: p.resetCluCfgPersistent(w, r, msg) + case apc.ActRotateLogs: + p.rotateLogs(w, r, msg) case apc.ActShutdownCluster: args := allocBcArgs() @@ -997,7 +999,7 @@ func (p *proxy) cluputJSON(w http.ResponseWriter, r *http.Request) { p.statsT.ResetStats(errorsOnly) args := allocBcArgs() args.req = cmn.HreqArgs{Method: http.MethodPut, Path: apc.URLPathDae.S, Body: cos.MustMarshal(msg)} - p.bcastReqGroup(w, r, args, cluster.AllNodes) + p.bcastAllNodes(w, r, args) freeBcArgs(args) case apc.ActXactStart: p.xstart(w, r, msg) @@ -1083,7 +1085,16 @@ func (p *proxy) resetCluCfgPersistent(w http.ResponseWriter, r *http.Request, ms args := allocBcArgs() args.req = cmn.HreqArgs{Method: http.MethodPut, Path: apc.URLPathDae.S, Body: body} - p.bcastReqGroup(w, r, args, cluster.AllNodes) + p.bcastAllNodes(w, r, args) + freeBcArgs(args) +} + +func (p *proxy) rotateLogs(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) { + nlog.Flush(nlog.ActRotate) + body := cos.MustMarshal(msg) + args := allocBcArgs() + args.req = cmn.HreqArgs{Method: http.MethodPut, Path: apc.URLPathDae.S, Body: body} + p.bcastAllNodes(w, r, args) freeBcArgs(args) } @@ -1100,7 +1111,7 @@ func (p *proxy) setCluCfgTransient(w http.ResponseWriter, r *http.Request, toUpd Body: cos.MustMarshal(msg), Query: url.Values{apc.ActTransient: []string{"true"}}, } - p.bcastReqGroup(w, r, args, cluster.AllNodes) + p.bcastAllNodes(w, r, args) freeBcArgs(args) } diff --git a/ais/tgtcp.go b/ais/tgtcp.go index f9525c5c5fc..b0dbdc7fa69 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -166,6 +166,8 @@ func (t *target) daeputJSON(w http.ResponseWriter, r *http.Request) { if err := t.owner.config.resetDaemonConfig(); err != nil { t.writeErr(w, r, err) } + case apc.ActRotateLogs: + nlog.Flush(nlog.ActRotate) case apc.ActResetStats: errorsOnly := msg.Value.(bool) t.statsT.ResetStats(errorsOnly) diff --git a/api/apc/actmsg.go b/api/apc/actmsg.go index dafa9e3a3fe..9671570c415 100644 --- a/api/apc/actmsg.go +++ b/api/apc/actmsg.go @@ -55,10 +55,13 @@ const ( ActPromote = "promote" ActRenameObject = "rename-obj" + // cp (reverse) ActResetStats = "reset-stats" ActResetConfig = "reset-config" ActSetConfig = "set-config" + ActRotateLogs = "rotate-logs" + ActShutdownCluster = "shutdown" // see also: ActShutdownNode // multi-object (via `ListRange`) diff --git a/api/cluster.go b/api/cluster.go index d0b3a79846e..0d6f393d8cd 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -183,19 +183,6 @@ func GetClusterStats(bp BaseParams) (res stats.Cluster, err error) { return } -func ResetClusterStats(bp BaseParams, errorsOnly bool) (err error) { - bp.Method = http.MethodPut - reqParams := AllocRp() - { - reqParams.BaseParams = bp - reqParams.Path = apc.URLPathClu.S - reqParams.Body = cos.MustMarshal(apc.ActMsg{Action: apc.ActResetStats, Value: errorsOnly}) - } - err = reqParams.DoRequest() - FreeRp(reqParams) - return -} - func GetRemoteAIS(bp BaseParams) (remais cluster.Remotes, err error) { bp.Method = http.MethodGet reqParams := AllocRp() @@ -288,14 +275,27 @@ func SetClusterConfigUsingMsg(bp BaseParams, configToUpdate *cmn.ConfigToSet, tr return err } -// ResetClusterConfig resets the configuration of all nodes to the cluster configuration +// zero out: all metrics _or_ only error counters +func ResetClusterStats(bp BaseParams, errorsOnly bool) (err error) { + return _putCluster(bp, apc.ActMsg{Action: apc.ActResetStats, Value: errorsOnly}) +} + +// all nodes: reset configuration to cluster defaults func ResetClusterConfig(bp BaseParams) error { + return _putCluster(bp, apc.ActMsg{Action: apc.ActResetConfig}) +} + +func RotateClusterLogs(bp BaseParams) error { + return _putCluster(bp, apc.ActMsg{Action: apc.ActRotateLogs}) +} + +func _putCluster(bp BaseParams, msg apc.ActMsg) error { bp.Method = http.MethodPut reqParams := AllocRp() { reqParams.BaseParams = bp reqParams.Path = apc.URLPathClu.S - reqParams.Body = cos.MustMarshal(apc.ActMsg{Action: apc.ActResetConfig}) + reqParams.Body = cos.MustMarshal(msg) reqParams.Header = http.Header{cos.HdrContentType: []string{cos.ContentJSON}} } err := reqParams.DoRequest() diff --git a/api/daemon.go b/api/daemon.go index b6a0f77e939..1c2f1290293 100644 --- a/api/daemon.go +++ b/api/daemon.go @@ -193,21 +193,6 @@ func GetDaemonStats(bp BaseParams, node *meta.Snode) (ds *stats.Node, err error) return ds, err } -// see also: ResetClusterStats -func ResetDaemonStats(bp BaseParams, node *meta.Snode, errorsOnly bool) (err error) { - bp.Method = http.MethodPut - reqParams := AllocRp() - { - reqParams.BaseParams = bp - reqParams.Path = apc.URLPathReverseDae.S - reqParams.Body = cos.MustMarshal(apc.ActMsg{Action: apc.ActResetStats, Value: errorsOnly}) - reqParams.Header = http.Header{apc.HdrNodeID: []string{node.ID()}} - } - err = reqParams.DoRequest() - FreeRp(reqParams) - return -} - func GetDiskStats(bp BaseParams, tid string) (res ios.AllDiskStats, err error) { bp.Method = http.MethodGet reqParams := AllocRp() @@ -289,15 +274,27 @@ func SetDaemonConfig(bp BaseParams, nodeID string, nvs cos.StrKVs, transient ... return err } -// ResetDaemonConfig resets the configuration for a specific node to the cluster configuration. -// TODO: revisit access control +// see also: ResetClusterStats +func ResetDaemonStats(bp BaseParams, node *meta.Snode, errorsOnly bool) error { + return _putDaemon(bp, node.ID(), apc.ActMsg{Action: apc.ActResetStats, Value: errorsOnly}) +} + +// reset node's configuration to cluster defaults func ResetDaemonConfig(bp BaseParams, nodeID string) error { + return _putDaemon(bp, nodeID, apc.ActMsg{Action: apc.ActResetConfig}) +} + +func RotateLogs(bp BaseParams, nodeID string) error { + return _putDaemon(bp, nodeID, apc.ActMsg{Action: apc.ActRotateLogs}) +} + +func _putDaemon(bp BaseParams, nodeID string, msg apc.ActMsg) error { bp.Method = http.MethodPut reqParams := AllocRp() { reqParams.BaseParams = bp reqParams.Path = apc.URLPathReverseDae.S - reqParams.Body = cos.MustMarshal(apc.ActMsg{Action: apc.ActResetConfig}) + reqParams.Body = cos.MustMarshal(msg) reqParams.Header = http.Header{ apc.HdrNodeID: []string{nodeID}, cos.HdrContentType: []string{cos.ContentJSON}, diff --git a/cmd/aisnode/main.go b/cmd/aisnode/main.go index f93f929c163..79293643930 100644 --- a/cmd/aisnode/main.go +++ b/cmd/aisnode/main.go @@ -21,6 +21,6 @@ var ( func main() { debug.Assert(build != "", "missing build") ecode := ais.Run(cmn.VersionAIStore+"."+build, buildtime) - nlog.Flush(true) + nlog.Flush(nlog.ActExit) os.Exit(ecode) } diff --git a/cmd/aisnodeprofile/main.go b/cmd/aisnodeprofile/main.go index 65d796281c2..7099df5d3fa 100644 --- a/cmd/aisnodeprofile/main.go +++ b/cmd/aisnodeprofile/main.go @@ -50,7 +50,7 @@ func run() int { } exitCode := ais.Run(cmn.VersionAIStore+"."+build, buildtime) - nlog.Flush(true) + nlog.Flush(nlog.ActExit) if s := *memProfile; s != "" { *memProfile = s + "." + strconv.Itoa(syscall.Getpid()) diff --git a/cmd/authn/main.go b/cmd/authn/main.go index 3f1f5dd4147..5d1fc66a9ed 100644 --- a/cmd/authn/main.go +++ b/cmd/authn/main.go @@ -39,7 +39,7 @@ func init() { func logFlush() { for { time.Sleep(time.Minute) // TODO: must be configurable - nlog.Flush() + nlog.Flush(nlog.ActNone) } } @@ -99,7 +99,7 @@ func main() { srv := newServer(mgr) err = srv.Run() - nlog.Flush(true) + nlog.Flush(nlog.ActExit) cos.Close(mgr.db) if err != nil { cos.ExitLogf("Server failed: %v", err) diff --git a/cmd/cli/cli/advanced_hdlr.go b/cmd/cli/cli/advanced_hdlr.go index 97e1096f3a4..4344914b4bf 100644 --- a/cmd/cli/cli/advanced_hdlr.go +++ b/cmd/cli/cli/advanced_hdlr.go @@ -50,6 +50,13 @@ var ( Action: randMountpath, BashComplete: suggestTargets, }, + { + Name: cmdRotateLogs, + Usage: "rotate logs", + ArgsUsage: optionalNodeIDArgument, + Action: rotateLogs, + BashComplete: suggestAllNodes, + }, }, } ) @@ -132,3 +139,25 @@ func randMountpath(c *cli.Context) error { } return nil } + +func rotateLogs(c *cli.Context) error { + node, sname, err := arg0Node(c) + if err != nil { + return err + } + // 1. node + if node != nil { + if err := api.RotateLogs(apiBP, node.ID()); err != nil { + return V(err) + } + msg := fmt.Sprintf("%s: rotated logs", sname) + actionDone(c, msg) + return nil + } + // 2. or cluster + if err := api.RotateClusterLogs(apiBP); err != nil { + return V(err) + } + actionDone(c, "cluster: rotated all logs") + return nil +} diff --git a/cmd/cli/cli/const.go b/cmd/cli/cli/const.go index f2a7e58951f..4aec95b4b38 100644 --- a/cmd/cli/cli/const.go +++ b/cmd/cli/cli/const.go @@ -48,6 +48,7 @@ const ( cmdRmSmap = "remove-from-smap" cmdRandNode = "random-node" cmdRandMountpath = "random-mountpath" + cmdRotateLogs = "rotate-logs" ) // - 2nd level subcommands (mostly, verbs) diff --git a/cmd/cli/cli/stats.go b/cmd/cli/cli/stats.go index c13df48d970..c95e9ed66d9 100644 --- a/cmd/cli/cli/stats.go +++ b/cmd/cli/cli/stats.go @@ -75,7 +75,7 @@ func fillNodeStatusMap(c *cli.Context, daeType string) (smap *meta.Smap, tstatus wg.Wait() mmc := strings.Split(cmn.VersionAIStore, versionSepa) - debug.Assert(len(mmc) > 2) + debug.Assert(len(mmc) > 1) ok := checkVersionWarn(c, apc.Target, mmc, tstatusMap) if ok && pstatusMap != nil { _ = checkVersionWarn(c, apc.Proxy, mmc, pstatusMap) diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index 39512113222..9e105f83d43 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -4,7 +4,7 @@ go 1.21 // direct require ( - github.com/NVIDIA/aistore v1.3.21 + github.com/NVIDIA/aistore v1.3.22-0.20231107182908-95194b4118d8 github.com/fatih/color v1.15.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index de0279905bb..bb451270e5d 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.21 h1:OB0FC6tOD9o+YZIhJHQ6rLx8OYfyDhZemd/v27WivRk= -github.com/NVIDIA/aistore v1.3.21/go.mod h1:+iSnZg0ovMaLgaT9fLAs2WmYBP7IfeTW1WYkbKrwP4g= +github.com/NVIDIA/aistore v1.3.22-0.20231107182908-95194b4118d8 h1:XRR43BhmCk3GwxF0d3oYVNtMlgT0jAwt5uqmMwX2D2c= +github.com/NVIDIA/aistore v1.3.22-0.20231107182908-95194b4118d8/go.mod h1:+iSnZg0ovMaLgaT9fLAs2WmYBP7IfeTW1WYkbKrwP4g= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= diff --git a/cmn/cos/assert.go b/cmn/cos/assert.go index 84a8b1a553d..6eed0ec0507 100644 --- a/cmn/cos/assert.go +++ b/cmn/cos/assert.go @@ -21,7 +21,7 @@ func Assertf(cond bool, f string, a ...any) { func Assert(cond bool) { if !cond { - nlog.Flush(true) + nlog.Flush(nlog.ActExit) panic(assertMsg) } } @@ -29,14 +29,14 @@ func Assert(cond bool) { // NOTE: when using Sprintf and such, `if (!cond) { AssertMsg(false, msg) }` is the preferable usage. func AssertMsg(cond bool, msg string) { if !cond { - nlog.Flush(true) + nlog.Flush(nlog.ActExit) panic(assertMsg + ": " + msg) } } func AssertNoErr(err error) { if err != nil { - nlog.Flush(true) + nlog.Flush(nlog.ActExit) panic(err) } } diff --git a/cmn/cos/err.go b/cmn/cos/err.go index e789ff99638..811a233ff71 100644 --- a/cmn/cos/err.go +++ b/cmn/cos/err.go @@ -189,7 +189,7 @@ func ExitLogf(f string, a ...any) { msg := fmt.Sprintf(fatalPrefix+f, a...) if flag.Parsed() { nlog.ErrorDepth(1, msg+"\n") - nlog.Flush(true) + nlog.Flush(nlog.ActExit) } _exit(msg) } @@ -198,7 +198,7 @@ func ExitLog(a ...any) { msg := fatalPrefix + fmt.Sprint(a...) if flag.Parsed() { nlog.ErrorDepth(1, msg+"\n") - nlog.Flush(true) + nlog.Flush(nlog.ActExit) } _exit(msg) } diff --git a/cmn/debug/debug_on.go b/cmn/debug/debug_on.go index 401b5b4cea2..323384435b7 100644 --- a/cmn/debug/debug_on.go +++ b/cmn/debug/debug_on.go @@ -58,7 +58,7 @@ func _panic(a ...any) { } if flag.Parsed() { nlog.Errorln(buffer.String()) - nlog.Flush(true) + nlog.Flush(nlog.ActExit) } else { fmt.Fprintln(os.Stderr, buffer.String()) } diff --git a/cmn/nlog/api.go b/cmn/nlog/api.go index 0746fb1e881..a0fc9ec886e 100644 --- a/cmn/nlog/api.go +++ b/cmn/nlog/api.go @@ -12,10 +12,14 @@ import ( "github.com/NVIDIA/aistore/cmn/mono" ) -var ( - MaxSize int64 = 4 * 1024 * 1024 +const ( + ActNone = iota + ActExit + ActRotate ) +var MaxSize int64 = 4 * 1024 * 1024 // usually, config.log.max_size + func InitFlags(flset *flag.FlagSet) { flset.BoolVar(&toStderr, "logtostderr", false, "log to standard error instead of files") flset.BoolVar(&alsoToStderr, "alsologtostderr", false, "log to standard error as well as files") @@ -36,11 +40,8 @@ func SetTitle(s string) { title = s } func InfoLogName() string { return sname() + ".INFO" } func ErrLogName() string { return sname() + ".ERROR" } -func Flush(exit ...bool) { - var ( - ex = len(exit) > 0 && exit[0] - now = mono.NanoTime() - ) +func Flush(action int) { + now := mono.NanoTime() for _, sev := range []severity{sevInfo, sevErr} { var ( nlog = nlogs[sev] @@ -52,7 +53,7 @@ func Flush(exit ...bool) { nlog.mw.Unlock() continue } - if ex || nlog.pw.avail() < maxLineSize || nlog.since(now) > 10*time.Second { + if action > ActNone || nlog.pw.avail() < maxLineSize || nlog.since(now) > 10*time.Second { nlog.toFlush = append(nlog.toFlush, nlog.pw) nlog.get() } @@ -62,7 +63,10 @@ func Flush(exit ...bool) { if oob { nlog.flush() } - if ex { + if action == ActRotate { + nlog.rotate(time.Now()) + } + if action == ActExit { nlog.file.Sync() nlog.file.Close() } diff --git a/stats/common_stats.go b/stats/common_stats.go index 0ddd57ce2ad..46b8893e517 100644 --- a/stats/common_stats.go +++ b/stats/common_stats.go @@ -778,7 +778,7 @@ waitStartup: deadline = time.Hour nlog.Infoln(r.Name() + ": standing by...") - nlog.Flush() + nlog.Flush(nlog.ActNone) continue } j += sleep @@ -828,7 +828,7 @@ waitStartup: flushTime = config.Log.FlushTime.D() } if nlog.Since() > flushTime || nlog.OOB() { - nlog.Flush() + nlog.Flush(nlog.ActNone) } now = mono.NanoTime()