Skip to content

Commit

Permalink
Implement a parallel download on S3
Browse files Browse the repository at this point in the history
Signed-off-by: Bertrand Paquet <[email protected]>
  • Loading branch information
bpaquet committed Dec 17, 2024
1 parent 4ea0679 commit 8cee779
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 151 deletions.
75 changes: 0 additions & 75 deletions cache/remotecache/s3/readerat.go

This file was deleted.

174 changes: 98 additions & 76 deletions cache/remotecache/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,36 +34,40 @@ import (
)

const (
attrBucket = "bucket"
attrRegion = "region"
attrPrefix = "prefix"
attrManifestsPrefix = "manifests_prefix"
attrBlobsPrefix = "blobs_prefix"
attrName = "name"
attrTouchRefresh = "touch_refresh"
attrEndpointURL = "endpoint_url"
attrAccessKeyID = "access_key_id"
attrSecretAccessKey = "secret_access_key"
attrSessionToken = "session_token"
attrUsePathStyle = "use_path_style"
attrUploadParallelism = "upload_parallelism"
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
attrBucket = "bucket"
attrRegion = "region"
attrPrefix = "prefix"
attrManifestsPrefix = "manifests_prefix"
attrBlobsPrefix = "blobs_prefix"
attrName = "name"
attrTouchRefresh = "touch_refresh"
attrEndpointURL = "endpoint_url"
attrAccessKeyID = "access_key_id"
attrSecretAccessKey = "secret_access_key"
attrSessionToken = "session_token"
attrUsePathStyle = "use_path_style"
attrUploadParallelism = "upload_parallelism"
attrDownloadParallelism = "download_parallelism"
attrDownloadPartSize = "download_part_size"
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
)

type Config struct {
Bucket string
Region string
Prefix string
ManifestsPrefix string
BlobsPrefix string
Names []string
TouchRefresh time.Duration
EndpointURL string
AccessKeyID string
SecretAccessKey string
SessionToken string
UsePathStyle bool
UploadParallelism int
Bucket string
Region string
Prefix string
ManifestsPrefix string
BlobsPrefix string
Names []string
TouchRefresh time.Duration
EndpointURL string
AccessKeyID string
SecretAccessKey string
SessionToken string
UsePathStyle bool
UploadParallelism int
DownloadParallelism int
DownloadPartSize int
}

func getConfig(attrs map[string]string) (Config, error) {
Expand Down Expand Up @@ -141,20 +145,48 @@ func getConfig(attrs map[string]string) (Config, error) {
uploadParallelism = uploadParallelismInt
}

downloadParallism := 4
downloadParallismStr, ok := attrs[attrDownloadParallelism]
if ok {
downloadParallismInt, err := strconv.Atoi(downloadParallismStr)
if err != nil {
return Config{}, errors.Errorf("download_parallelism must be a positive integer")
}
if downloadParallismInt <= 0 {
return Config{}, errors.Errorf("download_parallelism must be a positive integer")
}
downloadParallism = downloadParallismInt
}

downloadPartSize := 5 * 1024 * 1024
downloadPartSizeStr, ok := attrs[attrDownloadPartSize]
if ok {
downloadPartSizeInt, err := strconv.Atoi(downloadPartSizeStr)
if err != nil {
return Config{}, errors.Errorf("download_part_size must be a positive integer")
}
if downloadPartSizeInt <= 0 {
return Config{}, errors.Errorf("download_part_size must be a positive integer")
}
downloadParallism = downloadPartSizeInt
}

return Config{
Bucket: bucket,
Region: region,
Prefix: prefix,
ManifestsPrefix: manifestsPrefix,
BlobsPrefix: blobsPrefix,
Names: names,
TouchRefresh: touchRefresh,
EndpointURL: endpointURL,
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
SessionToken: sessionToken,
UsePathStyle: usePathStyle,
UploadParallelism: uploadParallelism,
Bucket: bucket,
Region: region,
Prefix: prefix,
ManifestsPrefix: manifestsPrefix,
BlobsPrefix: blobsPrefix,
Names: names,
TouchRefresh: touchRefresh,
EndpointURL: endpointURL,
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
SessionToken: sessionToken,
UsePathStyle: usePathStyle,
UploadParallelism: uploadParallelism,
DownloadParallelism: downloadParallism,
DownloadPartSize: downloadPartSize,
}, nil
}

Expand Down Expand Up @@ -385,22 +417,15 @@ func (i *importer) Resolve(ctx context.Context, _ ocispecs.Descriptor, id string
return solver.NewCacheManager(ctx, id, keysStorage, resultStorage), nil
}

type readerAt struct {
ReaderAtCloser
size int64
}

func (r *readerAt) Size() int64 {
return r.size
}

type s3Client struct {
*s3.Client
*manager.Uploader
bucket string
prefix string
blobsPrefix string
manifestsPrefix string
bucket string
prefix string
blobsPrefix string
manifestsPrefix string
downloadParallelism int
downloadPartSize int
}

func newS3Client(ctx context.Context, config Config) (*s3Client, error) {
Expand All @@ -419,12 +444,14 @@ func newS3Client(ctx context.Context, config Config) (*s3Client, error) {
})

return &s3Client{
Client: client,
Uploader: manager.NewUploader(client),
bucket: config.Bucket,
prefix: config.Prefix,
blobsPrefix: config.BlobsPrefix,
manifestsPrefix: config.ManifestsPrefix,
Client: client,
Uploader: manager.NewUploader(client),
bucket: config.Bucket,
prefix: config.Prefix,
blobsPrefix: config.BlobsPrefix,
manifestsPrefix: config.ManifestsPrefix,
downloadParallelism: config.DownloadParallelism,
downloadPartSize: config.DownloadPartSize,
}, nil
}

Expand Down Expand Up @@ -454,19 +481,6 @@ func (s3Client *s3Client) getManifest(ctx context.Context, key string, config *v
return true, nil
}

func (s3Client *s3Client) getReader(ctx context.Context, key string) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Bucket: &s3Client.bucket,
Key: &key,
}

output, err := s3Client.GetObject(ctx, input)
if err != nil {
return nil, err
}
return output.Body, nil
}

func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io.Reader) error {
input := &s3.PutObjectInput{
Bucket: &s3Client.bucket,
Expand Down Expand Up @@ -587,10 +601,18 @@ func (s3Client *s3Client) touch(ctx context.Context, key string, size *int64) (e
}

func (s3Client *s3Client) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
readerAtCloser := toReaderAtCloser(func(offset int64) (io.ReadCloser, error) {
return s3Client.getReader(ctx, s3Client.blobKey(desc.Digest))
})
return &readerAt{ReaderAtCloser: readerAtCloser, size: desc.Size}, nil
key := s3Client.blobKey(desc.Digest)
input := &S3DownloaderInput{
Bucket: &s3Client.bucket,
Key: &key,
S3Client: s3Client.Client,
Size: desc.Size,
Parallelism: s3Client.downloadParallelism,
PartSize: s3Client.downloadPartSize,
}
downloader := newDownloader(input)

return downloader.Download(ctx)
}

func (s3Client *s3Client) manifestKey(name string) string {
Expand Down
Loading

0 comments on commit 8cee779

Please sign in to comment.