-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmaster.go
101 lines (90 loc) · 1.63 KB
/
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package pagent
import (
"bufio"
"bytes"
"errors"
"io"
"sync"
"time"
)
type Master struct {
pool map[string]*Worker
mutex sync.Mutex
}
func NewMaster() *Master {
return &Master{
pool: make(map[string]*Worker),
}
}
func (m *Master) GetWorker(id string) *Worker {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.pool == nil {
m.pool = make(map[string]*Worker)
}
if _, ok := m.pool[id]; !ok {
m.pool[id] = NewWorker(id)
m.pool[id].RegMaster(m)
}
return m.pool[id]
}
func (m *Master) RunWorker(id string) error {
m.mutex.Lock()
if m.pool == nil {
m.mutex.Unlock()
return errors.New("m.pool == nil")
}
worker, ok := m.pool[id]
m.mutex.Unlock()
if !ok {
return errors.New("can't find worker id=" + id)
}
worker.Running = true
go func() {
reader := bufio.NewReader(worker.Output())
var buf bytes.Buffer
for {
line, isPrefix, err := reader.ReadLine()
if len(line) > 0 {
buf.Write(line)
//整行
if !isPrefix {
if worker.RunningCall != nil {
worker.RunningCall(id, buf.String())
}
buf.Reset()
}
}
if err == io.EOF {
break
} else if err != nil {
break
}
time.Sleep(time.Millisecond * 20)
}
err := worker.Wait()
if worker.FinishCall != nil {
worker.FinishCall(id, err)
}
worker.Running = false
}()
return nil
}
func (m *Master) DelWorker(id string) error {
m.mutex.Lock()
if worker, ok := m.pool[id]; ok {
m.mutex.Unlock()
worker.Stop()
for {
if !worker.Running {
delete(m.pool, id)
break
}
time.Sleep(20 * time.Millisecond)
}
} else {
m.mutex.Unlock()
return errors.New("can't find worker id=" + id)
}
return nil
}