diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index 340d1ece39..5037c0cb0f 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "math" "math/rand" "strings" "sync" @@ -246,7 +245,6 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), - ReadStrictMode(true), )) t.Run("TestLocking", createMultiDatastoreTest( @@ -338,17 +336,25 @@ func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestF } } -func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) { +func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf multiDatastoreTestFunc, options ...Option) func(*testing.T) { return func(t *testing.T) { ctx := context.Background() + + var replicaDS datastore.Datastore + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { - ds, err := newPostgresDatastore(ctx, uri, 42, options...) + ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, options...) require.NoError(t, err) + + ds2, err := newPostgresDatastore(ctx, uri, 42, append(options, ReadStrictMode(true))...) + require.NoError(t, err) + replicaDS = ds2 + return ds }) defer ds.Close() - tf(t, ds) + tf(t, ds, replicaDS) } } @@ -1572,24 +1578,33 @@ func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) require.NoError(t, err) } -func StrictReadModeTest(t *testing.T, ds datastore.Datastore) { +func StrictReadModeTest(t *testing.T, primaryDS datastore.Datastore, replicaDS datastore.Datastore) { require := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - lowestRevision, err := ds.HeadRevision(ctx) + // Write some relationships. + _, err := primaryDS.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + rtu := tuple.Touch(tuple.MustParse("resource:123#reader@user:456")) + return rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{rtu}) + }) + require.NoError(err) + + // Get the HEAD revision. + lowestRevision, err := primaryDS.HeadRevision(ctx) require.NoError(err) // Perform a read at the head revision, which should succeed. - reader := ds.SnapshotReader(lowestRevision) + reader := replicaDS.SnapshotReader(lowestRevision) it, err := reader.QueryRelationships(ctx, datastore.RelationshipsFilter{ OptionalResourceType: "resource", }) require.NoError(err) - _, err = datastore.IteratorToSlice(it) + found, err := datastore.IteratorToSlice(it) require.NoError(err) + require.NotEmpty(found) // Perform a read at a manually constructed revision beyond head, which should fail. badRev := postgresRevision{ @@ -1597,19 +1612,21 @@ func StrictReadModeTest(t *testing.T, ds datastore.Datastore) { // NOTE: the struct defines this value as uint64, but the underlying // revision is defined as an int64, so we run into an overflow issue // if we try and use a big uint64. - xmax: math.MaxInt64, + xmin: 123456789, + xmax: 123456789, }, } - it, err = ds.SnapshotReader(badRev).QueryRelationships(ctx, datastore.RelationshipsFilter{ + it, err = replicaDS.SnapshotReader(badRev).QueryRelationships(ctx, datastore.RelationshipsFilter{ OptionalResourceType: "resource", }) require.NoError(err) - _, err = datastore.IteratorToSlice(it) + found2, err := datastore.IteratorToSlice(it) require.Error(err) require.ErrorContains(err, "is not available on the replica") require.ErrorAs(err, &common.RevisionUnavailableError{}) + require.Nil(found2) } func NullCaveatWatchTest(t *testing.T, ds datastore.Datastore) { diff --git a/internal/datastore/postgres/strictreader.go b/internal/datastore/postgres/strictreader.go index b656e237ca..15bb52ab39 100644 --- a/internal/datastore/postgres/strictreader.go +++ b/internal/datastore/postgres/strictreader.go @@ -60,6 +60,6 @@ func (srqf strictReaderQueryFuncs) addAssertToSQL(sql string) string { // argument error and a message indicating that the xid "is in the future". If the transaction // does exist, but has not yet been committed (or aborted), the call to `pg_xact_status` will return // "in progress". rewriteError will catch these cases and return a RevisionUnavailableError. - assertion := fmt.Sprintf(`do $$ begin assert (select pg_xact_status(%d::text::xid8) != 'in progress'), 'replica missing revision';end;$$;`, srqf.revision.snapshot.xmin-1) - return assertion + sql + assertion := fmt.Sprintf(`; do $$ begin assert (select pg_xact_status(%d::text::xid8) != 'in progress'), 'replica missing revision';end;$$`, srqf.revision.snapshot.xmin-1) + return sql + assertion }