-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrunner.go
145 lines (124 loc) · 3.31 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package obsync
import (
"context"
"fmt"
"github.com/panjf2000/ants/v2"
log "github.com/sirupsen/logrus"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
)
// RunnerConfig represents a configuration for running
type RunnerConfig struct {
LocalPath string `yaml:"path" json:"path"`
Description string `yaml:"description" json:"description"`
Overrides bool `yaml:"overrides" json:"overrides"`
Exclude []string `yaml:"exclude" json:"exclude"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
Threads uint `yaml:"threads" json:"threads"`
BucketConfigs []BucketConfig `yaml:"buckets" json:"buckets"`
}
// Runner supported synchronous local files to specified bucket
type Runner struct {
config *RunnerConfig
pool *ants.Pool
}
func (r *Runner) SyncDir(ctx context.Context, dir string) (err error) {
dir, err = filepath.Abs(dir)
if err != nil {
log.Error(err)
return
}
err = filepath.Walk(dir, func(localPath string, info os.FileInfo, err error) error {
// @TODO: handle
// skip directories and dot prefix files
if prefixPath(localPath) {
return nil
}
if !info.IsDir() {
// exclude files by specified configuration
for _, exclude := range r.config.Exclude {
if found, _ := path.Match(exclude, filepath.Base(localPath)); found {
log.Warnf("found exclude %s in %s", exclude, localPath)
return nil
}
}
pathKey := strings.Replace(localPath, dir, "", 1)
key := pathKey[1:]
log.Infof("local path is %s, and key is %s", localPath, key)
err = r.InvokeAll(ctx, key, localPath)
if err != nil {
log.Error(err)
}
}
return nil
})
return nil
}
func (r *Runner) InvokeAll(ctx context.Context, key, local string) (err error) {
var wg sync.WaitGroup
for _, c := range r.config.BucketConfigs {
wg.Add(1)
err = r.pool.Submit(func() {
err = r.Invoke(ctx, key, local, c)
if err != nil {
log.Error(err)
}
wg.Done()
})
if err != nil {
log.Error(err)
}
}
wg.Wait()
return err
}
func (r *Runner) Invoke(ctx context.Context, key, local string, c BucketConfig) (err error) {
if c.SubDir != "" {
key = fmt.Sprintf("%s%c%s", c.SubDir, os.PathSeparator, key)
}
log.Debugf("new task key is %s, and local path is %s", key, local)
task, err := NewTask(key, local, r.config.Overrides, c)
if err != nil {
log.Error(err)
return err
}
return task.Put(ctx)
}
func (r *Runner) Stop() error {
if !r.pool.IsClosed() {
r.pool.Release()
}
return nil
}
// NewRunner to instance a new runner with specified configuration
// notice: 1. the bucket type must be registered
// 2. the local directory must readable
// 3. the threads must be greater than zero
func NewRunner(config RunnerConfig) (*Runner, error) {
stat, err := os.Stat(config.LocalPath)
if err != nil || !stat.IsDir() {
return nil, fmt.Errorf("%s is not a directory", config.LocalPath)
}
if config.Threads <= 0 {
return nil, fmt.Errorf("the number of threads must be greater than zero")
}
// check if the bucket type is supported
for _, bucketConfig := range config.BucketConfigs {
_, err := GetBucketSyncFunc(bucketConfig.Type)
if err != nil {
return nil, err
}
}
pool, err := ants.NewPool(int(config.Threads))
if err != nil {
return nil, err
}
return &Runner{
pool: pool,
config: &config,
}, nil
}