From 1826d8d3d9be3a5a21600bab5f5cee93a44b2fea Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 3 Jan 2025 15:07:57 -0500 Subject: [PATCH] Fix the strict read proxy Since the `do` command was first in the SQL, the query was always reading *it* as the result, causing no rows to be returned. This change first adds additional testing to ensure rows are returned and then reverse the order of the operations in the strict query --- .../postgres/postgres_shared_test.go | 41 +++++++++++++------ internal/datastore/postgres/strictreader.go | 4 +- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index 340d1ece39..5037c0cb0f 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "math" "math/rand" "strings" "sync" @@ -246,7 +245,6 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), - ReadStrictMode(true), )) t.Run("TestLocking", createMultiDatastoreTest( @@ -338,17 +336,25 @@ func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestF } } -func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) { +func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf multiDatastoreTestFunc, options ...Option) func(*testing.T) { return func(t *testing.T) { ctx := context.Background() + + var replicaDS datastore.Datastore + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { - ds, err := newPostgresDatastore(ctx, uri, 42, options...) + ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, options...) require.NoError(t, err) + + ds2, err := newPostgresDatastore(ctx, uri, 42, append(options, ReadStrictMode(true))...) + require.NoError(t, err) + replicaDS = ds2 + return ds }) defer ds.Close() - tf(t, ds) + tf(t, ds, replicaDS) } } @@ -1572,24 +1578,33 @@ func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) require.NoError(t, err) } -func StrictReadModeTest(t *testing.T, ds datastore.Datastore) { +func StrictReadModeTest(t *testing.T, primaryDS datastore.Datastore, replicaDS datastore.Datastore) { require := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - lowestRevision, err := ds.HeadRevision(ctx) + // Write some relationships. + _, err := primaryDS.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + rtu := tuple.Touch(tuple.MustParse("resource:123#reader@user:456")) + return rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{rtu}) + }) + require.NoError(err) + + // Get the HEAD revision. + lowestRevision, err := primaryDS.HeadRevision(ctx) require.NoError(err) // Perform a read at the head revision, which should succeed. - reader := ds.SnapshotReader(lowestRevision) + reader := replicaDS.SnapshotReader(lowestRevision) it, err := reader.QueryRelationships(ctx, datastore.RelationshipsFilter{ OptionalResourceType: "resource", }) require.NoError(err) - _, err = datastore.IteratorToSlice(it) + found, err := datastore.IteratorToSlice(it) require.NoError(err) + require.NotEmpty(found) // Perform a read at a manually constructed revision beyond head, which should fail. badRev := postgresRevision{ @@ -1597,19 +1612,21 @@ func StrictReadModeTest(t *testing.T, ds datastore.Datastore) { // NOTE: the struct defines this value as uint64, but the underlying // revision is defined as an int64, so we run into an overflow issue // if we try and use a big uint64. - xmax: math.MaxInt64, + xmin: 123456789, + xmax: 123456789, }, } - it, err = ds.SnapshotReader(badRev).QueryRelationships(ctx, datastore.RelationshipsFilter{ + it, err = replicaDS.SnapshotReader(badRev).QueryRelationships(ctx, datastore.RelationshipsFilter{ OptionalResourceType: "resource", }) require.NoError(err) - _, err = datastore.IteratorToSlice(it) + found2, err := datastore.IteratorToSlice(it) require.Error(err) require.ErrorContains(err, "is not available on the replica") require.ErrorAs(err, &common.RevisionUnavailableError{}) + require.Nil(found2) } func NullCaveatWatchTest(t *testing.T, ds datastore.Datastore) { diff --git a/internal/datastore/postgres/strictreader.go b/internal/datastore/postgres/strictreader.go index b656e237ca..15bb52ab39 100644 --- a/internal/datastore/postgres/strictreader.go +++ b/internal/datastore/postgres/strictreader.go @@ -60,6 +60,6 @@ func (srqf strictReaderQueryFuncs) addAssertToSQL(sql string) string { // argument error and a message indicating that the xid "is in the future". If the transaction // does exist, but has not yet been committed (or aborted), the call to `pg_xact_status` will return // "in progress". rewriteError will catch these cases and return a RevisionUnavailableError. - assertion := fmt.Sprintf(`do $$ begin assert (select pg_xact_status(%d::text::xid8) != 'in progress'), 'replica missing revision';end;$$;`, srqf.revision.snapshot.xmin-1) - return assertion + sql + assertion := fmt.Sprintf(`; do $$ begin assert (select pg_xact_status(%d::text::xid8) != 'in progress'), 'replica missing revision';end;$$`, srqf.revision.snapshot.xmin-1) + return sql + assertion }