Skip to content

Commit

Permalink
perf: add task context map
Browse files Browse the repository at this point in the history
  • Loading branch information
LeeEirc committed Jan 15, 2025
1 parent 136114a commit 7f7cd1a
Showing 1 changed file with 40 additions and 8 deletions.
48 changes: 40 additions & 8 deletions cmd/common/beat_service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"encoding/json"
"sync"
"time"
Expand All @@ -17,7 +18,7 @@ func NewBeatService(apiClient *service.JMService) *BeatService {
return &BeatService{
sessMap: make(map[string]*SessionToken),
apiClient: apiClient,
taskChan: make(chan *model.TerminalTask, 5),
taskChans: make(map[string]*TaskChanContext),
}
}

Expand All @@ -27,12 +28,18 @@ type SessionToken struct {
invalid bool
}

type TaskChanContext struct {
Id string
Ctx context.Context
TaskCh chan *model.TerminalTask
}

type BeatService struct {
sessMap map[string]*SessionToken

apiClient *service.JMService

taskChan chan *model.TerminalTask
taskChans map[string]*TaskChanContext

sync.Mutex
}
Expand Down Expand Up @@ -147,8 +154,16 @@ func (b *BeatService) RemoveSessionId(sid string) {
delete(b.sessMap, sid)
}

func (b *BeatService) GetTerminalTaskChan() <-chan *model.TerminalTask {
return b.taskChan
func (b *BeatService) GetTerminalTaskChan(ctx context.Context) <-chan *model.TerminalTask {
b.Lock()
defer b.Unlock()
taskCtx := &TaskChanContext{
Id: common.UUID(),
Ctx: ctx,
TaskCh: make(chan *model.TerminalTask, 5),
}
b.taskChans[taskCtx.Id] = taskCtx
return taskCtx.TaskCh
}

func (b *BeatService) FinishTask(taskId string) error {
Expand Down Expand Up @@ -178,10 +193,27 @@ func (b *BeatService) KeepCheckTokens() {
}

func (b *BeatService) sendTask(task *model.TerminalTask) {
select {
case b.taskChan <- task:
default:
logger.Errorf("Discard task %v", task)
b.Lock()
defer b.Unlock()
taskChans := make([]string, 0, len(b.taskChans))
for i := range b.taskChans {
taskChanCtx := b.taskChans[i]
select {
case <-taskChanCtx.Ctx.Done():
close(taskChanCtx.TaskCh)
taskChans = append(taskChans, taskChanCtx.Id)
continue
default:

}
select {
case taskChanCtx.TaskCh <- task:
default:
logger.Errorf("Task channel is full, discard task %+v", task)
}
}
for _, id := range taskChans {
delete(b.taskChans, id)
}
}

Expand Down

0 comments on commit 7f7cd1a

Please sign in to comment.