Skip to content

Commit

Permalink
refactor common 'k8s' package; up cli mod; docs
Browse files Browse the repository at this point in the history
* bump minor-minor versions: 3.21.1 aistore, 1.8.1 CLI
* cmn/k8s - substantial refactoring (part two)
* docs: add 'rotate logs' example

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 7, 2023
1 parent 518fb18 commit 4ce0e0b
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 140 deletions.
1 change: 1 addition & 0 deletions ais/tgtetl.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (t *target) handleETLGet(w http.ResponseWriter, r *http.Request) {
case apc.ETLHealth:
t.healthETL(w, r, apiItems[0])
case apc.ETLMetrics:
k8s.InitMetricsClient()
t.metricsETL(w, r, apiItems[0])
default:
t.writeErrURL(w, r)
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

// direct
require (
github.com/NVIDIA/aistore v1.3.22-0.20231107182908-95194b4118d8
github.com/NVIDIA/aistore v1.3.22-0.20231107185203-518fb18b7844
github.com/fatih/color v1.15.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.22-0.20231107182908-95194b4118d8 h1:XRR43BhmCk3GwxF0d3oYVNtMlgT0jAwt5uqmMwX2D2c=
github.com/NVIDIA/aistore v1.3.22-0.20231107182908-95194b4118d8/go.mod h1:+iSnZg0ovMaLgaT9fLAs2WmYBP7IfeTW1WYkbKrwP4g=
github.com/NVIDIA/aistore v1.3.22-0.20231107185203-518fb18b7844 h1:VFZz9PVWfVt9YFYuE1ADixHY9manzpDQrBRD6mBCZfs=
github.com/NVIDIA/aistore v1.3.22-0.20231107185203-518fb18b7844/go.mod h1:+iSnZg0ovMaLgaT9fLAs2WmYBP7IfeTW1WYkbKrwP4g=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
182 changes: 49 additions & 133 deletions cmn/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,17 @@ package k8s

import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
tcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
metrics "k8s.io/metrics/pkg/client/clientset/versioned"
)

type (
Expand All @@ -37,7 +31,6 @@ type (
Node(name string) (*corev1.Node, error)
Logs(podName string) ([]byte, error)
Health(podName string) (string, error)
Metrics(podName string) (cpuCores float64, freeMem int64, err error)
CheckMetricsAvailability() error
}

Expand All @@ -48,20 +41,58 @@ type (
namespace string
err error
}

metricsClient struct {
client *metrics.Clientset
err error
}
)

var (
_clientOnce sync.Once
_metricsClientOnce sync.Once
_defaultK8sClient *defaultClient
_defaultMetricsClient *metricsClient
_defaultK8sClient *defaultClient
)

func _initClient() {
config, err := rest.InClusterConfig()
if err != nil {
_defaultK8sClient = &defaultClient{err: err}
return
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
_defaultK8sClient = &defaultClient{err: err}
return
}
_defaultK8sClient = &defaultClient{
namespace: _namespace(),
client: client,
config: config,
}
}

// Retrieve pod namespace
// See:
// - https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/clientcmd/client_config.go
// - https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
// - https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod.
func _namespace() (namespace string) {
if namespace = os.Getenv("POD_NAMESPACE"); namespace != "" {
return
}
if ns, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
if namespace = strings.TrimSpace(string(ns)); len(namespace) > 0 {
return
}
}
return "default"
}

func GetClient() (Client, error) {
if _defaultK8sClient.err != nil {
return nil, _defaultK8sClient.err
}
return _defaultK8sClient, nil
}

///////////////////
// defaultClient //
///////////////////

func (c *defaultClient) pods() tcorev1.PodInterface {
return c.client.CoreV1().Pods(c.namespace)
}
Expand Down Expand Up @@ -91,7 +122,7 @@ func (c *defaultClient) Delete(entityType, entityName string) (err error) {
case Svc:
err = c.services().Delete(ctx, entityName, *metav1.NewDeleteOptions(0))
default:
debug.Assertf(false, "unknown entity type: %s", entityType)
debug.Assert(false, "unknown entity type", entityType)
}
return
}
Expand Down Expand Up @@ -123,7 +154,7 @@ func (c *defaultClient) CheckExists(entityType, entityName string) (exists bool,
return false, nil
}
default:
debug.Assertf(false, "unknown entity type: %s", entityType)
debug.Assert(false, "unknown entity type", entityType)
}
return true, nil
}
Expand Down Expand Up @@ -165,118 +196,3 @@ func (c *defaultClient) Health(podName string) (string, error) {
}
return string(response.Status.Phase), nil
}

func (*defaultClient) Metrics(podName string) (cpuCores float64, freeMem int64, err error) {
var (
totalCPU, totalMem int64
fracCPU float64
)

mc, err := getMetricsClient()
if err != nil {
return 0, 0, err
}

msgetter := mc.MetricsV1beta1().PodMetricses(metav1.NamespaceDefault)
ms, err := msgetter.Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
if statusErr, ok := err.(*errors.StatusError); ok && statusErr.Status().Reason == metav1.StatusReasonNotFound {
err = cos.NewErrNotFound("metrics for pod %q", podName)
}
return 0, 0, err
}

for _, metric := range ms.Containers {
cpuNanoCores, ok := metric.Usage.Cpu().AsInt64()
if !ok {
cpuNanoCores = metric.Usage.Cpu().AsDec().UnscaledBig().Int64()
}
totalCPU += cpuNanoCores

memInt, ok := metric.Usage.Memory().AsInt64()
if !ok {
memInt = metric.Usage.Memory().AsDec().UnscaledBig().Int64()
}
totalMem += memInt
}

// Kubernetes reports CPU in nanocores.
// https://godoc.org/k8s.io/api/core/v1#ResourceName
fracCPU = float64(totalCPU) / float64(1_000_000_000)
return fracCPU, totalMem, nil
}

func GetClient() (Client, error) {
_clientOnce.Do(_initClient)
if _defaultK8sClient.err != nil {
return nil, _defaultK8sClient.err
}
return _defaultK8sClient, nil
}

func _initClient() {
config, err := rest.InClusterConfig()
if err != nil {
_defaultK8sClient = &defaultClient{err: err}
return
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
_defaultK8sClient = &defaultClient{err: err}
return
}
_defaultK8sClient = &defaultClient{
namespace: _namespace(),
client: client,
config: config,
}
}

func getMetricsClient() (*metrics.Clientset, error) {
_metricsClientOnce.Do(_initMetricsClient)

if _defaultMetricsClient.err != nil {
return nil, _defaultMetricsClient.err
}
cos.Assert(_defaultMetricsClient.client != nil)
return _defaultMetricsClient.client, nil
}

func _initMetricsClient() {
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
_defaultMetricsClient = &metricsClient{
err: fmt.Errorf("failed to retrieve metrics client config; err: %v", err),
}
return
}

mc, err := metrics.NewForConfig(config)
if err != nil {
_defaultMetricsClient = &metricsClient{
err: fmt.Errorf("failed to create metrics client; err: %v", err),
}
return
}

_defaultMetricsClient = &metricsClient{
client: mc,
}
}

// Retrieve pod namespace
// See:
// - https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/clientcmd/client_config.go
// - https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
// - https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod.
func _namespace() (namespace string) {
if namespace = os.Getenv("POD_NAMESPACE"); namespace != "" {
return
}
if ns, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
if namespace = strings.TrimSpace(string(ns)); len(namespace) > 0 {
return
}
}
return "default"
}
1 change: 1 addition & 0 deletions cmn/k8s/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func Init() {
nodeName = os.Getenv(k8sNodeNameEnv)
podName = os.Getenv(k8sPodNameEnv)
)
_initClient()
client, err := GetClient()
if err != nil {
nlog.Infof("K8s client nil => non-Kubernetes deployment: (%s: %q, %s: %q)", k8sPodNameEnv, podName, k8sNodeNameEnv, nodeName)
Expand Down
91 changes: 91 additions & 0 deletions cmn/k8s/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Package k8s: initialization, client, and misc. helpers
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
*/
package k8s

import (
"context"
"fmt"
"sync"

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
metrics "k8s.io/metrics/pkg/client/clientset/versioned"
)

type metricsClient struct {
client *metrics.Clientset
err error
}

var (
once sync.Once
_mc *metricsClient
)

func InitMetricsClient() { once.Do(_initmc) }

func _initmc() {
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
_mc = &metricsClient{
err: fmt.Errorf("failed to retrieve metrics client config: %w", err),
}
return
}
mc, err := metrics.NewForConfig(config)
if err != nil {
_mc = &metricsClient{
err: fmt.Errorf("failed to create metrics client: %w", err),
}
return
}
_mc = &metricsClient{
client: mc,
}
}

func Metrics(podName string) (float64 /*cores*/, int64 /*mem*/, error) {
var (
totalCPU, totalMem int64
fracCPU float64
)
if _mc.err != nil {
return 0, 0, _mc.err
}
debug.Assert(_mc.client != nil)

var (
mc = _mc.client
msgetter = mc.MetricsV1beta1().PodMetricses(metav1.NamespaceDefault)
ms, err = msgetter.Get(context.Background(), podName, metav1.GetOptions{})
)
if err != nil {
if statusErr, ok := err.(*errors.StatusError); ok && statusErr.Status().Reason == metav1.StatusReasonNotFound {
err = cos.NewErrNotFound("metrics for pod %q", podName)
}
return 0, 0, err
}

for _, metric := range ms.Containers {
cpuNanoCores, ok := metric.Usage.Cpu().AsInt64()
if !ok {
cpuNanoCores = metric.Usage.Cpu().AsDec().UnscaledBig().Int64()
}
totalCPU += cpuNanoCores

memInt, ok := metric.Usage.Memory().AsInt64()
if !ok {
memInt = metric.Usage.Memory().AsDec().UnscaledBig().Int64()
}
totalMem += memInt
}

// Kubernetes reports CPU in nanocores, see https://godoc.org/k8s.io/api/core/v1#ResourceName
fracCPU = float64(totalCPU) / float64(1_000_000_000)
return fracCPU, totalMem, nil
}
2 changes: 1 addition & 1 deletion cmn/nlog/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Flush(action int) {
)

nlog.mw.Lock()
if nlog.file == nil || nlog.pw.length() == 0 {
if nlog.file == nil || (nlog.pw.length() == 0 && action != ActRotate) {
nlog.mw.Unlock()
continue
}
Expand Down
4 changes: 2 additions & 2 deletions cmn/ver_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import "github.com/NVIDIA/aistore/cmn/jsp"
// `jsp` formats its *signature* and other implementation details.

const (
VersionAIStore = "3.21"
VersionCLI = "1.8"
VersionAIStore = "3.21.1"
VersionCLI = "1.8.1"
VersionLoader = "1.9"
VersionAuthN = "1.0"
)
Expand Down
Loading

0 comments on commit 4ce0e0b

Please sign in to comment.