-
Notifications
You must be signed in to change notification settings - Fork 638
/
Copy pathmerge.go
355 lines (319 loc) · 9.99 KB
/
merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
package rosedb
import (
"encoding/binary"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/rosedblabs/rosedb/v2/index"
"github.com/rosedblabs/wal"
"github.com/valyala/bytebufferpool"
)
const (
mergeDirSuffixName = "-merge"
mergeFinishedBatchID = 0
)
// Merge merges all the data files in the database.
// It will iterate all the data files, find the valid data,
// and rewrite the data to the new data file.
//
// Merge operation maybe a very time-consuming operation when the database is large.
// So it is recommended to perform this operation when the database is idle.
//
// If reopenAfterDone is true, the original file will be replaced by the merge file,
// and db's index will be rebuilt after the merge completes.
func (db *DB) Merge(reopenAfterDone bool) error {
if err := db.doMerge(); err != nil {
return err
}
if !reopenAfterDone {
return nil
}
db.mu.Lock()
defer db.mu.Unlock()
// close current files
_ = db.closeFiles()
// replace original file
err := loadMergeFiles(db.options.DirPath)
if err != nil {
return err
}
// open data files
if db.dataFiles, err = db.openWalFiles(); err != nil {
return err
}
// discard the old index first.
db.index = index.NewIndexer()
// rebuild index
if err = db.loadIndex(); err != nil {
return err
}
return nil
}
func (db *DB) doMerge() error {
db.mu.Lock()
// check if the database is closed
if db.closed {
db.mu.Unlock()
return ErrDBClosed
}
// check if the data files is empty
if db.dataFiles.IsEmpty() {
db.mu.Unlock()
return nil
}
// check if the merge operation is running
if atomic.LoadUint32(&db.mergeRunning) == 1 {
db.mu.Unlock()
return ErrMergeRunning
}
// set the mergeRunning flag to true
atomic.StoreUint32(&db.mergeRunning, 1)
// set the mergeRunning flag to false when the merge operation is completed
defer atomic.StoreUint32(&db.mergeRunning, 0)
prevActiveSegId := db.dataFiles.ActiveSegmentID()
// rotate the write-ahead log, create a new active segment file.
// so all the older segment files will be merged.
if err := db.dataFiles.OpenNewActiveSegment(); err != nil {
db.mu.Unlock()
return err
}
// we can unlock the mutex here, because the write-ahead log files has been rotated,
// and the new active segment file will be used for the subsequent writes.
// Our Merge operation will only read from the older segment files.
db.mu.Unlock()
// open a merge db to write the data to the new data file.
// delete the merge directory if it exists and create a new one.
mergeDB, err := db.openMergeDB()
if err != nil {
return err
}
defer func() {
_ = mergeDB.Close()
}()
buf := bytebufferpool.Get()
now := time.Now().UnixNano()
defer bytebufferpool.Put(buf)
// iterate all the data files, and write the valid data to the new data file.
reader := db.dataFiles.NewReaderWithMax(prevActiveSegId)
for {
buf.Reset()
chunk, position, err := reader.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
record := decodeLogRecord(chunk)
// Only handle the normal log record, LogRecordDeleted and LogRecordBatchFinished
// will be ignored, because they are not valid data.
if record.Type == LogRecordNormal && (record.Expire == 0 || record.Expire > now) {
db.mu.RLock()
indexPos := db.index.Get(record.Key)
db.mu.RUnlock()
if indexPos != nil && positionEquals(indexPos, position) {
// clear the batch id of the record,
// all data after merge will be valid data, so the batch id should be 0.
record.BatchId = mergeFinishedBatchID
// Since the mergeDB will never be used for any read or write operations,
// it is not necessary to update the index.
newPosition, err := mergeDB.dataFiles.Write(encodeLogRecord(record, mergeDB.encodeHeader, buf))
if err != nil {
return err
}
// And now we should write the new position to the write-ahead log,
// which is so-called HINT FILE in bitcask paper.
// The HINT FILE will be used to rebuild the index quickly when the database is restarted.
_, err = mergeDB.hintFile.Write(encodeHintRecord(record.Key, newPosition))
if err != nil {
return err
}
}
}
}
// After rewrite all the data, we should add a file to indicate that the merge operation is completed.
// So when we restart the database, we can know that the merge is completed if the file exists,
// otherwise, we will delete the merge directory and redo the merge operation again.
mergeFinFile, err := mergeDB.openMergeFinishedFile()
if err != nil {
return err
}
_, err = mergeFinFile.Write(encodeMergeFinRecord(prevActiveSegId))
if err != nil {
return err
}
// close the merge finished file
if err := mergeFinFile.Close(); err != nil {
return err
}
// all done successfully, return nil
return nil
}
func (db *DB) openMergeDB() (*DB, error) {
mergePath := mergeDirPath(db.options.DirPath)
// delete the merge directory if it exists
if err := os.RemoveAll(mergePath); err != nil {
return nil, err
}
options := db.options
// we don't need to use the original sync policy,
// because we can sync the data file manually after the merge operation is completed.
options.Sync, options.BytesPerSync = false, 0
options.DirPath = mergePath
mergeDB, err := Open(options)
if err != nil {
return nil, err
}
// open the hint files to write the new position of the data.
hintFile, err := wal.Open(wal.Options{
DirPath: options.DirPath,
// we don't need to rotate the hint file, just write all data to a single file.
SegmentSize: math.MaxInt64,
SegmentFileExt: hintFileNameSuffix,
Sync: false,
BytesPerSync: 0,
})
if err != nil {
return nil, err
}
mergeDB.hintFile = hintFile
return mergeDB, nil
}
func mergeDirPath(dirPath string) string {
dir := filepath.Dir(filepath.Clean(dirPath))
base := filepath.Base(dirPath)
return filepath.Join(dir, base+mergeDirSuffixName)
}
func (db *DB) openMergeFinishedFile() (*wal.WAL, error) {
return wal.Open(wal.Options{
DirPath: db.options.DirPath,
SegmentSize: GB,
SegmentFileExt: mergeFinNameSuffix,
Sync: false,
BytesPerSync: 0,
})
}
func positionEquals(a, b *wal.ChunkPosition) bool {
return a.SegmentId == b.SegmentId &&
a.BlockNumber == b.BlockNumber &&
a.ChunkOffset == b.ChunkOffset
}
// loadMergeFiles loads all the merge files, and copy the data to the original data directory.
// If there is no merge files, or the merge operation is not completed, it will return nil.
func loadMergeFiles(dirPath string) error {
// check if there is a merge directory
mergeDirPath := mergeDirPath(dirPath)
if _, err := os.Stat(mergeDirPath); err != nil {
// does not exist, just return.
if os.IsNotExist(err) {
return nil
}
return err
}
// remove the merge directory at last
defer func() {
_ = os.RemoveAll(mergeDirPath)
}()
copyFile := func(suffix string, fileId uint32, force bool) {
srcFile := wal.SegmentFileName(mergeDirPath, suffix, fileId)
stat, err := os.Stat(srcFile)
if os.IsNotExist(err) {
return
}
if err != nil {
panic(fmt.Sprintf("loadMergeFiles: failed to get src file stat %v", err))
}
if !force && stat.Size() == 0 {
return
}
destFile := wal.SegmentFileName(dirPath, suffix, fileId)
_ = os.Rename(srcFile, destFile)
}
// get the merge finished segment id
mergeFinSegmentId, err := getMergeFinSegmentId(mergeDirPath)
if err != nil {
return err
}
// now we get the merge finished segment id, so all the segment id less than the merge finished segment id
// should be moved to the original data directory, and the original data files should be deleted.
for fileId := uint32(1); fileId <= mergeFinSegmentId; fileId++ {
destFile := wal.SegmentFileName(dirPath, dataFileNameSuffix, fileId)
// will have bug here if continue, check it later.todo
// If we call Merge multiple times, some segment files will be deleted earlier, so just skip them.
// if _, err = os.Stat(destFile); os.IsNotExist(err) {
// continue
// } else if err != nil {
// return err
// }
// remove the original data file
if _, err = os.Stat(destFile); err == nil {
if err = os.Remove(destFile); err != nil {
return err
}
}
// move the merge data file to the original data directory
copyFile(dataFileNameSuffix, fileId, false)
}
// copy MERGEFINISHED and HINT files to the original data directory
// there is only one merge finished file, so the file id is always 1,
// the same as the hint file.
copyFile(mergeFinNameSuffix, 1, true)
copyFile(hintFileNameSuffix, 1, true)
return nil
}
func getMergeFinSegmentId(mergePath string) (wal.SegmentID, error) {
// check if the merge operation is completed
mergeFinFile, err := os.Open(wal.SegmentFileName(mergePath, mergeFinNameSuffix, 1))
if err != nil {
// if the merge finished file does not exist, it means that the merge operation is not completed.
// so we should remove the merge directory and return nil.
return 0, nil
}
defer func() {
_ = mergeFinFile.Close()
}()
// Only 4 bytes are needed to store the segment id.
// And the first 7 bytes are chunk header.
mergeFinBuf := make([]byte, 4)
if _, err := mergeFinFile.ReadAt(mergeFinBuf, 7); err != nil {
return 0, err
}
mergeFinSegmentId := binary.LittleEndian.Uint32(mergeFinBuf)
return mergeFinSegmentId, nil
}
func (db *DB) loadIndexFromHintFile() error {
hintFile, err := wal.Open(wal.Options{
DirPath: db.options.DirPath,
// we don't need to rotate the hint file, just write all data to the same file.
SegmentSize: math.MaxInt64,
SegmentFileExt: hintFileNameSuffix,
})
if err != nil {
return err
}
defer func() {
_ = hintFile.Close()
}()
// read all the hint records from the hint file
reader := hintFile.NewReader()
hintFile.SetIsStartupTraversal(true)
for {
chunk, _, err := reader.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
key, position := decodeHintRecord(chunk)
// All the hint records are valid because it is generated by the merge operation.
// So just put them into the index without checking.
db.index.Put(key, position)
}
hintFile.SetIsStartupTraversal(false)
return nil
}