From 12a1cbf55baf5dba0d936a78b9860d3f22a3b025 Mon Sep 17 00:00:00 2001 From: MegaByte875 Date: Fri, 23 Feb 2024 14:14:21 +0800 Subject: [PATCH] support ssl flags reset (#449) --- config/samples/nebularestore-s3.yaml | 4 +-- config/samples/restore-pod.yaml | 4 +-- pkg/controller/component/graphd_cluster.go | 2 +- pkg/controller/component/helper.go | 16 +++++++-- pkg/controller/component/metad_cluster.go | 34 +------------------ pkg/controller/component/storaged_cluster.go | 24 ++++++++----- .../nebulacluster/nebula_cluster_control.go | 33 ++++++++++++++++++ 7 files changed, 68 insertions(+), 49 deletions(-) diff --git a/config/samples/nebularestore-s3.yaml b/config/samples/nebularestore-s3.yaml index 95837e0d..1c23257a 100644 --- a/config/samples/nebularestore-s3.yaml +++ b/config/samples/nebularestore-s3.yaml @@ -4,8 +4,8 @@ metadata: name: aws-s3-secret type: Opaque data: - access-key: - secret-key: + access_key: + secret_key: --- apiVersion: apps.nebula-graph.io/v1alpha1 kind: NebulaRestore diff --git a/config/samples/restore-pod.yaml b/config/samples/restore-pod.yaml index ba169b66..30ab07ae 100644 --- a/config/samples/restore-pod.yaml +++ b/config/samples/restore-pod.yaml @@ -4,8 +4,8 @@ metadata: name: aws-s3-secret type: Opaque data: - access-key: - secret-key: + access_key: + secret_key: --- apiVersion: v1 kind: Pod diff --git a/pkg/controller/component/graphd_cluster.go b/pkg/controller/component/graphd_cluster.go index 77773962..b46125a9 100644 --- a/pkg/controller/component/graphd_cluster.go +++ b/pkg/controller/component/graphd_cluster.go @@ -192,7 +192,7 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error { return err } endpoints := nc.GetGraphdEndpoints(v1alpha1.GraphdPortNameHTTP) - if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations()); err != nil { + if err := updateDynamicFlags(endpoints, oldWorkload.GetAnnotations(), newWorkload.GetAnnotations()); err != nil { return fmt.Errorf("update graphd cluster %s dynamic flags failed: %v", newWorkload.GetName(), err) } } diff --git a/pkg/controller/component/helper.go b/pkg/controller/component/helper.go index ce9e9089..16dcceae 100644 --- a/pkg/controller/component/helper.go +++ b/pkg/controller/component/helper.go @@ -224,7 +224,7 @@ func staticOrStartupFlags(config map[string]string) map[string]string { return static } -func updateDynamicFlags(endpoints []string, newAnnotations map[string]string) error { +func updateDynamicFlags(endpoints []string, oldAnnotations, newAnnotations map[string]string) error { newFlags := make(map[string]string) newFlagsVal, ok := newAnnotations[annotation.AnnLastAppliedDynamicFlagsKey] if ok { @@ -232,9 +232,21 @@ func updateDynamicFlags(endpoints []string, newAnnotations map[string]string) er return err } } - if len(newFlags) == 0 { + oldFlags := make(map[string]string) + oldFlagsVal, ok := oldAnnotations[annotation.AnnLastAppliedDynamicFlagsKey] + if ok { + if err := json.Unmarshal([]byte(oldFlagsVal), &oldFlags); err != nil { + return err + } + } + if len(newFlags) == 0 && len(oldFlags) == 0 { return nil } + _, removed := maputil.IntersectionDifference(oldFlags, newFlags) + if len(removed) > 0 { + maputil.ResetMap(removed, v1alpha1.DynamicFlags) + newFlags = maputil.MergeStringMaps(true, newFlags, removed) + } klog.V(1).Infof("dynamic flags: %v", newFlags) str, err := codec.Encode(newFlags) if err != nil { diff --git a/pkg/controller/component/metad_cluster.go b/pkg/controller/component/metad_cluster.go index d9ad09d6..26438286 100644 --- a/pkg/controller/component/metad_cluster.go +++ b/pkg/controller/component/metad_cluster.go @@ -25,11 +25,9 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/klog/v2" - "github.com/vesoft-inc/nebula-go/v3/nebula/meta" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" "github.com/vesoft-inc/nebula-operator/pkg/kube" - "github.com/vesoft-inc/nebula-operator/pkg/nebula" "github.com/vesoft-inc/nebula-operator/pkg/util/discovery" utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" "github.com/vesoft-inc/nebula-operator/pkg/util/extender" @@ -183,12 +181,8 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error { } if equal && nc.MetadComponent().IsReady() { - if err := c.setVersion(nc); err != nil { - return err - } - endpoints := nc.GetMetadEndpoints(v1alpha1.MetadPortNameHTTP) - if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations()); err != nil { + if err := updateDynamicFlags(endpoints, oldWorkload.GetAnnotations(), newWorkload.GetAnnotations()); err != nil { return fmt.Errorf("update metad cluster %s dynamic flags failed: %v", newWorkload.GetName(), err) } } @@ -228,32 +222,6 @@ func (c *metadCluster) syncMetadPVC(nc *v1alpha1.NebulaCluster) error { return syncPVC(nc.MetadComponent(), c.clientSet.PVC()) } -func (c *metadCluster) setVersion(nc *v1alpha1.NebulaCluster) error { - options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) - if err != nil { - return err - } - endpoints := []string{nc.GetMetadThriftConnAddress()} - metaClient, err := nebula.NewMetaClient(endpoints, options...) - if err != nil { - return err - } - defer func() { - _ = metaClient.Disconnect() - }() - - hosts, err := metaClient.ListHosts(meta.ListHostType_META) - if err != nil { - return err - } - for _, host := range hosts { - version := host.Version - nc.Status.Version = string(version) - break - } - return nil -} - type FakeMetadCluster struct { err error } diff --git a/pkg/controller/component/storaged_cluster.go b/pkg/controller/component/storaged_cluster.go index 577d95e6..f7bc35cf 100644 --- a/pkg/controller/component/storaged_cluster.go +++ b/pkg/controller/component/storaged_cluster.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/utils/pointer" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" @@ -251,14 +252,19 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error oVal, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartPodOrdinal] if ok { ordinal, err := strconv.Atoi(oVal) - if err != nil { - return err - } - if err := c.updateManager.RestartPod(nc, int32(ordinal)); err != nil { - return err - } - if err := c.updateManager.Balance(nc); err != nil { - return err + if err == nil { + if ordinal >= 0 && ordinal < int(pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0)) { + if err := c.updateManager.RestartPod(nc, int32(ordinal)); err != nil { + return err + } + if err := c.updateManager.Balance(nc); err != nil { + return err + } + } else { + klog.Errorf("restart pod with invalid ordinal %s, ignored", oVal) + } + } else { + klog.Errorf("convert pod ordinal failed: %v", err) } } @@ -268,7 +274,7 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error if equal && nc.StoragedComponent().IsReady() { endpoints := nc.GetStoragedEndpoints(v1alpha1.StoragedPortNameHTTP) - if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations()); err != nil { + if err := updateDynamicFlags(endpoints, oldWorkload.GetAnnotations(), newWorkload.GetAnnotations()); err != nil { return fmt.Errorf("update storaged cluster %s dynamic flags failed: %v", newWorkload.GetName(), err) } diff --git a/pkg/controller/nebulacluster/nebula_cluster_control.go b/pkg/controller/nebulacluster/nebula_cluster_control.go index 056253ef..b0e86525 100644 --- a/pkg/controller/nebulacluster/nebula_cluster_control.go +++ b/pkg/controller/nebulacluster/nebula_cluster_control.go @@ -19,6 +19,8 @@ package nebulacluster import ( "context" + "github.com/vesoft-inc/nebula-go/v3/nebula/meta" + "github.com/vesoft-inc/nebula-operator/pkg/nebula" apiequality "k8s.io/apimachinery/pkg/api/equality" errorutils "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" @@ -162,6 +164,11 @@ func (c *defaultNebulaClusterControl) updateNebulaCluster(nc *v1alpha1.NebulaClu return err } + if err := setClusterVersion(nc); err != nil { + klog.Errorf("set cluster version failed: %v", err) + return err + } + if err := c.exporter.Reconcile(nc); err != nil { klog.Errorf("reconcile exporter failed: %v", err) return err @@ -185,6 +192,32 @@ func (c *defaultNebulaClusterControl) updateNebulaCluster(nc *v1alpha1.NebulaClu return nil } +func setClusterVersion(nc *v1alpha1.NebulaCluster) error { + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return err + } + endpoints := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(endpoints, options...) + if err != nil { + return err + } + defer func() { + _ = metaClient.Disconnect() + }() + + hosts, err := metaClient.ListHosts(meta.ListHostType_META) + if err != nil { + return err + } + for _, host := range hosts { + version := host.Version + nc.Status.Version = string(version) + break + } + return nil +} + type FakeClusterControl struct { err error }