-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnfsmon.go
158 lines (141 loc) · 4.08 KB
/
nfsmon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Package nfsmon provides simple monitoring for nfs mounts. When a stale mount
// is detected, the remountFunc is called.
package nfsmon
import (
"context"
"sync"
"syscall"
"time"
)
type (
// Mount defines a mount to watch.
Mount struct {
Server string // Server defines the server mounted from.
ServerPath string // ServerPath defines the location of the mount on the server.
DestPath string // DestPath defines the path to mount/monitor.
MountOpts string // MountOpts defines the mount options to use when mounting.
ExtraData interface{} // ExtraData contains extra data relevant to the remount function.
// todo: maybe instead of ExtraData, define a remountFunc per mount.
}
)
var (
// remountFunc gets called when a mount is detected as having gone stale.
remountFunc func(m Mount) error
remountFuncTex = &sync.RWMutex{}
// errCondition is checked by Watch. If true, Watch calls remountFunc. Default returns
// true if stale nfs mount detected.
errCondition func(error) bool
errConditionTex = &sync.RWMutex{}
// mounts define which mounts to watch.
mounts = []Mount{}
mTex = &sync.RWMutex{}
)
// WatchMount adds a mount to the list of mounts to watch.
func WatchMount(m Mount) {
mTex.Lock()
defer mTex.Unlock()
for i := range mounts {
if mounts[i].DestPath == m.DestPath {
mounts[i] = m
return
}
}
mounts = append(mounts, m)
}
// UnwatchMount removes a mount from the list of mounts to watch.
func UnwatchMount(m Mount) {
mTex.Lock()
defer mTex.Unlock()
for i := range mounts {
if mounts[i].DestPath == m.DestPath {
mounts = append(mounts[:i], mounts[i+1:]...)
return
}
}
}
// SetErrConditionFunc sets the errCondition function in a threadsafe manner.
// The "errCondition" function is checked by Watch. If true, it calls remountFunc.
// Default returns true if stale nfs mount detected.
func SetErrConditionFunc(f func(error) bool) {
errConditionTex.Lock()
defer errConditionTex.Unlock()
errCondition = f
}
// getErrConditionFunc gets the configured or default error condition function.
func getErrConditionFunc() func(error) bool {
errConditionTex.RLock()
defer errConditionTex.RUnlock()
if errCondition == nil {
return errConditionFunc
}
return errCondition
}
// SetRemountFunc sets the remount function in a threadsafe manner.
func SetRemountFunc(f func(Mount) error) {
remountFuncTex.Lock()
defer remountFuncTex.Unlock()
remountFunc = f
}
// getRemountFunc gets the configured remount function.
func getRemountFunc() func(Mount) error {
remountFuncTex.RLock()
defer remountFuncTex.RUnlock()
return remountFunc
}
// errConditionFunc is the default error condition to check for; a stale nfs mount.
func errConditionFunc(err error) bool {
return err == syscall.ESTALE
}
// WatchCfg allows for configuring the watch function.
type WatchCfg struct {
// NumRetries defines how many times to retry mounting.
NumRetries int
// WatchFreq defines how frequently to check mount destination path.
WatchFreq time.Duration
// todo: add RemountBackoff - time to sleep before retrying the mount
}
// Watch watches the configured mountsand calls remountFunc if errCondition is true.
// Configured via functional options. For default config, run with Watch(ctx).
func Watch(ctx context.Context, opts ...func(*WatchCfg)) {
cfg := WatchCfg{
NumRetries: 3,
WatchFreq: time.Second * 30,
}
// set config options (functional parameters)
for i := range opts {
opts[i](&cfg)
}
for {
select {
case <-time.Tick(cfg.WatchFreq):
mTex.RLock()
tMounts := mounts
mTex.RUnlock()
for i := range tMounts {
buf := syscall.Statfs_t{}
err := syscall.Statfs(tMounts[i].DestPath, &buf)
if err != nil {
if getErrConditionFunc()(err) {
retry := 0
remount:
if getRemountFunc() == nil {
continue
}
// consumers can add timeout in remount func
err := getRemountFunc()(tMounts[i])
if err != nil {
retry++
if retry < cfg.NumRetries {
time.Sleep(time.Second)
goto remount
}
continue
}
}
}
}
case <-ctx.Done():
return
}
}
}