diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index af0409ef1..f645535ed 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -22,6 +22,7 @@ import ( "os" "sync" + "github.com/containers/nri-plugins/pkg/kubernetes/client" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -32,7 +33,6 @@ import ( nrtapi "github.com/containers/nri-plugins/pkg/agent/nrtapi" "github.com/containers/nri-plugins/pkg/agent/watch" cfgapi "github.com/containers/nri-plugins/pkg/apis/config/v1alpha1" - k8sclient "k8s.io/client-go/kubernetes" logger "github.com/containers/nri-plugins/pkg/log" ) @@ -126,11 +126,10 @@ type Agent struct { kubeConfig string // kubeconfig path configFile string // configuration file to use instead of custom resource - cfgIf ConfigInterface // custom resource access interface - httpCli *http.Client // shared HTTP client - k8sCli *k8sclient.Clientset // kubernetes client - nrtCli *nrtapi.Client // NRT custom resources client - nrtLock sync.Mutex // serialize NRT custom resource updates + cfgIf ConfigInterface // custom resource access interface + k8sCli *client.Client + nrtCli *nrtapi.Client // NRT custom resources client + nrtLock sync.Mutex // serialize NRT custom resource updates notifyFn NotifyFn // config resource change notification callback nodeWatch watch.Interface // kubernetes node watch @@ -267,36 +266,26 @@ func (a *Agent) hasLocalConfig() bool { } func (a *Agent) setupClients() error { + var err error + if a.hasLocalConfig() { return nil } - cfg, err := a.getRESTConfig() + a.k8sCli, err = client.New(client.WithKubeOrInClusterConfig(a.kubeConfig)) if err != nil { return err } - a.httpCli, err = rest.HTTPClientFor(cfg) - if err != nil { - return fmt.Errorf("failed to setup kubernetes HTTP client: %w", err) - } - - a.k8sCli, err = k8sclient.NewForConfigAndClient(cfg, a.httpCli) - if err != nil { - a.cleanupClients() - return fmt.Errorf("failed to setup kubernetes client: %w", err) - } - - restCfg := *cfg - a.nrtCli, err = nrtapi.NewForConfigAndClient(&restCfg, a.httpCli) + a.nrtCli, err = nrtapi.NewForConfigAndClient(a.k8sCli.RestConfig(), a.k8sCli.HttpClient()) if err != nil { a.cleanupClients() return fmt.Errorf("failed to setup NRT client: %w", err) } - restCfg = *cfg - err = a.cfgIf.SetKubeClient(a.httpCli, &restCfg) + err = a.cfgIf.SetKubeClient(a.k8sCli.HttpClient(), a.k8sCli.RestConfig()) if err != nil { + a.cleanupClients() return fmt.Errorf("failed to setup kubernetes config resource client: %w", err) } @@ -304,10 +293,9 @@ func (a *Agent) setupClients() error { } func (a *Agent) cleanupClients() { - if a.httpCli != nil { - a.httpCli.CloseIdleConnections() + if a.k8sCli != nil { + a.k8sCli.Close() } - a.httpCli = nil a.k8sCli = nil a.nrtCli = nil } diff --git a/pkg/kubernetes/client/client.go b/pkg/kubernetes/client/client.go new file mode 100644 index 000000000..0e50a575f --- /dev/null +++ b/pkg/kubernetes/client/client.go @@ -0,0 +1,149 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "net/http" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// Option is an option that can be applied to a Client. +type Option func(*Client) error + +// Client enacapsulates our Kubernetes client. +type Client struct { + cfg *rest.Config + http *http.Client + *kubernetes.Clientset +} + +// GetConfigForFile returns a REST configuration for the given file. +func GetConfigForFile(kubeConfig string) (*rest.Config, error) { + return clientcmd.BuildConfigFromFlags("", kubeConfig) +} + +// InClusterConfig returns the in-cluster REST configuration. +func InClusterConfig() (*rest.Config, error) { + return rest.InClusterConfig() +} + +// WithKubeConfig returns a Client Option for using the given kubeconfig file. +func WithKubeConfig(file string) Option { + return func(c *Client) error { + cfg, err := GetConfigForFile(file) + if err != nil { + return err + } + return WithRestConfig(cfg)(c) + } +} + +// WithInClusterConfig returns a Client Option for using the in-cluster configuration. +func WithInClusterConfig() Option { + return func(c *Client) error { + cfg, err := rest.InClusterConfig() + if err != nil { + return err + } + return WithRestConfig(cfg)(c) + } +} + +// WithKubeOrInClusterConfig returns a Client Option for using in-cluster configuration +// if a configuration file is not given. +func WithKubeOrInClusterConfig(file string) Option { + if file == "" { + return WithInClusterConfig() + } + return WithKubeConfig(file) +} + +// WithRestConfig returns a Client Option for using the given REST configuration. +func WithRestConfig(cfg *rest.Config) Option { + return func(c *Client) error { + c.cfg = cfg + return nil + } +} + +// WithHttpClient returns a Client Option for using/sharing the given HTTP client. +func WithHttpClient(hc *http.Client) Option { + return func(c *Client) error { + c.http = hc + return nil + } +} + +// New creates a new Client with the given options. +func New(options ...Option) (*Client, error) { + c := &Client{} + + for _, o := range options { + if err := o(c); err != nil { + return nil, err + } + } + + if c.cfg == nil { + if err := WithInClusterConfig()(c); err != nil { + return nil, err + } + } + + if c.http == nil { + hc, err := rest.HTTPClientFor(c.cfg) + if err != nil { + return nil, err + } + c.http = hc + } + + client, err := kubernetes.NewForConfigAndClient(c.cfg, c.http) + if err != nil { + return nil, err + } + c.Clientset = client + + return c, nil +} + +// RestConfig returns a shallow copy of the REST configuration of the Client. +func (c *Client) RestConfig() *rest.Config { + cfg := *c.cfg + return &cfg +} + +// HttpClient returns the HTTP client of the Client. +func (c *Client) HttpClient() *http.Client { + return c.http +} + +// K8sClient returns the K8s Clientset of the Client. +func (c *Client) K8sClient() *kubernetes.Clientset { + return c.Clientset +} + +// Close closes the Client. +func (c *Client) Close() { + if c.http != nil { + c.http.CloseIdleConnections() + } + c.cfg = nil + c.http = nil + c.Clientset = nil +} diff --git a/pkg/kubernetes/watch/file.go b/pkg/kubernetes/watch/file.go new file mode 100644 index 000000000..2c5edd151 --- /dev/null +++ b/pkg/kubernetes/watch/file.go @@ -0,0 +1,178 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + "errors" + "io/fs" + "os" + "path" + "path/filepath" + "sync" + + "github.com/fsnotify/fsnotify" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8swatch "k8s.io/apimachinery/pkg/watch" +) + +// FileClient takes care of unmarshaling file content to a target object. +type FileClient interface { + Unmarshal([]byte, string) (runtime.Object, error) +} + +type fileWatch struct { + dir string + file string + client FileClient + fsw *fsnotify.Watcher + resultC chan Event + stopOnce sync.Once + stopC chan struct{} + doneC chan struct{} +} + +// File creates a k8s-compatible watch for the given file. Contents of the +// file are unmarshalled into the object of choice by the client. The file +// is monitored for changes and corresponding watch events are generated. +func File(client FileClient, file string) (Interface, error) { + absPath, err := filepath.Abs(file) + if err != nil { + return nil, err + } + + fsw, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + if err = fsw.Add(filepath.Dir(absPath)); err != nil { + return nil, err + } + + fw := &fileWatch{ + dir: filepath.Dir(absPath), + file: filepath.Base(absPath), + client: client, + fsw: fsw, + resultC: make(chan Event, k8swatch.DefaultChanSize), + stopC: make(chan struct{}), + doneC: make(chan struct{}), + } + + obj, err := fw.readObject() + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return nil, err + } + + fw.sendEvent(Added, obj) + + go fw.run() + + return fw, nil +} + +// Stop stops the watch. +func (w *fileWatch) Stop() { + w.stopOnce.Do(func() { + close(w.stopC) + _ = <-w.doneC + w.stopC = nil + }) +} + +// ResultChan returns the channel for receiving events from the watch. +func (w *fileWatch) ResultChan() <-chan Event { + return w.resultC +} + +func (w *fileWatch) run() { + for { + select { + case <-w.stopC: + w.fsw.Close() + close(w.resultC) + close(w.doneC) + return + + case e, ok := <-w.fsw.Events: + log.Debug("%s got event %+v", w.name(), e) + + if !ok { + w.sendEvent( + Error, + &metav1.Status{ + Status: metav1.StatusFailure, + Message: "failed to receive fsnotify event", + }, + ) + close(w.resultC) + close(w.doneC) + return + } + + if path.Base(e.Name) != w.file { + continue + } + + switch { + case (e.Op & (fsnotify.Create | fsnotify.Write)) != 0: + obj, err := w.readObject() + if err != nil { + log.Debug("%s failed to read/unmarshal: %v", w.name(), err) + continue + } + + if (e.Op & fsnotify.Create) != 0 { + w.sendEvent(Added, obj) + } else { + w.sendEvent(Added, obj) + } + + case (e.Op & (fsnotify.Remove | fsnotify.Rename)) != 0: + w.sendEvent(Deleted, nil) + } + } + } +} + +func (w *fileWatch) sendEvent(t EventType, obj runtime.Object) { + select { + case w.resultC <- Event{Type: t, Object: obj}: + default: + log.Warn("failed to deliver fileWatch %v event", t) + } +} + +func (w *fileWatch) readObject() (runtime.Object, error) { + file := path.Join(w.dir, w.file) + data, err := os.ReadFile(file) + if err != nil { + return nil, err + } + + obj, err := w.client.Unmarshal(data, file) + if err != nil { + return nil, err + } + + log.Debug("%s read object %+v", w.name(), obj) + + return obj, nil +} + +func (w *fileWatch) name() string { + return path.Join("filewatch/path:", path.Join(w.dir, w.file)) +} diff --git a/pkg/kubernetes/watch/object.go b/pkg/kubernetes/watch/object.go new file mode 100644 index 000000000..2170481d1 --- /dev/null +++ b/pkg/kubernetes/watch/object.go @@ -0,0 +1,160 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + "path" + "sync" + "time" + + k8swatch "k8s.io/apimachinery/pkg/watch" +) + +const ( + createDelay = 1 * time.Second +) + +type watch struct { + sync.Mutex + subject string + client ObjectClient + w Interface + resultC chan Event + createC <-chan time.Time + + stopOnce sync.Once + stopC chan struct{} + doneC chan struct{} +} + +// ObjectClient takes care of low-level details of creating a wrapped watch. +type ObjectClient interface { + CreateWatch() (Interface, error) +} + +// Object creates a wrapped watch using the given client. The watch +// is transparently recreated upon expiration and any errors. +func Object(client ObjectClient, subject string) (Interface, error) { + w := &watch{ + subject: subject, + client: client, + resultC: make(chan Event, k8swatch.DefaultChanSize), + stopC: make(chan struct{}), + doneC: make(chan struct{}), + } + + if err := w.create(); err != nil { + return nil, err + } + + go w.run() + + return w, nil +} + +// Stop stops the watch. +func (w *watch) Stop() { + w.stopOnce.Do(func() { + close(w.stopC) + _ = <-w.doneC + w.stop() + w.stopC = nil + }) +} + +// ResultChan returns the channel for receiving events from the watch. +func (w *watch) ResultChan() <-chan Event { + return w.resultC +} + +func (w *watch) eventChan() <-chan Event { + if w.w == nil { + return nil + } + + return w.w.ResultChan() +} + +func (w *watch) run() { + for { + select { + case <-w.stopC: + close(w.resultC) + close(w.doneC) + return + + case e, ok := <-w.eventChan(): + if !ok { + log.Debug("%s failed...", w.name()) + w.stop() + w.scheduleCreate() + continue + } + + if e.Type == Error { + log.Debug("%s failed with an error...", w.name()) + w.stop() + w.scheduleCreate() + continue + } + + select { + case w.resultC <- e: + default: + log.Debug("%s failed to deliver %v event", w.name(), e.Type) + w.stop() + w.scheduleCreate() + } + + case <-w.createC: + w.createC = nil + log.Debug("reopening %s...", w.name()) + if err := w.create(); err != nil { + w.scheduleCreate() + } + } + } +} + +func (w *watch) create() error { + w.stop() + + wif, err := w.client.CreateWatch() + if err != nil { + return err + } + w.w = wif + + return nil +} + +func (w *watch) scheduleCreate() { + if w.createC == nil { + w.createC = time.After(createDelay) + } +} + +func (w *watch) stop() { + if w.w == nil { + return + } + + w.w.Stop() + w.w = nil +} + +func (w *watch) name() string { + return path.Join("watch", w.subject) +} diff --git a/pkg/kubernetes/watch/watch.go b/pkg/kubernetes/watch/watch.go new file mode 100644 index 000000000..095215211 --- /dev/null +++ b/pkg/kubernetes/watch/watch.go @@ -0,0 +1,38 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + logger "github.com/containers/nri-plugins/pkg/log" + k8swatch "k8s.io/apimachinery/pkg/watch" +) + +type ( + Interface = k8swatch.Interface + EventType = k8swatch.EventType + Event = k8swatch.Event +) + +const ( + Added = k8swatch.Added + Modified = k8swatch.Modified + Deleted = k8swatch.Deleted + Bookmark = k8swatch.Bookmark + Error = k8swatch.Error +) + +var ( + log = logger.Get("watch") +)