-
Notifications
You must be signed in to change notification settings - Fork 4
/
concurrent_tx.go
108 lines (86 loc) · 3.04 KB
/
concurrent_tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package pg
import (
"context"
"sync"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)
// ConcurrentTx is a wrapper around pgx.Tx that provides a mutex to synchronize access
// to the underlying pgx.Tx. This is useful when you want to use a pgx.Tx from
// multiple goroutines.
type ConcurrentTx struct {
pgx.Tx
mu sync.Mutex
}
// NewConcurrentTx is a wrapper around pgxpool.Pool.Begin that provides a mutex to synchronize
// access to the underlying pgx.Tx.
// It returns a TxSync that wraps the pgx.Tx.
// The TxSync must be closed when done with it.
func NewConcurrentTx(ctx context.Context, p *pgxpool.Pool) (*ConcurrentTx, error) {
tx, err := p.Begin(ctx)
if err != nil {
return nil, err
}
return &ConcurrentTx{Tx: tx}, nil
}
// Rollback is a wrapper around pgx.Tx.Rollback that provides a mutex to synchronize
// access to the underlying pgx.Tx.
func (ct *ConcurrentTx) Rollback(ctx context.Context) error {
ct.mu.Lock()
defer ct.mu.Unlock()
return ct.Tx.Rollback(ctx)
}
// Commit is a wrapper around pgx.Tx.Commit that provides a mutex to synchronize
// access to the underlying pgx.Tx.
func (ct *ConcurrentTx) Commit(ctx context.Context) error {
ct.mu.Lock()
defer ct.mu.Unlock()
return ct.Tx.Commit(ctx)
}
// QueryRow is a wrapper around pgx.Tx.QueryRow that provides a mutex to synchronize
// access to the underlying pgx.Tx.
func (ct *ConcurrentTx) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
ct.mu.Lock()
defer ct.mu.Unlock()
return ct.Tx.QueryRow(ctx, sql, args...)
}
// Query is a wrapper around pgx.Tx.Query that provides a mutex to synchronize
// access to the underlying pgx.Tx.
func (ct *ConcurrentTx) Query(ctx context.Context, sql string, args ...any) (Rows, error) {
ct.mu.Lock()
defer ct.mu.Unlock()
return ct.Tx.Query(ctx, sql, args...)
}
// QueryRow is a wrapper around pgx.Tx.QueryRow that provides a mutex to synchronize
// access to the underlying pgx.Tx.
func (ct *ConcurrentTx) Exec(ctx context.Context, sql string, args ...any) (commandTag pgconn.CommandTag, err error) {
ct.mu.Lock()
defer ct.mu.Unlock()
return ct.Tx.Exec(ctx, sql, args...)
}
// Prepare is a wrapper around pgx.Tx.Prepare that provides a mutex to synchronize
// access to the underlying pgx.Tx.
func (ct *ConcurrentTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) {
ct.mu.Lock()
defer ct.mu.Unlock()
return ct.Tx.Prepare(ctx, name, sql)
}
// SendBatch is a wrapper around pgx.Tx.SendBatch that provides a mutex to synchronize
// access to the underlying pgx.Tx.
func (ct *ConcurrentTx) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
ct.mu.Lock()
defer ct.mu.Unlock()
return ct.Tx.SendBatch(ctx, b)
}
// Begin is a wrapper around pgx.Tx.Begin that provides a mutex to synchronize
// access to the underlying pgx.Tx.
func (ct *ConcurrentTx) Begin(ctx context.Context) (pgx.Tx, error) {
ct.mu.Lock()
defer ct.mu.Unlock()
tx, err := ct.Tx.Begin(ctx)
if err != nil {
return nil, err
}
return &ConcurrentTx{Tx: tx}, nil
}