Skip to content

Commit

Permalink
Use CRC32 instead of comparing flows directly
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo Zanini <[email protected]>
  • Loading branch information
ricardozanini committed Nov 12, 2024
1 parent ce73049 commit 6ece147
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 9 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ type SonataFlowStatus struct {
// Triggers list of triggers created for the SonataFlow
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="triggers"
Triggers []SonataFlowTriggerRef `json:"triggers,omitempty"`
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="flowRevision"
FlowCRC uint32 `json:"flowCRC,omitempty"`
}

// SonataFlowTriggerRef defines a trigger created for the SonataFlow.
Expand Down
3 changes: 3 additions & 0 deletions bundle/manifests/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10050,6 +10050,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the workflow
type: string
flowCRC:
format: int32
type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10050,6 +10050,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the workflow
type: string
flowCRC:
format: int32
type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ spec:
- description: Endpoint is an externally accessible URL of the workflow
displayName: endpoint
path: endpoint
- displayName: flowRevision
path: flowCRC
- displayName: lastTimeRecoverAttempt
path: lastTimeRecoverAttempt
- description: Platform displays which platform is being used by this workflow
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/profiles/common/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"context"
"fmt"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

"k8s.io/client-go/rest"

"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/discovery"
Expand Down Expand Up @@ -56,6 +58,10 @@ func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operat
return false, err
}
workflow.Status.ObservedGeneration = workflow.Generation
workflow.Status.FlowCRC, err = utils.Crc32Checksum(workflow.Spec.Flow)
if err != nil {
return false, err
}
services.SetServiceUrlsInWorkflowStatus(pl, workflow)
if workflow.Status.Platform == nil {
workflow.Status.Platform = &operatorapi.SonataFlowPlatformRef{}
Expand Down
23 changes: 16 additions & 7 deletions internal/controller/profiles/preview/states_preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package preview
import (
"context"
"fmt"
"reflect"
"sort"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -209,7 +209,11 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato
return ctrl.Result{}, nil, err
}

if h.isWorkflowChanged(workflow) { // Let's check that the 2 resWorkflowDef definition are different
hasChanged, err := h.isWorkflowChanged(workflow)
if err != nil {
return ctrl.Result{}, nil, err
}
if hasChanged { // Let's check that the 2 resWorkflowDef definition are different
if err = buildManager.MarkToRestart(build); err != nil {
return ctrl.Result{}, nil, err
}
Expand All @@ -236,12 +240,17 @@ func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, workfl
}

// isWorkflowChanged checks whether the contents of .spec.flow of the given workflow has changed.
func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) bool {
serverlessWorkflow := &operatorapi.SonataFlow{}
if err := h.C.Get(context.TODO(), client.ObjectKeyFromObject(workflow), serverlessWorkflow); err != nil {
klog.V(log.E).ErrorS(err, "unable to retrieve SonataFlow definition")
func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) (bool, error) {
// Added this guard for backward compatibility for workflows deployed with a previous operator version, so we won't kick thousands of builds on users' cluster.
// After this reconciliation cycle, the CRC should be updated
if workflow.Status.FlowCRC == 0 {
return false, nil
}
actualCRC, err := utils.Crc32Checksum(workflow.Spec.Flow)
if err != nil {
return false, err
}
return !reflect.DeepEqual(&serverlessWorkflow.Spec.Flow, &workflow.Spec.Flow)
return actualCRC != workflow.Status.FlowCRC, nil
}

func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx context.Context, workflow *operatorapi.SonataFlow) error {
Expand Down
12 changes: 10 additions & 2 deletions internal/controller/profiles/preview/states_preview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package preview
import (
"testing"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
"github.com/serverlessworkflow/sdk-go/v2/model"
Expand All @@ -29,11 +31,15 @@ import (
func Test_deployWithBuildWorkflowState_isWorkflowChanged(t *testing.T) {
workflow1 := test.GetBaseSonataFlow(t.Name())
workflow2 := test.GetBaseSonataFlow(t.Name())
workflow1.Status.FlowCRC, _ = utils.Crc32Checksum(workflow1.Spec.Flow)
workflow2.Status.FlowCRC, _ = utils.Crc32Checksum(workflow2.Spec.Flow)
deployWithBuildWorkflowState := &deployWithBuildWorkflowState{
StateSupport: &common.StateSupport{C: test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow1).Build()},
}

assert.False(t, deployWithBuildWorkflowState.isWorkflowChanged(workflow2))
hasChanged, err := deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
assert.NoError(t, err)
assert.False(t, hasChanged)

// change workflow2
workflow2.Spec.Flow.Metadata = model.Metadata{
Expand All @@ -42,5 +48,7 @@ func Test_deployWithBuildWorkflowState_isWorkflowChanged(t *testing.T) {
},
}

assert.True(t, deployWithBuildWorkflowState.isWorkflowChanged(workflow2))
hasChanged, err = deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
assert.NoError(t, err)
assert.True(t, hasChanged)
}
3 changes: 3 additions & 0 deletions operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27477,6 +27477,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the workflow
type: string
flowCRC:
format: int32
type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
Expand Down
3 changes: 3 additions & 0 deletions test/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"runtime"
"strings"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
Expand Down Expand Up @@ -71,6 +73,7 @@ func GetSonataFlow(testFile, namespace string) *operatorapi.SonataFlow {
GetKubernetesResource(testFile, ksw)
klog.V(log.D).InfoS("Successfully read KSW", "ksw", spew.Sprint(ksw))
ksw.Namespace = namespace
ksw.Status.FlowCRC, _ = utils.Crc32Checksum(ksw.Spec.Flow)
return ksw
}

Expand Down
33 changes: 33 additions & 0 deletions utils/crc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package utils

import (
"bytes"
"encoding/gob"
"hash/crc32"
)

func Crc32Checksum(v interface{}) (uint32, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(v); err != nil {
return 0, err
}
return crc32.ChecksumIEEE(buf.Bytes()), nil
}

0 comments on commit 6ece147

Please sign in to comment.