Skip to content

Commit

Permalink
Delete LookupResources v1, ReachableResources and all helper code
Browse files Browse the repository at this point in the history
SpiceDB no longer dispatches to these methods, so we can remove them
  • Loading branch information
josephschorr committed Jan 10, 2025
1 parent e658772 commit 6eba72c
Show file tree
Hide file tree
Showing 30 changed files with 1,198 additions and 10,294 deletions.
183 changes: 15 additions & 168 deletions internal/dispatch/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@ type Dispatcher struct {
c cache.Cache[keys.DispatchCacheKey, any]
keyHandler keys.Handler

checkTotalCounter prometheus.Counter
checkFromCacheCounter prometheus.Counter
reachableResourcesTotalCounter prometheus.Counter
reachableResourcesFromCacheCounter prometheus.Counter
lookupResourcesTotalCounter prometheus.Counter
lookupResourcesFromCacheCounter prometheus.Counter
lookupSubjectsTotalCounter prometheus.Counter
lookupSubjectsFromCacheCounter prometheus.Counter
checkTotalCounter prometheus.Counter
checkFromCacheCounter prometheus.Counter
lookupResourcesTotalCounter prometheus.Counter
lookupResourcesFromCacheCounter prometheus.Counter
lookupSubjectsTotalCounter prometheus.Counter
lookupSubjectsFromCacheCounter prometheus.Counter
}

func DispatchTestCache(t testing.TB) cache.Cache[keys.DispatchCacheKey, any] {
Expand Down Expand Up @@ -82,17 +80,6 @@ func NewCachingDispatcher(cacheInst cache.Cache[keys.DispatchCacheKey, any], met
Name: "lookup_resources_from_cache_total",
})

reachableResourcesTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Name: "reachable_resources_total",
})
reachableResourcesFromCacheCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Name: "reachable_resources_from_cache_total",
})

lookupSubjectsTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Expand Down Expand Up @@ -121,14 +108,6 @@ func NewCachingDispatcher(cacheInst cache.Cache[keys.DispatchCacheKey, any], met
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}
err = prometheus.Register(reachableResourcesTotalCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}
err = prometheus.Register(reachableResourcesFromCacheCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}
err = prometheus.Register(lookupSubjectsTotalCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
Expand All @@ -144,17 +123,15 @@ func NewCachingDispatcher(cacheInst cache.Cache[keys.DispatchCacheKey, any], met
}

return &Dispatcher{
d: fakeDelegate{},
c: cacheInst,
keyHandler: keyHandler,
checkTotalCounter: checkTotalCounter,
checkFromCacheCounter: checkFromCacheCounter,
reachableResourcesTotalCounter: reachableResourcesTotalCounter,
reachableResourcesFromCacheCounter: reachableResourcesFromCacheCounter,
lookupResourcesTotalCounter: lookupResourcesTotalCounter,
lookupResourcesFromCacheCounter: lookupResourcesFromCacheCounter,
lookupSubjectsTotalCounter: lookupSubjectsTotalCounter,
lookupSubjectsFromCacheCounter: lookupSubjectsFromCacheCounter,
d: fakeDelegate{},
c: cacheInst,
keyHandler: keyHandler,
checkTotalCounter: checkTotalCounter,
checkFromCacheCounter: checkFromCacheCounter,
lookupResourcesTotalCounter: lookupResourcesTotalCounter,
lookupResourcesFromCacheCounter: lookupResourcesFromCacheCounter,
lookupSubjectsTotalCounter: lookupSubjectsTotalCounter,
lookupSubjectsFromCacheCounter: lookupSubjectsFromCacheCounter,
}, nil
}

Expand Down Expand Up @@ -232,69 +209,6 @@ func (cd *Dispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchExpand
return resp, err
}

// DispatchReachableResources implements dispatch.ReachableResources interface.
func (cd *Dispatcher) DispatchReachableResources(req *v1.DispatchReachableResourcesRequest, stream dispatch.ReachableResourcesStream) error {
cd.reachableResourcesTotalCounter.Inc()

requestKey, err := cd.keyHandler.ReachableResourcesCacheKey(stream.Context(), req)
if err != nil {
return err
}

if cachedResultRaw, found := cd.c.Get(requestKey); found {
cd.reachableResourcesFromCacheCounter.Inc()
for _, slice := range cachedResultRaw.([][]byte) {
var response v1.DispatchReachableResourcesResponse
if err := response.UnmarshalVT(slice); err != nil {
return fmt.Errorf("could not publish cached reachable resources result: %w", err)
}
if err := stream.Publish(&response); err != nil {
return fmt.Errorf("could not publish cached reachable resources result: %w", err)
}
}

return nil
}

var (
mu sync.Mutex
toCacheResults [][]byte
)
wrapped := &dispatch.WrappedDispatchStream[*v1.DispatchReachableResourcesResponse]{
Stream: stream,
Ctx: stream.Context(),
Processor: func(result *v1.DispatchReachableResourcesResponse) (*v1.DispatchReachableResourcesResponse, bool, error) {
adjustedResult := result.CloneVT()
adjustedResult.Metadata.CachedDispatchCount = adjustedResult.Metadata.DispatchCount
adjustedResult.Metadata.DispatchCount = 0
adjustedResult.Metadata.DebugInfo = nil

adjustedBytes, err := adjustedResult.MarshalVT()
if err != nil {
return nil, false, err
}

mu.Lock()
toCacheResults = append(toCacheResults, adjustedBytes)
mu.Unlock()

return result, true, nil
},
}

if err := cd.d.DispatchReachableResources(req, wrapped); err != nil {
return err
}

var size int64
for _, slice := range toCacheResults {
size += sliceSize(slice)
}

cd.c.Set(requestKey, toCacheResults, size)
return nil
}

func sliceSize(xs []byte) int64 {
// Slice Header + Slice Contents
return int64(int(unsafe.Sizeof(xs)) + len(xs))
Expand Down Expand Up @@ -364,71 +278,6 @@ func (cd *Dispatcher) DispatchLookupResources2(req *v1.DispatchLookupResources2R
return nil
}

// DispatchLookupResources implements dispatch.LookupResources interface.
func (cd *Dispatcher) DispatchLookupResources(req *v1.DispatchLookupResourcesRequest, stream dispatch.LookupResourcesStream) error {
cd.lookupResourcesTotalCounter.Inc()

requestKey, err := cd.keyHandler.LookupResourcesCacheKey(stream.Context(), req)
if err != nil {
return err
}

if cachedResultRaw, found := cd.c.Get(requestKey); found {
cd.lookupResourcesFromCacheCounter.Inc()
for _, slice := range cachedResultRaw.([][]byte) {
var response v1.DispatchLookupResourcesResponse
if err := response.UnmarshalVT(slice); err != nil {
return err
}
if err := stream.Publish(&response); err != nil {
// don't wrap error with additional context, as it may be a grpc status.Status.
// status.FromError() is unable to unwrap status.Status values, and as a consequence
// the Dispatcher wouldn't properly propagate the gRPC error code
return err
}
}
return nil
}

var (
mu sync.Mutex
toCacheResults [][]byte
)
wrapped := &dispatch.WrappedDispatchStream[*v1.DispatchLookupResourcesResponse]{
Stream: stream,
Ctx: stream.Context(),
Processor: func(result *v1.DispatchLookupResourcesResponse) (*v1.DispatchLookupResourcesResponse, bool, error) {
adjustedResult := result.CloneVT()
adjustedResult.Metadata.CachedDispatchCount = adjustedResult.Metadata.DispatchCount
adjustedResult.Metadata.DispatchCount = 0
adjustedResult.Metadata.DebugInfo = nil

adjustedBytes, err := adjustedResult.MarshalVT()
if err != nil {
return &v1.DispatchLookupResourcesResponse{Metadata: &v1.ResponseMeta{}}, false, err
}

mu.Lock()
toCacheResults = append(toCacheResults, adjustedBytes)
mu.Unlock()

return result, true, nil
},
}

if err := cd.d.DispatchLookupResources(req, wrapped); err != nil {
return err
}

var size int64
for _, slice := range toCacheResults {
size += sliceSize(slice)
}

cd.c.Set(requestKey, toCacheResults, size)
return nil
}

// DispatchLookupSubjects implements dispatch.LookupSubjects interface.
func (cd *Dispatcher) DispatchLookupSubjects(req *v1.DispatchLookupSubjectsRequest, stream dispatch.LookupSubjectsStream) error {
cd.lookupSubjectsTotalCounter.Inc()
Expand Down Expand Up @@ -497,8 +346,6 @@ func (cd *Dispatcher) DispatchLookupSubjects(req *v1.DispatchLookupSubjectsReque
func (cd *Dispatcher) Close() error {
prometheus.Unregister(cd.checkTotalCounter)
prometheus.Unregister(cd.checkFromCacheCounter)
prometheus.Unregister(cd.reachableResourcesTotalCounter)
prometheus.Unregister(cd.reachableResourcesFromCacheCounter)
prometheus.Unregister(cd.lookupResourcesTotalCounter)
prometheus.Unregister(cd.lookupResourcesFromCacheCounter)
prometheus.Unregister(cd.lookupSubjectsFromCacheCounter)
Expand Down
8 changes: 0 additions & 8 deletions internal/dispatch/caching/cachingdispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,6 @@ func (ddm delegateDispatchMock) DispatchExpand(_ context.Context, _ *v1.Dispatch
return &v1.DispatchExpandResponse{}, nil
}

func (ddm delegateDispatchMock) DispatchReachableResources(_ *v1.DispatchReachableResourcesRequest, _ dispatch.ReachableResourcesStream) error {
return nil
}

func (ddm delegateDispatchMock) DispatchLookupResources(_ *v1.DispatchLookupResourcesRequest, _ dispatch.LookupResourcesStream) error {
return nil
}

func (ddm delegateDispatchMock) DispatchLookupResources2(_ *v1.DispatchLookupResources2Request, _ dispatch.LookupResources2Stream) error {
return nil
}
Expand Down
8 changes: 0 additions & 8 deletions internal/dispatch/caching/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ func (fd fakeDelegate) DispatchExpand(_ context.Context, _ *v1.DispatchExpandReq
return &v1.DispatchExpandResponse{}, spiceerrors.MustBugf(errMessage)
}

func (fd fakeDelegate) DispatchReachableResources(_ *v1.DispatchReachableResourcesRequest, _ dispatch.ReachableResourcesStream) error {
return spiceerrors.MustBugf(errMessage)
}

func (fd fakeDelegate) DispatchLookupResources(_ *v1.DispatchLookupResourcesRequest, _ dispatch.LookupResourcesStream) error {
return spiceerrors.MustBugf(errMessage)
}

func (fd fakeDelegate) DispatchLookupResources2(_ *v1.DispatchLookupResources2Request, _ dispatch.LookupResources2Stream) error {
return spiceerrors.MustBugf(errMessage)
}
Expand Down
26 changes: 0 additions & 26 deletions internal/dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type ReadyState struct {
type Dispatcher interface {
Check
Expand
ReachableResources
LookupResources
LookupSubjects
LookupResources2

Expand All @@ -47,30 +45,6 @@ type Expand interface {
DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error)
}

// ReachableResourcesStream is an alias for the stream to which reachable resources will be written.
type ReachableResourcesStream = Stream[*v1.DispatchReachableResourcesResponse]

// ReachableResources interface describes just the methods required to dispatch reachable resources requests.
type ReachableResources interface {
// DispatchReachableResources submits a single reachable resources request, writing its results to the specified stream.
DispatchReachableResources(
req *v1.DispatchReachableResourcesRequest,
stream ReachableResourcesStream,
) error
}

// LookupResourcesStream is an alias for the stream to which found resources will be written.
type LookupResourcesStream = Stream[*v1.DispatchLookupResourcesResponse]

// LookupResources interface describes just the methods required to dispatch LookupResources requests.
type LookupResources interface {
// DispatchLookupResources submits a single lookup request and returns its result.
DispatchLookupResources(
req *v1.DispatchLookupResourcesRequest,
stream LookupResourcesStream,
) error
}

type LookupResources2Stream = Stream[*v1.DispatchLookupResources2Response]

type LookupResources2 interface {
Expand Down
5 changes: 4 additions & 1 deletion internal/dispatch/graph/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
"github.com/authzed/spicedb/pkg/tuple"
)

var ONR = tuple.ONR
var (
ONR = tuple.ONR
RR = tuple.RR
)

func TestSimpleCheck(t *testing.T) {
t.Parallel()
Expand Down
14 changes: 9 additions & 5 deletions internal/dispatch/graph/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stretchr/testify/require"
)

const veryLargeLimit = 1000000000

func TestDispatchChunking(t *testing.T) {
t.Parallel()
schema := `
Expand Down Expand Up @@ -55,14 +57,16 @@ func TestDispatchChunking(t *testing.T) {
}
})

t.Run("lookup-resources", func(t *testing.T) {
t.Run("lookup-resources2", func(t *testing.T) {
t.Parallel()

for _, tpl := range resources[:1] {
stream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResourcesResponse](ctx)
err := dispatcher.DispatchLookupResources(&v1.DispatchLookupResourcesRequest{
ObjectRelation: RR(tpl.Resource.ObjectType, "view").ToCoreRR(),
Subject: tuple.CoreONR(tpl.Subject.ObjectType, tpl.Subject.ObjectID, graph.Ellipsis),
stream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResources2Response](ctx)
err := dispatcher.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
ResourceRelation: RR(tpl.Resource.ObjectType, "view").ToCoreRR(),
SubjectRelation: RR(tpl.Subject.ObjectType, graph.Ellipsis).ToCoreRR(),
SubjectIds: []string{tpl.Subject.ObjectID},
TerminalSubject: tuple.CoreONR(tpl.Subject.ObjectType, tpl.Subject.ObjectID, graph.Ellipsis),
Metadata: &v1.ResolverMeta{
AtRevision: revision.String(),
DepthRemaining: 50,
Expand Down
Loading

0 comments on commit 6eba72c

Please sign in to comment.