diff --git a/ais/tgtetl.go b/ais/tgtetl.go index 3b203bc472f..2da2d74eb0a 100644 --- a/ais/tgtetl.go +++ b/ais/tgtetl.go @@ -118,6 +118,7 @@ func (t *target) handleETLGet(w http.ResponseWriter, r *http.Request) { case apc.ETLHealth: t.healthETL(w, r, apiItems[0]) case apc.ETLMetrics: + k8s.InitMetricsClient() t.metricsETL(w, r, apiItems[0]) default: t.writeErrURL(w, r) diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index 9e105f83d43..e101f54f43f 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -4,7 +4,7 @@ go 1.21 // direct require ( - github.com/NVIDIA/aistore v1.3.22-0.20231107182908-95194b4118d8 + github.com/NVIDIA/aistore v1.3.22-0.20231107185203-518fb18b7844 github.com/fatih/color v1.15.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index bb451270e5d..49d263e5c60 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.22-0.20231107182908-95194b4118d8 h1:XRR43BhmCk3GwxF0d3oYVNtMlgT0jAwt5uqmMwX2D2c= -github.com/NVIDIA/aistore v1.3.22-0.20231107182908-95194b4118d8/go.mod h1:+iSnZg0ovMaLgaT9fLAs2WmYBP7IfeTW1WYkbKrwP4g= +github.com/NVIDIA/aistore v1.3.22-0.20231107185203-518fb18b7844 h1:VFZz9PVWfVt9YFYuE1ADixHY9manzpDQrBRD6mBCZfs= +github.com/NVIDIA/aistore v1.3.22-0.20231107185203-518fb18b7844/go.mod h1:+iSnZg0ovMaLgaT9fLAs2WmYBP7IfeTW1WYkbKrwP4g= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= diff --git a/cmn/k8s/client.go b/cmn/k8s/client.go index 0fa8eeca343..70ec72bed39 100644 --- a/cmn/k8s/client.go +++ b/cmn/k8s/client.go @@ -6,23 +6,17 @@ package k8s import ( "context" - "fmt" "io" "os" "strings" - "sync" - "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" tcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - metrics "k8s.io/metrics/pkg/client/clientset/versioned" ) type ( @@ -37,7 +31,6 @@ type ( Node(name string) (*corev1.Node, error) Logs(podName string) ([]byte, error) Health(podName string) (string, error) - Metrics(podName string) (cpuCores float64, freeMem int64, err error) CheckMetricsAvailability() error } @@ -48,20 +41,58 @@ type ( namespace string err error } - - metricsClient struct { - client *metrics.Clientset - err error - } ) var ( - _clientOnce sync.Once - _metricsClientOnce sync.Once - _defaultK8sClient *defaultClient - _defaultMetricsClient *metricsClient + _defaultK8sClient *defaultClient ) +func _initClient() { + config, err := rest.InClusterConfig() + if err != nil { + _defaultK8sClient = &defaultClient{err: err} + return + } + client, err := kubernetes.NewForConfig(config) + if err != nil { + _defaultK8sClient = &defaultClient{err: err} + return + } + _defaultK8sClient = &defaultClient{ + namespace: _namespace(), + client: client, + config: config, + } +} + +// Retrieve pod namespace +// See: +// - https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/clientcmd/client_config.go +// - https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ +// - https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod. +func _namespace() (namespace string) { + if namespace = os.Getenv("POD_NAMESPACE"); namespace != "" { + return + } + if ns, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + if namespace = strings.TrimSpace(string(ns)); len(namespace) > 0 { + return + } + } + return "default" +} + +func GetClient() (Client, error) { + if _defaultK8sClient.err != nil { + return nil, _defaultK8sClient.err + } + return _defaultK8sClient, nil +} + +/////////////////// +// defaultClient // +/////////////////// + func (c *defaultClient) pods() tcorev1.PodInterface { return c.client.CoreV1().Pods(c.namespace) } @@ -91,7 +122,7 @@ func (c *defaultClient) Delete(entityType, entityName string) (err error) { case Svc: err = c.services().Delete(ctx, entityName, *metav1.NewDeleteOptions(0)) default: - debug.Assertf(false, "unknown entity type: %s", entityType) + debug.Assert(false, "unknown entity type", entityType) } return } @@ -123,7 +154,7 @@ func (c *defaultClient) CheckExists(entityType, entityName string) (exists bool, return false, nil } default: - debug.Assertf(false, "unknown entity type: %s", entityType) + debug.Assert(false, "unknown entity type", entityType) } return true, nil } @@ -165,118 +196,3 @@ func (c *defaultClient) Health(podName string) (string, error) { } return string(response.Status.Phase), nil } - -func (*defaultClient) Metrics(podName string) (cpuCores float64, freeMem int64, err error) { - var ( - totalCPU, totalMem int64 - fracCPU float64 - ) - - mc, err := getMetricsClient() - if err != nil { - return 0, 0, err - } - - msgetter := mc.MetricsV1beta1().PodMetricses(metav1.NamespaceDefault) - ms, err := msgetter.Get(context.Background(), podName, metav1.GetOptions{}) - if err != nil { - if statusErr, ok := err.(*errors.StatusError); ok && statusErr.Status().Reason == metav1.StatusReasonNotFound { - err = cos.NewErrNotFound("metrics for pod %q", podName) - } - return 0, 0, err - } - - for _, metric := range ms.Containers { - cpuNanoCores, ok := metric.Usage.Cpu().AsInt64() - if !ok { - cpuNanoCores = metric.Usage.Cpu().AsDec().UnscaledBig().Int64() - } - totalCPU += cpuNanoCores - - memInt, ok := metric.Usage.Memory().AsInt64() - if !ok { - memInt = metric.Usage.Memory().AsDec().UnscaledBig().Int64() - } - totalMem += memInt - } - - // Kubernetes reports CPU in nanocores. - // https://godoc.org/k8s.io/api/core/v1#ResourceName - fracCPU = float64(totalCPU) / float64(1_000_000_000) - return fracCPU, totalMem, nil -} - -func GetClient() (Client, error) { - _clientOnce.Do(_initClient) - if _defaultK8sClient.err != nil { - return nil, _defaultK8sClient.err - } - return _defaultK8sClient, nil -} - -func _initClient() { - config, err := rest.InClusterConfig() - if err != nil { - _defaultK8sClient = &defaultClient{err: err} - return - } - client, err := kubernetes.NewForConfig(config) - if err != nil { - _defaultK8sClient = &defaultClient{err: err} - return - } - _defaultK8sClient = &defaultClient{ - namespace: _namespace(), - client: client, - config: config, - } -} - -func getMetricsClient() (*metrics.Clientset, error) { - _metricsClientOnce.Do(_initMetricsClient) - - if _defaultMetricsClient.err != nil { - return nil, _defaultMetricsClient.err - } - cos.Assert(_defaultMetricsClient.client != nil) - return _defaultMetricsClient.client, nil -} - -func _initMetricsClient() { - config, err := clientcmd.BuildConfigFromFlags("", "") - if err != nil { - _defaultMetricsClient = &metricsClient{ - err: fmt.Errorf("failed to retrieve metrics client config; err: %v", err), - } - return - } - - mc, err := metrics.NewForConfig(config) - if err != nil { - _defaultMetricsClient = &metricsClient{ - err: fmt.Errorf("failed to create metrics client; err: %v", err), - } - return - } - - _defaultMetricsClient = &metricsClient{ - client: mc, - } -} - -// Retrieve pod namespace -// See: -// - https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/clientcmd/client_config.go -// - https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ -// - https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod. -func _namespace() (namespace string) { - if namespace = os.Getenv("POD_NAMESPACE"); namespace != "" { - return - } - if ns, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { - if namespace = strings.TrimSpace(string(ns)); len(namespace) > 0 { - return - } - } - return "default" -} diff --git a/cmn/k8s/init.go b/cmn/k8s/init.go index 8a818fc5208..bf018e6217a 100644 --- a/cmn/k8s/init.go +++ b/cmn/k8s/init.go @@ -33,6 +33,7 @@ func Init() { nodeName = os.Getenv(k8sNodeNameEnv) podName = os.Getenv(k8sPodNameEnv) ) + _initClient() client, err := GetClient() if err != nil { nlog.Infof("K8s client nil => non-Kubernetes deployment: (%s: %q, %s: %q)", k8sPodNameEnv, podName, k8sNodeNameEnv, nodeName) diff --git a/cmn/k8s/metrics.go b/cmn/k8s/metrics.go new file mode 100644 index 00000000000..20e69c01890 --- /dev/null +++ b/cmn/k8s/metrics.go @@ -0,0 +1,91 @@ +// Package k8s: initialization, client, and misc. helpers +/* + * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. + */ +package k8s + +import ( + "context" + "fmt" + "sync" + + "github.com/NVIDIA/aistore/cmn/cos" + "github.com/NVIDIA/aistore/cmn/debug" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/clientcmd" + metrics "k8s.io/metrics/pkg/client/clientset/versioned" +) + +type metricsClient struct { + client *metrics.Clientset + err error +} + +var ( + once sync.Once + _mc *metricsClient +) + +func InitMetricsClient() { once.Do(_initmc) } + +func _initmc() { + config, err := clientcmd.BuildConfigFromFlags("", "") + if err != nil { + _mc = &metricsClient{ + err: fmt.Errorf("failed to retrieve metrics client config: %w", err), + } + return + } + mc, err := metrics.NewForConfig(config) + if err != nil { + _mc = &metricsClient{ + err: fmt.Errorf("failed to create metrics client: %w", err), + } + return + } + _mc = &metricsClient{ + client: mc, + } +} + +func Metrics(podName string) (float64 /*cores*/, int64 /*mem*/, error) { + var ( + totalCPU, totalMem int64 + fracCPU float64 + ) + if _mc.err != nil { + return 0, 0, _mc.err + } + debug.Assert(_mc.client != nil) + + var ( + mc = _mc.client + msgetter = mc.MetricsV1beta1().PodMetricses(metav1.NamespaceDefault) + ms, err = msgetter.Get(context.Background(), podName, metav1.GetOptions{}) + ) + if err != nil { + if statusErr, ok := err.(*errors.StatusError); ok && statusErr.Status().Reason == metav1.StatusReasonNotFound { + err = cos.NewErrNotFound("metrics for pod %q", podName) + } + return 0, 0, err + } + + for _, metric := range ms.Containers { + cpuNanoCores, ok := metric.Usage.Cpu().AsInt64() + if !ok { + cpuNanoCores = metric.Usage.Cpu().AsDec().UnscaledBig().Int64() + } + totalCPU += cpuNanoCores + + memInt, ok := metric.Usage.Memory().AsInt64() + if !ok { + memInt = metric.Usage.Memory().AsDec().UnscaledBig().Int64() + } + totalMem += memInt + } + + // Kubernetes reports CPU in nanocores, see https://godoc.org/k8s.io/api/core/v1#ResourceName + fracCPU = float64(totalCPU) / float64(1_000_000_000) + return fracCPU, totalMem, nil +} diff --git a/cmn/nlog/api.go b/cmn/nlog/api.go index a0fc9ec886e..ed7c6b202ac 100644 --- a/cmn/nlog/api.go +++ b/cmn/nlog/api.go @@ -49,7 +49,7 @@ func Flush(action int) { ) nlog.mw.Lock() - if nlog.file == nil || nlog.pw.length() == 0 { + if nlog.file == nil || (nlog.pw.length() == 0 && action != ActRotate) { nlog.mw.Unlock() continue } diff --git a/cmn/ver_const.go b/cmn/ver_const.go index ea00fe60cb6..9bfa6d77cd8 100644 --- a/cmn/ver_const.go +++ b/cmn/ver_const.go @@ -24,8 +24,8 @@ import "github.com/NVIDIA/aistore/cmn/jsp" // `jsp` formats its *signature* and other implementation details. const ( - VersionAIStore = "3.21" - VersionCLI = "1.8" + VersionAIStore = "3.21.1" + VersionCLI = "1.8.1" VersionLoader = "1.9" VersionAuthN = "1.0" ) diff --git a/docs/cli/advanced.md b/docs/cli/advanced.md index e1297c7db06..7cd6e3d6780 100644 --- a/docs/cli/advanced.md +++ b/docs/cli/advanced.md @@ -30,6 +30,7 @@ COMMANDS: remove-from-smap immediately remove node from cluster map (advanced usage - potential data loss!) random-node print random node ID (by default, random target) random-mountpath print a random mountpath from a given target + rotate-logs rotate logs ``` AIS CLI features a number of miscellaneous and advanced-usage commands. @@ -99,3 +100,31 @@ BcnQp8083 0.16% 31.12GiB 8m NnPLp8082 0.16% 31.12GiB 8m MvwQp8080[P] 0.19% 31.12GiB 7m50s ``` + + +## Rotate logs: individual nodes or entire cluster + +Usage: `ais advanced rotate-logs [NODE_ID]` + +Example: + +```console +$ ais show log t[kOktEWrTg] + +Started up at 2023/11/07 18:06:22, host u2204, go1.21.1 for linux/amd64 +W 18:06:22.930488 config:1713 load initial global config "/root/.ais1/ais.json" +... +... +``` + +Now, let's go ahead and rotate: + +```console +$ ais advanced rotate-logs t[kOktEWrTg] +t[kOktEWrTg]: rotated logs + +$ ais show log t[kOktEWrTg] +Rotated at 2023/11/07 18:07:31, host u2204, go1.21.1 for linux/amd64 +Node t[kOktEWrTg], Version 3.21.1.69a90d64b, build time 2023-11-07T18:06:19-0500, debug false, CPUs(16, runtime=16) +... +``` diff --git a/ext/etl/transform.go b/ext/etl/transform.go index 6e22a5bfef9..226f37c9d17 100644 --- a/ext/etl/transform.go +++ b/ext/etl/transform.go @@ -390,7 +390,7 @@ func PodMetrics(t cluster.Target, etlName string) (*CPUMemUsed, error) { if err != nil { return nil, err } - cpuUsed, memUsed, err := client.Metrics(c.PodName()) + cpuUsed, memUsed, err := k8s.Metrics(c.PodName()) if err == nil { return &CPUMemUsed{TargetID: t.SID(), CPU: cpuUsed, Mem: memUsed}, nil }