From 7b2d3536086775825bcbd7b3ae7e46800e5ca3d8 Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Fri, 29 Nov 2024 15:58:25 -0800 Subject: [PATCH 1/7] solver: return errors hit during cache export There were two lines that checked if err != nil and return nil error. This seems likely to be a typo, especially since there is support for ignoring errors during cache export but on a completely different level of abstraction (in llbsolver). There were bugs causing errors during cache export (fixed in subsequent commits) which ended up getting silently dropped and causing cache exports to be missing mysteriously. Now errors are returned and only ignored if cache export errors are configured to be ignored. Signed-off-by: Erik Sipsma --- solver/exporter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/solver/exporter.go b/solver/exporter.go index 5f040f75d03b..a9be316c55b8 100644 --- a/solver/exporter.go +++ b/solver/exporter.go @@ -172,7 +172,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach for _, dep := range deps { recs, err := dep.CacheKey.Exporter.ExportTo(ctx, t, opt) if err != nil { - return nil, nil + return nil, err } for _, r := range recs { srcs[i] = append(srcs[i], expr{r: r, selector: dep.Selector}) @@ -184,7 +184,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach for _, de := range e.edge.secondaryExporters { recs, err := de.cacheKey.CacheKey.Exporter.ExportTo(ctx, t, opt) if err != nil { - return nil, nil + return nil, err } for _, r := range recs { srcs[de.index] = append(srcs[de.index], expr{r: r, selector: de.cacheKey.Selector}) From b37519523d1429e882e400da8cecdd4edc330c2e Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Fri, 29 Nov 2024 15:57:03 -0800 Subject: [PATCH 2/7] solver: fix handling of lazy blobs in cache export Before this change, lazy blobs were handled during cache export by providing descHandlers from the ref being exported in llbsolver. However, this didn't handle some max cache export cases that involve use of read-write mounts. Specifically, if you exported cache for a ref from a read-write mount in an ExecOp, the ref's descHandlers didn't include handlers for any refs from the the rootfs of the ExecOp. If any of those refs for the rootfs involved lazy blobs, any error would get hit during cache export about lazy blobs. It's possible for the rootfs to have lazy blobs in a few different ways, but the one tested in the integ test added here involves two images with layers that get deduped by chainID (i.e. uncompress to the same layer but have different compressions). Image layer refs that find an existing ref w/ same chainID will get a snapshot for free but stay lazy in terms of their blobs, thus making it possible for an exec to run on top of them while still considered lazy. The fix here puts the CacheOptGetter logic in the cache export code directly so that it can use the solver's information on dependencies to find all possible descHandlers, including those for the rootfs in the read-write mount case. Signed-off-by: Erik Sipsma --- client/client_test.go | 142 +++++++++++++++++++++++++++++++++ solver/exporter.go | 7 ++ solver/llbsolver/provenance.go | 9 --- solver/llbsolver/solver.go | 17 ---- 4 files changed, 149 insertions(+), 26 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index e7a5886f8eec..018e680b8c5a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -222,6 +222,7 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){ testLayerLimitOnMounts, testFrontendVerifyPlatforms, testRunValidExitCodes, + testSameChainIDWithLazyBlobs, } func TestIntegration(t *testing.T) { @@ -10895,3 +10896,144 @@ func testRunValidExitCodes(t *testing.T, sb integration.Sandbox) { require.Error(t, err) require.ErrorContains(t, err, "exit code: 0") } + +func testSameChainIDWithLazyBlobs(t *testing.T, sb integration.Sandbox) { + workers.CheckFeatureCompat(t, sb, + workers.FeatureCacheExport, + workers.FeatureCacheImport, + workers.FeatureCacheBackendRegistry, + ) + + c, err := New(sb.Context(), sb.Address()) + require.NoError(t, err) + defer c.Close() + + registry, err := sb.NewRegistry() + if errors.Is(err, integration.ErrRequirements) { + t.Skip(err.Error()) + } + require.NoError(t, err) + + // push the base busybox image, ensuring it uses gzip + + def, err := llb.Image("busybox:latest"). + Marshal(sb.Context()) + require.NoError(t, err) + busyboxGzipRef := registry + "/buildkit/busyboxgzip:latest" + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterImage, + Attrs: map[string]string{ + "name": busyboxGzipRef, + "push": "true", + "compression": "gzip", + "force-compression": "true", + }, + }, + }, + }, nil) + require.NoError(t, err) + + // push the base busybox image plus an extra layer, ensuring it uses zstd + // the extra layer allows us to avoid edge merge later + def, err = llb.Image("busybox:latest"). + Run(llb.Shlex(`touch /foo`)).Root(). + Marshal(sb.Context()) + require.NoError(t, err) + busyboxZstdRef := registry + "/buildkit/busyboxzstd:latest" + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterImage, + Attrs: map[string]string{ + "name": busyboxZstdRef, + "push": "true", + "compression": "zstd", + "force-compression": "true", + }, + }, + }, + }, nil) + require.NoError(t, err) + + ensurePruneAll(t, c, sb) + + // create non-lazy cache refs for the zstd image + def, err = llb.Image(busyboxZstdRef). + Run(llb.Shlex(`true`)).Root(). + Marshal(sb.Context()) + require.NoError(t, err) + _, err = c.Solve(sb.Context(), def, SolveOpt{}, nil) + require.NoError(t, err) + + // Create lazy cache refs for the gzip layers that will be deduped by chainID with + // the zstd layers made in the previous solve. + // Put a random file in the rootfs, run a cache invalidation step and then copy + // the random file to a r/w mnt. + def, err = llb.Image(busyboxGzipRef). + Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 > /rand"`)).Root(). + Run(llb.Shlex(`echo `+identity.NewID())).Root(). + Run(llb.Shlex(`cp /rand /mnt/rand`)).AddMount("/mnt", llb.Scratch()). + Marshal(sb.Context()) + require.NoError(t, err) + + outDir := t.TempDir() + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: outDir, + }, + }, + CacheExports: []CacheOptionsEntry{ + { + Type: "registry", + Attrs: map[string]string{ + "ref": registry + "/buildkit/idc:latest", + "mode": "max", + }, + }, + }, + }, nil) + require.NoError(t, err) + + rand1, err := os.ReadFile(filepath.Join(outDir, "rand")) + require.NoError(t, err) + + ensurePruneAll(t, c, sb) + + // Run the same steps as before but with a different cache invalidation step in the middle + // The random file should still be cached from earlier and thus the output should be the same + def, err = llb.Image(busyboxGzipRef). + Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 > /rand"`)).Root(). + Run(llb.Shlex(`echo `+identity.NewID())).Root(). + Run(llb.Shlex(`cp /rand /mnt/rand`)).AddMount("/mnt", llb.Scratch()). + Marshal(sb.Context()) + require.NoError(t, err) + + outDir = t.TempDir() + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: outDir, + }, + }, + CacheImports: []CacheOptionsEntry{ + { + Type: "registry", + Attrs: map[string]string{ + "ref": registry + "/buildkit/idc:latest", + "mode": "max", + }, + }, + }, + }, nil) + require.NoError(t, err) + + rand2, err := os.ReadFile(filepath.Join(outDir, "rand")) + require.NoError(t, err) + + require.Equal(t, string(rand1), string(rand2)) +} diff --git a/solver/exporter.go b/solver/exporter.go index a9be316c55b8..28f8b277f8b0 100644 --- a/solver/exporter.go +++ b/solver/exporter.go @@ -117,6 +117,13 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach return nil, err } + if e.edge != nil { + op, ok := e.edge.op.(*sharedOp) + if ok && op != nil && op.st != nil { + ctx = withAncestorCacheOpts(ctx, op.st) + } + } + remotes, err := cm.results.LoadRemotes(ctx, res, opt.CompressionOpt, opt.Session) if err != nil { return nil, err diff --git a/solver/llbsolver/provenance.go b/solver/llbsolver/provenance.go index ec4f437e4281..049c01f5a583 100644 --- a/solver/llbsolver/provenance.go +++ b/solver/llbsolver/provenance.go @@ -398,18 +398,9 @@ func NewProvenanceCreator(ctx context.Context, cp *provenance.Capture, res solve return nil, err } - wref, ok := r.Sys().(*worker.WorkerRef) - if !ok { - return nil, errors.Errorf("invalid worker ref %T", r.Sys()) - } - addLayers = func() error { e := newCacheExporter() - if wref.ImmutableRef != nil { - ctx = withDescHandlerCacheOpts(ctx, wref.ImmutableRef) - } - if _, err := r.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{ ResolveRemotes: resolveRemotes, Mode: solver.CacheExportModeRemoteOnly, diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 15a1f0911372..e8f3d8c4ff8a 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -691,8 +691,6 @@ func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j * err = inBuilderContext(ctx, j, exp.Exporter.Name(), id, func(ctx context.Context, _ session.Group) error { prepareDone := progress.OneOff(ctx, "preparing build cache for export") if err := result.EachRef(cached, inp, func(res solver.CachedResult, ref cache.ImmutableRef) error { - ctx = withDescHandlerCacheOpts(ctx, ref) - // Configure compression compressionConfig := exp.Config().Compression @@ -998,7 +996,6 @@ func inlineCache(ctx context.Context, ie inlineCacheExporter, res solver.CachedR digests = append(digests, desc.Digest) } - ctx = withDescHandlerCacheOpts(ctx, workerRef.ImmutableRef) refCfg := cacheconfig.RefConfig{Compression: compressionopt} if _, err := res.CacheKeys()[0].Exporter.ExportTo(ctx, ie, solver.CacheExportOpt{ ResolveRemotes: workerRefResolver(refCfg, true, g), // load as many compression blobs as possible @@ -1011,20 +1008,6 @@ func inlineCache(ctx context.Context, ie inlineCacheExporter, res solver.CachedR return ie.ExportForLayers(ctx, digests) } -func withDescHandlerCacheOpts(ctx context.Context, ref cache.ImmutableRef) context.Context { - return solver.WithCacheOptGetter(ctx, func(includeAncestors bool, keys ...interface{}) map[interface{}]interface{} { - vals := make(map[interface{}]interface{}) - for _, k := range keys { - if key, ok := k.(cache.DescHandlerKey); ok { - if handler := ref.DescHandler(digest.Digest(key)); handler != nil { - vals[k] = handler - } - } - } - return vals - }) -} - func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error { if err := s.history.Status(ctx, id, statusChan); err != nil { if !errors.Is(err, os.ErrNotExist) { From 30e17433e342b7237a462a0b92f21c53285e06a9 Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Fri, 29 Nov 2024 18:15:06 -0800 Subject: [PATCH 3/7] mountmanager: handle lazy blob ref loading in cache mounts Before this change, if a cache mount had base layers from a ref and those layers were lazy, you could hit missing blob errors when trying to reload an existing mutable ref for the cache mount. It's possible to have lazy refs in the base layers when blobs get deduped by chainID. The fix is just to handle the lazy blob error and reload with descHandlers set. Signed-off-by: Erik Sipsma --- client/client_test.go | 104 ++++++++++++++++++++++++++++++- solver/llbsolver/mounts/mount.go | 14 ++++- 2 files changed, 114 insertions(+), 4 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 018e680b8c5a..464c59ccbf0b 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -222,7 +222,8 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){ testLayerLimitOnMounts, testFrontendVerifyPlatforms, testRunValidExitCodes, - testSameChainIDWithLazyBlobs, + testSameChainIDWithLazyBlobsCacheExport, + testSameChainIDWithLazyBlobsCacheMountBase, } func TestIntegration(t *testing.T) { @@ -10897,7 +10898,7 @@ func testRunValidExitCodes(t *testing.T, sb integration.Sandbox) { require.ErrorContains(t, err, "exit code: 0") } -func testSameChainIDWithLazyBlobs(t *testing.T, sb integration.Sandbox) { +func testSameChainIDWithLazyBlobsCacheExport(t *testing.T, sb integration.Sandbox) { workers.CheckFeatureCompat(t, sb, workers.FeatureCacheExport, workers.FeatureCacheImport, @@ -10936,7 +10937,7 @@ func testSameChainIDWithLazyBlobs(t *testing.T, sb integration.Sandbox) { require.NoError(t, err) // push the base busybox image plus an extra layer, ensuring it uses zstd - // the extra layer allows us to avoid edge merge later + // the extra layer allows us to avoid edge-merge/cache-load later def, err = llb.Image("busybox:latest"). Run(llb.Shlex(`touch /foo`)).Root(). Marshal(sb.Context()) @@ -11037,3 +11038,100 @@ func testSameChainIDWithLazyBlobs(t *testing.T, sb integration.Sandbox) { require.Equal(t, string(rand1), string(rand2)) } + +func testSameChainIDWithLazyBlobsCacheMountBase(t *testing.T, sb integration.Sandbox) { + workers.CheckFeatureCompat(t, sb, + workers.FeatureCacheExport, + workers.FeatureCacheImport, + workers.FeatureCacheBackendRegistry, + ) + + c, err := New(sb.Context(), sb.Address()) + require.NoError(t, err) + defer c.Close() + + registry, err := sb.NewRegistry() + if errors.Is(err, integration.ErrRequirements) { + t.Skip(err.Error()) + } + require.NoError(t, err) + + // push the base busybox image, ensuring it uses gzip + + def, err := llb.Image("busybox:latest"). + Marshal(sb.Context()) + require.NoError(t, err) + busyboxGzipRef := registry + "/buildkit/busyboxgzip:latest" + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterImage, + Attrs: map[string]string{ + "name": busyboxGzipRef, + "push": "true", + "compression": "gzip", + "force-compression": "true", + }, + }, + }, + }, nil) + require.NoError(t, err) + + // push the base busybox image plus an extra layer, ensuring it uses zstd + // the extra layer allows us to avoid edge-merge/cache-load later + def, err = llb.Image("busybox:latest"). + Run(llb.Shlex(`touch /foo`)).Root(). + Marshal(sb.Context()) + require.NoError(t, err) + busyboxZstdRef := registry + "/buildkit/busyboxzstd:latest" + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterImage, + Attrs: map[string]string{ + "name": busyboxZstdRef, + "push": "true", + "compression": "zstd", + "force-compression": "true", + }, + }, + }, + }, nil) + require.NoError(t, err) + + ensurePruneAll(t, c, sb) + + // create non-lazy cache refs for the zstd image + def, err = llb.Image(busyboxZstdRef). + Run(llb.Shlex(`true`)).Root(). + Marshal(sb.Context()) + require.NoError(t, err) + _, err = c.Solve(sb.Context(), def, SolveOpt{}, nil) + require.NoError(t, err) + + // use the gzip image as a cache mount base, the cache ref will be deduped by + // chainID with the zstd layers made in the previous solve + def, err = llb.Image(busyboxZstdRef).Run( + llb.Shlex(`touch /mnt/bar`), + llb.AddMount("/mnt", + llb.Image(busyboxGzipRef), + llb.AsPersistentCacheDir("idc", llb.CacheMountShared), + ), + ).Root().Marshal(sb.Context()) + require.NoError(t, err) + _, err = c.Solve(sb.Context(), def, SolveOpt{}, nil) + require.NoError(t, err) + + // try to re-use the cache mount from before, ensure we successfully get + // the same one writen to in previous step + def, err = llb.Image(busyboxZstdRef).Run( + llb.Shlex(`stat /mnt/bar`), + llb.AddMount("/mnt", + llb.Image(busyboxGzipRef), + llb.AsPersistentCacheDir("idc", llb.CacheMountShared), + ), + ).Root().Marshal(sb.Context()) + require.NoError(t, err) + _, err = c.Solve(sb.Context(), def, SolveOpt{}, nil) + require.NoError(t, err) +} diff --git a/solver/llbsolver/mounts/mount.go b/solver/llbsolver/mounts/mount.go index c95fe4966b61..bb15c8093ff8 100644 --- a/solver/llbsolver/mounts/mount.go +++ b/solver/llbsolver/mounts/mount.go @@ -24,6 +24,7 @@ import ( "github.com/moby/buildkit/util/grpcerrors" "github.com/moby/locker" "github.com/moby/sys/userns" + digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "google.golang.org/grpc/codes" ) @@ -123,7 +124,18 @@ func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string, } locked := false for _, si := range sis { - if mRef, err := g.cm.GetMutable(ctx, si.ID()); err == nil { + mRef, err := g.cm.GetMutable(ctx, si.ID()) + var needsRemoteProviders cache.NeedsRemoteProviderError + if errors.As(err, &needsRemoteProviders) && ref != nil { + descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler)) + for _, dgst := range needsRemoteProviders { + if handler := ref.DescHandler(dgst); handler != nil { + descHandlers[dgst] = handler + } + } + mRef, err = g.cm.GetMutable(ctx, si.ID(), descHandlers) + } + if err == nil { bklog.G(ctx).Debugf("reusing ref for cache dir %q: %s", id, mRef.ID()) return mRef, nil } else if errors.Is(err, cache.ErrLocked) { From 8f16a380c008e8b903fa7342a6f77dd7cb8899dc Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Thu, 5 Dec 2024 15:46:01 -0800 Subject: [PATCH 4/7] solver: allow LoadCache to set CacheOpts For lazy remote cache cases, we figure out the descriptor handlers to use during loading of cache rather than during a CacheMap operation. In order to make those descHandlers available as CacheOpts we need to plumb them through to the shared op and allow withAncestorCacheOpts to check those in addition to the CacheOpts from CacheMaps. This allows loading of lazy refs during cache export when there are refs resolved with cache imports. Signed-off-by: Erik Sipsma --- cache/manager.go | 4 +- cache/opts.go | 13 +++++- cache/refs.go | 15 +++++++ client/client_test.go | 4 +- solver/cacheopts.go | 76 +++++++++++++++++++------------- solver/exporter.go | 2 +- solver/jobs.go | 19 +++++--- solver/llbsolver/mounts/mount.go | 6 +-- solver/scheduler_test.go | 1 + solver/types.go | 1 + worker/base/worker.go | 4 +- worker/result.go | 10 +++++ 12 files changed, 107 insertions(+), 48 deletions(-) diff --git a/cache/manager.go b/cache/manager.go index c1b3d52ec1d6..c818d6e53083 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -143,7 +143,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor, descHandlers := descHandlersOf(opts...) if desc.Digest != "" && (descHandlers == nil || descHandlers[desc.Digest] == nil) { if _, err := cm.ContentStore.Info(ctx, desc.Digest); errors.Is(err, cerrdefs.ErrNotFound) { - return nil, NeedsRemoteProviderError([]digest.Digest{desc.Digest}) + return nil, NeedsRemoteProviderError([]DigestDescriptionPair{{Digest: desc.Digest}}) } else if err != nil { return nil, err } @@ -396,7 +396,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt if isLazy, err := cr.isLazy(ctx); err != nil { return err } else if isLazy && dhs[blob] == nil { - missing = append(missing, blob) + missing = append(missing, DigestDescriptionPair{Digest: blob, Description: cr.GetDescription()}) } return nil }); err != nil { diff --git a/cache/opts.go b/cache/opts.go index 1f1db6ca6105..df94083357b7 100644 --- a/cache/opts.go +++ b/cache/opts.go @@ -30,10 +30,19 @@ func descHandlersOf(opts ...RefOption) DescHandlers { type DescHandlerKey digest.Digest -type NeedsRemoteProviderError []digest.Digest //nolint:errname +type NeedsRemoteProviderError []DigestDescriptionPair //nolint:errname + +type DigestDescriptionPair struct { + Digest digest.Digest + Description string +} + +func (d DigestDescriptionPair) String() string { + return fmt.Sprintf("%s: %s", d.Digest, d.Description) +} func (m NeedsRemoteProviderError) Error() string { - return fmt.Sprintf("missing descriptor handlers for lazy blobs %+v", []digest.Digest(m)) + return fmt.Sprintf("missing descriptor handlers for lazy blobs %+v", []DigestDescriptionPair(m)) } type Unlazy session.Group diff --git a/cache/refs.go b/cache/refs.go index 0cf30737abc0..235fa0c66c44 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -49,6 +49,7 @@ type Ref interface { RefMetadata Release(context.Context) error IdentityMapping() *idtools.IdentityMapping + DescHandlers() DescHandlers DescHandler(digest.Digest) *DescHandler } @@ -612,6 +613,13 @@ func (sr *immutableRef) LayerChain() RefList { return l } +func (sr *immutableRef) DescHandlers() DescHandlers { + // clone to prevent mutation of internal state + dhs := make(DescHandlers) + maps.Copy(dhs, sr.descHandlers) + return dhs +} + func (sr *immutableRef) DescHandler(dgst digest.Digest) *DescHandler { return sr.descHandlers[dgst] } @@ -640,6 +648,13 @@ func (sr *mutableRef) traceLogFields() logrus.Fields { return m } +func (sr *mutableRef) DescHandlers() DescHandlers { + // clone to prevent mutation of internal state + dhs := make(DescHandlers) + maps.Copy(dhs, sr.descHandlers) + return dhs +} + func (sr *mutableRef) DescHandler(dgst digest.Digest) *DescHandler { return sr.descHandlers[dgst] } diff --git a/client/client_test.go b/client/client_test.go index 464c59ccbf0b..0c1f76060894 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -10899,6 +10899,7 @@ func testRunValidExitCodes(t *testing.T, sb integration.Sandbox) { } func testSameChainIDWithLazyBlobsCacheExport(t *testing.T, sb integration.Sandbox) { + requiresLinux(t) workers.CheckFeatureCompat(t, sb, workers.FeatureCacheExport, workers.FeatureCacheImport, @@ -11040,6 +11041,7 @@ func testSameChainIDWithLazyBlobsCacheExport(t *testing.T, sb integration.Sandbo } func testSameChainIDWithLazyBlobsCacheMountBase(t *testing.T, sb integration.Sandbox) { + requiresLinux(t) workers.CheckFeatureCompat(t, sb, workers.FeatureCacheExport, workers.FeatureCacheImport, @@ -11123,7 +11125,7 @@ func testSameChainIDWithLazyBlobsCacheMountBase(t *testing.T, sb integration.San require.NoError(t, err) // try to re-use the cache mount from before, ensure we successfully get - // the same one writen to in previous step + // the same one written to in previous step def, err = llb.Image(busyboxZstdRef).Run( llb.Shlex(`stat /mnt/bar`), llb.AddMount("/mnt", diff --git a/solver/cacheopts.go b/solver/cacheopts.go index 4b661471ed82..8fd3c17d27c3 100644 --- a/solver/cacheopts.go +++ b/solver/cacheopts.go @@ -28,69 +28,85 @@ func WithCacheOptGetter(ctx context.Context, getter func(includeAncestors bool, return context.WithValue(ctx, cacheOptGetterKey{}, getter) } -func withAncestorCacheOpts(ctx context.Context, start *state) context.Context { +func withAncestorCacheOpts(ctx context.Context, start *sharedOp) context.Context { return WithCacheOptGetter(ctx, func(includeAncestors bool, keys ...interface{}) map[interface{}]interface{} { keySet := make(map[interface{}]struct{}) for _, k := range keys { keySet[k] = struct{}{} } values := make(map[interface{}]interface{}) - walkAncestors(ctx, start, func(st *state) bool { - if st.clientVertex.Error != "" { + walkAncestors(ctx, start, func(op *sharedOp) bool { + if op.st.clientVertex.Error != "" { // don't use values from cancelled or otherwise error'd vertexes return false } - for _, res := range st.op.cacheRes { - if res.Opts == nil { - continue + + for k := range keySet { + var v any + var ok bool + + // check opts set from CacheMap operation + for _, res := range op.cacheRes { + if res.Opts == nil { + continue + } + v, ok = res.Opts[k] + if ok { + break + } } - for k := range keySet { - if v, ok := res.Opts[k]; ok { - values[k] = v - delete(keySet, k) - if len(keySet) == 0 { - return true - } + + // check opts set during cache load + if !ok && op.loadCacheOpts != nil { + v, ok = op.loadCacheOpts[k] + } + + if ok { + values[k] = v + delete(keySet, k) + if len(keySet) == 0 { + return true } } } - return !includeAncestors // stop after the first state unless includeAncestors is true + + return !includeAncestors // stop after the first op unless includeAncestors is true }) return values }) } -func walkAncestors(ctx context.Context, start *state, f func(*state) bool) { - stack := [][]*state{{start}} +func walkAncestors(ctx context.Context, start *sharedOp, f func(*sharedOp) bool) { + stack := [][]*sharedOp{{start}} cache := make(map[digest.Digest]struct{}) for len(stack) > 0 { - sts := stack[len(stack)-1] - if len(sts) == 0 { + ops := stack[len(stack)-1] + if len(ops) == 0 { stack = stack[:len(stack)-1] continue } - st := sts[len(sts)-1] - stack[len(stack)-1] = sts[:len(sts)-1] - if st == nil { + op := ops[len(ops)-1] + stack[len(stack)-1] = ops[:len(ops)-1] + if op == nil { continue } - if _, ok := cache[st.origDigest]; ok { + if _, ok := cache[op.st.origDigest]; ok { continue } - cache[st.origDigest] = struct{}{} - if shouldStop := f(st); shouldStop { + cache[op.st.origDigest] = struct{}{} + if shouldStop := f(op); shouldStop { return } - stack = append(stack, []*state{}) - for _, parentDgst := range st.clientVertex.Inputs { - st.solver.mu.RLock() - parent := st.solver.actives[parentDgst] - st.solver.mu.RUnlock() + stack = append(stack, []*sharedOp{}) + for _, parentDgst := range op.st.clientVertex.Inputs { + op.st.solver.mu.RLock() + parent := op.st.solver.actives[parentDgst] + op.st.solver.mu.RUnlock() if parent == nil { bklog.G(ctx).Warnf("parent %q not found in active job list during cache opt search", parentDgst) continue } - stack[len(stack)-1] = append(stack[len(stack)-1], parent) + stack[len(stack)-1] = append(stack[len(stack)-1], parent.op) } } } diff --git a/solver/exporter.go b/solver/exporter.go index 28f8b277f8b0..b3776dd9687c 100644 --- a/solver/exporter.go +++ b/solver/exporter.go @@ -120,7 +120,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach if e.edge != nil { op, ok := e.edge.op.(*sharedOp) if ok && op != nil && op.st != nil { - ctx = withAncestorCacheOpts(ctx, op.st) + ctx = withAncestorCacheOpts(ctx, op) } } diff --git a/solver/jobs.go b/solver/jobs.go index d8cce3abdcf7..fed10d692dd6 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -869,6 +869,8 @@ type sharedOp struct { cacheDone bool cacheErr error + loadCacheOpts CacheOpts + slowMu sync.Mutex slowCacheRes map[Index]digest.Digest slowCacheErr map[Index]error @@ -879,18 +881,18 @@ func (s *sharedOp) IgnoreCache() bool { } func (s *sharedOp) Cache() CacheManager { - return &cacheWithCacheOpts{s.st.combinedCacheManager(), s.st} + return &cacheWithCacheOpts{s.st.combinedCacheManager(), s} } type cacheWithCacheOpts struct { CacheManager - st *state + op *sharedOp } func (c cacheWithCacheOpts) Records(ctx context.Context, ck *CacheKey) ([]*CacheRecord, error) { // Allow Records accessing to cache opts through ctx. This enable to use remote provider // during checking the cache existence. - return c.CacheManager.Records(withAncestorCacheOpts(ctx, c.st), ck) + return c.CacheManager.Records(withAncestorCacheOpts(ctx, c.op), ck) } func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, error) { @@ -901,9 +903,12 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err // no cache hit. start evaluating the node span, ctx := tracing.StartSpan(ctx, "load cache: "+s.st.vtx.Name(), trace.WithAttributes(attribute.String("vertex", s.st.vtx.Digest().String()))) notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, true) - res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s.st), rec) + res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s), rec) tracing.FinishWithError(span, err) notifyCompleted(err, true) + if err == nil { + s.loadCacheOpts = res.CacheOpts() + } return res, err } @@ -952,7 +957,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF if s.st.mspan.Span != nil { ctx = trace.ContextWithSpan(ctx, s.st.mspan) } - key, err = f(withAncestorCacheOpts(ctx, s.st), res, s.st) + key, err = f(withAncestorCacheOpts(ctx, s), res, s.st) } if err != nil { select { @@ -1008,7 +1013,7 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp, if s.st.mspan.Span != nil { ctx = trace.ContextWithSpan(ctx, s.st.mspan) } - ctx = withAncestorCacheOpts(ctx, s.st) + ctx = withAncestorCacheOpts(ctx, s) if len(s.st.vtx.Inputs()) == 0 { // no cache hit. start evaluating the node span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name(), trace.WithAttributes(attribute.String("vertex", s.st.vtx.Digest().String()))) @@ -1087,7 +1092,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, if s.st.mspan.Span != nil { ctx = trace.ContextWithSpan(ctx, s.st.mspan) } - ctx = withAncestorCacheOpts(ctx, s.st) + ctx = withAncestorCacheOpts(ctx, s) // no cache hit. start evaluating the node span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name(), trace.WithAttributes(attribute.String("vertex", s.st.vtx.Digest().String()))) diff --git a/solver/llbsolver/mounts/mount.go b/solver/llbsolver/mounts/mount.go index bb15c8093ff8..a0c5cae97b35 100644 --- a/solver/llbsolver/mounts/mount.go +++ b/solver/llbsolver/mounts/mount.go @@ -128,9 +128,9 @@ func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string, var needsRemoteProviders cache.NeedsRemoteProviderError if errors.As(err, &needsRemoteProviders) && ref != nil { descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler)) - for _, dgst := range needsRemoteProviders { - if handler := ref.DescHandler(dgst); handler != nil { - descHandlers[dgst] = handler + for _, dgstDescPair := range needsRemoteProviders { + if handler := ref.DescHandler(dgstDescPair.Digest); handler != nil { + descHandlers[dgstDescPair.Digest] = handler } } mRef, err = g.cm.GetMutable(ctx, si.ID(), descHandlers) diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 2d6f4c6357f9..6f082eef65cf 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3947,6 +3947,7 @@ func (r *dummyResult) ID() string { return r.id } func (r *dummyResult) Release(context.Context) error { return nil } func (r *dummyResult) Sys() interface{} { return r } func (r *dummyResult) Clone() Result { return r } +func (r *dummyResult) CacheOpts() CacheOpts { return nil } func testOpResolver(v Vertex, b Builder) (Op, error) { if op, ok := v.Sys().(Op); ok { diff --git a/solver/types.go b/solver/types.go index 56a53d0fd912..16b99e3d03eb 100644 --- a/solver/types.go +++ b/solver/types.go @@ -64,6 +64,7 @@ type Result interface { Release(context.Context) error Sys() interface{} Clone() Result + CacheOpts() CacheOpts } // CachedResult is a result connected with its cache key diff --git a/worker/base/worker.go b/worker/base/worker.go index 3f3a77c8be71..2581b7c5c8f0 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -300,8 +300,8 @@ func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.Imm if errors.As(err, &needsRemoteProviders) { if optGetter := solver.CacheOptGetterOf(ctx); optGetter != nil { var keys []interface{} - for _, dgst := range needsRemoteProviders { - keys = append(keys, cache.DescHandlerKey(dgst)) + for _, dgstDescPair := range needsRemoteProviders { + keys = append(keys, cache.DescHandlerKey(dgstDescPair.Digest)) } descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler)) for k, v := range optGetter(true, keys...) { diff --git a/worker/result.go b/worker/result.go index 26054cf8c206..5242ba5df02f 100644 --- a/worker/result.go +++ b/worker/result.go @@ -33,6 +33,16 @@ func (wr *WorkerRef) Release(ctx context.Context) error { return wr.ImmutableRef.Release(ctx) } +func (wr *WorkerRef) CacheOpts() solver.CacheOpts { + opts := solver.CacheOpts{} + if wr.ImmutableRef != nil { + for k, v := range wr.ImmutableRef.DescHandlers() { + opts[cache.DescHandlerKey(k)] = v + } + } + return opts +} + // GetRemotes method abstracts ImmutableRef's GetRemotes to allow a Worker to override. // This is needed for moby integration. // Use this method instead of calling ImmutableRef.GetRemotes() directly. From fc4ecadab03a2d9308f7ecf6430f07d7d66ca06d Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Thu, 5 Dec 2024 16:00:29 -0800 Subject: [PATCH 5/7] remove redundant activeOp interface Signed-off-by: Erik Sipsma --- solver/edge.go | 4 ++-- solver/exporter.go | 3 +-- solver/jobs.go | 9 --------- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/solver/edge.go b/solver/edge.go index 27aa54ec0bda..da7de7d7858d 100644 --- a/solver/edge.go +++ b/solver/edge.go @@ -24,7 +24,7 @@ func (t edgeStatusType) String() string { return []string{"initial", "cache-fast", "cache-slow", "complete"}[t] } -func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge { +func newEdge(ed Edge, op *sharedOp, index *edgeIndex) *edge { e := &edge{ edge: ed, op: op, @@ -40,7 +40,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge { type edge struct { edge Edge - op activeOp + op *sharedOp edgeState depRequests map[pipeReceiver]*dep diff --git a/solver/exporter.go b/solver/exporter.go index b3776dd9687c..d03f33ca2dd5 100644 --- a/solver/exporter.go +++ b/solver/exporter.go @@ -118,8 +118,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach } if e.edge != nil { - op, ok := e.edge.op.(*sharedOp) - if ok && op != nil && op.st != nil { + if op := e.edge.op; op != nil && op.st != nil { ctx = withAncestorCacheOpts(ctx, op) } } diff --git a/solver/jobs.go b/solver/jobs.go index fed10d692dd6..5aa559c6371e 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -825,15 +825,6 @@ type cacheMapResp struct { complete bool } -type activeOp interface { - CacheMap(context.Context, int) (*cacheMapResp, error) - LoadCache(ctx context.Context, rec *CacheRecord) (Result, error) - Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, err error) - IgnoreCache() bool - Cache() CacheManager - CalcSlowCache(context.Context, Index, PreprocessFunc, ResultBasedCacheFunc, Result) (digest.Digest, error) -} - func newSharedOp(resolver ResolveOpFunc, st *state) *sharedOp { so := &sharedOp{ resolver: resolver, From 867e8fd85ea43da509ce3d3bc56f841d63720f4d Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Thu, 5 Dec 2024 16:35:33 -0800 Subject: [PATCH 6/7] provide DescHandlers to GetMutable always Signed-off-by: Erik Sipsma --- solver/llbsolver/mounts/mount.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/solver/llbsolver/mounts/mount.go b/solver/llbsolver/mounts/mount.go index a0c5cae97b35..b58073651a03 100644 --- a/solver/llbsolver/mounts/mount.go +++ b/solver/llbsolver/mounts/mount.go @@ -24,7 +24,6 @@ import ( "github.com/moby/buildkit/util/grpcerrors" "github.com/moby/locker" "github.com/moby/sys/userns" - digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "google.golang.org/grpc/codes" ) @@ -124,17 +123,11 @@ func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string, } locked := false for _, si := range sis { - mRef, err := g.cm.GetMutable(ctx, si.ID()) - var needsRemoteProviders cache.NeedsRemoteProviderError - if errors.As(err, &needsRemoteProviders) && ref != nil { - descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler)) - for _, dgstDescPair := range needsRemoteProviders { - if handler := ref.DescHandler(dgstDescPair.Digest); handler != nil { - descHandlers[dgstDescPair.Digest] = handler - } - } - mRef, err = g.cm.GetMutable(ctx, si.ID(), descHandlers) + var opts []cache.RefOption + if ref != nil { + opts = append(opts, ref.DescHandlers()) } + mRef, err := g.cm.GetMutable(ctx, si.ID(), opts...) if err == nil { bklog.G(ctx).Debugf("reusing ref for cache dir %q: %s", id, mRef.ID()) return mRef, nil From c00304fd39b2fa444cd67bad3f3ddf05c363644f Mon Sep 17 00:00:00 2001 From: Kohei Tokunaga Date: Fri, 13 Dec 2024 23:05:37 +0900 Subject: [PATCH 7/7] Call callback only for unlazied contents Signed-off-by: Kohei Tokunaga --- cache/refs.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cache/refs.go b/cache/refs.go index 235fa0c66c44..ad5edf5c6b16 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -848,6 +848,11 @@ func getBlobWithCompression(ctx context.Context, cs content.Store, desc ocispecs } func walkBlob(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, f func(ocispecs.Descriptor) bool) error { + if _, err := cs.Info(ctx, desc.Digest); errors.Is(err, cerrdefs.ErrNotFound) { + return nil // this blob doesn't exist in the content store. Don't call the callback. + } else if err != nil { + return err + } if !f(desc) { return nil }