06.Select 语句概览

六、Select 语句概览

主要内容转载自 TiDB 源码阅读系列文章(六)Select 语句概览

表结构和语句

表结构沿用上篇文章的:

1
2
3
4
5
6
CREATE TABLE t {
id VARCHAR(31),
name VARCHAR(50),
age int,
key id_idx (id)
};

Select 语句只会讲解最简单的情况:全表扫描+过滤,暂时不考虑索引等复杂情况,更复杂的情况会在后续章节中介绍。语句为:

1
SELECT name FROM t WHERE age > 10;

语句处理流程

相比 Insert 的处理流程,Select 的处理流程中有 3 个明显的不同:

  1. 需要经过 Optimize

    Insert 是比较简单语句,在查询计划这块并不能做什么事情(对于 Insert into Select 语句这种,实际上只对 Select 进行优化),而 Select 语句可能会无比复杂,不同的查询计划之间性能天差地别,需要非常仔细的进行优化。

  2. 需要和存储引擎中的计算模块交互

    Insert 语句只涉及对 Key-Value 的 Set 操作,Select 语句可能要查询大量的数据,如果通过 KV 接口操作存储引擎,会过于低效,必须要通过计算下推的方式,将计算逻辑发送到存储节点,就近进行处理。

  3. 需要对客户端返回结果集数据

    Insert 语句只需要返回是否成功以及插入了多少行即可,而 Select 语句需要返回结果集。

本篇文章重点说明这些不同的地方,相同的步骤会尽量化简。

Parsing

Select 语句的语法解析规则,相比 Insert 语句,要复杂很多,大家可以对着 MySQL 文档 看一下具体的解析实现。需要特别注意的是 From 字段,这里可能会非常复杂,其语法定义是递归的。

最终语句被解析成 ast.SelectStmt 结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type SelectStmt struct {
dmlNode
resultSetNode
// SelectStmtOpts wraps around select hints and switches.
*SelectStmtOpts
// Distinct represents whether the select has distinct option.
Distinct bool
// From is the from clause of the query.
From *TableRefsClause
// Where is the where clause in select statement.
Where ExprNode
// Fields is the select expression list.
Fields *FieldList
// GroupBy is the group by expression list.
GroupBy *GroupByClause
// Having is the having condition.
Having *HavingClause
// OrderBy is the ordering expression list.
OrderBy *OrderByClause
// Limit is the limit clause.
Limit *Limit
// LockTp is the lock type
LockTp SelectLockType
// TableHints represents the level Optimizer Hint
TableHints [](#)*TableOptimizerHint
}

对于本文所提到的语句 SELECT name FROM t WHERE age > 10; name 会被解析为 Fields,WHERE age > 10 被解析为 Where 字段,FROM t 被解析为 From 字段。

Planning

planBuilder.buildSelect() 方法中,我们可以看到 ast.SelectStmt 是如何转换成一个 plan 树,最终的结果是一个 LogicalPlan,每一个语法元素都被转换成一个逻辑查询计划单元,例如 WHERE c > 10 会被处理为一个 plan.LogicalSelection 的结构:

1
2
3
4
5
6
if sel.Where != nil {
p = b.buildSelection(p, sel.Where, nil)
if b.err != nil {
return nil
}
}

具体的结构如下:

1
2
3
4
5
6
7
8
// LogicalSelection represents a where or having predicate.
type LogicalSelection struct {
baseLogicalPlan
// Originally the WHERE or ON condition is parsed into a single expression,
// but after we converted to CNF(Conjunctive normal form), it can be
// split into a list of AND conditions.
Conditions []expression.Expression
}

其中最重要的就是这个 Conditions 字段,代表了 Where 语句需要计算的表达式,这个表达式求值结果为 True 的时候,表明这一行符合条件。

其他字段的 AST 转 LogicalPlan 读者可以自行研究一下,经过这个 buildSelect() 函数后,AST 变成一个 Plan 的树状结构树,下一步会在这个结构上进行优化。

Optimizing

回到 plan.Optimize() 函数,Select 语句得到的 Plan 是一个 LogicalPlan,所以 这里 可以进入 doOptimize 这个函数,这个函数比较短,其内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// doOptimize 函数将逻辑计划优化为物理计划,同时返回优化后的逻辑计划、最终的物理计划和最终计划的成本。
// 返回的逻辑计划对于生成公共表表达式 (CTE) 的计划是必要的。
func doOptimize(
ctx context.Context, // 上下文信息
sctx base.PlanContext, // 计划上下文,包含会话变量、统计信息等
flag uint64, // 优化标志,控制优化器的行为
logic base.LogicalPlan, // 待优化的逻辑计划
) (base.LogicalPlan, base.PhysicalPlan, float64, error) {
sessVars := sctx.GetSessionVars() // 获取会话变量

// 根据逻辑计划调整优化标志
flag = adjustOptimizationFlags(flag, logic)

// 对逻辑计划进行逻辑优化,例如谓词下推、列裁剪等
logic, err := logicalOptimize(ctx, flag, logic)
if err != nil {
return nil, nil, 0, err
}

// 检查逻辑计划中是否存在笛卡尔积,如果存在且不允许笛卡尔积,则返回错误
if !AllowCartesianProduct.Load() && existsCartesianProduct(logic) {
return nil, nil, 0, errors.Trace(plannererrors.ErrCartesianProductUnsupported)
}

// 获取强制指定的计划编号,用于选择特定的执行计划
planCounter := base.PlanCounterTp(sessVars.StmtCtx.StmtHints.ForceNthPlan)
if planCounter == 0 {
planCounter = -1 // 默认情况下不强制指定计划编号
}

// 对逻辑计划进行物理优化,选择最佳的物理执行计划,并计算其成本
physical, cost, err := physicalOptimize(logic, &planCounter)
if err != nil {
return nil, nil, 0, err
}

// 对物理计划进行后优化,例如添加内存限制、选择合适的算子实现等
finalPlan := postOptimize(ctx, sctx, physical)

// 如果启用了 CETrace,则完善 CETrace 信息
if sessVars.StmtCtx.EnableOptimizerCETrace {
refineCETrace(sctx)
}

// 如果启用了 OptimizeTrace,则记录最终的物理计划
if sessVars.StmtCtx.EnableOptimizeTrace {
sessVars.StmtCtx.OptimizeTracer.RecordFinalPlan(finalPlan.BuildPlanTrace())
}

// 返回优化后的逻辑计划、最终的物理计划、计划成本和错误信息
return logic, finalPlan, cost, nil
}

大家可以关注两个步骤:logicalOptimize 和 physicalOptimize,分别代表逻辑优化和物理优化,这两种优化的基本概念和区别本文不会描述,请大家自行研究(这个是数据库的基础知识)。下面分别介绍一下这两个函数做了什么事情。

逻辑优化

逻辑优化由一系列优化规则组成,对于这些规则会按顺序不断应用到传入的 LogicalPlan Tree 中,见 logicalOptimize() 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// logicalOptimize 函数对逻辑计划进行逻辑优化,例如谓词下推、列裁剪等。
func logicalOptimize(ctx context.Context, flag uint64, logic base.LogicalPlan) (base.LogicalPlan, error) {
// 如果启用了优化器调试跟踪,则进入调试上下文
if logic.SCtx().GetSessionVars().StmtCtx.EnableOptimizerDebugTrace {
debugtrace.EnterContextCommon(logic.SCtx())
defer debugtrace.LeaveContextCommon(logic.SCtx())
}

// 创建默认的逻辑优化选项
opt := optimizetrace.DefaultLogicalOptimizeOption()

// 获取会话变量
vars := logic.SCtx().GetSessionVars()

// 如果启用了优化跟踪,则创建优化跟踪器
if vars.StmtCtx.EnableOptimizeTrace {
vars.StmtCtx.OptimizeTracer = &tracing.OptimizeTracer{}
tracer := &tracing.LogicalOptimizeTracer{
Steps: make([]*tracing.LogicalRuleOptimizeTracer, 0),
}
opt = opt.WithEnableOptimizeTracer(tracer)
defer func() {
vars.StmtCtx.OptimizeTracer.Logical = tracer
}()
}

var err error
var againRuleList []logicalOptRule // 存储需要再次优化的交互规则

// 遍历逻辑优化规则列表
for i, rule := range optRuleList {
// 规则标志的顺序与规则列表中的顺序相同。
// 我们使用位掩码来记录应该使用哪些优化规则。如果第 i 位为 1,则表示我们应该应用第 i 个优化规则。
if flag&(1<<uint(i)) == 0 || isLogicalRuleDisabled(rule) { // 检查是否启用了该规则,以及该规则是否被禁用
continue
}

// 记录优化规则执行前的逻辑计划
opt.AppendBeforeRuleOptimize(i, rule.name(), logic.BuildPlanTrace)

// 应用优化规则
var planChanged bool
logic, planChanged, err = rule.optimize(ctx, logic, opt)
if err != nil {
return nil, err
}

// 计算需要再次优化的交互规则
interactionRule, ok := optInteractionRuleList[rule]
if planChanged && ok && isLogicalRuleDisabled(interactionRule) {
againRuleList = append(againRuleList, interactionRule)
}
}

// 触发交互规则
for i, rule := range againRuleList {
opt.AppendBeforeRuleOptimize(i, rule.name(), logic.BuildPlanTrace)
logic, _, err = rule.optimize(ctx, logic, opt)
if err != nil {
return nil, err
}
}

// 记录最终的逻辑计划
opt.RecordFinalLogicalPlan(logic.BuildPlanTrace)

return logic, err
}

目前 TiDB 已经支持下列优化规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
var optRuleList = []logicalOptRule{
&gcSubstituter{},
&columnPruner{},
&resultReorder{},
&buildKeySolver{},
&decorrelateSolver{},
&semiJoinRewriter{},
&aggregationEliminator{},
&skewDistinctAggRewriter{},
&projectionEliminator{},
&maxMinEliminator{},
&constantPropagationSolver{},
&convertOuterToInnerJoin{},
&ppdSolver{},
&outerJoinEliminator{},
&partitionProcessor{},
&collectPredicateColumnsPoint{},
&aggregationPushDownSolver{},
&deriveTopNFromWindow{},
&predicateSimplification{},
&pushDownTopNOptimizer{},
&syncWaitStatsLoadPoint{},
&joinReOrderSolver{},
&columnPruner{}, // column pruning again at last, note it will mess up the results of buildKeySolver
&pushDownSequenceSolver{},
&resolveExpand{},
}

这些规则并不会考虑数据的分布,直接无脑的操作 Plan 树,因为大多数规则应用之后,一定会得到更好的 Plan(不过上面有一个规则并不一定会更好,读者可以想一下是哪个)。

这里选一个规则介绍一下,其他优化规则可以自行研究或者是等待后续文章。

columnPruner(列裁剪) 规则,会将不需要的列裁剪掉,考虑这个 SQL: select c from t; 对于 from t 这个全表扫描算子(也可能是索引扫描)来说,只需要对外返回 c 这一列的数据即可,这里就是通过列裁剪这个规则实现,整个 Plan 树从树根到叶子节点递归调用这个规则,每层节点只保留上面节点所需要的列即可。

经过逻辑优化,我们可以得到这样一个查询计划:

logical-select

其中 FROM t 变成了 DataSource 算子,WHERE age > 10 变成了 Selection 算子,这里留一个思考题,SELECT name 中的列选择去哪里了?

物理优化

在物理优化阶段,会考虑数据的分布,决定如何选择物理算子,比如对于 FROM t WHERE age > 10 这个语句,假设在 age 字段上有索引,需要考虑是通过 TableScan + Filter 的方式快还是通过 IndexScan 的方式比较快,这个选择取决于统计信息,也就是 age > 10 这个条件究竟能过滤掉多少数据。

我们看一下 physicalOptimize 这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// physicalOptimize 对逻辑计划进行物理优化,选择最佳的物理执行计划。
func physicalOptimize(logic base.LogicalPlan, planCounter *base.PlanCounterTp) (plan base.PhysicalPlan, cost float64, err error) {
// 如果开启了优化器调试跟踪,则进入调试上下文。
if logic.SCtx().GetSessionVars().StmtCtx.EnableOptimizerDebugTrace {
debugtrace.EnterContextCommon(logic.SCtx())
defer debugtrace.LeaveContextCommon(logic.SCtx())
}
// 递归地推导逻辑计划中每个算子的统计信息。
if _, err := logic.RecursiveDeriveStats(nil); err != nil {
return nil, 0, err
}

// 为逻辑计划准备可能的属性,例如排序顺序、数据分布等。
preparePossibleProperties(logic)

// 定义根节点的物理属性,期望获取所有数据。
prop := &property.PhysicalProperty{
TaskTp: property.RootTaskType,
ExpectedCnt: math.MaxFloat64,
}

// 创建默认的物理优化选项。
opt := optimizetrace.DefaultPhysicalOptimizeOption()
stmtCtx := logic.SCtx().GetSessionVars().StmtCtx
// 如果开启了优化跟踪,则创建跟踪器并记录优化过程。
if stmtCtx.EnableOptimizeTrace {
tracer := &tracing.PhysicalOptimizeTracer{
PhysicalPlanCostDetails: make(map[string]*tracing.PhysicalPlanCostDetail),
Candidates: make(map[int]*tracing.CandidatePlanTrace),
}
opt = opt.WithEnableOptimizeTracer(tracer)
defer func() {
// 捕获 panic,确保在发生 panic 时也能记录跟踪信息。
r := recover()
if r != nil {
panic(r) /* pass panic to upper function to handle */
}
// 如果没有错误,则记录最终的物理计划跟踪信息。
if err == nil {
tracer.RecordFinalPlanTrace(plan.BuildPlanTrace())
stmtCtx.OptimizeTracer.Physical = tracer
}
}()
}

// 重置 TaskMap 的备份时间戳。
logic.SCtx().GetSessionVars().StmtCtx.TaskMapBakTS = 0
// 为逻辑计划找到最佳的执行任务,包括物理计划和执行代价。
t, _, err := logic.FindBestTask(prop, planCounter, opt)
if err != nil {
return nil, 0, err
}
// 如果 planCounter 大于 0,说明 nth_plan() 参数超出范围。
if *planCounter > 0 {
logic.SCtx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("The parameter of nth_plan() is out of range"))
}
// 如果没有找到有效的执行任务,则返回错误。
if t.Invalid() {
errMsg := "Can't find a proper physical plan for this query"
// 如果是 TiFlash 分布式部署且不允许 MPP,则添加额外的错误信息。
if config.GetGlobalConfig().DisaggregatedTiFlash && !logic.SCtx().GetSessionVars().IsMPPAllowed() {
errMsg += ": cop and batchCop are not allowed in disaggregated tiflash mode, you should turn on tidb_allow_mpp switch"
}
return nil, 0, plannererrors.ErrInternal.GenWithStackByArgs(errMsg)
}

// 解析物理计划中的索引信息。
if err = t.Plan().ResolveIndices(); err != nil {
return nil, 0, err
}
// 获取物理计划的执行代价。
cost, err = getPlanCost(t.Plan(), property.RootTaskType, optimizetrace.NewDefaultPlanCostOption())
// 返回最佳的物理计划、执行代价和错误信息。
return t.Plan(), cost, err
}

这里的 FindBestTask实现 PhysicalPlan 接口,它将枚举所有可用的索引,并选择代价最低的计划。

返回值是一个叫 task 的结构,而不是物理计划,这里引入一个概念 **Task**,TiDB 的优化器会将 PhysicalPlan 打包成为 Task。Task 的定义在 task_base.go 中,我们看一下注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Task 是 `PhysicalPlanInfo` 的新版本。它存储任务的代价信息。
// 一个任务可能是 CopTask、RootTask、MPPTaskMeta 或 ParallelTask。
type Task interface {
// Count 返回当前任务的行数估计。
Count() float64
// Copy 返回当前任务的浅拷贝,与 p 指向相同的计划。
Copy() Task
// Plan 返回当前任务的物理计划。
Plan() PhysicalPlan
// Invalid 返回当前任务是否无效。
Invalid() bool
// ConvertToRootTask 将当前任务转换为根任务类型。
// 这里我们将返回类型更改为接口以避免导入循环。
// 基本接口定义不应依赖于具体的实现结构。
ConvertToRootTask(ctx PlanContext) Task
// MemoryUsage 返回当前任务的内存使用量估计。
MemoryUsage() int64
//Count(): 返回任务的估计行数,用于优化器评估执行计划的代价。
//Copy(): 创建一个任务的浅拷贝,新的任务与原任务共享底层的物理计划,但可以拥有不同的代价信息或其他属性。
//Plan(): 返回任务对应的物理执行计划 (PhysicalPlan)。
//Invalid(): 判断任务是否有效。当优化器无法生成有效的执行计划时,会返回一个无效的任务。
//ConvertToRootTask(): 将当前任务转换为根任务类型 (RootTask)。根任务通常是整个查询计划的顶层任务。
//MemoryUsage(): 返回任务的估计内存使用量,用于优化器评估执行计划的资源消耗。
}

在 TiDB 中,Task 的定义是能在单个节点上不依赖于和其他节点进行数据交换即可进行的一系列操作,举列两种 Task:

  • CopTask 是需要下推到存储引擎(TiKV)上进行计算的物理计划,每个收到请求的 TiKV 节点都会做相同的操作
  • RootTask 是保留在 TiDB 中进行计算的那部分物理计划

如果了解过 TiDB 的 Explain 结果,那么可以看到每个 Operator 都会标明属于哪种 Task,比如下面这个例子:

explain

整个流程是一个树形动态规划的算法,大家有兴趣可以跟一下相关的代码自行研究或者等待后续的文章。

经过整个优化过程,我们已经得到一个物理查询计划,这个 SELECT name FROM t WHERE age > 10; 语句能够指定出来的查询计划大概是这样子的:

simple-select

读者可能会比较奇怪,为什么只剩下这样一个物理算子?WHERR age > 10 哪里去了?实际上 age > 10 这个过滤条件被合并进了 PhysicalTableScan,因为 age > 10 这个表达式可以下推到 TiKV 上进行计算,所以会把 TableScan 和 Filter 这样两个操作合在一起。哪些表达式会被下推到 TiKV 上的 Coprocessor 模块进行计算呢?对于这个 Query 是在下面 这个地方 进行识别:

1
2
3
4
5
// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (ds *DataSource) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
_, ds.pushedDownConds, predicates = expression.ExpressionsToPB(ds.ctx.GetSessionVars().StmtCtx, predicates, ds.ctx.GetClient())
return predicates, ds
}

expression.ExpressionsToPB 这个方法中,会把能下推 TiKV 上的表达式识别出来(TiKV 还没有实现所有的表达式,特别是内建函数只实现了一部分),放到 DataSource.pushedDownConds 字段中。接下来我们看一下 DataSource 是如何转成 PhysicalTableScan,见 DataSource.convertToTableScan() 方法。这个方法会构建出 PhysicalTableScan,并且调用 addPushDownSelection() 方法,将一个 PhysicalSelection 加到 PhysicalTableScan 之上,一起放进 copTask 中。

这个查询计划是一个非常简单的计划,不过我们可以用这个计划来说明 TiDB 是如何执行查询操作。

Executing

一个查询计划如何变成一个可执行的结构以及如何驱动这个结构执行查询已经在前面的 两篇文章中做了描述,这里不再敷述,这一节重点介绍具体的执行过程以及 TiDB 的分布式执行框架。

Coprocessor 框架

Coprocessor 这个概念是从 HBase 中借鉴而来,简单来说是一段注入在存储引擎中的计算逻辑,等待 SQL 层发来的计算请求(序列化后的物理执行计划),处理本地数据并返回计算结果。在 TiDB 中,计算是以 Region 为单位进行,SQL 层会分析出要处理的数据的 Key Range,再将这些 Key Range 根据 PD 中拿到的 Region 信息划分成若干个 Key Range, 最后将这些请求发往对应的 Region。

SQL 层会将多个 Region 返回的结果进行汇总,再经过所需的 Operator 处理,生成最终的结果集。

DistSQL

请求的分发与汇总会有很多复杂的处理逻辑,比如出错重试、获取路由信息、控制并发度以及结果返回顺序,为了避免这些复杂的逻辑与 SQL 层耦合在一起,TiDB 抽象了一个统一的分布式查询接口,称为 DistSQL API,位于 pkg/distsql/distsql.go 这个包中。

其中最重要的方法是select这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Select 发送一个 DAG 请求,返回 SelectResult。
// 在 kvReq 中,KeyRanges 是必需的,Concurrency/KeepOrder/Desc/IsolationLevel/Priority 是可选的。
// Select 函数是 TiDB 中分布式 SQL 执行引擎的核心函数之一,它负责发送 DAG (Directed Acyclic Graph,有向无环图) 请求到 TiKV 或 TiFlash 集群,并处理返回的结果。DAG 请求包含了执行计划中需要在 TiKV 或 TiFlash 上执行的操作,例如读取数据、过滤数据、排序数据等。
func Select(ctx context.Context, dctx *distsqlctx.DistSQLContext, kvReq *kv.Request, fieldTypes []*types.FieldType) (SelectResult, error) {
// 使用 tracing 记录 Select 操作的开始和结束时间。
r, ctx := tracing.StartRegionEx(ctx, "distsql.Select")
defer r.End()

// 用于测试目的的钩子函数,可以用来检查 Select 请求。
if hook := ctx.Value("CheckSelectRequestHook"); hook != nil {
hook.(func(*kv.Request))(kvReq)
}

// 获取是否启用限速操作。
enabledRateLimitAction := dctx.EnabledRateLimitAction
// 获取原始 SQL 语句。
originalSQL := dctx.OriginalSQL
// 定义一个事件回调函数,用于处理 coprocessor 遇到的锁事件。
eventCb := func(event trxevents.TransactionEvent) {
// 注意:不要假设此回调将在同一个 goroutine 中被调用。
if copMeetLock := event.GetCopMeetLock(); copMeetLock != nil {
logutil.Logger(ctx).Debug("coprocessor encounters lock",
zap.Uint64("startTS", kvReq.StartTs),
zap.Stringer("lock", copMeetLock.LockInfo),
zap.String("stmt", originalSQL))
}
}

// 使用拦截器记录 SQL KV 执行计数器。
ctx = WithSQLKvExecCounterInterceptor(ctx, dctx.KvExecCounter)
// 创建一个 ClientSendOption,用于配置客户端发送请求的选项。
option := &kv.ClientSendOption{
SessionMemTracker: dctx.SessionMemTracker, // 用于跟踪会话内存使用情况。
EnabledRateLimitAction: enabledRateLimitAction, // 是否启用限速操作。
EventCb: eventCb, // 事件回调函数。
EnableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load(), // 是否启用收集执行信息。
}

// 如果存储类型是 TiFlash,则在上下文中设置 TiFlash 配置变量,并配置 TiFlash 副本读取选项。
if kvReq.StoreType == kv.TiFlash {
ctx = SetTiFlashConfVarsInContext(ctx, dctx)
option.TiFlashReplicaRead = dctx.TiFlashReplicaRead
option.AppendWarning = dctx.AppendWarning
}

// 使用客户端发送 DAG 请求。
resp := dctx.Client.Send(ctx, kvReq, dctx.KVVars, option)
// 如果响应为空,则返回错误。
if resp == nil {
return nil, errors.New("client returns nil response")
}

// 设置指标标签。
label := metrics.LblGeneral
if dctx.InRestrictedSQL {
label = metrics.LblInternal
}

// kvReq.MemTracker 用于跟踪和控制 DistSQL 层的内存使用情况;
// 对于 selectResult,我们只是使用为 coprocessor 准备的 kvReq.MemTracker,而不是为了简化而创建一个新的。
// 创建并返回一个 selectResult,用于处理 DAG 请求的响应。
return &selectResult{
label: "dag",
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: dctx,
sqlType: label,
memTracker: kvReq.MemTracker,
storeType: kvReq.StoreType,
paging: kvReq.Paging.Enable,
distSQLConcurrency: kvReq.Concurrency,
}, nil
}

TiKV Client 中的具体逻辑我们暂时跳过,这里只关注 SQL 层拿到了这个 selectResult 后如何读取数据,下面这个结构体是关键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// NextRaw gets the next raw result.
NextRaw(context.Context) ([]byte, error)
// Next reads the data into chunk.
Next(context.Context, *chunk.Chunk) error
// Close closes the iterator.
Close() error
}
type selectResult struct {
label string
resp kv.Response

rowLen int
fieldTypes []*types.FieldType
ctx *dcontext.DistSQLContext

selectResp *tipb.SelectResponse
selectRespSize int64 // record the selectResp.Size() when it is initialized.
respChkIdx int
respChunkDecoder *chunk.Decoder

partialCount int64 // number of partial results.
sqlType string

// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
copPlanIDs []int
rootPlanID int

storeType kv.StoreType

fetchDuration time.Duration
durationReported bool
memTracker *memory.Tracker

stats *selectResultRuntimeStats
// distSQLConcurrency and paging are only for collecting information, and they don't affect the process of execution.
distSQLConcurrency int
paging bool
}

selectResult 实现了 SelectResult 这个接口,代表了一次查询的所有结果的抽象,计算是以 Region 为单位进行,所以这里全部结果会包含所有涉及到的 Region 的结果。调用 Chunk 方法可以读到一个 Chunk 的数据,通过不断调用 NextChunk 方法,直到 Chunk 的 NumRows 返回 0 就能拿到所有结果。NextChunk 的实现会不断获取每个 Region 返回的 SelectResponse,把结果写入 Chunk。

Root Executor

能推送到 TiKV 上的计算请求目前有 TableScan、IndexScan、Selection、TopN、Limit、PartialAggregation 这样几个,其他更复杂的算子,还是需要在单个 tidb-server 上进行处理。所以整个计算是一个多 tikv-server 并行处理 + 单个 tidb-server 进行汇总的模式。

总结

Select 语句的处理过程中最复杂的地方有两点,一个是查询优化,一个是如何分布式地执行,这两部分后续都会有文章来更进一步介绍。


06.Select 语句概览
https://blog.longpi1.com/2024/11/07/06-Select-语句概览/