diff --git a/api/v1alpha08/sonataflow_types.go b/api/v1alpha08/sonataflow_types.go index 01452cc49..3371e5164 100644 --- a/api/v1alpha08/sonataflow_types.go +++ b/api/v1alpha08/sonataflow_types.go @@ -200,6 +200,8 @@ type SonataFlowStatus struct { // Triggers list of triggers created for the SonataFlow //+operator-sdk:csv:customresourcedefinitions:type=status,displayName="triggers" Triggers []SonataFlowTriggerRef `json:"triggers,omitempty"` + //+operator-sdk:csv:customresourcedefinitions:type=status,displayName="flowRevision" + FlowCRC uint32 `json:"flowCRC,omitempty"` } // SonataFlowTriggerRef defines a trigger created for the SonataFlow. diff --git a/bundle/manifests/sonataflow.org_sonataflows.yaml b/bundle/manifests/sonataflow.org_sonataflows.yaml index 911f64e2d..44d568593 100644 --- a/bundle/manifests/sonataflow.org_sonataflows.yaml +++ b/bundle/manifests/sonataflow.org_sonataflows.yaml @@ -10050,6 +10050,9 @@ spec: endpoint: description: Endpoint is an externally accessible URL of the workflow type: string + flowCRC: + format: int32 + type: integer lastTimeRecoverAttempt: format: date-time type: string diff --git a/config/crd/bases/sonataflow.org_sonataflows.yaml b/config/crd/bases/sonataflow.org_sonataflows.yaml index aed84f8c2..36be93c84 100644 --- a/config/crd/bases/sonataflow.org_sonataflows.yaml +++ b/config/crd/bases/sonataflow.org_sonataflows.yaml @@ -10050,6 +10050,9 @@ spec: endpoint: description: Endpoint is an externally accessible URL of the workflow type: string + flowCRC: + format: int32 + type: integer lastTimeRecoverAttempt: format: date-time type: string diff --git a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml index df687131a..0aea8f407 100644 --- a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml @@ -239,6 +239,8 @@ spec: - description: Endpoint is an externally accessible URL of the workflow displayName: endpoint path: endpoint + - displayName: flowRevision + path: flowCRC - displayName: lastTimeRecoverAttempt path: lastTimeRecoverAttempt - description: Platform displays which platform is being used by this workflow diff --git a/internal/controller/profiles/common/reconciler.go b/internal/controller/profiles/common/reconciler.go index dba18439a..82637c4dd 100644 --- a/internal/controller/profiles/common/reconciler.go +++ b/internal/controller/profiles/common/reconciler.go @@ -23,6 +23,8 @@ import ( "context" "fmt" + "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + "k8s.io/client-go/rest" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/discovery" @@ -56,6 +58,10 @@ func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operat return false, err } workflow.Status.ObservedGeneration = workflow.Generation + workflow.Status.FlowCRC, err = utils.Crc32Checksum(workflow.Spec.Flow) + if err != nil { + return false, err + } services.SetServiceUrlsInWorkflowStatus(pl, workflow) if workflow.Status.Platform == nil { workflow.Status.Platform = &operatorapi.SonataFlowPlatformRef{} diff --git a/internal/controller/profiles/preview/profile_preview_test.go b/internal/controller/profiles/preview/profile_preview_test.go index aed417bda..99c1677b2 100644 --- a/internal/controller/profiles/preview/profile_preview_test.go +++ b/internal/controller/profiles/preview/profile_preview_test.go @@ -181,7 +181,7 @@ func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) { assert.Equal(t, serviceMonitor.Spec.Endpoints[0].Path, "/q/metrics") } -func Test_GenerationAnnotationCheck(t *testing.T) { +func Test_WorkflowChangedCheck(t *testing.T) { // we load a workflow with metadata.generation to 0 workflow := test.GetBaseSonataFlow(t.Name()) platform := test.GetBasePlatformInReadyPhase(t.Name()) @@ -199,15 +199,14 @@ func Test_GenerationAnnotationCheck(t *testing.T) { assert.NotNil(t, result) assert.Len(t, objects, 3) - // then we load a workflow with metadata.generation set to 1 + // then we load the current workflow workflowChanged := &operatorapi.SonataFlow{} err = client.Get(context.TODO(), clientruntime.ObjectKeyFromObject(workflow), workflowChanged) assert.NoError(t, err) - //we set the generation to 1 - workflowChanged.Generation = int64(1) - err = client.Update(context.TODO(), workflowChanged) - assert.NoError(t, err) - // reconcile + //we change something within the flow + workflowChanged.Spec.Flow.AutoRetries = true + + // reconcile -> the one in the k8s DB is different, so there's a change. handler = &deployWithBuildWorkflowState{ StateSupport: fakeReconcilerSupport(client), ensurers: NewObjectEnsurers(&common.StateSupport{C: client}), diff --git a/internal/controller/profiles/preview/states_preview.go b/internal/controller/profiles/preview/states_preview.go index c5480d44d..88031a7ab 100644 --- a/internal/controller/profiles/preview/states_preview.go +++ b/internal/controller/profiles/preview/states_preview.go @@ -24,6 +24,7 @@ import ( "fmt" "sort" + "github.com/apache/incubator-kie-kogito-serverless-operator/utils" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" @@ -40,7 +41,6 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/log" - kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" ) @@ -209,7 +209,11 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato return ctrl.Result{}, nil, err } - if h.isWorkflowChanged(workflow) { // Let's check that the 2 resWorkflowDef definition are different + hasChanged, err := h.isWorkflowChanged(workflow) + if err != nil { + return ctrl.Result{}, nil, err + } + if hasChanged { // Let's check that the 2 resWorkflowDef definition are different if err = buildManager.MarkToRestart(build); err != nil { return ctrl.Result{}, nil, err } @@ -235,13 +239,18 @@ func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, workfl return h.cleanupOutdatedRevisions(ctx, workflow) } -// isWorkflowChanged marks the workflow status as unknown to require a new build reconciliation -func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) bool { - generation := kubeutil.GetLastGeneration(workflow.Namespace, workflow.Name, h.C, context.TODO()) - if generation > workflow.Status.ObservedGeneration { - return true +// isWorkflowChanged checks whether the contents of .spec.flow of the given workflow has changed. +func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) (bool, error) { + // Added this guard for backward compatibility for workflows deployed with a previous operator version, so we won't kick thousands of builds on users' cluster. + // After this reconciliation cycle, the CRC should be updated + if workflow.Status.FlowCRC == 0 { + return false, nil } - return false + actualCRC, err := utils.Crc32Checksum(workflow.Spec.Flow) + if err != nil { + return false, err + } + return actualCRC != workflow.Status.FlowCRC, nil } func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx context.Context, workflow *operatorapi.SonataFlow) error { diff --git a/internal/controller/profiles/preview/states_preview_test.go b/internal/controller/profiles/preview/states_preview_test.go new file mode 100644 index 000000000..43a26de91 --- /dev/null +++ b/internal/controller/profiles/preview/states_preview_test.go @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package preview + +import ( + "testing" + + "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common" + "github.com/apache/incubator-kie-kogito-serverless-operator/test" + "github.com/serverlessworkflow/sdk-go/v2/model" + "github.com/stretchr/testify/assert" +) + +func Test_deployWithBuildWorkflowState_isWorkflowChanged(t *testing.T) { + workflow1 := test.GetBaseSonataFlow(t.Name()) + workflow2 := test.GetBaseSonataFlow(t.Name()) + workflow1.Status.FlowCRC, _ = utils.Crc32Checksum(workflow1.Spec.Flow) + workflow2.Status.FlowCRC, _ = utils.Crc32Checksum(workflow2.Spec.Flow) + deployWithBuildWorkflowState := &deployWithBuildWorkflowState{ + StateSupport: &common.StateSupport{C: test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow1).Build()}, + } + + hasChanged, err := deployWithBuildWorkflowState.isWorkflowChanged(workflow2) + assert.NoError(t, err) + assert.False(t, hasChanged) + + // change workflow2 + workflow2.Spec.Flow.Metadata = model.Metadata{ + "string": model.Object{ + StringValue: "test", + }, + } + + hasChanged, err = deployWithBuildWorkflowState.isWorkflowChanged(workflow2) + assert.NoError(t, err) + assert.True(t, hasChanged) +} diff --git a/operator.yaml b/operator.yaml index b45f2fe7e..88fa16823 100644 --- a/operator.yaml +++ b/operator.yaml @@ -27477,6 +27477,9 @@ spec: endpoint: description: Endpoint is an externally accessible URL of the workflow type: string + flowCRC: + format: int32 + type: integer lastTimeRecoverAttempt: format: date-time type: string diff --git a/test/yaml.go b/test/yaml.go index 752f05620..6f2ffa959 100644 --- a/test/yaml.go +++ b/test/yaml.go @@ -27,6 +27,8 @@ import ( "runtime" "strings" + "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/log" @@ -71,6 +73,7 @@ func GetSonataFlow(testFile, namespace string) *operatorapi.SonataFlow { GetKubernetesResource(testFile, ksw) klog.V(log.D).InfoS("Successfully read KSW", "ksw", spew.Sprint(ksw)) ksw.Namespace = namespace + ksw.Status.FlowCRC, _ = utils.Crc32Checksum(ksw.Spec.Flow) return ksw } diff --git a/utils/crc.go b/utils/crc.go new file mode 100644 index 000000000..a262f3ee6 --- /dev/null +++ b/utils/crc.go @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package utils + +import ( + "bytes" + "encoding/gob" + "hash/crc32" +) + +func Crc32Checksum(v interface{}) (uint32, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(v); err != nil { + return 0, err + } + return crc32.ChecksumIEEE(buf.Bytes()), nil +} diff --git a/utils/kubernetes/annotations.go b/utils/kubernetes/annotations.go index 24d9ba4ab..91d4927a8 100644 --- a/utils/kubernetes/annotations.go +++ b/utils/kubernetes/annotations.go @@ -20,33 +20,11 @@ package kubernetes import ( - "context" "strconv" - "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" - - operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/apache/incubator-kie-kogito-serverless-operator/log" ) -func getWorkflow(namespace string, name string, c client.Client, ctx context.Context) *operatorapi.SonataFlow { - serverlessWorkflowType := &operatorapi.SonataFlow{} - serverlessWorkflowType.Namespace = namespace - serverlessWorkflowType.Name = name - serverlessWorkflow := &operatorapi.SonataFlow{} - if err := c.Get(ctx, client.ObjectKeyFromObject(serverlessWorkflowType), serverlessWorkflow); err != nil { - klog.V(log.E).ErrorS(err, "unable to retrieve SonataFlow definition") - } - return serverlessWorkflow -} - -func GetLastGeneration(namespace string, name string, c client.Client, ctx context.Context) int64 { - workflow := getWorkflow(namespace, name, c, ctx) - return workflow.Generation -} - // GetAnnotationAsBool returns the boolean value from the given annotation. // If the annotation is not present or is there an error in the ParseBool conversion, returns false. func GetAnnotationAsBool(object client.Object, key string) bool {