Skip to content

Commit

Permalink
Fixes weaveworks#3864, Fix pod topology for jobs when cronJobs not ex…
Browse files Browse the repository at this point in the history
…ist.
  • Loading branch information
shabicheng committed May 30, 2021
1 parent 5373b5e commit aadeeb0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 13 deletions.
22 changes: 11 additions & 11 deletions probe/kubernetes/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,18 +661,18 @@ func (r *Reporter) podTopology(services []Service, deployments []Deployment, dae
report.MakeCronJobNodeID(cronJob.UID()),
))
}
for _, job := range jobs {
selector, err := job.Selector()
if err != nil {
return pods, err
}
selectors = append(selectors, match(
job.Namespace(),
selector,
report.Job,
report.MakeJobNodeID(job.UID()),
))
}
for _, job := range jobs {
selector, err := job.Selector()
if err != nil {
return pods, err
}
selectors = append(selectors, match(
job.Namespace(),
selector,
report.Job,
report.MakeJobNodeID(job.UID()),
))
}

err := r.client.WalkPods(func(p Pod) error {
Expand Down
55 changes: 53 additions & 2 deletions probe/kubernetes/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"testing"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
k8smeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"

"github.com/bmizerany/assert"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
Expand All @@ -26,6 +28,7 @@ var (
nodeName = "nodename"
pod1UID = "a1b2c3d4e5"
pod2UID = "f6g7h8i9j0"
job1UID = "abcdef1234"
serviceUID = "service1234"
podTypeMeta = metav1.TypeMeta{
Kind: "Pod",
Expand Down Expand Up @@ -72,6 +75,25 @@ var (
NodeName: nodeName,
},
}
apiJob1 = batchv1.Job{
TypeMeta: metav1.TypeMeta{
Kind: "Job",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "pong",
UID: types.UID(job1UID),
Namespace: "ping",
CreationTimestamp: metav1.Now(),
Labels: map[string]string{"ponger": "true"},
},
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"ponger": "true"},
},
},
Status: batchv1.JobStatus{},
}
apiService1 = apiv1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
Expand Down Expand Up @@ -102,20 +124,23 @@ var (
}
pod1 = kubernetes.NewPod(&apiPod1)
pod2 = kubernetes.NewPod(&apiPod2)
job1 = kubernetes.NewJob(&apiJob1)
service1 = kubernetes.NewService(&apiService1)
)

func newMockClient() *mockClient {
return &mockClient{
pods: []kubernetes.Pod{pod1, pod2},
services: []kubernetes.Service{service1},
jobs: []kubernetes.Job{job1},
logs: map[string]io.ReadCloser{},
}
}

type mockClient struct {
pods []kubernetes.Pod
services []kubernetes.Service
jobs []kubernetes.Job
deployments []kubernetes.Deployment
logs map[string]io.ReadCloser
}
Expand Down Expand Up @@ -173,6 +198,11 @@ func (c *mockClient) WalkVolumeSnapshotData(f func(kubernetes.VolumeSnapshotData
return nil
}
func (c *mockClient) WalkJobs(f func(kubernetes.Job) error) error {
for _, job := range c.jobs {
if err := f(job); err != nil {
return err
}
}
return nil
}
func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {}
Expand Down Expand Up @@ -229,6 +259,7 @@ func (c mockPipeClient) PipeClose(appID, id string) error {
func TestReporter(t *testing.T) {
pod1ID := report.MakePodNodeID(pod1UID)
pod2ID := report.MakePodNodeID(pod2UID)
job1ID := report.MakeJobNodeID(job1UID)
serviceID := report.MakeServiceNodeID(serviceUID)
hr := controls.NewDefaultHandlerRegistry()
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "probe-id", "foo", nil, hr, nodeName).Report()
Expand All @@ -237,14 +268,15 @@ func TestReporter(t *testing.T) {
for _, pod := range []struct {
id string
parentService string
parentJob string
latest map[string]string
}{
{pod1ID, serviceID, map[string]string{
{pod1ID, serviceID, job1ID, map[string]string{
kubernetes.Name: "pong-a",
kubernetes.Namespace: "ping",
kubernetes.Created: pod1.Created(),
}},
{pod2ID, serviceID, map[string]string{
{pod2ID, serviceID, job1ID, map[string]string{
kubernetes.Name: "pong-b",
kubernetes.Namespace: "ping",
kubernetes.Created: pod2.Created(),
Expand All @@ -259,12 +291,31 @@ func TestReporter(t *testing.T) {
t.Errorf("Expected pod %s to have parent service %q, got %q", pod.id, pod.parentService, parents)
}

if parents, ok := node.Parents.Lookup(report.Job); !ok || !parents.Contains(pod.parentJob) {
t.Errorf("Expected pod %s to have parent service %q, got %q", pod.id, pod.parentJob, parents)
}

for k, want := range pod.latest {
if have, ok := node.Latest.Lookup(k); !ok || have != want {
t.Errorf("Expected pod %s latest %q: %q, got %q", pod.id, k, want, have)
}
}
}
// Reporter should have added a job
{
node, ok := rpt.Job.Nodes[job1ID]
assert.Equal(t, ok, true)
jobLatest := map[string]string{
kubernetes.Name: "pong",
kubernetes.Namespace: "ping",
kubernetes.Created: job1.Created(),
}
for k, want := range jobLatest {
hava, ok := node.Latest.Lookup(k)
assert.Equal(t, ok, true)
assert.Equal(t, want, hava)
}
}

// Reporter should have added a service
{
Expand Down

0 comments on commit aadeeb0

Please sign in to comment.