forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtxn.go
216 lines (194 loc) · 6.51 KB
/
txn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"sync"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/logutil"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)
const (
// TimeToPrintLongTimeInternalTxn is the duration if the internal transaction lasts more than it,
// TiDB prints a log message.
TimeToPrintLongTimeInternalTxn = time.Minute * 5
)
var globalInnerTxnTsBox = innerTxnStartTsBox{
innerTSLock: sync.Mutex{},
innerTxnStartTsMap: make(map[uint64]struct{}, 256),
}
type innerTxnStartTsBox struct {
innerTSLock sync.Mutex
innerTxnStartTsMap map[uint64]struct{}
}
func (ib *innerTxnStartTsBox) storeInnerTxnTS(startTS uint64) {
ib.innerTSLock.Lock()
ib.innerTxnStartTsMap[startTS] = struct{}{}
ib.innerTSLock.Unlock()
}
func (ib *innerTxnStartTsBox) deleteInnerTxnTS(startTS uint64) {
ib.innerTSLock.Lock()
delete(ib.innerTxnStartTsMap, startTS)
ib.innerTSLock.Unlock()
}
// GetMinInnerTxnStartTS get the min StartTS between startTSLowerLimit and curMinStartTS in globalInnerTxnTsBox.
func GetMinInnerTxnStartTS(now time.Time, startTSLowerLimit uint64,
curMinStartTS uint64) uint64 {
return globalInnerTxnTsBox.getMinStartTS(now, startTSLowerLimit, curMinStartTS)
}
func (ib *innerTxnStartTsBox) getMinStartTS(now time.Time, startTSLowerLimit uint64,
curMinStartTS uint64) uint64 {
minStartTS := curMinStartTS
ib.innerTSLock.Lock()
for innerTS := range ib.innerTxnStartTsMap {
PrintLongTimeInternalTxn(now, innerTS, true)
if innerTS > startTSLowerLimit && innerTS < minStartTS {
minStartTS = innerTS
}
}
ib.innerTSLock.Unlock()
return minStartTS
}
// PrintLongTimeInternalTxn print the internal transaction information.
// runByFunction true means the transaction is run by `RunInNewTxn`,
//
// false means the transaction is run by internal session.
func PrintLongTimeInternalTxn(now time.Time, startTS uint64, runByFunction bool) {
if startTS > 0 {
innerTxnStartTime := oracle.GetTimeFromTS(startTS)
if now.Sub(innerTxnStartTime) > TimeToPrintLongTimeInternalTxn {
callerName := "internal session"
if runByFunction {
callerName = "RunInNewTxn"
}
infoHeader := fmt.Sprintf("An internal transaction running by %s lasts long time", callerName)
logutil.BgLogger().Info(infoHeader,
zap.Duration("time", now.Sub(innerTxnStartTime)), zap.Uint64("startTS", startTS),
zap.Time("start time", innerTxnStartTime))
}
}
}
// RunInNewTxn will run the f in a new transaction environment, should be used by inner txn only.
func RunInNewTxn(ctx context.Context, store Storage, retryable bool, f func(ctx context.Context, txn Transaction) error) error {
var (
err error
originalTxnTS uint64
txn Transaction
)
defer func() {
globalInnerTxnTsBox.deleteInnerTxnTS(originalTxnTS)
}()
for i := uint(0); i < maxRetryCnt; i++ {
txn, err = store.Begin()
if err != nil {
logutil.BgLogger().Error("RunInNewTxn", zap.Error(err))
return err
}
setRequestSourceForInnerTxn(ctx, txn)
// originalTxnTS is used to trace the original transaction when the function is retryable.
if i == 0 {
originalTxnTS = txn.StartTS()
globalInnerTxnTsBox.storeInnerTxnTS(originalTxnTS)
}
err = f(ctx, txn)
if err != nil {
err1 := txn.Rollback()
terror.Log(err1)
if retryable && IsTxnRetryableError(err) {
logutil.BgLogger().Warn("RunInNewTxn",
zap.Uint64("retry txn", txn.StartTS()),
zap.Uint64("original txn", originalTxnTS),
zap.Error(err))
continue
}
return err
}
failpoint.Inject("mockCommitErrorInNewTxn", func(val failpoint.Value) {
if v := val.(string); len(v) > 0 {
switch v {
case "retry_once":
//nolint:noloopclosure
if i == 0 {
err = ErrTxnRetryable
}
case "no_retry":
failpoint.Return(errors.New("mock commit error"))
}
}
})
if err == nil {
err = txn.Commit(ctx)
if err == nil {
break
}
}
if retryable && IsTxnRetryableError(err) {
logutil.BgLogger().Warn("RunInNewTxn",
zap.Uint64("retry txn", txn.StartTS()),
zap.Uint64("original txn", originalTxnTS),
zap.Error(err))
BackOff(i)
continue
}
return err
}
return err
}
var (
// maxRetryCnt represents maximum retry times in RunInNewTxn.
maxRetryCnt uint = 100
// retryBackOffBase is the initial duration, in microsecond, a failed transaction stays dormancy before it retries
retryBackOffBase = 1
// retryBackOffCap is the max amount of duration, in microsecond, a failed transaction stays dormancy before it retries
retryBackOffCap = 100
)
// BackOff Implements exponential backoff with full jitter.
// Returns real back off time in microsecond.
// See http://www.awsarchitectureblog.com/2015/03/backoff.html.
func BackOff(attempts uint) int {
upper := int(math.Min(float64(retryBackOffCap), float64(retryBackOffBase)*math.Pow(2.0, float64(attempts))))
sleep := time.Duration(rand.Intn(upper)) * time.Millisecond // #nosec G404
time.Sleep(sleep)
return int(sleep)
}
func setRequestSourceForInnerTxn(ctx context.Context, txn Transaction) {
if source := ctx.Value(RequestSourceKey); source != nil {
requestSource := source.(RequestSource)
if requestSource.RequestSourceType != "" {
if !requestSource.RequestSourceInternal {
logutil.Logger(ctx).Warn("`RunInNewTxn` should be used by inner txn only")
}
txn.SetOption(RequestSourceInternal, requestSource.RequestSourceInternal)
txn.SetOption(RequestSourceType, requestSource.RequestSourceType)
return
}
}
// panic in test mode in case there are requests without source in the future.
// log warnings in production mode.
if intest.InTest {
panic("unexpected no source type context, if you see this error, " +
"the `RequestSourceTypeKey` is missing in your context")
} else {
logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, " +
"the `RequestSourceTypeKey` is missing in the context")
}
}