Skip to content

Commit

Permalink
Merge branch 'develop' into bugfix/nil_storage_close
Browse files Browse the repository at this point in the history
  • Loading branch information
xaionaro authored Jun 16, 2023
2 parents 6fbf26a + eca9e1e commit 6efdf11
Showing 1 changed file with 80 additions and 72 deletions.
152 changes: 80 additions & 72 deletions cmds/contest/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,47 +42,55 @@ import (
"github.com/linuxboot/contest/plugins/listeners/httplistener"
)

var (
flagSet *flag.FlagSet
flagDBURI *string
flagListenAddr *string
flagServerID *string
flagProcessTimeout *time.Duration
flagTargetLocker *string
flagInstanceTag *string
flagPauseTimeout *time.Duration
flagResumeJobs *bool
flagTargetLockDuration *time.Duration
type flags struct {
DBURI string
ListenAddr string
ServerID string
ProcessTimeout time.Duration
TargetLocker string
InstanceTag string
PauseTimeout time.Duration
ResumeJobs bool
TargetLockDuration time.Duration
// http logger parameters
flagAdminServerAddr *string
flagHttpLoggerBufferSize *int
flagHttpLoggerMaxBatchSize *int
flagHttpLoggerMaxBatchCount *int
flagHttpLoggerBatchSendFreq *time.Duration
flagHttpLoggerTimeout *time.Duration
logLevel = logger.LevelDebug
)
AdminServerAddr string
HttpLoggerBufferSize int
HttpLoggerMaxBatchSize int
HttpLoggerMaxBatchCount int
HttpLoggerBatchSendFreq time.Duration
HttpLoggerTimeout time.Duration
LogLevel logger.Level
}

func initFlags(cmd string) {
flagSet = flag.NewFlagSet(cmd, flag.ContinueOnError)
flagDBURI = flagSet.String("dbURI", config.DefaultDBURI, "Database URI")
flagListenAddr = flagSet.String("listenAddr", ":8080", "Listen address and port")
flagAdminServerAddr = flagSet.String("adminServerAddr", "", "Addr of the admin server to connect to")
flagHttpLoggerBufferSize = flagSet.Int("loggerBufferSize", loggerhook.DefaultBufferSize, "buffer size for the http logger hook")
flagHttpLoggerMaxBatchSize = flagSet.Int("loggerMaxBatchSize", loggerhook.DefaultMaxBatchSize, "max size (in bytes) of a logs batch to be sent if it reaches/exceeds it")
flagHttpLoggerMaxBatchCount = flagSet.Int("loggerMaxBatchCount", loggerhook.DefaultMaxBatchCount, "max count of logs in a batch")
flagHttpLoggerBatchSendFreq = flagSet.Duration("loggerBatchSendFreq", loggerhook.DefaultBatchSendFreq, "duration that defines the batch sending freq")
flagHttpLoggerTimeout = flagSet.Duration("loggerTimeout", loggerhook.DefaultLogTimeout, "logs send timeout")
flagServerID = flagSet.String("serverID", "", "Set a static server ID, e.g. the host name or another unique identifier. If unset, will use the listener's default")
flagProcessTimeout = flagSet.Duration("processTimeout", api.DefaultEventTimeout, "API request processing timeout")
flagTargetLocker = flagSet.String("targetLocker", "auto", "Target locker implementation to use, \"auto\" follows DBURI setting")
flagInstanceTag = flagSet.String("instanceTag", "", "A tag for this instance. Server will only operate on jobs with this tag and will add this tag to the jobs it creates.")
flagSet.Var(&logLevel, "logLevel", "A log level, possible values: debug, info, warning, error, panic, fatal")
flagPauseTimeout = flagSet.Duration("pauseTimeout", 0, "SIGINT/SIGTERM shutdown timeout (seconds), after which pause will be escalated to cancellaton; -1 - no escalation, 0 - do not pause, cancel immediately")
flagResumeJobs = flagSet.Bool("resumeJobs", false, "Attempt to resume paused jobs")
flagTargetLockDuration = flagSet.Duration("targetLockDuration", config.DefaultTargetLockDuration,
func parseFlags(cmd string, args ...string) (*flags, error) {
f := &flags{
LogLevel: logger.LevelDebug,
}
flagSet := flag.NewFlagSet(cmd, flag.ContinueOnError)
flagSet.StringVar(&f.DBURI, "dbURI", config.DefaultDBURI, "Database URI")
flagSet.StringVar(&f.ListenAddr, "listenAddr", ":8080", "Listen address and port")
flagSet.StringVar(&f.AdminServerAddr, "adminServerAddr", "", "Addr of the admin server to connect to")
flagSet.IntVar(&f.HttpLoggerBufferSize, "loggerBufferSize", loggerhook.DefaultBufferSize, "buffer size for the http logger hook")
flagSet.IntVar(&f.HttpLoggerMaxBatchSize, "loggerMaxBatchSize", loggerhook.DefaultMaxBatchSize, "max size (in bytes) of a logs batch to be sent if it reaches/exceeds it")
flagSet.IntVar(&f.HttpLoggerMaxBatchCount, "loggerMaxBatchCount", loggerhook.DefaultMaxBatchCount, "max count of logs in a batch")
flagSet.DurationVar(&f.HttpLoggerBatchSendFreq, "loggerBatchSendFreq", loggerhook.DefaultBatchSendFreq, "duration that defines the batch sending freq")
flagSet.DurationVar(&f.HttpLoggerTimeout, "loggerTimeout", loggerhook.DefaultLogTimeout, "logs send timeout")
flagSet.StringVar(&f.ServerID, "serverID", "", "Set a static server ID, e.g. the host name or another unique identifier. If unset, will use the listener's default")
flagSet.DurationVar(&f.ProcessTimeout, "processTimeout", api.DefaultEventTimeout, "API request processing timeout")
flagSet.StringVar(&f.TargetLocker, "targetLocker", "auto", "Target locker implementation to use, \"auto\" follows DBURI setting")
flagSet.StringVar(&f.InstanceTag, "instanceTag", "", "A tag for this instance. Server will only operate on jobs with this tag and will add this tag to the jobs it creates.")
flagSet.Var(&f.LogLevel, "logLevel", "A log level, possible values: debug, info, warning, error, panic, fatal")
flagSet.DurationVar(&f.PauseTimeout, "pauseTimeout", 0, "SIGINT/SIGTERM shutdown timeout (seconds), after which pause will be escalated to cancellaton; -1 - no escalation, 0 - do not pause, cancel immediately")
flagSet.BoolVar(&f.ResumeJobs, "resumeJobs", false, "Attempt to resume paused jobs")
flagSet.DurationVar(&f.TargetLockDuration, "targetLockDuration", config.DefaultTargetLockDuration,
"The amount of time target lock is extended by while the job is running. "+
"This is the maximum amount of time a job can stay paused safely.")

if err := flagSet.Parse(args); err != nil {
return nil, err
}

return f, nil
}

var userFunctions = []map[string]interface{}{
Expand Down Expand Up @@ -153,24 +161,24 @@ func registerPlugins(pluginRegistry *pluginregistry.PluginRegistry, pluginConfig

// Main is the main function that executes the ConTest server.
func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.Signal) error {
initFlags(cmd)
if err := flagSet.Parse(args); err != nil {
return err
flags, err := parseFlags(cmd, args...)
if err != nil {
return fmt.Errorf("unable to parse the flags: %w", err)
}

clk := clock.New()

ctx, cancel := context.WithCancel(context.Background())
ctx = logging.WithBelt(ctx, logLevel)
ctx = logging.WithBelt(ctx, flags.LogLevel)

if *flagAdminServerAddr != "" {
if flags.AdminServerAddr != "" {
httpHook, err := loggerhook.NewHttpHook(loggerhook.Config{
Addr: *flagAdminServerAddr,
BufferSize: *flagHttpLoggerBufferSize,
MaxBatchSize: *flagHttpLoggerMaxBatchSize,
MaxBatchCount: *flagHttpLoggerMaxBatchCount,
BatchSendFreq: *flagHttpLoggerBatchSendFreq,
LogTimeout: *flagHttpLoggerTimeout,
Addr: flags.AdminServerAddr,
BufferSize: flags.HttpLoggerBufferSize,
MaxBatchSize: flags.HttpLoggerMaxBatchSize,
MaxBatchCount: flags.HttpLoggerMaxBatchCount,
BatchSendFreq: flags.HttpLoggerBatchSendFreq,
LogTimeout: flags.HttpLoggerTimeout,
})
errmon.ObserveErrorCtx(ctx, err)
if httpHook != nil {
Expand Down Expand Up @@ -203,8 +211,8 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
}()

// primary storage initialization
if *flagDBURI != "" {
primaryDBURI := *flagDBURI
if flags.DBURI != "" {
primaryDBURI := flags.DBURI
log.Infof("Using database URI for primary storage: %s", primaryDBURI)
s, err := rdbms.New(primaryDBURI)
if err != nil {
Expand All @@ -224,7 +232,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.

// replica storage initialization
// pointing to main database for now but can be used to point to replica
replicaDBURI := *flagDBURI
replicaDBURI := flags.DBURI
log.Infof("Using database URI for replica storage: %s", replicaDBURI)
r, err := rdbms.New(replicaDBURI)
if err != nil {
Expand Down Expand Up @@ -261,54 +269,54 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
}

// set Locker engine
if *flagTargetLocker == "auto" {
if *flagDBURI != "" {
*flagTargetLocker = dblocker.Name
if flags.TargetLocker == "auto" {
if flags.DBURI != "" {
flags.TargetLocker = dblocker.Name
} else {
*flagTargetLocker = inmemory.Name
flags.TargetLocker = inmemory.Name
}
log.Infof("Locker engine set to auto, using %s", *flagTargetLocker)
log.Infof("Locker engine set to auto, using %s", flags.TargetLocker)
}
switch *flagTargetLocker {
switch flags.TargetLocker {
case inmemory.Name:
target.SetLocker(inmemory.New(clk))
case dblocker.Name:
if l, err := dblocker.New(*flagDBURI, dblocker.WithClock(clk)); err == nil {
if l, err := dblocker.New(flags.DBURI, dblocker.WithClock(clk)); err == nil {
target.SetLocker(l)
} else {
log.Fatalf("Failed to create locker %q: %v", *flagTargetLocker, err)
log.Fatalf("Failed to create locker %q: %v", flags.TargetLocker, err)
}
default:
log.Fatalf("Invalid target locker name %q", *flagTargetLocker)
log.Fatalf("Invalid target locker name %q", flags.TargetLocker)
}

// spawn JobManager
listener := httplistener.New(*flagListenAddr)
listener := httplistener.New(flags.ListenAddr)

opts := []jobmanager.Option{
jobmanager.APIOption(api.OptionEventTimeout(*flagProcessTimeout)),
jobmanager.APIOption(api.OptionEventTimeout(flags.ProcessTimeout)),
}
if *flagServerID != "" {
opts = append(opts, jobmanager.APIOption(api.OptionServerID(*flagServerID)))
if flags.ServerID != "" {
opts = append(opts, jobmanager.APIOption(api.OptionServerID(flags.ServerID)))
}
if *flagInstanceTag != "" {
opts = append(opts, jobmanager.OptionInstanceTag(*flagInstanceTag))
if flags.InstanceTag != "" {
opts = append(opts, jobmanager.OptionInstanceTag(flags.InstanceTag))
}
if *flagTargetLockDuration != 0 {
opts = append(opts, jobmanager.OptionTargetLockDuration(*flagTargetLockDuration))
if flags.TargetLockDuration != 0 {
opts = append(opts, jobmanager.OptionTargetLockDuration(flags.TargetLockDuration))
}

jm, err := jobmanager.New(listener, pluginRegistry, storageEngineVault, opts...)
if err != nil {
log.Fatalf("%v", err)
}

pauseTimeout := *flagPauseTimeout
pauseTimeout := flags.PauseTimeout

go func() {
intLevel := 0
// cancel immediately if pauseTimeout is zero
if *flagPauseTimeout == 0 {
if flags.PauseTimeout == 0 {
intLevel = 1
}
for {
Expand All @@ -328,7 +336,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
if intLevel == 0 {
log.Infof("Signal %q, pausing jobs", sig)
pause()
if *flagPauseTimeout > 0 {
if flags.PauseTimeout > 0 {
go func() {
select {
case <-ctx.Done():
Expand All @@ -347,7 +355,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
}
}()

err = jm.Run(ctx, *flagResumeJobs)
err = jm.Run(ctx, flags.ResumeJobs)

target.SetLocker(nil)

Expand Down

0 comments on commit 6efdf11

Please sign in to comment.