Skip to content

Commit

Permalink
feat: add exclude reaction type upon finding missing extensions
Browse files Browse the repository at this point in the history
* refactor structure and logic of `MissExtReact`
* implement `exclude` action that removes any incomplete sample if it doesn't contain all require extensions. it also removes all unnecessary extension files

Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Jul 29, 2024
1 parent 21c1cd6 commit 2d4b78b
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 139 deletions.
29 changes: 16 additions & 13 deletions cmd/ishard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,31 @@ To give a quick example, `a/b/c/toyota.jpeg` and `a/b/c/toyota.json` from an ori

## CLI Parameters

- `-sample_key_pattern`: The pattern used to substitute source file names to sample keys. This ensures that objects with the same sample key are always merged into the same output shard.
- `-sample_key_pattern`: The pattern used to substitute source file names to sample keys. This ensures that files with the same sample key are always merged into the same output shard.
- `-sample_key_pattern="base_filename"`: The default option. Extracts and uses only the base filename as the sample key to merge. Removes all directory paths and extensions.
- `-sample_key_pattern="full_name"`: Performs no substitution, using the entire file name without extension as the sample key.
- `-sample_key_pattern="collapse_all_dir"`: Removes all '/' characters from the file name, using the resulting string as the sample key.
- `-sample_key_pattern="custom_regex"`: Applies a custom regex pattern to substitute the file names to sample keys for your specific requirements.
- `-max_shard_size`: Maximum size of each output shard. Default is `1MiB`. Accept following _units_ formats:
- IEC format, e.g.: KiB, MiB, GiB
- SI format, e.g.: KB, MB, GB
- IEC format, e.g.: 4KiB, 16MiB, 2GiB
- SI format, e.g.: 4KB, 16MB, 2GB
- raw format (in bytes), e.g.: 1024000
- `-src_bck`: The source bucket name or URI.
- `-dst_bck`: The destination bucket name or URI.
- `-shard_template`: The template used for generating output shards. Accepts Bash, Fmt, or At formats.
- `-shard_template="prefix-{0000..4096..8}-suffix"`: generate output shards `prefix-0000-suffix`, `prefix-0008-suffix`, `prefix-00016-suffix`, and so on.
- `-shard_template="prefix-%06d-suffix"`: generate output shards `prefix-000000-suffix`, `prefix-000001-suffix`, `prefix-000002-suffix`, and so on.
- `-shard_template="prefix-@00001-gap-@100-suffix"`: generate output shards `prefix-00001-gap-001-suffix`, `prefix-00001-gap-002-suffix`, and so on.
- `-shard_template="prefix-{0000..4096..8}-suffix"`: Generate output shards `prefix-0000-suffix`, `prefix-0008-suffix`, `prefix-00016-suffix`, and so on.
- `-shard_template="prefix-%06d-suffix"`: Generate output shards `prefix-000000-suffix`, `prefix-000001-suffix`, `prefix-000002-suffix`, and so on.
- `-shard_template="prefix-@00001-gap-@100-suffix"`: Generate output shards `prefix-00001-gap-001-suffix`, `prefix-00001-gap-002-suffix`, and so on.
- `-ext`: The extension used for generating output shards. Supports `.tar`, `.tgz`, `.tar.gz`, `.zip`, and `.tar.lz4` formats.
- `-sample_exts`: A comma-separated list of extensions that should exists in the dataset. Also see `missing_extension_action`.
- `-missing_extension_action`: Action to take when an extension is missing at any sample: `abort` | `warn` | `ignore`, if `sample_exts` is set.
- `-sample_exts`: A comma-separated list of required extensions for all samples in the dataset. See -missing_extension_action for handling missing extensions.
- `-missing_extension_action`: Specifies the action to take when an expected extension is missing from a sample. Options are: `abort` | `warn` | `ignore` | `exclude`.
- `-missing_extension_action="ignore"`: Do nothing when an expected extension is missing.
- `-missing_extension_action="warn"`: Print a warning if a sample contains an unspecified extension.
- `-missing_extension_action="abort"`: Stop the process if a sample contains an unspecified extension.
- `-missing_extension_action="exclude"`: Exclude any incomplete records and remove unnecessary extensions.
- `-collapse`: If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.
- `-progress`: If true, display the progress of processing objects in the source bucket.
- `-dry_run`: If set, only shows the layout of resulting output shards without actually executing archive jobs. Use 'show_keys' to include sample keys.
- `-dry_run`: If set, only shows the layout of resulting output shards without actually executing archive jobs. Use -dry_run="show_keys" to include sample keys.
- `-sort`: Specifies the sorting algorithm for files within shards. Also see [dSort](/docs/dsort.md)
- `-sort="alpha:inc"`: Sorts the items in alphanumeric order in ascending (increasing) order.
- `-sort="alpha:dec"`: Sorts the items in alphanumeric order in descending (decreasing) order.
Expand Down Expand Up @@ -390,12 +394,11 @@ go test -v -short -tags=debug -run=TestIshardMaxShardSize
- [X] configurable record key, extensions
- [X] upon missing extension in a record: (abort | warn | ignore)
- [X] dry run
- [ ] version 0.9 (github checksum, git cmd)
- [ ] go install
- [ ] debug build
- [X] allow user to specify source directories to include/exclude (achieved by prefix option)
- [ ] logging (timestamp, nlog)
- [X] logging (timestamp, nlog)
- [ ] Large list of objects, need to swap MEM temporary
- [ ] Dry run with Dsort
- [X] Long stress tests
- [X] Dsort integration
- [ ] Dry run with Dsort
Expand All @@ -404,6 +407,6 @@ go test -v -short -tags=debug -run=TestIshardMaxShardSize
- [X] progress bar (later)
- [X] polling for completion of archive xactions (necessary to show the progress)
- [ ] substitute the original file name
- [ ] multi-worker archive xact
- [X] multi-worker archive xact
- [ ] integration into aistore (later)
- [ ] E2E testing from CLI
79 changes: 55 additions & 24 deletions cmd/ishard/ishard/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type (
Ext string
ShardTemplate string
SampleKeyPattern SampleKeyPattern
SampleExtensions []string
MissingExtAction string
MissingExtAction *MissExtReact
Collapse bool
}
DryRunFlag struct {
Expand All @@ -54,12 +53,19 @@ const (

var DefaultConfig = Config{
ClusterConfig: ClusterConfig{URL: "http://" + defaultClusterIPv4 + ":" + defaultProxyPort},
IshardConfig: IshardConfig{MaxShardSize: 102400, Ext: ".tar", ShardTemplate: "shard-%d", Collapse: false, SampleKeyPattern: BaseFileNamePattern, MissingExtAction: "ignore"},
SrcBck: cmn.Bck{Name: "src_bck", Provider: apc.AIS},
DstBck: cmn.Bck{Name: "dst_bck", Provider: apc.AIS},
Progress: false,
DryRunFlag: DryRunFlag{IsSet: false},
SortFlag: SortFlag{IsSet: false},
IshardConfig: IshardConfig{
MaxShardSize: 102400,
Ext: ".tar",
ShardTemplate: "shard-%d",
Collapse: false,
SampleKeyPattern: BaseFileNamePattern,
MissingExtAction: nil,
},
SrcBck: cmn.Bck{Name: "src_bck", Provider: apc.AIS},
DstBck: cmn.Bck{Name: "dst_bck", Provider: apc.AIS},
Progress: false,
DryRunFlag: DryRunFlag{IsSet: false},
SortFlag: SortFlag{IsSet: false},
}

////////////////////////
Expand Down Expand Up @@ -156,24 +162,44 @@ func Load() (*Config, error) {
func parseCliParams(cfg *Config) {
flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "Source bucket name or URI.")
flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "Destination bucket name or URI.")
flag.StringVar(&cfg.ShardTemplate, "shard_template", "shard-%d", "Template used for generating output shards. Accepts Bash (prefix{0001..0010}suffix), Fmt (prefix-%06d-suffix), or At (prefix-@00001-gap-@100-suffix) templates")
flag.StringVar(&cfg.Ext, "ext", ".tar", "Extension used for generating output shards.")
flag.StringVar(&cfg.MissingExtAction, "missing_extension_action", "ignore", "Action to take when an extension is missing: abort | warn | ignore")
flag.BoolVar(&cfg.Collapse, "collapse", false, "If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.")
flag.BoolVar(&cfg.Progress, "progress", false, "If true, display the progress of processing objects in the source bucket.")
flag.Var(&cfg.DryRunFlag, "dry_run", "If set, only shows the layout of resulting output shards without actually executing archive jobs. Use 'show_keys' to include sample keys.")
flag.Var(&cfg.SortFlag, "sort", "sorting algorithm (e.g., alpha:inc, alpha:dec, shuffle, shuffle:seed)")
flag.StringVar(&cfg.ShardTemplate, "shard_template", "shard-%06d", "The template used for generating output shards. Default is `\"shard-%06d\"`. Accepts Bash, Fmt, or At formats.\n"+
" -shard_template=\"prefix-{0000..4096..8}-suffix\": Generate output shards prefix-0000-suffix, prefix-0008-suffix, prefix-0016-suffix, and so on.\n"+
" -shard_template=\"prefix-%06d-suffix\": Generate output shards prefix-000000-suffix, prefix-000001-suffix, prefix-000002-suffix, and so on.\n"+
" -shard_template=\"prefix-@00001-gap-@100-suffix\": Generate output shards prefix-00001-gap-001-suffix, prefix-00001-gap-002-suffix, and so on.")

flag.StringVar(&cfg.Ext, "ext", ".tar", "Extension used for generating output shards. Default is `\".tar\"`. Options are \".tar\" | \".tgz\" | \".tar.gz\" | \".zip\" | \".tar.lz4\" formats.")
flag.BoolVar(&cfg.Collapse, "collapse", false, "If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size. Default is `false`.")
flag.BoolVar(&cfg.Progress, "progress", false, "If true, display the progress of processing objects in the source bucket. Default is `false`.")
flag.Var(&cfg.DryRunFlag, "dry_run", "If set, only shows the layout of resulting output shards without actually executing archive jobs. Use -dry_run=\"show_keys\" to include sample keys.")
flag.Var(&cfg.SortFlag, "sort", "If set, sorting algorithm will be performed on files within shards\n"+
" -sort=\"alpha:inc\": Sorts the items in alphanumeric order in ascending (increasing) order.\n"+
" -sort=\"shuffle:124123\": Randomly shuffles the items using the specified seed 124123 for reproducibility. If the seed cannot be parsed as an integer, the flag is rejected.")

var (
err error
maxShardSizeStr string
sampleExts string
sampleKeyPatternStr string
missingExtActStr string
)

flag.StringVar(&maxShardSizeStr, "max_shard_size", "1MiB", "Maximum size of each output shard. Accepts IEC, SI, and raw formats.")
flag.StringVar(&sampleExts, "sample_exts", "", "Comma-separated list of extensions that should exists in the dataset.")
flag.StringVar(&sampleKeyPatternStr, "sample_key_pattern", "", "The regex pattern used to transform object names in the source bucket to sample keys. This ensures that objects with the same sample key are always sharded into the same output shard.")
flag.StringVar(&maxShardSizeStr, "max_shard_size", "1MiB", "Maximum size of each output shard. Default is `\"1MiB\"`. Accepts the following units formats:\n"+
" - IEC format, e.g.: 4KiB, 16MiB, 2GiB\n"+
" - SI format, e.g.: 4KB, 16MB, 2GB\n"+
" - raw format (in bytes), e.g.: 1024000")
flag.StringVar(&sampleExts, "sample_exts", "", "Comma-separated list of extensions that should exists in the dataset. e.g. -sample=\".JPEG,.xml,.json\". See -missing_extension_action for handling missing extensions")
flag.StringVar(&sampleKeyPatternStr, "sample_key_pattern", "", "The pattern used to substitute source file names to sample keys. Default it `\"base_filename\"`. Options are \"base_file_name\" | \"full_name\" | \"collapse_all_dir\" | \"any other valid regex\" \n"+
"This ensures that files with the same sample key are always sharded into the same output shard.\n"+
" -sample_key_pattern=\"base_filename\": The default option. Extracts and uses only the base filename as the sample key to merge. Removes all directory paths and extensions.\n"+
" -sample_key_pattern=\"full_name\": Performs no substitution, using the entire file name without extension as the sample key.\n"+
" -sample_key_pattern=\"collapse_all_dir\": Removes all '/' characters from the file name, using the resulting string as the sample key.\n"+
" -sample_key_pattern=\".*/([^/]+)/[^/]+$\": Applies a custom regex pattern to substitute the file names to their last level of directory names.")

flag.StringVar(&missingExtActStr, "missing_extension_action", "ignore", "Specifies the action to take when an expected extension is missing from a sample. Default is `\"ignore\"`. Options are: \"abort\" | \"warn\" | \"ignore\" | \"exclude\".\n"+
" -missing_extension_action=\"ignore\": Do nothing when an expected extension is missing.\n"+
" -missing_extension_action=\"warn\": Print a warning if a sample contains an unspecified extension.\n"+
" -missing_extension_action=\"abort\": Stop the process if a sample contains an unspecified extension.\n"+
" -missing_extension_action=\"exclude\": Exclude any incomplete records and remove unnecessary extensions.")

flag.Parse()

Expand All @@ -183,13 +209,18 @@ func parseCliParams(cfg *Config) {
os.Exit(1)
}

if _, ok := MissingExtActMap[cfg.MissingExtAction]; !ok {
fmt.Fprintf(os.Stderr, "Invalid action: %s. Accepted values are: abort, warn, ignore\n", cfg.MissingExtAction)
var reactions = []string{"ignore", "warn", "abort", "exclude"}
if !cos.StringInSlice(missingExtActStr, reactions) {
fmt.Printf("Invalid action: %s. Accepted values are: abort, warn, ignore, exclude\n", missingExtActStr)
flag.Usage()
os.Exit(1)
}
if sampleExts != "" {
cfg.SampleExtensions = strings.Split(sampleExts, ",")

cfg.MissingExtAction, err = NewMissExtReact(missingExtActStr, strings.Split(sampleExts, ","))
if err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
}

var commonPatterns = map[string]SampleKeyPattern{
Expand All @@ -199,12 +230,12 @@ func parseCliParams(cfg *Config) {
}

if sampleKeyPatternStr == "" {
fmt.Println("`sample_key_pattern` is not specified, use `base_file_name` as sample key by default.")
fmt.Println("\"sample_key_pattern\" is not specified, use \"base_file_name\" as sample key by default.")
cfg.SampleKeyPattern = BaseFileNamePattern
} else if pattern, ok := commonPatterns[sampleKeyPatternStr]; ok {
cfg.SampleKeyPattern = pattern
} else {
fmt.Printf("`sample_key_pattern` %s is not built-in (`base_file_name` | `full_name` | `collapse_all_dir`), compiled as custom regex\n", sampleKeyPatternStr)
fmt.Printf("\"sample_key_pattern\" %s is not built-in (\"base_file_name\" | \"full_name\" | \"collapse_all_dir\"), compiled as custom regex\n", sampleKeyPatternStr)
if _, err := regexp.Compile(sampleKeyPatternStr); err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
Expand Down
106 changes: 91 additions & 15 deletions cmd/ishard/ishard/config/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package config
import (
"fmt"

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/ext/dsort/shard"
)

type SampleKeyPattern struct {
Expand All @@ -22,25 +24,99 @@ var (
CollapseAllDirPattern = SampleKeyPattern{Regex: `/`, CaptureGroup: ""}
)

// Action types and functions
func abortAction(missingExt string) error {
return fmt.Errorf("missing extension: %s. Aborting process", missingExt)
// MissExtReact contains the set of expected extensions for each sample, and corresponding reaction
type MissExtReact struct {
Name string
extSet cos.StrSet

// Action to take on the given Records, returns the potentially updated Records and any error encountered
React func(*shard.Records) (*shard.Records, error)
}

func NewMissExtReact(name string, sampleExts []string) (*MissExtReact, error) {
if len(sampleExts) == 0 {
return nil, fmt.Errorf("invalid extensions, should have at least one specified extension")
}
mer := &MissExtReact{
Name: name,
extSet: cos.NewStrSet(sampleExts...),
}

switch name {
case "ignore":
mer.React = mer.ignore
case "warn":
mer.React = mer.warn
case "abort":
mer.React = mer.abort
case "exclude":
mer.React = mer.exclude
default:
debug.Assert(false)
return nil, fmt.Errorf("invalid action: %s. Accepted values are: abort, warn, ignore, exclude", name)
}

return mer, nil
}

func (mer *MissExtReact) ignore(recs *shard.Records) (*shard.Records, error) {
return nil, nil
}

func warnAction(missingExt string) error {
fmt.Printf("Warning: missing extension: %s\n", missingExt)
return nil
func (mer *MissExtReact) warn(recs *shard.Records) (*shard.Records, error) {
for _, record := range recs.All() {
extra, missing := difference(mer.extSet, record.Objects)
for ext := range extra {
fmt.Printf("[Warning] sample %s contains extension %s, not specified in `sample_ext` config\n", record.Name, ext)
}
for ext := range missing {
fmt.Printf("[Warning] extension %s not found in sample %s\n", ext, record.Name)
}
}

return nil, nil
}

func ignoreAction(missingExt string) error {
fmt.Printf("Ignoring missing extension: %s\n", missingExt)
return nil
func (mer *MissExtReact) abort(recs *shard.Records) (*shard.Records, error) {
for _, record := range recs.All() {
extra, missing := difference(mer.extSet, record.Objects)
for ext := range extra {
return nil, fmt.Errorf("sample %s contains extension %s, not specified in `sample_ext` config", record.Name, ext)
}
for ext := range missing {
return nil, fmt.Errorf("missing extension: extension %s not found in sample %s", ext, record.Name)
}
}

return nil, nil
}

type MissingExtFunc func(missingExt string) error
func (mer *MissExtReact) exclude(recs *shard.Records) (*shard.Records, error) {
filteredRecs := shard.NewRecords(16)

for _, record := range recs.All() {
extra, missing := difference(mer.extSet, record.Objects)
for ext := range extra {
recs.DeleteDup(record.Name, ext)
}
if len(missing) == 0 {
filteredRecs.Insert(record)
}
}

return filteredRecs, nil
}

var MissingExtActMap = map[string]MissingExtFunc{
cmn.AbortReaction: abortAction,
cmn.WarnReaction: warnAction,
cmn.IgnoreReaction: ignoreAction,
// difference finds the differences between two sets: `want` and `have`.
// returns `extra` (extensions in `have` but not in `want`) and `missing` (extensions in `want` but not in `have`).
func difference(want cos.StrSet, have []*shard.RecordObj) (extra cos.StrSet, missing cos.StrSet) {
missing = want.Clone()
extra = cos.NewStrSet()
for _, obj := range have {
if !missing.Contains(obj.Extension) {
extra.Add(obj.Extension)
}
missing.Delete(obj.Extension)
}
return
}
Loading

0 comments on commit 2d4b78b

Please sign in to comment.