forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmpp.go
218 lines (186 loc) · 7.07 KB
/
mpp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"context"
"strconv"
"strings"
"time"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
)
// MppVersion indicates the mpp-version used to build mpp plan
type MppVersion int64
const (
// MppVersionV0 supports TiFlash version [~, ~]
MppVersionV0 MppVersion = iota
// MppVersionV1 supports TiFlash version [v6.6.x, ~]
MppVersionV1
// MppVersionV2
// MppVersionV3
mppVersionMax
newestMppVersion MppVersion = mppVersionMax - 1
// MppVersionUnspecified means the illegal or unspecified version, it only used in TiDB.
MppVersionUnspecified MppVersion = -1
// MppVersionUnspecifiedName denotes name of UNSPECIFIED mpp version
MppVersionUnspecifiedName string = "UNSPECIFIED"
)
// ToInt64 transforms MppVersion to int64
func (v MppVersion) ToInt64() int64 {
return int64(v)
}
// ToMppVersion transforms string to MppVersion
func ToMppVersion(name string) (MppVersion, bool) {
name = strings.ToUpper(name)
if name == MppVersionUnspecifiedName {
return MppVersionUnspecified, true
}
v, err := strconv.ParseInt(name, 10, 64)
if err != nil {
return MppVersionUnspecified, false
}
version := MppVersion(v)
if version >= MppVersionUnspecified && version <= newestMppVersion {
return version, true
}
return MppVersionUnspecified, false
}
// GetNewestMppVersion returns the mpp-version can be used in mpp plan
func GetNewestMppVersion() MppVersion {
return newestMppVersion
}
// MPPTaskMeta means the meta info such as location of a mpp task.
type MPPTaskMeta interface {
// GetAddress indicates which node this task should execute on.
GetAddress() string
}
// MPPQueryID means the global unique id of a mpp query.
type MPPQueryID struct {
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
LocalQueryID uint64 // unique mpp query id in local tidb memory.
ServerID uint64
}
// MPPTask means the minimum execution unit of a mpp computation job.
type MPPTask struct {
Meta MPPTaskMeta // on which store this task will execute
ID int64 // mppTaskID
StartTs uint64
MppQueryID MPPQueryID
TableID int64 // physical table id
MppVersion MppVersion // mpp version
PartitionTableIDs []int64
IsDisaggregatedTiFlashStaticPrune bool
}
// ToPB generates the pb structure.
func (t *MPPTask) ToPB() *mpp.TaskMeta {
meta := &mpp.TaskMeta{
StartTs: t.StartTs,
QueryTs: t.MppQueryID.QueryTs,
LocalQueryId: t.MppQueryID.LocalQueryID,
ServerId: t.MppQueryID.ServerID,
TaskId: t.ID,
MppVersion: t.MppVersion.ToInt64(),
}
if t.ID != -1 {
meta.Address = t.Meta.GetAddress()
}
return meta
}
// MppTaskStates denotes the state of mpp tasks
type MppTaskStates uint8
const (
// MppTaskReady means the task is ready
MppTaskReady MppTaskStates = iota
// MppTaskRunning means the task is running
MppTaskRunning
// MppTaskCancelled means the task is cancelled
MppTaskCancelled
// MppTaskDone means the task is done
MppTaskDone
)
// MPPDispatchRequest stands for a dispatching task.
type MPPDispatchRequest struct {
Data []byte // data encodes the dag coprocessor request.
Meta MPPTaskMeta // mpp store is the location of tiflash store.
IsRoot bool // root task returns data to tidb directly.
Timeout uint64 // If task is assigned but doesn't receive a connect request during timeout, the task should be destroyed.
// SchemaVer is for any schema-ful storage (like tiflash) to validate schema correctness if necessary.
SchemaVar int64
StartTs uint64
MppQueryID MPPQueryID
ID int64 // identify a single task
State MppTaskStates
}
// MPPClient accepts and processes mpp requests.
type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)
// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID, mppVersion MppVersion, memTracker *memory.Tracker) Response
}
// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
// However, the request doesn't contain the particular plan, because only key ranges take effect on the location assignment.
type MPPBuildTasksRequest struct {
KeyRanges []KeyRange
StartTS uint64
PartitionIDAndRanges []PartitionIDAndRanges
}
// ExchangeCompressionMode means the compress method used in exchange operator
type ExchangeCompressionMode int
const (
// ExchangeCompressionModeNONE indicates no compression
ExchangeCompressionModeNONE ExchangeCompressionMode = iota
// ExchangeCompressionModeFast indicates fast compression/decompression speed, compression ratio is lower than HC mode
ExchangeCompressionModeFast
// ExchangeCompressionModeHC indicates high compression (HC) ratio mode
ExchangeCompressionModeHC
// ExchangeCompressionModeUnspecified indicates unspecified compress method, let TiDB choose one
ExchangeCompressionModeUnspecified
// RecommendedExchangeCompressionMode indicates recommended compression mode
RecommendedExchangeCompressionMode ExchangeCompressionMode = ExchangeCompressionModeFast
exchangeCompressionModeUnspecifiedName string = "UNSPECIFIED"
)
// Name returns the name of ExchangeCompressionMode
func (t ExchangeCompressionMode) Name() string {
if t == ExchangeCompressionModeUnspecified {
return exchangeCompressionModeUnspecifiedName
}
return t.ToTipbCompressionMode().String()
}
// ToExchangeCompressionMode returns the ExchangeCompressionMode from name
func ToExchangeCompressionMode(name string) (ExchangeCompressionMode, bool) {
name = strings.ToUpper(name)
if name == exchangeCompressionModeUnspecifiedName {
return ExchangeCompressionModeUnspecified, true
}
value, ok := tipb.CompressionMode_value[name]
if ok {
return ExchangeCompressionMode(value), true
}
return ExchangeCompressionModeNONE, false
}
// ToTipbCompressionMode returns tipb.CompressionMode from kv.ExchangeCompressionMode
func (t ExchangeCompressionMode) ToTipbCompressionMode() tipb.CompressionMode {
switch t {
case ExchangeCompressionModeNONE:
return tipb.CompressionMode_NONE
case ExchangeCompressionModeFast:
return tipb.CompressionMode_FAST
case ExchangeCompressionModeHC:
return tipb.CompressionMode_HIGH_COMPRESSION
}
return tipb.CompressionMode_NONE
}