-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathreader.go
242 lines (208 loc) · 4.45 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package feedx
import (
"context"
"errors"
"io"
"time"
"github.com/bsm/bfs"
)
// ReaderOptions configure the reader instance.
type ReaderOptions struct {
// Format specifies the format
// Default: auto-detected from URL path.
Format Format
// Compression specifies the compression type.
// Default: auto-detected from URL path.
Compression Compression
}
func (o *ReaderOptions) norm(name string) {
if o.Format == nil {
o.Format = DetectFormat(name)
}
if o.Compression == nil {
o.Compression = DetectCompression(name)
}
}
// Reader reads data from a remote feed.
type Reader struct {
ctx context.Context
opt *ReaderOptions
remotes []*bfs.Object
ownRemotes bool
cur *streamReader
pos int
num int64
}
// NewReader inits a new reader.
func NewReader(ctx context.Context, remote *bfs.Object, opt *ReaderOptions) (*Reader, error) {
return MultiReader(ctx, []*bfs.Object{remote}, opt), nil
}
// MultiReader inits a new reader for multiple remotes. Remotes are read sequentially as if concatenated.
// Once all remotes are fully read, Read will return EOF.
func MultiReader(ctx context.Context, remotes []*bfs.Object, opt *ReaderOptions) *Reader {
return &Reader{
remotes: remotes,
opt: opt,
ctx: ctx,
}
}
// Read reads raw bytes from the feed.
// At end of feed, Read returns 0, io.EOF.
func (r *Reader) Read(p []byte) (int, error) {
if !r.ensureCurrent() {
return 0, io.EOF
}
n, err := r.cur.Read(p)
if errors.Is(err, io.EOF) {
if more, err := r.nextRemote(); err != nil {
return n, err
} else if more {
return n, nil // dont return EOF until all remotes read
}
}
return n, err
}
// Decode decodes the next formatted value from the feed.
// At end of feed, Read returns io.EOF.
func (r *Reader) Decode(v interface{}) error {
if !r.ensureCurrent() {
return io.EOF
}
err := r.cur.Decode(v)
if errors.Is(err, io.EOF) {
if more, err := r.nextRemote(); err != nil {
return err
} else if more {
return r.Decode(v) // start decoding from next remote
}
} else if err == nil {
r.num++
}
return err
}
// NumRead returns the number of read values.
func (r *Reader) NumRead() int64 {
return r.num
}
// LastModified returns the last modified time of the remote feed.
func (r *Reader) LastModified() (time.Time, error) {
var lastMod timestamp
for _, remote := range r.remotes {
t, err := remoteLastModified(r.ctx, remote)
if err != nil {
return time.Time{}, err
}
if t > lastMod {
lastMod = t
}
}
return lastMod.Time(), nil
}
// Close closes the reader.
func (r *Reader) Close() (err error) {
if r.cur != nil {
err = r.cur.Close()
}
if r.ownRemotes {
for _, remote := range r.remotes {
if e := remote.Close(); e != nil {
err = e
}
}
}
return
}
func (r *Reader) ensureCurrent() bool {
if r.pos >= len(r.remotes) {
return false
}
if r.cur == nil {
remote := r.remotes[r.pos]
var o ReaderOptions
if r.opt != nil {
o = *r.opt
}
o.norm(remote.Name())
r.cur = &streamReader{
remote: remote,
opt: o,
ctx: r.ctx,
}
}
return true
}
func (r *Reader) nextRemote() (bool, error) {
if err := r.cur.Close(); err != nil {
return false, err
}
// unset current, increment cursor
r.cur = nil
r.pos++
return r.pos < len(r.remotes), nil
}
type streamReader struct {
remote *bfs.Object
opt ReaderOptions
ctx context.Context
br io.ReadCloser // bfs reader
cr io.ReadCloser // compression reader
fd FormatDecoder
}
// Read reads raw bytes from the feed.
func (r *streamReader) Read(p []byte) (int, error) {
if err := r.ensureOpen(); err != nil {
return 0, err
}
return r.cr.Read(p)
}
// Decode decodes the next formatted value from the feed.
func (r *streamReader) Decode(v interface{}) error {
if err := r.ensureOpen(); err != nil {
return err
}
if r.fd == nil {
fd, err := r.opt.Format.NewDecoder(r.cr)
if err != nil {
return err
}
r.fd = fd
}
return r.fd.Decode(v)
}
// Close closes the reader.
func (r *streamReader) Close() error {
var err error
if r.fd != nil {
if e := r.fd.Close(); e != nil {
err = e
}
}
if r.cr != nil {
if e := r.cr.Close(); e != nil {
err = e
}
}
if r.br != nil {
if e := r.br.Close(); e != nil {
err = e
}
}
return err
}
func (r *streamReader) ensureOpen() error {
if r.br == nil {
br, err := r.remote.Open(r.ctx)
if err != nil {
return err
}
r.br = br
}
if r.cr == nil {
cr, err := r.opt.Compression.NewReader(r.br)
if err != nil {
return err
}
r.cr = cr
}
return nil
}