Skip to content

Commit

Permalink
debugging unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Feb 26, 2024
1 parent da98117 commit 781c4df
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 22 deletions.
5 changes: 2 additions & 3 deletions pkg/reduce/pbq/wal/unaligned/fs/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import (
)

// filesInDir lists all files sorted chronologically in the given directory, except the WIP temp file.
// FIXME(WAL): pass in a filter suffix. (e.g, segment-current, gc-current)
func filesInDir(dirPath string) ([]os.FileInfo, error) {
func filesInDir(dirPath string, filterStr string) ([]os.FileInfo, error) {

dir, err := os.Open(dirPath)
if err != nil {
Expand All @@ -44,7 +43,7 @@ func filesInDir(dirPath string) ([]os.FileInfo, error) {
// ignore the files which has "current" in their file name
// because it will in use by the writer
for i := 0; i < len(files); i++ {
if strings.Contains(files[i].Name(), "current") {
if strings.Contains(files[i].Name(), filterStr) {
continue
}
cfs = append(cfs, files[i])
Expand Down
17 changes: 14 additions & 3 deletions pkg/reduce/pbq/wal/unaligned/fs/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *compactor) Start(ctx context.Context) error {
// before starting the compactor

// get all the GC events files
eventFiles, err := filesInDir(c.gcEventsWALPath)
eventFiles, err := filesInDir(c.gcEventsWALPath, currentWALPrefix)
if err != nil {
return err
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (c *compactor) keepCompacting(ctx context.Context) {
return
case <-compTimer.C:
// get all the events files
eventFiles, _ := filesInDir(c.gcEventsWALPath)
eventFiles, _ := filesInDir(c.gcEventsWALPath, currentEventsFile)
err := c.compact(ctx, eventFiles)
// TODO: retry, if its not ctx or stop signal error
if err != nil {
Expand All @@ -230,6 +230,12 @@ func (c *compactor) compact(ctx context.Context, eventFiles []os.FileInfo) error
return err
}

// log compaction key map
c.log.Infow("Compaction key map")
for k, v := range c.compactKeyMap {
c.log.Infow("Map entry - ", zap.String("key", k), zap.Int64("value", v))
}

// compact the data files based on the compaction key map
err = c.compactDataFiles(ctx)
if err != nil {
Expand Down Expand Up @@ -298,11 +304,16 @@ func (c *compactor) compactDataFiles(ctx context.Context) error {
}

// get all the data files
dataFiles, err := filesInDir(c.dataSegmentWALPath)
dataFiles, err := filesInDir(c.dataSegmentWALPath, currentWALPrefix)
if err != nil {
return err
}

c.log.Infow("Compacting data files", zap.Int("count", len(dataFiles)))
for _, dataFile := range dataFiles {
c.log.Infow("Data file - ", zap.String("file", dataFile.Name()))
}

// iterate over all the data files and compact them
for _, dataFile := range dataFiles {
select {
Expand Down
16 changes: 8 additions & 8 deletions pkg/reduce/pbq/wal/unaligned/fs/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestFilesInDir(t *testing.T) {
assert.NoError(t, err)
}

files, err := filesInDir(dir)
files, err := filesInDir(dir, currentWALPrefix)
assert.NoError(t, err)
assert.Len(t, files, 10)

Expand All @@ -245,7 +245,7 @@ func TestFilesInDir(t *testing.T) {
err = file.Close()
assert.NoError(t, err)

files, err = filesInDir(dir)
files, err = filesInDir(dir, currentWALPrefix)
assert.NoError(t, err)
assert.Len(t, files, 10)

Expand All @@ -255,7 +255,7 @@ func TestFilesInDir(t *testing.T) {
assert.NoError(t, err)
}

files, err = filesInDir(dir)
files, err = filesInDir(dir, currentWALPrefix)
assert.NoError(t, err)
assert.Len(t, files, 0)

Expand All @@ -270,7 +270,7 @@ func TestFilesInDir(t *testing.T) {
err = file.Close()
assert.NoError(t, err)

files, err = filesInDir(dir)
files, err = filesInDir(dir, currentWALPrefix)
assert.NoError(t, err)
assert.Len(t, files, 2)

Expand All @@ -280,7 +280,7 @@ func TestFilesInDir(t *testing.T) {
err = file.Close()
assert.NoError(t, err)

files, err = filesInDir(dir)
files, err = filesInDir(dir, currentWALPrefix)
assert.NoError(t, err)
assert.Len(t, files, 2)

Expand All @@ -291,7 +291,7 @@ func TestFilesInDir(t *testing.T) {
err = os.Remove(filepath.Join(dir, "file-2"))
assert.NoError(t, err)

files, err = filesInDir(dir)
files, err = filesInDir(dir, currentWALPrefix)
assert.NoError(t, err)
assert.Len(t, files, 0)
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestCompactor_ContextClose(t *testing.T) {
assert.NoError(t, err)

cancel()
files, _ := filesInDir(dataDir)
files, _ := filesInDir(dataDir, currentWALPrefix)
for _, file := range files {
println(file.Name())
}
Expand Down Expand Up @@ -399,7 +399,7 @@ func Test_buildCompactionKeyMap(t *testing.T) {
dc: newDecoder(),
}

eFiles, err := filesInDir(eventDir)
eFiles, err := filesInDir(eventDir, currentWALPrefix)
assert.NoError(t, err)

err = c.buildCompactionKeyMap(eFiles)

Check failure on line 405 in pkg/reduce/pbq/wal/unaligned/fs/compactor_test.go

View workflow job for this annotation

GitHub Actions / Lint

ineffectual assignment to err (ineffassign)
Expand Down
2 changes: 1 addition & 1 deletion pkg/reduce/pbq/wal/unaligned/fs/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (ws *fsWAL) CreateWAL(_ context.Context, partitionID partition.ID) (wal.WAL
// DiscoverWALs returns all the WALs present in the storePath
func (ws *fsWAL) DiscoverWALs(_ context.Context) ([]wal.WAL, error) {
partitions := make([]wal.WAL, 0)
files, err := filesInDir(ws.storePath)
files, err := filesInDir(ws.storePath, currentWALPrefix)

if os.IsNotExist(err) || len(files) == 0 {
return partitions, nil
Expand Down
15 changes: 8 additions & 7 deletions pkg/reduce/pbq/wal/unaligned/fs/wal_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ import (
)

const (
currentWALPrefix = "current"
IEEE = 0xedb88320
SegmentPrefix = "segment"
CurrentSegmentName = "current" + "-" + SegmentPrefix
segmentPrefix = "segment"
currentSegmentName = currentWALPrefix + "-" + segmentPrefix
)

// unalignedWAL is an unaligned write-ahead log
Expand Down Expand Up @@ -112,7 +113,7 @@ func NewUnalignedReadWriteWAL(opts ...WALOption) (wal.WAL, error) {
opt(s)
}

s.files, err = filesInDir(s.storeDataPath)
s.files, err = filesInDir(s.storeDataPath, currentWALPrefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -271,7 +272,7 @@ func (s *unalignedWAL) openReadFile() error {

// openFile opens a new data file
func (s *unalignedWAL) openFile() error {
dataFilePath := filepath.Join(s.storeDataPath, CurrentSegmentName)
dataFilePath := filepath.Join(s.storeDataPath, currentSegmentName)
var err error
if s.currDataFp, err = os.OpenFile(dataFilePath, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
return err
Expand Down Expand Up @@ -316,7 +317,7 @@ func (s *unalignedWAL) rotateFile() error {
}

// rename the current data file to the segment file
if err := os.Rename(filepath.Join(s.storeDataPath, CurrentSegmentName), s.segmentFilePath(s.storeDataPath)); err != nil {
if err := os.Rename(filepath.Join(s.storeDataPath, currentSegmentName), s.segmentFilePath(s.storeDataPath)); err != nil {
return err
}

Expand All @@ -326,7 +327,7 @@ func (s *unalignedWAL) rotateFile() error {

// segmentFilePath creates the file path for the segment file located in the storage path.
func (s *unalignedWAL) segmentFilePath(storePath string) string {
return filepath.Join(storePath, SegmentPrefix+"-"+fmt.Sprintf("%d", time.Now().UnixMilli()))
return filepath.Join(storePath, segmentPrefix+"-"+fmt.Sprintf("%d", time.Now().UnixMilli()))
}

// flushAndSync flushes the buffered data to the writer and syncs the file to disk.
Expand Down Expand Up @@ -359,7 +360,7 @@ func (s *unalignedWAL) Close() error {
}

// rename the current data file to the segment file
if err = os.Rename(filepath.Join(s.storeDataPath, CurrentSegmentName), s.segmentFilePath(s.storeDataPath)); err != nil {
if err = os.Rename(filepath.Join(s.storeDataPath, currentSegmentName), s.segmentFilePath(s.storeDataPath)); err != nil {
return err
}

Expand Down

0 comments on commit 781c4df

Please sign in to comment.