Skip to content

Commit

Permalink
build rc4; fixes
Browse files Browse the repository at this point in the history
* v3.24.rc4
* rewrite `cos.SaveReader` and friends
* universally use `cos.Remove`
* introduce `err-bdir`
* log open/close-ec-streams on both sides

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 11, 2024
1 parent 5eb4677 commit 8fd6845
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 95 deletions.
8 changes: 4 additions & 4 deletions ais/s3/mpt.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ package s3
import (
"fmt"
"net/http"
"os"
"sort"
"strconv"
"sync"
"time"

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/core"
Expand Down Expand Up @@ -134,12 +134,12 @@ func CleanupUpload(id, fqn string, aborted bool) (exists bool) {

if !aborted {
if err := storeMptXattr(fqn, mpt); err != nil {
nlog.Warningf("fqn %s, id %s: %v", fqn, id, err)
nlog.Warningln("failed to xattr [", fqn, id, err, "]")
}
}
for _, part := range mpt.parts {
if err := os.Remove(part.FQN); err != nil && !os.IsNotExist(err) {
nlog.Errorln(err)
if err := cos.RemoveFile(part.FQN); err != nil {
nlog.Errorln("failed to remove part [", fqn, id, err, "]")
}
}
return true
Expand Down
56 changes: 20 additions & 36 deletions cmn/cos/ioutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,52 +153,36 @@ func CopyFile(src, dst string, buf []byte, cksumType string) (written int64, cks
return
}

func SaveReaderSafe(tmpfqn, fqn string, reader io.Reader, buf []byte, cksumType string, size int64) (cksum *CksumHash,
err error) {
if cksum, err = SaveReader(tmpfqn, reader, buf, cksumType, size); err != nil {
return
}
if err = Rename(tmpfqn, fqn); err != nil {
os.Remove(tmpfqn)
}
return
}

// Saves the reader directly to `fqn`, checksums if requested
func SaveReader(fqn string, reader io.Reader, buf []byte, cksumType string, size int64) (cksum *CksumHash, err error) {
var (
written int64
file, erc = CreateFile(fqn)
writer = WriterOnly{file} // Hiding `ReadFrom` for `*os.File` introduced in Go1.15.
)
// Saves the `reader` directly to `fqn`, checksums if requested
func SaveReader(fqn string, reader io.Reader, buf []byte, cksumType string, size int64) (*CksumHash, error) {
wfh, erc := CreateFile(fqn)
if erc != nil {
return nil, erc
}
defer func() {
if err != nil {
os.Remove(fqn)
}
}()
cksum, err := _save(fqn, WriterOnly{wfh} /*hide ReadFrom*/, reader, buf, cksumType, size)
erc = wfh.Close()

if size >= 0 {
reader = io.LimitReader(reader, size)
if err == nil && erc != nil {
err = fmt.Errorf("failed to close %s: %w", fqn, erc)
}
written, cksum, err = CopyAndChecksum(writer, reader, buf, cksumType)
erc = file.Close()
if err != nil {
os.Remove(fqn) // cleanup
}
return cksum, err
}

func _save(fqn string, w io.Writer, r io.Reader, buf []byte, cksumType string, size int64) (*CksumHash, error) {
if size >= 0 {
r = io.LimitReader(r, size)
}
written, cksum, err := CopyAndChecksum(w, r, buf, cksumType)
if err != nil {
err = fmt.Errorf("failed to save to %q: %w", fqn, err)
return
return nil, fmt.Errorf("failed to copy-and-checksum to %s: %w", fqn, err)
}
if size >= 0 && written != size {
err = fmt.Errorf("wrong size when saving to %q: expected %d, got %d", fqn, size, written)
return
err = fmt.Errorf("wrong size %s: expected %d, got %d", fqn, size, written) // (unlikely)
}
if erc != nil {
err = fmt.Errorf("failed to close %q: %w", fqn, erc)
return
}
return
return cksum, err
}

// a slightly modified excerpt from https://github.com/golang/go/blob/master/src/io/io.go#L407
Expand Down
2 changes: 1 addition & 1 deletion cmn/ver_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const GitHubHome = "https://github.com/NVIDIA/aistore"
// `jsp` formats its *signature* and other implementation details.

const (
VersionAIStore = "3.24.rc3"
VersionAIStore = "3.24.rc4"
VersionCLI = "1.12"
VersionLoader = "1.11"
VersionAuthN = "1.0"
Expand Down
43 changes: 27 additions & 16 deletions core/ct.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
// Package core provides core metadata and in-cluster API
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package core

import (
"fmt"
"io"
"os"

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
)
Expand Down Expand Up @@ -44,6 +46,7 @@ func (ct *CT) Mountpath() *fs.Mountpath { return ct.mi }
func (ct *CT) Lsize() int64 { return ct.size }
func (ct *CT) MtimeUnix() int64 { return ct.mtime }
func (ct *CT) Digest() uint64 { return ct.digest }
func (ct *CT) Cname() string { return ct.bck.Cname(ct.objName) }

func (ct *CT) LoadSliceFromFS() error {
debug.Assert(ct.ContentType() == fs.ECSliceType, "unexpected content type: ", ct.ContentType())
Expand Down Expand Up @@ -158,30 +161,38 @@ func (ct *CT) Clone(ctType string) *CT {
}
}

func (ct *CT) Make(toType string, pref ...string /*optional prefix*/) string {
var prefix string
cos.Assert(toType != "")

if len(pref) > 0 {
prefix = pref[0]
}
return fs.CSM.Gen(ct, toType, prefix)
func (ct *CT) Make(toType string) string {
debug.Assert(toType != "")
return fs.CSM.Gen(ct, toType, "")
}

// Save CT to local drives. If workFQN is set, it saves in two steps: first,
// save to workFQN; second, rename workFQN to ct.FQN. If unset, it writes
// directly to ct.FQN
func (ct *CT) Write(reader io.Reader, size int64, workFQN ...string) (err error) {
// save to workFQN; second, rename workFQN to ct.fqn. If unset, it writes
// directly to ct.fqn
func (ct *CT) Write(reader io.Reader, size int64, workFQN string) (err error) {
bdir := ct.mi.MakePathBck(ct.Bucket())
if err := cos.Stat(bdir); err != nil {
return err
if err = cos.Stat(bdir); err != nil {
return &errBdir{ct.Cname(), err}
}
buf, slab := g.pmm.Alloc()
if len(workFQN) == 0 {
if workFQN == "" {
_, err = cos.SaveReader(ct.fqn, reader, buf, cos.ChecksumNone, size)
} else {
_, err = cos.SaveReaderSafe(workFQN[0], ct.fqn, reader, buf, cos.ChecksumNone, size)
_, err = ct.saveAndRename(workFQN, reader, buf, cos.ChecksumNone, size)
}
slab.Free(buf)
return err
}

func (ct *CT) saveAndRename(tmpfqn string, reader io.Reader, buf []byte, cksumType string, size int64) (cksum *cos.CksumHash, err error) {
if cksum, err = cos.SaveReader(tmpfqn, reader, buf, cksumType, size); err != nil {
return
}
if err = cos.Rename(tmpfqn, ct.fqn); err != nil {
err = fmt.Errorf("failed to rename temp to %s: %w", ct.Cname(), err)
if rmErr := cos.RemoveFile(tmpfqn); rmErr != nil {
nlog.Errorln("nested error:", err, "[ failed to remove temp fqn:", rmErr, "]")
}
}
return
}
4 changes: 2 additions & 2 deletions core/lcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (lom *LOM) copy2fqn(dst *LOM, buf []byte) (err error) {
lom.md.copies[lom.FQN], dst.md.copies[lom.FQN] = lom.mi, lom.mi
if err = lom.syncMetaWithCopies(); err != nil {
if _, ok := lom.md.copies[dst.FQN]; !ok {
if errRemove := os.Remove(dst.FQN); errRemove != nil && !os.IsNotExist(errRemove) {
if errRemove := cos.RemoveFile(dst.FQN); errRemove != nil {
nlog.Errorln("nested err:", errRemove)
}
}
Expand All @@ -342,7 +342,7 @@ func (lom *LOM) copy2fqn(dst *LOM, buf []byte) (err error) {
}
err = lom.Persist()
} else if err = dst.Persist(); err != nil {
if errRemove := os.Remove(dst.FQN); errRemove != nil && !os.IsNotExist(errRemove) {
if errRemove := cos.RemoveFile(dst.FQN); errRemove != nil {
nlog.Errorln("nested err:", errRemove)
}
}
Expand Down
16 changes: 14 additions & 2 deletions core/lfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ const (
_apndFlags = os.O_APPEND | os.O_WRONLY
)

type errBdir struct {
cname string
err error
}

func (e *errBdir) Error() string {
if os.IsNotExist(e.err) {
return e.cname + ": missing bdir (bucket exists?)"
}
return fmt.Sprintf("%s: missing bdir [%v]", e.cname, e.err)
}

//
// open
//
Expand Down Expand Up @@ -74,7 +86,7 @@ func (lom *LOM) _checkBdir() (err error) {
if err = cos.Stat(bdir); err == nil {
return nil
}
err = fmt.Errorf("%s (bdir %s): %w", lom, bdir, err)
err = &errBdir{lom.Cname(), err}
bmd := T.Bowner().Get()
if _, present := bmd.Get(&lom.bck); present {
err = fmt.Errorf("%w [%v]", syscall.ENOTDIR, err)
Expand Down Expand Up @@ -134,7 +146,7 @@ func (lom *LOM) RenameToMain(wfqn string) error {
func (lom *LOM) RenameFinalize(wfqn string) error {
bdir := lom.mi.MakePathBck(lom.Bucket())
if err := cos.Stat(bdir); err != nil {
return fmt.Errorf("%s(bdir: %s): %w", lom, bdir, err)
return &errBdir{lom.Cname(), err}
}
if err := lom.RenameToMain(wfqn); err != nil {
T.FSHC(err, lom.Mountpath(), wfqn)
Expand Down
2 changes: 1 addition & 1 deletion core/meta/smap.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (d *Snode) ID() string { return d.DaeID }
func (d *Snode) Type() string { return d.DaeType } // enum { apc.Proxy, apc.Target }

func (d *Snode) Name() string { return d.name }
func (d *Snode) String() string { return d.Name() }
func (d *Snode) String() string { return d.name }

func (d *Snode) SetName() {
name := d.StringEx()
Expand Down
10 changes: 5 additions & 5 deletions ec/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (s *slice) free() {
}
}
if s.workFQN != "" {
if err := os.Remove(s.workFQN); err != nil && !os.IsNotExist(err) {
if err := cos.RemoveFile(s.workFQN); err != nil {
nlog.Errorln(err)
}
}
Expand Down Expand Up @@ -465,7 +465,7 @@ func WriteSliceAndMeta(hdr *transport.ObjHdr, args *WriteArgs) error {
if err := ct.Write(args.Reader, hdr.ObjAttrs.Size, tmpFQN); err != nil {
return err
}
if err := ctMeta.Write(bytes.NewReader(args.MD), -1); err != nil {
if err := ctMeta.Write(bytes.NewReader(args.MD), -1, "" /*work fqn*/); err != nil {
return err
}
if _, exists := core.T.Bowner().Get().Get(ctMeta.Bck()); !exists {
Expand Down Expand Up @@ -507,13 +507,13 @@ func WriteReplicaAndMeta(lom *core.LOM, args *WriteArgs) (err error) {
return
}
if rmErr := lom.RemoveMain(); rmErr != nil {
nlog.Errorf("nested error: save replica -> remove replica: %v", rmErr)
nlog.Errorln("nested error: save replica -> remove replica:", rmErr)
}
if rmErr := cos.RemoveFile(ctMeta.FQN()); rmErr != nil {
nlog.Errorf("nested error: save replica -> remove metafile: %v", rmErr)
nlog.Errorln("nested error: save replica -> remove metafile:", rmErr)
}
}()
if err = ctMeta.Write(bytes.NewReader(args.MD), -1); err != nil {
if err = ctMeta.Write(bytes.NewReader(args.MD), -1, "" /*work fqn*/); err != nil {
return
}
if _, exists := core.T.Bowner().Get().Get(ctMeta.Bck()); !exists {
Expand Down
2 changes: 1 addition & 1 deletion ec/getjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ loop: //nolint:gocritic // keeping label for readability

b := cos.MustMarshal(ctx.meta)
ctMeta := core.NewCTFromLOM(ctx.lom, fs.ECMetaType)
if err := ctMeta.Write(bytes.NewReader(b), -1); err != nil {
if err := ctMeta.Write(bytes.NewReader(b), -1, "" /*work fqn*/); err != nil {
return err
}
if _, exists := core.T.Bowner().Get().Get(ctMeta.Bck()); !exists {
Expand Down
8 changes: 5 additions & 3 deletions ec/getx.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ func newGetXact(bck *cmn.Bck, mgr *Manager) *XactGet {
return xctn
}

func (r *XactGet) DispatchResp(iReq intraReq, hdr *transport.ObjHdr, bck *meta.Bck, reader io.Reader) {
objName, objAttrs := hdr.ObjName, hdr.ObjAttrs
uname := unique(hdr.SID, bck, objName)
func (r *XactGet) dispatchResp(iReq intraReq, hdr *transport.ObjHdr, bck *meta.Bck, reader io.Reader) {
var (
objName, objAttrs = hdr.ObjName, hdr.ObjAttrs
uname = unique(hdr.SID, bck, objName)
)
switch hdr.Opcode {
// It is response to slice/replica request by an object
// restoration process. In this case, there should exists
Expand Down
19 changes: 11 additions & 8 deletions ec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (mgr *Manager) notifyTerm(core.Notif, error, bool) {

func cbReq(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) {
if err != nil {
nlog.Errorf("failed to request %s: %v", hdr.Cname(), err)
nlog.Errorln("failed to request", hdr.Cname(), "err: [", err, "]")
}
}

Expand All @@ -96,7 +96,7 @@ func (mgr *Manager) OpenStreams(withRefc bool) {
if !mgr.bundleEnabled.CAS(false, true) {
return
}
nlog.InfoDepth(1, core.T.String(), "ECM.OpenStreams")
nlog.InfoDepth(1, core.T.String(), "ECM", apc.ActEcOpen)
var (
client = transport.NewIntraDataClient()
config = cmn.GCO.Get()
Expand Down Expand Up @@ -128,7 +128,7 @@ func (mgr *Manager) CloseStreams(justRefc bool) {
if !mgr.bundleEnabled.CAS(true, false) {
return
}
nlog.InfoDepth(1, core.T.String(), "ECM.CloseStreams")
nlog.InfoDepth(1, core.T.String(), "ECM", apc.ActEcClose)
mgr.req().Close(false)
mgr.resp().Close(false)
}
Expand Down Expand Up @@ -201,7 +201,8 @@ func (mgr *Manager) recvRequest(hdr *transport.ObjHdr, objReader io.Reader, err
return err
}
}
mgr.RestoreBckRespXact(bck).DispatchReq(iReq, hdr, bck)
xctn := mgr.RestoreBckRespXact(bck)
xctn.dispatchReq(iReq, hdr, bck)
return nil
}

Expand All @@ -226,19 +227,21 @@ func (mgr *Manager) recvResponse(hdr *transport.ObjHdr, objReader io.Reader, err
return err
}
bck := meta.CloneBck(&hdr.Bck)
if err = bck.Init(core.T.Bowner()); err != nil {
if _, ok := err.(*cmn.ErrRemoteBckNotFound); !ok { // is ais
if err := bck.Init(core.T.Bowner()); err != nil {
if !cmn.IsErrRemoteBckNotFound(err) { // is ais://
nlog.Errorln(err)
return err
}
}
switch hdr.Opcode {
case reqPut:
mgr.RestoreBckRespXact(bck).DispatchResp(iReq, hdr, objReader)
xctn := mgr.RestoreBckRespXact(bck)
xctn.dispatchResp(iReq, hdr, objReader)
case respPut:
// Process the request even if the number of targets is insufficient
// (might've started when we had enough)
mgr.RestoreBckGetXact(bck).DispatchResp(iReq, hdr, bck, objReader)
xctn := mgr.RestoreBckGetXact(bck)
xctn.dispatchResp(iReq, hdr, bck, objReader)
default:
debug.Assertf(false, "unknown EC response action %d", hdr.Opcode)
}
Expand Down
4 changes: 2 additions & 2 deletions ec/putjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (c *putJogger) encode(req *request, lom *core.LOM) error {
return err
}
metaBuf := bytes.NewReader(meta.NewPack())
if err := ctMeta.Write(metaBuf, -1); err != nil {
if err := ctMeta.Write(metaBuf, -1, "" /*work fqn*/); err != nil {
return err
}
if _, exists := core.T.Bowner().Get().Get(ctMeta.Bck()); !exists {
Expand Down Expand Up @@ -497,7 +497,7 @@ func (c *putJogger) sendSlice(ctx *encodeCtx, data *slice, node *meta.Snode, idx
data.release()
}
if err != nil {
nlog.Errorln("failed to send", hdr.Cname()+": ", err)
nlog.Errorln("failed to send", hdr.Cname(), "[", err, "]")
}
}

Expand Down
Loading

0 comments on commit 8fd6845

Please sign in to comment.