Skip to content

Commit

Permalink
Merge pull request #2203 from josephschorr/delete-lr1
Browse files Browse the repository at this point in the history
Delete LookupResources v1, ReachableResources and all helper code
  • Loading branch information
josephschorr authored Jan 10, 2025
2 parents 6609142 + 6eba72c commit 6b926cf
Show file tree
Hide file tree
Showing 30 changed files with 1,198 additions and 10,294 deletions.
183 changes: 15 additions & 168 deletions internal/dispatch/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@ type Dispatcher struct {
c cache.Cache[keys.DispatchCacheKey, any]
keyHandler keys.Handler

checkTotalCounter prometheus.Counter
checkFromCacheCounter prometheus.Counter
reachableResourcesTotalCounter prometheus.Counter
reachableResourcesFromCacheCounter prometheus.Counter
lookupResourcesTotalCounter prometheus.Counter
lookupResourcesFromCacheCounter prometheus.Counter
lookupSubjectsTotalCounter prometheus.Counter
lookupSubjectsFromCacheCounter prometheus.Counter
checkTotalCounter prometheus.Counter
checkFromCacheCounter prometheus.Counter
lookupResourcesTotalCounter prometheus.Counter
lookupResourcesFromCacheCounter prometheus.Counter
lookupSubjectsTotalCounter prometheus.Counter
lookupSubjectsFromCacheCounter prometheus.Counter
}

func DispatchTestCache(t testing.TB) cache.Cache[keys.DispatchCacheKey, any] {
Expand Down Expand Up @@ -82,17 +80,6 @@ func NewCachingDispatcher(cacheInst cache.Cache[keys.DispatchCacheKey, any], met
Name: "lookup_resources_from_cache_total",
})

reachableResourcesTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Name: "reachable_resources_total",
})
reachableResourcesFromCacheCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Name: "reachable_resources_from_cache_total",
})

lookupSubjectsTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Expand Down Expand Up @@ -121,14 +108,6 @@ func NewCachingDispatcher(cacheInst cache.Cache[keys.DispatchCacheKey, any], met
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}
err = prometheus.Register(reachableResourcesTotalCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}
err = prometheus.Register(reachableResourcesFromCacheCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}
err = prometheus.Register(lookupSubjectsTotalCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
Expand All @@ -144,17 +123,15 @@ func NewCachingDispatcher(cacheInst cache.Cache[keys.DispatchCacheKey, any], met
}

return &Dispatcher{
d: fakeDelegate{},
c: cacheInst,
keyHandler: keyHandler,
checkTotalCounter: checkTotalCounter,
checkFromCacheCounter: checkFromCacheCounter,
reachableResourcesTotalCounter: reachableResourcesTotalCounter,
reachableResourcesFromCacheCounter: reachableResourcesFromCacheCounter,
lookupResourcesTotalCounter: lookupResourcesTotalCounter,
lookupResourcesFromCacheCounter: lookupResourcesFromCacheCounter,
lookupSubjectsTotalCounter: lookupSubjectsTotalCounter,
lookupSubjectsFromCacheCounter: lookupSubjectsFromCacheCounter,
d: fakeDelegate{},
c: cacheInst,
keyHandler: keyHandler,
checkTotalCounter: checkTotalCounter,
checkFromCacheCounter: checkFromCacheCounter,
lookupResourcesTotalCounter: lookupResourcesTotalCounter,
lookupResourcesFromCacheCounter: lookupResourcesFromCacheCounter,
lookupSubjectsTotalCounter: lookupSubjectsTotalCounter,
lookupSubjectsFromCacheCounter: lookupSubjectsFromCacheCounter,
}, nil
}

Expand Down Expand Up @@ -232,69 +209,6 @@ func (cd *Dispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchExpand
return resp, err
}

// DispatchReachableResources implements dispatch.ReachableResources interface.
func (cd *Dispatcher) DispatchReachableResources(req *v1.DispatchReachableResourcesRequest, stream dispatch.ReachableResourcesStream) error {
cd.reachableResourcesTotalCounter.Inc()

requestKey, err := cd.keyHandler.ReachableResourcesCacheKey(stream.Context(), req)
if err != nil {
return err
}

if cachedResultRaw, found := cd.c.Get(requestKey); found {
cd.reachableResourcesFromCacheCounter.Inc()
for _, slice := range cachedResultRaw.([][]byte) {
var response v1.DispatchReachableResourcesResponse
if err := response.UnmarshalVT(slice); err != nil {
return fmt.Errorf("could not publish cached reachable resources result: %w", err)
}
if err := stream.Publish(&response); err != nil {
return fmt.Errorf("could not publish cached reachable resources result: %w", err)
}
}

return nil
}

var (
mu sync.Mutex
toCacheResults [][]byte
)
wrapped := &dispatch.WrappedDispatchStream[*v1.DispatchReachableResourcesResponse]{
Stream: stream,
Ctx: stream.Context(),
Processor: func(result *v1.DispatchReachableResourcesResponse) (*v1.DispatchReachableResourcesResponse, bool, error) {
adjustedResult := result.CloneVT()
adjustedResult.Metadata.CachedDispatchCount = adjustedResult.Metadata.DispatchCount
adjustedResult.Metadata.DispatchCount = 0
adjustedResult.Metadata.DebugInfo = nil

adjustedBytes, err := adjustedResult.MarshalVT()
if err != nil {
return nil, false, err
}

mu.Lock()
toCacheResults = append(toCacheResults, adjustedBytes)
mu.Unlock()

return result, true, nil
},
}

if err := cd.d.DispatchReachableResources(req, wrapped); err != nil {
return err
}

var size int64
for _, slice := range toCacheResults {
size += sliceSize(slice)
}

cd.c.Set(requestKey, toCacheResults, size)
return nil
}

func sliceSize(xs []byte) int64 {
// Slice Header + Slice Contents
return int64(int(unsafe.Sizeof(xs)) + len(xs))
Expand Down Expand Up @@ -364,71 +278,6 @@ func (cd *Dispatcher) DispatchLookupResources2(req *v1.DispatchLookupResources2R
return nil
}

// DispatchLookupResources implements dispatch.LookupResources interface.
func (cd *Dispatcher) DispatchLookupResources(req *v1.DispatchLookupResourcesRequest, stream dispatch.LookupResourcesStream) error {
cd.lookupResourcesTotalCounter.Inc()

requestKey, err := cd.keyHandler.LookupResourcesCacheKey(stream.Context(), req)
if err != nil {
return err
}

if cachedResultRaw, found := cd.c.Get(requestKey); found {
cd.lookupResourcesFromCacheCounter.Inc()
for _, slice := range cachedResultRaw.([][]byte) {
var response v1.DispatchLookupResourcesResponse
if err := response.UnmarshalVT(slice); err != nil {
return err
}
if err := stream.Publish(&response); err != nil {
// don't wrap error with additional context, as it may be a grpc status.Status.
// status.FromError() is unable to unwrap status.Status values, and as a consequence
// the Dispatcher wouldn't properly propagate the gRPC error code
return err
}
}
return nil
}

var (
mu sync.Mutex
toCacheResults [][]byte
)
wrapped := &dispatch.WrappedDispatchStream[*v1.DispatchLookupResourcesResponse]{
Stream: stream,
Ctx: stream.Context(),
Processor: func(result *v1.DispatchLookupResourcesResponse) (*v1.DispatchLookupResourcesResponse, bool, error) {
adjustedResult := result.CloneVT()
adjustedResult.Metadata.CachedDispatchCount = adjustedResult.Metadata.DispatchCount
adjustedResult.Metadata.DispatchCount = 0
adjustedResult.Metadata.DebugInfo = nil

adjustedBytes, err := adjustedResult.MarshalVT()
if err != nil {
return &v1.DispatchLookupResourcesResponse{Metadata: &v1.ResponseMeta{}}, false, err
}

mu.Lock()
toCacheResults = append(toCacheResults, adjustedBytes)
mu.Unlock()

return result, true, nil
},
}

if err := cd.d.DispatchLookupResources(req, wrapped); err != nil {
return err
}

var size int64
for _, slice := range toCacheResults {
size += sliceSize(slice)
}

cd.c.Set(requestKey, toCacheResults, size)
return nil
}

// DispatchLookupSubjects implements dispatch.LookupSubjects interface.
func (cd *Dispatcher) DispatchLookupSubjects(req *v1.DispatchLookupSubjectsRequest, stream dispatch.LookupSubjectsStream) error {
cd.lookupSubjectsTotalCounter.Inc()
Expand Down Expand Up @@ -497,8 +346,6 @@ func (cd *Dispatcher) DispatchLookupSubjects(req *v1.DispatchLookupSubjectsReque
func (cd *Dispatcher) Close() error {
prometheus.Unregister(cd.checkTotalCounter)
prometheus.Unregister(cd.checkFromCacheCounter)
prometheus.Unregister(cd.reachableResourcesTotalCounter)
prometheus.Unregister(cd.reachableResourcesFromCacheCounter)
prometheus.Unregister(cd.lookupResourcesTotalCounter)
prometheus.Unregister(cd.lookupResourcesFromCacheCounter)
prometheus.Unregister(cd.lookupSubjectsFromCacheCounter)
Expand Down
8 changes: 0 additions & 8 deletions internal/dispatch/caching/cachingdispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,6 @@ func (ddm delegateDispatchMock) DispatchExpand(_ context.Context, _ *v1.Dispatch
return &v1.DispatchExpandResponse{}, nil
}

func (ddm delegateDispatchMock) DispatchReachableResources(_ *v1.DispatchReachableResourcesRequest, _ dispatch.ReachableResourcesStream) error {
return nil
}

func (ddm delegateDispatchMock) DispatchLookupResources(_ *v1.DispatchLookupResourcesRequest, _ dispatch.LookupResourcesStream) error {
return nil
}

func (ddm delegateDispatchMock) DispatchLookupResources2(_ *v1.DispatchLookupResources2Request, _ dispatch.LookupResources2Stream) error {
return nil
}
Expand Down
8 changes: 0 additions & 8 deletions internal/dispatch/caching/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ func (fd fakeDelegate) DispatchExpand(_ context.Context, _ *v1.DispatchExpandReq
return &v1.DispatchExpandResponse{}, spiceerrors.MustBugf(errMessage)
}

func (fd fakeDelegate) DispatchReachableResources(_ *v1.DispatchReachableResourcesRequest, _ dispatch.ReachableResourcesStream) error {
return spiceerrors.MustBugf(errMessage)
}

func (fd fakeDelegate) DispatchLookupResources(_ *v1.DispatchLookupResourcesRequest, _ dispatch.LookupResourcesStream) error {
return spiceerrors.MustBugf(errMessage)
}

func (fd fakeDelegate) DispatchLookupResources2(_ *v1.DispatchLookupResources2Request, _ dispatch.LookupResources2Stream) error {
return spiceerrors.MustBugf(errMessage)
}
Expand Down
26 changes: 0 additions & 26 deletions internal/dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type ReadyState struct {
type Dispatcher interface {
Check
Expand
ReachableResources
LookupResources
LookupSubjects
LookupResources2

Expand All @@ -47,30 +45,6 @@ type Expand interface {
DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error)
}

// ReachableResourcesStream is an alias for the stream to which reachable resources will be written.
type ReachableResourcesStream = Stream[*v1.DispatchReachableResourcesResponse]

// ReachableResources interface describes just the methods required to dispatch reachable resources requests.
type ReachableResources interface {
// DispatchReachableResources submits a single reachable resources request, writing its results to the specified stream.
DispatchReachableResources(
req *v1.DispatchReachableResourcesRequest,
stream ReachableResourcesStream,
) error
}

// LookupResourcesStream is an alias for the stream to which found resources will be written.
type LookupResourcesStream = Stream[*v1.DispatchLookupResourcesResponse]

// LookupResources interface describes just the methods required to dispatch LookupResources requests.
type LookupResources interface {
// DispatchLookupResources submits a single lookup request and returns its result.
DispatchLookupResources(
req *v1.DispatchLookupResourcesRequest,
stream LookupResourcesStream,
) error
}

type LookupResources2Stream = Stream[*v1.DispatchLookupResources2Response]

type LookupResources2 interface {
Expand Down
5 changes: 4 additions & 1 deletion internal/dispatch/graph/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
"github.com/authzed/spicedb/pkg/tuple"
)

var ONR = tuple.ONR
var (
ONR = tuple.ONR
RR = tuple.RR
)

func TestSimpleCheck(t *testing.T) {
t.Parallel()
Expand Down
14 changes: 9 additions & 5 deletions internal/dispatch/graph/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stretchr/testify/require"
)

const veryLargeLimit = 1000000000

func TestDispatchChunking(t *testing.T) {
t.Parallel()
schema := `
Expand Down Expand Up @@ -55,14 +57,16 @@ func TestDispatchChunking(t *testing.T) {
}
})

t.Run("lookup-resources", func(t *testing.T) {
t.Run("lookup-resources2", func(t *testing.T) {
t.Parallel()

for _, tpl := range resources[:1] {
stream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResourcesResponse](ctx)
err := dispatcher.DispatchLookupResources(&v1.DispatchLookupResourcesRequest{
ObjectRelation: RR(tpl.Resource.ObjectType, "view").ToCoreRR(),
Subject: tuple.CoreONR(tpl.Subject.ObjectType, tpl.Subject.ObjectID, graph.Ellipsis),
stream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResources2Response](ctx)
err := dispatcher.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
ResourceRelation: RR(tpl.Resource.ObjectType, "view").ToCoreRR(),
SubjectRelation: RR(tpl.Subject.ObjectType, graph.Ellipsis).ToCoreRR(),
SubjectIds: []string{tpl.Subject.ObjectID},
TerminalSubject: tuple.CoreONR(tpl.Subject.ObjectType, tpl.Subject.ObjectID, graph.Ellipsis),
Metadata: &v1.ResolverMeta{
AtRevision: revision.String(),
DepthRemaining: 50,
Expand Down
Loading

0 comments on commit 6b926cf

Please sign in to comment.