04.TiDB Insert 语句执行流程

四、TiDB Insert 语句执行流程

主要内容转载自 TiDB 源码阅读系列文章(四)Insert 语句概览

本文为 TiDB 源码阅读系列文章的第四篇。上一篇文章简单介绍了整体流程,无论什么语句,大体上是在这个框架下运行,DDL 语句也不例外。

本篇文章会以 Insert 语句为例进行讲解,帮助读者理解前一篇文章。

表结构

这里先给一个表结构,下面介绍的 SQL 语句都是在这个表上的操作。

1
2
3
4
5
6
CREATE TABLE t (
id VARCHAR(31),
name VARCHAR(50),
age int,
key id_idx (id)
);

Insert 语句

INSERT INTO t VALUES ("pingcap001", "pingcap", 3); 以这条语句为例,解释 Insert 是如何运行的。

语句处理流程

首先大家回忆一下上一篇文章介绍的框架,一条 SQL 语句经过协议层、Parser、Plan、Executor 这样几个模块处理后,变成可执行的结构,再通过 Next() 来驱动语句的真正执行。对于框架,每类语句都差不多;对于每个核心步骤,每个语句会有自己的处理逻辑。

语法解析

先看 Parser,对于 Insert 语句的解析逻辑,可以看到这条语句会被解析成下面这个结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// InsertStmt is a statement to insert new rows into an existing table.
// See https://dev.mysql.com/doc/refman/5.7/en/insert.html
type InsertStmt struct {
dmlNode
IsReplace bool
IgnoreErr bool
Table *TableRefsClause
Columns [](#)*ColumnName
Lists [](#)[](#)ExprNode
Setlist [](#)*Assignment
Priority mysql.PriorityEnum
OnDuplicate [](#)*Assignment
Select ResultSetNode
}

这里提到的语句比较简单,只会涉及 Table 以及 Lists 这两个字段,也就是向哪个表插入哪些数据。其中 Lists 是一个二维数组,数组中的每一行对应于一行数据,这个语句只包含一行数据。有了 AST 之后,需要对其进行一系列处理,预处理、合法性验证、权限检查这些暂时跳过(每个语句的处理逻辑都差不多),我们看一下针对 Insert 语句的处理逻辑。

查询计划

接下来是将 AST 转成 Plan 结构,这个操作是在 planBuilder.buildInsert() 中完成。对于这个简单的语句,主要涉及两个部分:

  • 补全 Schema 信息

    包括 Database/Table/Column 信息,这个语句没有指定向哪些列插入数据,所以会使用所有的列。

  • 处理 Lists 中的数据

    这里会处理一遍所有的 Value,将 ast.ExprNode 转换成 expression.Expression,也就是纳入了我们的表达式框架,后面会在这个框架下求值。大多数情况下,这里的 Value 都是常量,也就是 expression.Constant。

如果 Insert 语句比较复杂,比如要插入的数据来自于一个 Select,或者是 OnDuplicateUpdate 这种情况,还会做更多的处理,这里暂时不再深入描述,读者可以执行看 buildInsert() 中其他的代码。

现在 ast.InsertStmt 已经被转换成为 plan.Insert 结构,对于 Insert 语句并没有什么可以优化的地方,plan.Insert 这个结构只实现了 Plan 这个接口,所以在下面这个判断中,不会走进 Optimize 流程:

1
2
3
if logic, ok := p.(LogicalPlan); ok {
return doOptimize(builder.optFlag, logic)
}

其他比较简单的语句也不会进入 doOptimize,比如 Show 这种语句,下一篇文章会讲解 Select 语句,会涉及到 doOptimize 函数。

执行

拿到 plan.Insert 这个结构后,查询计划就算制定完成。最后我们看一下 Insert 是如何执行的。

首先 plan.Insert 在这里被转成 executor.InsertExec 结构,后续的执行都由这个结构进行。执行入口是 Next 方法,第一步是要对待插入数据的每行进行表达式求值,具体的可以看 getRows 这个函数,拿到数据后就进入最重要的逻辑— InsertExec.exec() 这个函数,这个函数有点长,不过只考虑我们文章中讲述的这条 SQL 的话,可以把代码简化成下面这段逻辑:

1
2
3
for _, row := range rows {
h, err := e.Table.AddRecord(e.ctx, row, false)
}

接下来我们看一下 AddRecord 这个函数是如何将一行数据写入存储引擎中。要理解这段代码,需要了解一下 TiDB 是如何将 SQL 的数据映射为 Key-Value,可以先读一下我们之前写的一些文章,比如三篇文章了解 TiDB 技术内幕 - 说计算。这里假设读者已经了解了这一点背景知识,那么一定会知道这里需要将 Row 和 Index 的 Key-Value 构造出来的,写入存储引擎。

构造 Index 数据的代码在 addIndices() 函数中,会调用 index.Create() 这个方法:

1
2
3
4
5
6
7
8
9
构造 Index Key:
func (c *index) GenIndexKey(sc *stmtctx.StatementContext, indexedValues [](#)types.Datum, h int64, buf [](#)byte) (key [](#)byte, distinct bool, err error) {
......
key = c.getIndexKeyBuf(buf, len(c.prefix)+len(indexedValues)*9+9)
key = append(key, [](#)byte(c.prefix)...)
key, err = codec.EncodeKey(sc, key, indexedValues...)
if !distinct && err == nil {
key, err = codec.EncodeKey(sc, key, types.NewDatum(h))
}
1
2
3
4
5
6
7
8
9
10
11
12
构造 Index Value:
func (c *index) Create(ctx context.Context, rm kv.RetrieverMutator, indexedValues [](#)types.Datum, h int64) (int64, error) {
if !distinct {
// non-unique index doesn't need store value, write a '0' to reduce space
err = rm.Set(key, [](#)byte'0')
return 0, errors.Trace(err)
}
......
if skipCheck {
err = rm.Set(key, encodeHandle(h))
return 0, errors.Trace(err)
}

构造 Row 数据的代码比较简单,就在 tables.AddRecord 函数中:

1
2
构造 Row Key: 
key := t.RecordKey(recordID)
1
2
构造 Row Value:
writeBufs.RowValBuf, err = tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues)

构造完成后,调用类似下面这段代码即可将 Key-Value 写到当前事务的缓存中:

1
2
3
if err = txn.Set(key, value); err != nil {
return 0, errors.Trace(err)
}

在事务的提交过程中,即可将这些 Key-Value 提交到存储引擎中。

核心流程代码

1
addRecordWithAutoIDHint  -》 e.Table.AddRecord -》addIndices &&  tablecodec.EncodeRow等

1.addRecordWithAutoIDHint 位置:pkg/executor/insert_common.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 在InsertValues对象中添加记录,同时考虑自动ID的预留数量。
//
// 如果不启用即时约束检查(ConstraintCheckInPlace),则假设键不存在。
//
// 根据reserveAutoIDCount的值,选择性地传递WithReserveAutoIDHint选项给AddRecord。
//
// 如果添加记录成功:
// - 更新受影响的行数。
// - 如果lastInsertID不为零,设置最后插入的ID。
// - 如果不在批处理模式下,执行外键检查。
func (e *InsertValues) addRecordWithAutoIDHint(
ctx context.Context,
row []types.Datum,
reserveAutoIDCount int,
) (err error) {
// 获取当前会话变量
vars := e.Ctx().GetSessionVars()

// 如果不启用即时约束检查,假设键不存在
if !vars.ConstraintCheckInPlace {
vars.PresumeKeyNotExists = true
}

// 根据预留自动ID数量选择AddRecord调用方式
if reserveAutoIDCount > 0 {
// 传递WithReserveAutoIDHint选项,预留指定数量的自动ID
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount))
} else {
// 不预留自动ID
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx))
}

// 还原假设键不存在的状态
vars.PresumeKeyNotExists = false

if err != nil {
// 直接返回添加记录的错误
return err
}

// 记录受影响的行数
vars.StmtCtx.AddAffectedRows(1)

// 如果lastInsertID不为零,设置最后插入的ID
if e.lastInsertID != 0 {
vars.SetLastInsertID(e.lastInsertID)
}

// 如果不在批处理模式下,执行外键检查
if !vars.StmtCtx.BatchCheck {
for _, fkc := range e.fkChecks {
err = fkc.insertRowNeedToCheck(vars.StmtCtx, row)
if err != nil {
return err
}
}
}

return nil
}

2.AddRecord 位置: pkg/table/tables/tables.go(只展示主要代码注释可查看:https://github.com/longpi1/tidb/blob/chinese_annotation/pkg/table/table.go)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// AddRecord 实现了 table.Table 的 AddRecord 接口,用于向表中添加一条记录。
func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
// 获取当前事务,如果事务开启失败则返回错误。
txn, err := sctx.Txn(true)
if err != nil {
return nil, err
}

// ... 更多处理逻辑 ...
//1.构造Row key
key := t.RecordKey(recordID)
sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder
//计算 checksum
checksums, writeBufs.RowValBuf = t.calcChecksums(sctx, recordID, checksumData, writeBufs.RowValBuf)

//构造 Row Value
writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc.TimeZone(), buffer.Row, buffer.ColIDs,
writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...)
//处理错误
err = sc.HandleError(err)
if err != nil {
// 如果处理错误失败,则返回错误
return nil, err
}

// ... 更多处理逻辑 ...

//2.构造 Index 数据
var createIdxOpts []table.CreateIdxOptFunc
if len(opts) > 0 {
createIdxOpts = make([]table.CreateIdxOptFunc, 0, len(opts))
for _, fn := range opts {
if raw, ok := fn.(table.CreateIdxOptFunc); ok {
createIdxOpts = append(createIdxOpts, raw)
}
}
}
//插入新索引数据
h, err := t.addIndices(sctx, recordID, r, txn, createIdxOpts)
if err != nil {
// 如果插入失败,则返回错误
return h, err
}
// ... 更多处理逻辑 ...

memBuffer.Release(sh)
// 3.binlog插入
if shouldWriteBinlog(sctx.GetSessionVars(), t.meta) {
// For insert, TiDB and Binlog can use same row and schema.
binlogRow = buffer.Row
binlogColIDs = buffer.ColIDs
err = t.addInsertBinlog(sctx, recordID, binlogRow, binlogColIDs)
if err != nil {
return nil, err
}
}

// ... 更多处理逻辑 ...
return recordID, nil
}

3.索引创建Create位置:pkg/table/tables/index.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Create 在 kvIndex 数据中创建一个新的条目。
// 如果索引是唯一的且存在具有相同键的条目,
// Create 将返回现有条目的句柄作为第一个返回值,ErrKeyExists 作为第二个返回值。
func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...table.CreateIdxOptFunc) (kv.Handle, error) {
// ... 更多处理逻辑 ...
// 获取索引值
indexedValues := c.getIndexedValue(indexedValue)
// ... 更多处理逻辑 ...
// 遍历所有索引值
for _, value := range indexedValues {
// 生成索引键
key, distinct, err := c.GenIndexKey(sc.ErrCtx(), sc.TimeZone(), value, h, writeBufs.IndexKeyBuf)
// ... 更多处理逻辑 ...
// 生成索引值
idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx.TimeZone(), c.tblInfo, c.idxInfo,
c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData, nil)

// 处理唯一索引或跳过检查的情况
if !distinct || skipCheck || opt.Untouched {
val := idxVal
// ... 更多处理逻辑 ...
err = txn.GetMemBuffer().Set(key, val)
if err != nil {
return nil, err
}
// ... 更多处理逻辑 ...
continue
}

// 检查键是否已经存在
var value []byte
if c.tblInfo.TempTableType != model.TempTableNone {
// 对于临时表,始终检查键,因为它不会写入 TiKV
value, err = txn.Get(ctx, key)
} else if (txn.IsPipelined() || sctx.GetSessionVars().LazyCheckKeyNotExists()) && !keyIsTempIdxKey {
// 对于流水线事务或启用延迟检查且不是临时索引键的情况,从内存缓冲区获取本地值
value, err = txn.GetMemBuffer().GetLocal(ctx, key)
} else {
// 其他情况,从 TiKV 获取值
value, err = txn.Get(ctx, key)
}
if err != nil && !kv.IsErrNotFound(err) {
return nil, err
}

// ... 更多处理逻辑 ...

// 如果索引键值不存在或已删除
if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) {
val := idxVal
// 延迟检查:流水线事务或启用了延迟检查且获取值时出错
lazyCheck := (txn.IsPipelined() || sctx.GetSessionVars().LazyCheckKeyNotExists()) && err != nil
if keyIsTempIdxKey {
// 如果是临时索引键,则编码临时索引值元素
tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true}
val = tempVal.Encode(value)
}
// 检查是否需要设置 PresumeKeyNotExists 标志
needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h,
keyIsTempIdxKey, c.tblInfo.ID)
if err != nil {
return nil, err
}
if lazyCheck {
// 延迟检查
var flags []kv.FlagsOp
if needPresumeNotExists {
// 设置 PresumeKeyNotExists 标志
flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists}
}
// 如果是悲观事务且不在限制性 SQL 中且连接 ID 大于 0,则设置 NeedConstraintCheckInPrewrite 标志
if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() &&
!vars.InRestrictedSQL && vars.ConnectionID > 0 {
flags = append(flags, kv.SetNeedConstraintCheckInPrewrite)
}
// 将键值对和标志设置到内存缓冲区
err = txn.GetMemBuffer().SetWithFlags(key, val, flags...)
} else {
// 直接将键值对设置到内存缓冲区
err = txn.GetMemBuffer().Set(key, val)
}
if err != nil {
return nil, err
// ... 更多处理逻辑 ...
if lazyCheck && !txn.IsPessimistic() {
// 延迟检查且不是悲观事务,设置断言为 SetAssertUnknown
err = txn.SetAssertion(key, kv.SetAssertUnknown)
} else {
// 其他情况,设置断言为 SetAssertNotExist
err = txn.SetAssertion(key, kv.SetAssertNotExist)
}
if err != nil {
return nil, err
}
continue
}
// 如果键已存在,返回现有条目的句柄
if keyIsTempIdxKey && !tempIdxVal.IsEmpty() {
value = tempIdxVal.Current().Value
}
handle, err := tablecodec.DecodeHandleInIndexValue(value)
if err != nil {
return nil, err
}
return handle, kv.ErrKeyExists
}
return nil, nil
}

小结

Insert 语句在诸多 DML 语句中算是最简单的语句,本文也没有涉及 Insert 语句中更复杂的情况,所以相对比较好理解。上面讲了这么多代码,让我们用一幅图来再回顾一下整个流程。

Insert

流程图


04.TiDB Insert 语句执行流程
https://blog.longpi1.com/2024/11/01/04-TiDBInsert语句执行流程/