Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EH: Add qconf resolve host group #23

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/simulator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
Expand Down
5 changes: 4 additions & 1 deletion examples/testexample/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ require (
google.golang.org/protobuf v1.35.1
)

require go.uber.org/multierr v1.10.0 // indirect
require (
github.com/goccy/go-json v0.10.3 // indirect
go.uber.org/multierr v1.10.0 // indirect
)
2 changes: 2 additions & 0 deletions examples/testexample/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
Expand Down
78 changes: 69 additions & 9 deletions examples/testexample/testexample.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,34 @@ import (
var qacctClient qacct.QAcct
var qstatClient qstat.QStat

var newlyFinishedJobs <-chan qacct.JobDetail

var log *zap.Logger

func init() {
var err error
log, _ = zap.NewProduction()

qstatClient, err = qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{})
if err != nil {
log.Fatal("Failed to initialize qstat client", zap.String("error",
err.Error()))
}

qacctClient, err = qacct.NewCommandLineQAcct(qacct.CommandLineQAcctConfig{})
if err != nil {
log.Fatal("Failed to initialize qacct client", zap.String("error",
err.Error()))
}
qstatClient, err = qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{})

// watch for newly finished jobs
newlyFinishedJobs, err = qacct.WatchFile(context.Background(),
qacct.GetDefaultQacctFile(), 1024)
if err != nil {
log.Fatal("Failed to initialize qstat client", zap.String("error",
err.Error()))
log.Fatal("Failed to initialize job watcher",
zap.String("error", err.Error()))
}

}

func main() {
Expand All @@ -48,7 +61,7 @@ func run(ctx context.Context) {
log.Info("Context cancelled, stopping ClusterScheduler")
return
default:
finishedJobs, err := GetFinishedJobs()
finishedJobs, err := GetFinishedJobsWithWatcher()
if err != nil {
log.Error("Error getting finished jobs", zap.String("error",
err.Error()))
Expand Down Expand Up @@ -107,10 +120,47 @@ type SimpleJob struct {
MasterNode string `json:"master_node"`
}

func GetFinishedJobsWithWatcher() ([]*SimpleJob, error) {
jobs := []*SimpleJob{}

for {
// get next job or timeout after 0.1s of there is no new job
select {
case fjob := <-newlyFinishedJobs:
state := fmt.Sprintf("%d", fjob.ExitStatus)
if state == "0" {
state = "done"
} else {
state = "failed"
}
simpleJob := SimpleJob{
// ignore job arrays for now
JobId: fmt.Sprintf("%d", fjob.JobNumber),
Cluster: fjob.QName,
JobName: fjob.JobName,
Partition: fjob.GrantedPE,
Account: fjob.Account,
User: fjob.Owner,
State: state,
ExitCode: fmt.Sprintf("%d", fjob.ExitStatus),
Submit: parseTimestampInt64(fjob.SubmitTime),
Start: parseTimestampInt64(fjob.StartTime),
End: parseTimestampInt64(fjob.EndTime),
MasterNode: fjob.HostName,
}
jobs = append(jobs, &simpleJob)
case <-time.After(100 * time.Millisecond):
return jobs, nil
}
}
return jobs, nil
}

func GetFinishedJobs() ([]*SimpleJob, error) {
// Use qacct NativeSpecification to get finished jobs
qacctOutput, err := qacctClient.NativeSpecification([]string{"-j", "*"})
if err != nil {
// no job are command failed
return nil, fmt.Errorf("error running qacct command: %v", err)
}

Expand All @@ -137,9 +187,9 @@ func GetFinishedJobs() ([]*SimpleJob, error) {
User: job.Owner,
State: state,
ExitCode: fmt.Sprintf("%d", job.ExitStatus),
Submit: parseTimestamp(job.QSubTime),
Start: parseTimestamp(job.StartTime),
End: parseTimestamp(job.EndTime),
Submit: parseTimestampInt64(job.SubmitTime),
Start: parseTimestampInt64(job.StartTime),
End: parseTimestampInt64(job.EndTime),
MasterNode: job.HostName,
}
}
Expand All @@ -150,7 +200,8 @@ func GetRunningJobs() ([]*SimpleJob, error) {

qstatOverview, err := qstatClient.NativeSpecification([]string{"-g", "t"})
if err != nil {
return nil, fmt.Errorf("error running qstat command: %v", err)
// no jobs running
return nil, nil
}
jobsByTask, err := qstat.ParseGroupByTask(qstatOverview)
if err != nil {
Expand Down Expand Up @@ -193,7 +244,8 @@ func GetRunningJobs() ([]*SimpleJob, error) {
// get running jobs
qstatOutput, err := qstatClient.NativeSpecification([]string{"-j", "*"})
if err != nil {
return nil, fmt.Errorf("error running qstat command: %v", err)
// no jobs running; qstat -j * found 0 jobs (TODO)
return nil, nil
}

jobs, err := qstat.ParseSchedulerJobInfo(qstatOutput)
Expand Down Expand Up @@ -242,6 +294,14 @@ func SendJobs(ctx context.Context, jobs []*SimpleJob) (int, error) {
return len(jobs), nil
}

func parseTimestampInt64(ts int64) *timestamppb.Timestamp {
// ts is 6 digits behind the second (microseconds)
sec := ts / 1e6
nsec := (ts - sec*1e6) * 1e3
t := time.Unix(sec, nsec)
return timestamppb.New(t)
}

// 2024-10-24 09:49:59.911136
func parseTimestamp(s string) *timestamppb.Timestamp {
loc, err := time.LoadLocation("Local")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/hpc-gridware/go-clusterscheduler
go 1.22.4

require (
github.com/goccy/go-json v0.10.3
github.com/onsi/ginkgo/v2 v2.19.1
github.com/onsi/gomega v1.34.1
go.opentelemetry.io/contrib/bridges/otelslog v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
Expand Down
90 changes: 90 additions & 0 deletions pkg/qacct/v9.0/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package qacct

import (
"bufio"
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"time"

"github.com/goccy/go-json"
)

// DefaultQacctFile returns the path to the default accounting file based
// on the SGE_ROOT and SGE_CELL environment variables.
func GetDefaultQacctFile() string {
sgeRoot := os.Getenv("SGE_ROOT")
sgeCell := os.Getenv("SGE_CELL")
return filepath.Join(sgeRoot, sgeCell, "common", "accounting.jsonl")
}

// WatchFile returns a channel that emits all JobDetail objects from the accounting
// file. It continues to emit JobDetail objects as new lines are added to the file.
// The channel is buffered with the given buffer size.
func WatchFile(ctx context.Context, path string, bufferSize int) (<-chan JobDetail, error) {
if path == "" {
path = GetDefaultQacctFile()
}

file, err := os.OpenFile(path, os.O_RDONLY, 0)
if err != nil {
return nil, fmt.Errorf("failed to open file: %v", err)
}

jobDetailsChan := make(chan JobDetail, bufferSize)

// offset points to the last processed line
var offset int64 = 0

go func() {
defer file.Close()
defer close(jobDetailsChan)

scanner := bufio.NewScanner(file)

for {
if _, err := file.Seek(offset, io.SeekStart); err != nil {
log.Printf("failed to seek to file end: %v", err)
return
}

for scanner.Scan() {
var job JobDetail
line := scanner.Text()
// TODO parsing can be done in parallel
err := json.Unmarshal([]byte(line), &job)
if err != nil {
log.Printf("failed to unmarshal line: %v", err)
continue
}
jobDetailsChan <- job
}

if err := scanner.Err(); err != nil {
log.Printf("JSONL parsing error: %v", err)
return
}

// store processed offset
offset, err = file.Seek(0, io.SeekCurrent)
if err != nil {
log.Printf("failed to get current offset: %v", err)
return
}

// wait a little before re-scanning for new data and reset scanner
select {
case <-ctx.Done():
return
default:
<-time.After(1 * time.Second)
scanner = bufio.NewScanner(file)
}
}
}()

return jobDetailsChan, nil
}
66 changes: 66 additions & 0 deletions pkg/qacct/v9.0/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package qacct_test

import (
"context"
"fmt"
"log"
"slices"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

qacct "github.com/hpc-gridware/go-clusterscheduler/pkg/qacct/v9.0"
qsub "github.com/hpc-gridware/go-clusterscheduler/pkg/qsub/v9.0"
)

var _ = Describe("File", func() {

Context("WatchFile", func() {

It("returns an error when the file does not exist", func() {
_, err := qacct.WatchFile(context.Background(),
"nonexistentfile.txt", 10)
Expect(err).To(HaveOccurred())
})

It("returns a channel that emits JobDetail objects for 10 jobs", func() {

qs, err := qsub.NewCommandLineQSub(qsub.CommandLineQSubConfig{})
Expect(err).NotTo(HaveOccurred())

jobIDs := make([]int, 10)
for i := 0; i < 10; i++ {
jobID, _, err := qs.Submit(context.Background(), qsub.JobOptions{
Command: "echo",
CommandArgs: []string{fmt.Sprintf("job %d", i+1)},
Binary: qsub.ToPtr(true),
})
Expect(err).NotTo(HaveOccurred())
log.Printf("jobID: %d", jobID)
jobIDs[i] = int(jobID)
}

jobDetailsChan, err := qacct.WatchFile(context.Background(),
qacct.GetDefaultQacctFile(), 0)
Expect(err).NotTo(HaveOccurred())
Expect(jobDetailsChan).NotTo(BeNil())

receivedJobs := make(map[int]bool)
Eventually(func() bool {
select {
case jd := <-jobDetailsChan:
log.Printf("job: %+v", jd.JobNumber)
// check if jobID is in the jobIDs list
if slices.Contains(jobIDs, int(jd.JobNumber)) {
Expect(jd.SubmitCommandLine).To(ContainSubstring("echo 'job"))
Expect(jd.JobUsage.Usage.Memory).To(BeNumerically(">=", 0))
receivedJobs[int(jd.JobNumber)] = true
}
default:
return len(receivedJobs) == 10
}
return false
}, "10s").Should(BeTrue())
})
})
})
Loading
Loading