Skip to content

Commit

Permalink
feat: Live resource view
Browse files Browse the repository at this point in the history
Signed-off-by: jannfis <[email protected]>
  • Loading branch information
jannfis committed Jan 8, 2025
1 parent d16afb3 commit ad9b02d
Show file tree
Hide file tree
Showing 49 changed files with 3,125 additions and 141 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ setup-e2e2:
.PHONY: start-e2e2
start-e2e2:
hack/dev-env/gen-creds.sh
hack/dev-env/gen-tls.sh
goreman -f hack/dev-env/Procfile.e2e start

.PHONY: test-e2e2
Expand Down
21 changes: 12 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject"
"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/internal/informer"
"github.com/argoproj-labs/argocd-agent/internal/kube"
"github.com/argoproj-labs/argocd-agent/internal/manager"
"github.com/argoproj-labs/argocd-agent/internal/manager/application"
"github.com/argoproj-labs/argocd-agent/internal/manager/appproject"
Expand All @@ -38,7 +39,6 @@ import (
"k8s.io/apimachinery/pkg/watch"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
)

const waitForSyncedDuration = 10 * time.Second
Expand Down Expand Up @@ -66,9 +66,10 @@ type Agent struct {
emitter *event.EventSource
// At present, 'watchLock' is only acquired on calls to 'addAppUpdateToQueue'. This behaviour was added as a short-term attempt to preserve update event ordering. However, this is known to be problematic due to the potential for race conditions, both within itself, and between other event processors like deleteAppCallback.
watchLock sync.RWMutex
version *version.Version

eventWriter *event.EventWriter
version *version.Version
kubeClient *kube.KubernetesClient
}

const defaultQueueName = "default"
Expand All @@ -86,7 +87,7 @@ type AgentOption func(*Agent) error

// NewAgent creates a new agent instance, using the given client interfaces and
// options.
func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace string, opts ...AgentOption) (*Agent, error) {
func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace string, opts ...AgentOption) (*Agent, error) {
a := &Agent{
version: version.New("argocd-agent", "agent"),
}
Expand All @@ -105,6 +106,8 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s
return nil, fmt.Errorf("remote not defined")
}

a.kubeClient = client

// Initial state of the agent is disconnected
a.connected.Store(false)

Expand All @@ -127,10 +130,10 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s

// appListFunc and watchFunc are anonymous functions for the informer
appListFunc := func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) {
return appclient.ArgoprojV1alpha1().Applications(a.namespace).List(ctx, opts)
return client.ApplicationsClientset.ArgoprojV1alpha1().Applications(a.namespace).List(ctx, opts)
}
appWatchFunc := func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return appclient.ArgoprojV1alpha1().Applications(a.namespace).Watch(ctx, opts)
return client.ApplicationsClientset.ArgoprojV1alpha1().Applications(a.namespace).Watch(ctx, opts)
}

appInformerOptions := []informer.InformerOption[*v1alpha1.Application]{
Expand Down Expand Up @@ -160,10 +163,10 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s
}

projListFunc := func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) {
return appclient.ArgoprojV1alpha1().AppProjects(a.namespace).List(ctx, opts)
return client.ApplicationsClientset.ArgoprojV1alpha1().AppProjects(a.namespace).List(ctx, opts)
}
projWatchFunc := func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return appclient.ArgoprojV1alpha1().AppProjects(a.namespace).Watch(ctx, opts)
return client.ApplicationsClientset.ArgoprojV1alpha1().AppProjects(a.namespace).Watch(ctx, opts)
}

projInformerOptions := []informer.InformerOption[*v1alpha1.AppProject]{
Expand All @@ -178,7 +181,7 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s

// The agent only supports Kubernetes as application backend
a.appManager, err = application.NewApplicationManager(
kubeapp.NewKubernetesBackend(appclient, a.namespace, appInformer, true),
kubeapp.NewKubernetesBackend(client.ApplicationsClientset, a.namespace, appInformer, true),
a.namespace,
application.WithAllowUpsert(allowUpsert),
application.WithRole(manager.ManagerRoleAgent),
Expand All @@ -189,7 +192,7 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s
}

a.projectManager, err = appproject.NewAppProjectManager(
kubeappproject.NewKubernetesBackend(appclient, a.namespace, projInformer, true),
kubeappproject.NewKubernetesBackend(client.ApplicationsClientset, a.namespace, projInformer, true),
a.namespace,
appProjectManagerOption...)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,26 @@ import (
"context"
"testing"

fakeappclient "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake"
"github.com/sirupsen/logrus"

"github.com/argoproj-labs/argocd-agent/pkg/client"
"github.com/argoproj-labs/argocd-agent/test/fake/kube"
"github.com/stretchr/testify/require"
)

func newAgent(t *testing.T) *Agent {
t.Helper()
appc := fakeappclient.NewSimpleClientset()
kubec := kube.NewKubernetesFakeClient()
remote, err := client.NewRemote("127.0.0.1", 8080)
require.NoError(t, err)
agent, err := NewAgent(context.TODO(), appc, "argocd", WithRemote(remote))
agent, err := NewAgent(context.TODO(), kubec, "argocd", WithRemote(remote))
require.NoError(t, err)
return agent
}

func Test_NewAgent(t *testing.T) {
appc := fakeappclient.NewSimpleClientset()
agent, err := NewAgent(context.TODO(), appc, "agent", WithRemote(&client.Remote{}))
kubec := kube.NewKubernetesFakeClient()
agent, err := NewAgent(context.TODO(), kubec, "agent", WithRemote(&client.Remote{}))
require.NotNil(t, agent)
require.NoError(t, err)
}
Expand Down
11 changes: 8 additions & 3 deletions agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,18 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error
logCtx.Tracef("Queue shutdown in progress")
return nil
}
logCtx.Tracef("Grabbed an item")
logCtx.Trace("Grabbed an item")
if ev == nil {
// TODO: Is this really the right thing to do?
return nil
}

logCtx.WithField("resource_id", event.ResourceID(ev)).WithField("event_id", event.EventID(ev)).Trace("Adding an event to the event writer")
logCtx = logCtx.WithFields(logrus.Fields{
"event_target": ev.DataSchema(),
"event_type": ev.Type(),
"resource_id": event.ResourceID(ev),
"event_id": event.EventID(ev),
})
logCtx.Trace("Adding an event to the event writer")
a.eventWriter.Add(ev)

return nil
Expand Down
110 changes: 108 additions & 2 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@
package agent

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/argoproj-labs/argocd-agent/internal/backend"
"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)

/*
Expand All @@ -32,20 +39,109 @@ Inbound events are those coming through our gRPC interface, e.g. those that
were received from a server.
*/

const defaultResourceRequestTimeout = 5 * time.Second

func (a *Agent) processIncomingEvent(ev *event.Event) error {
var err error
switch ev.Target() {
case event.TargetApplication:
err = a.processIncomingApplication(ev)
case event.TargetAppProject:
err = a.processIncomingAppProject(ev)
case event.TargetResource:
err = a.processIncomingResourceRequest(ev)
default:
err = fmt.Errorf("unknown event target: %s", ev.Target())
}

return err
}

// processIncomingResourceRequest processes an incoming event that requests
// to retrieve information from the Kubernetes API.
//
// There can be multiple forms of requests. Currently supported are:
//
// - Request for a particular resource, both namespace and cluster scoped
// - Request for a list of resources of a particular kind (e.g. configmaps,
// pods, etc), both namespace and custer scoped
// - Request for a list of available APIs and

func (a *Agent) processIncomingResourceRequest(ev *event.Event) error {
rreq, err := ev.ResourceRequest()
if err != nil {
return err
}
logCtx := log().WithFields(logrus.Fields{
"method": "processIncomingEvents",
"uuid": rreq.UUID,
})
logCtx.Tracef("Start processing %v", rreq)

// TODO(jannfis): The connection to fetch resources should support some
// form of impersonation in the future.

// Create a dynamic kubernetes client and retrieve the resource from the
// cluster.
dynClient, err := dynamic.NewForConfig(a.kubeClient.RestConfig)
if err != nil {
return fmt.Errorf("could not create a dynamic client: %w", err)
}

// Some of GVR may be empty, and that is ok.
gvk := schema.GroupVersionResource{Group: rreq.Group, Version: rreq.Version, Resource: rreq.Resource}
rif := dynClient.Resource(gvk)

ctx, cancel := context.WithTimeout(a.context, defaultResourceRequestTimeout)
defer cancel()

var jsonres []byte
var unres *unstructured.Unstructured
var unlist *unstructured.UnstructuredList
var status error

// If we have a request for a named resource, we fetch that particular
// resource. If the name is empty, we fetch a list of resources instead.
if rreq.Name != "" {
if rreq.Namespace != "" {
unres, err = rif.Namespace(rreq.Namespace).Get(ctx, rreq.Name, v1.GetOptions{})
} else {
unres, err = rif.Get(ctx, rreq.Name, v1.GetOptions{})
}
} else {
if rreq.Namespace != "" {
unlist, err = rif.Namespace(rreq.Namespace).List(ctx, v1.ListOptions{})
} else {
unlist, err = rif.List(ctx, v1.ListOptions{})
}
}
if err != nil {
logCtx.Errorf("could not request resource: %v", err)
status = err
} else {
// Marshal the unstructured resource to JSON for submission
if unres != nil {
jsonres, err = json.Marshal(unres)
} else if unlist != nil {
jsonres, err = json.Marshal(unlist)
}
if err != nil {
return fmt.Errorf("could not marshal resource to json: %w", err)
}
logCtx.Tracef("marshaled resource")
}

q := a.queues.SendQ(a.remote.ClientID())
if q == nil {
logCtx.Error("Remote queue disappeared")
return nil
}
q.Add(a.emitter.NewResourceResponseEvent(rreq.UUID, event.HttpStatusFromError(status), string(jsonres)))
logCtx.Tracef("Emitted resource response")

return nil
}

func (a *Agent) processIncomingApplication(ev *event.Event) error {
logCtx := log().WithFields(logrus.Fields{
"method": "processIncomingEvents",
Expand Down Expand Up @@ -207,7 +303,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
// In modes other than "managed", we don't process new application events
// that are incoming.
if a.mode != types.AgentModeManaged {
logCtx.Trace("Discarding this event, because agent is not in managed mode")
logCtx.Info("Discarding this event, because agent is not in managed mode")
return nil, event.NewEventDiscardedErr("cannot create application: agent is not in managed mode")
}

Expand All @@ -216,7 +312,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
//
// TODO(jannfis): Handle this situation properly instead of throwing an error.
if a.appManager.IsManaged(incoming.QualifiedName()) {
logCtx.Trace("Discarding this event, because application is already managed on this agent")
logCtx.Info("Discarding this event, because application is already managed on this agent")
return nil, event.NewEventDiscardedErr("application %s is already managed", incoming.QualifiedName())
}

Expand All @@ -228,6 +324,11 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
delete(incoming.Annotations, "kubectl.kubernetes.io/last-applied-configuration")
}

// Set target cluster to a sensible value
// TODO(jannfis): Make this actually configurable per agent
incoming.Spec.Destination.Server = ""
incoming.Spec.Destination.Name = "in-cluster"

created, err := a.appManager.Create(a.context, incoming)
if apierrors.IsAlreadyExists(err) {
logCtx.Debug("application already exists")
Expand All @@ -252,6 +353,11 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App
logCtx.Tracef("New resource version: %s", incoming.ResourceVersion)
}

// Set target cluster to a sensible value
// TODO(jannfis): Make this actually configurable per agent
incoming.Spec.Destination.Server = ""
incoming.Spec.Destination.Name = "in-cluster"

logCtx.Infof("Updating application")

var err error
Expand Down
Loading

0 comments on commit ad9b02d

Please sign in to comment.