Skip to content

Commit

Permalink
Change the postgres migrations in two ways:
Browse files Browse the repository at this point in the history
1) Make each non-transactional migration step contain only a single migration operation. This ensures that if the operation fails, we don't end up "halfway" into a migration step. In the future, we'll enforce this via testing.
2) Change how concurrent indexes are created. See: https://www.shayon.dev/post/2024/225/stop-relying-on-if-not-exists-for-concurrent-index-creation-in-postgresql/ for why this is important
  • Loading branch information
josephschorr committed Jan 10, 2025
1 parent fabeff1 commit ec03842
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 27 deletions.
7 changes: 7 additions & 0 deletions internal/datastore/postgres/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@ const (
pgSerializationFailure = "40001"
pgTransactionAborted = "25P02"
pgReadOnlyTransaction = "25006"
pgQueryCanceled = "57014"
)

var (
createConflictDetailsRegex = regexp.MustCompile(`^Key (.+)=\(([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)\) already exists`)
createConflictDetailsRegexWithoutCaveat = regexp.MustCompile(`^Key (.+)=\(([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)\) already exists`)
)

// IsQueryCanceledError returns true if the error is a Postgres error indicating a query was canceled.
func IsQueryCanceledError(err error) bool {
var pgerr *pgconn.PgError
return errors.As(err, &pgerr) && pgerr.Code == pgQueryCanceled
}

// IsConstraintFailureError returns true if the error is a Postgres error indicating a constraint
// failure.
func IsConstraintFailureError(err error) bool {
Expand Down
52 changes: 52 additions & 0 deletions internal/datastore/postgres/migrations/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package migrations

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"

"github.com/authzed/spicedb/internal/datastore/postgres/common"
)

const createIndexTemplate = `
CREATE INDEX CONCURRENTLY
%s
%s`

const dropIndexTemplate = `
DROP INDEX CONCURRENTLY IF EXISTS
%s;
`

// createIndexConcurrently creates an index concurrently, dropping the existing index if it exists to ensure
// that indexes are not left in a partially constructed state.
// See: https://www.shayon.dev/post/2024/225/stop-relying-on-if-not-exists-for-concurrent-index-creation-in-postgresql/
func createIndexConcurrently(ctx context.Context, conn *pgx.Conn, indexName, creationClause string) error {
dropIndexSQL := fmt.Sprintf(dropIndexTemplate, indexName)
if _, err := conn.Exec(ctx, dropIndexSQL); err != nil {
if common.IsQueryCanceledError(err) {
return fmt.Errorf(
"timed out while trying to drop index %s before recreating it: %w. This typically indicates that your statement_timeout needs to be increased",
indexName,
err,
)
}

return fmt.Errorf("failed to drop index %s before creating it: %w", indexName, err)
}

createIndexSQL := fmt.Sprintf(createIndexTemplate, indexName, creationClause)
if _, err := conn.Exec(ctx, createIndexSQL); err != nil {
if common.IsQueryCanceledError(err) {
return fmt.Errorf(
"timed out while trying to create index %s: %w. This typically indicates that your statement_timeout needs to be increased",
indexName,
err,
)
}

return fmt.Errorf("failed to create index %s: %w", indexName, err)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@ package migrations

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

const addWatchAPIIndexToRelationTupleTable = `CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_watch_index ON relation_tuple (created_xid);`
const watchAPIIndexToRelationTupleTable = `
ON relation_tuple (created_xid);`

func init() {
if err := DatabaseMigrations.Register("add-watch-api-index-to-relation-tuple-table", "add-metadata-to-transaction-table",
func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, addWatchAPIIndexToRelationTupleTable); err != nil {
return fmt.Errorf("failed to add watch API index to relation tuple table: %w", err)
}
return nil
return createIndexConcurrently(ctx, conn, "ix_watch_index", watchAPIIndexToRelationTupleTable)
},
noTxMigration); err != nil {
panic("failed to register migration: " + err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,12 @@ const addExpirationColumn = `
ADD COLUMN expiration TIMESTAMPTZ DEFAULT NULL;
`

// Used for cleaning up expired relationships.
const addExpiredRelationshipsIndex = `CREATE INDEX CONCURRENTLY
IF NOT EXISTS ix_relation_tuple_expired
ON relation_tuple (expiration)
WHERE expiration IS NOT NULL;
`

func init() {
if err := DatabaseMigrations.Register("add-expiration-support", "add-watch-api-index-to-relation-tuple-table",
func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, addExpirationColumn); err != nil {
return fmt.Errorf("failed to add expiration column to relation tuple table: %w", err)
}

if _, err := conn.Exec(ctx, addExpiredRelationshipsIndex); err != nil {
return fmt.Errorf("failed to add expiration column to relation tuple table: %w", err)
}

return nil
},
noTxMigration); err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package migrations

import (
"context"

"github.com/jackc/pgx/v5"
)

// Used for cleaning up expired relationships.
const expiredRelationshipsIndex = `
ON relation_tuple (expiration)
WHERE expiration IS NOT NULL;
`

func init() {
if err := DatabaseMigrations.Register("add-expiration-cleanup-index", "add-expiration-support",
func(ctx context.Context, conn *pgx.Conn) error {
return createIndexConcurrently(ctx, conn, "ix_relation_tuple_expired", expiredRelationshipsIndex)
},
noTxMigration); err != nil {
panic("failed to register migration: " + err.Error())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package migrations

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

// addGCIndexForRelationTupleTransaction adds a missing index to relation_tuple_transaction table
// indexForRelationTupleTransaction adds a missing index to relation_tuple_transaction table
// to support garbage collection. This is in support of the query for selecting the most recent
// transaction: `SELECT xid, snapshot FROM relation_tuple_transaction WHERE timestamp < $1 ORDER BY xid DESC LIMIT 1`
//
Expand All @@ -21,17 +20,13 @@ import (
// Planning Time: 0.098 ms
// Execution Time: 5706.192 ms
// (6 rows)
const addGCIndexForRelationTupleTransaction = `CREATE INDEX CONCURRENTLY
IF NOT EXISTS ix_relation_tuple_transaction_xid_desc_timestamp
const indexForRelationTupleTransaction = `
ON relation_tuple_transaction (xid DESC, timestamp);`

func init() {
if err := DatabaseMigrations.Register("add-index-for-transaction-gc", "add-expiration-support",
if err := DatabaseMigrations.Register("add-index-for-transaction-gc", "add-expiration-cleanup-index",
func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, addGCIndexForRelationTupleTransaction); err != nil {
return fmt.Errorf("failed to add missing GC index: %w", err)
}
return nil
return createIndexConcurrently(ctx, conn, "ix_relation_tuple_transaction_xid_desc_timestamp", indexForRelationTupleTransaction)
},
noTxMigration); err != nil {
panic("failed to register migration: " + err.Error())
Expand Down

0 comments on commit ec03842

Please sign in to comment.