hashicorp raft源码分析(一、项目介绍与Leder选举实现)
本文基于 hashicorp/raft v1.7.3
版本进行源码分析
API手册:https://pkg.go.dev/github.com/hashicorp/raft
源码地址:hashicorp/raft
raft论文中文解读:https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
在阅读文章前需要有一定的 raft 基础, 不然直接看源码会一头雾水.
一、项目背景:什么是 Raft? 在聊代码结构前,先简单回顾 Raft 算法的基本思想:
Hashicorp 的 Raft 是 Raft 论文(In Search of an Understandable Consensus Algorithm )的Go语言实现 ,被广泛应用在:
Hashicorp 的 Consul 、Vault 、Nomad
类似 etcd (另一个 Raft 实现)
二、项目结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 github.com/hashicorp/raft/ 相关核心目录与代码 ├── fuzzy ├── bench ├── raft.go ├── api.go ├── config.go ├── file_snapshot.go ├── fsm.go ├── future.go ├── log.go ├── log_store.go ├── net_transport.go ├── observer.go ├── raft_test.go ├── snapshot.go ├── stable_store.go ├── transport.go └── util.go
关键子目录/文件说明:
raft.go
:核心 !实现了Raft算法的主要逻辑,包括:
节点状态切换(成为候选人、领导者、跟随者)
心跳机制(Leader发送心跳维持领导权)
选举定时器(触发选举)
日志复制(Leader向Follower同步日志)
状态机应用(FSM.Apply()
)
log.go
& log_store.go
: Raft日志的存储与管理:
Log
结构体(Index、Term、Command)
LogStore
接口(持久化日志的Append()
、Get()
等)
默认内存实现(inmem_store.go
)和BoltDB实现(raftboltdb/
)用于持久化
fsm.go
:有限状态机接口,使用者需实现:
Apply(logEntry []byte) interface{}
(应用日志到状态机)
Snapshot() (FSMSnapshot, error)
(创建快照)
Restore(snapshot io.ReadCloser) error
(从快照恢复)
snapshot.go
& file_snapshot.go
: 快照的创建、传输、恢复逻辑:
SnapshotSink
(写快照到文件)
SnapshotStore
接口(支持文件系统、内存快照)
transport.go
& net_transport.go
:网络通信层
Transport
接口(AppendEntries()
、RequestVote()
等RPC)
基于TCP的默认实现(NetTransport
),封装了encoding/gob
序列化
future.go
: 异步操作机制:
提交日志(raft.Apply()
)时返回Future
对象
可阻塞等待操作结果(成功/失败)
整体结构特点:
接口与实现分离 :如LogStore
、SnapshotStore
、Transport
等关键组件均为接口,方便替换存储引擎(内存/BoltDB)或网络层(TCP、自定义)
分层清晰:
最底层:StableStore
(任期、投票持久化)、LogStore
(日志存储)
中间层:Raft
状态机(选举、日志复制)
上层:FSM
(业务状态机,由用户实现)
大量使用Go的goroutine
+channel
:
后台心跳/选举定时器
处理RPC调用
日志异步复制
三、Leader选举
在raft算法中,典型的领导者选举在本质上是节点状态的变更。具体到raft源码中,领导者选举的入口函数就是run()
,在raft.go中以一个单独的协程运行,来实现节点状态的变更
在下面的实现代码中,可以看到Follower
、Candidate
和Leader
3 种节点状态都有分别对应的功能函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (r *Raft) run() { for { select { case <-r.shutdownCh: r.setLeader("" ) return default : } switch r.getState() { case Follower: r.runFollower() case Candidate: r.runCandidate() case Leader: r.runLeader() } } }
1.1 follower跟随者运行逻辑
主要处理逻辑:
接收 RPC 请求 (<-r.rpcCh
): 这是 Follower 接收外部通信的主要方式。当收到 RPC 时,将其交给 r.processRPC
方法处理。Follower 主要处理以下几种 RPC:
AppendEntriesRequest
: 接收 Leader 发来的日志条目同步请求或心跳信号。
RequestVoteRequest
: 接收 Candidate 发来的投票请求。
RequestPreVoteRequest
: 接收 Candidate 发来的预投票请求(如果启用了 Pre-Vote 优化)。
InstallSnapshotRequest
: 接收 Leader 发来的快照传输请求。
TimeoutNowRequest
: 接收强制立即超时的请求。
接收配置变更请求 (<-r.configurationChangeCh
): Follower 节点不能发起配置变更。因此,收到此类请求时,直接回复 ErrNotLeader
错误。
接收日志应用请求 (<-r.applyCh
): Follower 节点不能直接处理外部的日志应用请求。日志的应用是由 Leader 驱动的 commitIndex
前进后,由 Raft 内部机制完成的。因此,收到此类请求时,也直接回复 ErrNotLeader
错误。
心跳定时器超时 (<-heartbeatTimer
): 这是 Follower 检测 Leader 是否存活的关键机制。
定时器超时后,首先重新启动 一个新的随机化心跳定时器。
然后,检查距离最后一次与 Leader 成功通信 的时间 (r.LastContact()
) 是否超过了心跳超时时间。
如果最近与 Leader 有过联系 (时间差小于超时时间): 说明 Leader 仍然活跃,这只是一个正常的定时器事件,Follower 继续保持 Follower 状态,循环继续。
如果最近与 Leader 没有 联系 (时间差大于等于超时时间):
说明 Leader 可能已经失效或网络有问题,Follower 认为 Leader 失联。
接收关闭信号 (<-r.shutdownCh
): 当 Raft 节点被要求关闭时,收到此信号,函数直接返回,结束主循环和 Follower 状态的运行。
相关源码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 func (r *Raft) runFollower() { didWarn := false leaderAddr, leaderID := r.LeaderWithID() r.logger.Info("entering follower state" , "follower" , r, "leader-address" , leaderAddr, "leader-id" , leaderID) metrics.IncrCounter([]string {"raft" , "state" , "follower" }, 1 ) heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) for r.getState() == Follower { r.mainThreadSaturation.sleeping() select { case rpc := <-r.rpcCh: r.mainThreadSaturation.working() r.processRPC(rpc) case c := <-r.configurationChangeCh: r.mainThreadSaturation.working() c.respond(ErrNotLeader) case a := <-r.applyCh: r.mainThreadSaturation.working() a.respond(ErrNotLeader) case <-heartbeatTimer: r.mainThreadSaturation.working() hbTimeout := r.config().HeartbeatTimeout heartbeatTimer = randomTimeout(hbTimeout) lastContact := r.LastContact() if time.Since(lastContact) < hbTimeout { continue } lastLeaderAddr, lastLeaderID := r.LeaderWithID() r.setLeader("" , "" ) if r.configurations.latestIndex == 0 { if !didWarn { r.logger.Warn("no known peers, aborting election" ) didWarn = true } } else if r.configurations.latestIndex == r.configurations.committedIndex && !hasVote(r.configurations.latest, r.localID) { if !didWarn { r.logger.Warn("not part of stable configuration, aborting election" ) didWarn = true } } else { metrics.IncrCounter([]string {"raft" , "transition" , "heartbeat_timeout" }, 1 ) if hasVote(r.configurations.latest, r.localID) { r.logger.Warn("heartbeat timeout reached, starting election" , "last-leader-addr" , lastLeaderAddr, "last-leader-id" , lastLeaderID) r.setState(Candidate) return } else if !didWarn { r.logger.Warn("heartbeat timeout reached, not part of a stable configuration or a non-voter, not triggering a leader election" ) didWarn = true } } case <-r.shutdownCh: return } } }func hasVote (configuration Configuration, id ServerID) bool { for _, server := range configuration.Servers { if server.ID == id { return server.Suffrage == Voter } } return false }
processRPC 处理请求逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 func (r *Raft) processRPC(rpc RPC) { if err := r.checkRPCHeader(rpc); err != nil { rpc.Respond(nil , err) return } switch cmd := rpc.Command.(type ) { case *AppendEntriesRequest: r.appendEntries(rpc, cmd) case *RequestVoteRequest: r.requestVote(rpc, cmd) case *RequestPreVoteRequest: r.requestPreVote(rpc, cmd) case *InstallSnapshotRequest: r.installSnapshot(rpc, cmd) case *TimeoutNowRequest: r.timeoutNow(rpc, cmd) default : r.logger.Error("got unexpected command" , "command" , hclog.Fmt("%#v" , rpc.Command)) rpc.Respond(nil , fmt.Errorf(rpcUnexpectedCommandError)) } }
hashicorp raft server/client 是使用 msgpack on tcp 实现的 rpc 服务, 关于 hashcrop raft transport server/client 的实现原理没什么可深入的, 请直接看代码实现. msgpack rpc 的协议报文格式如下.
appendEntries 同步日志 appendEntries
函数是 Raft 协议中 Follower 节点处理 Leader 发来的 AppendEntriesRequest
的核心方法。它的主要职责是根据 Leader 的请求更新 Follower 的状态、日志和提交索引。
核心流程:
初始化响应: 创建一个 AppendEntriesResponse
并设置默认值(通常是失败),包含当前节点的 Term 和最后一个日志索引。使用 defer
确保函数退出时发送响应。
Term 检查:
如果 Leader 的 Term 小于当前节点的 Term,则拒绝请求并返回(Follower 的 Term 在响应中)。
如果 Leader 的 Term 大于当前节点的 Term,或者当前节点不是 Follower (且不是领导权转移中的 Candidate),则更新当前节点的 Term 为 Leader 的 Term,并转换为 Follower 状态。
记录 Leader 信息: 保存 Leader 的地址和 ID。
日志一致性检查:
如果请求包含 PrevLogEntry
(> 0),则获取当前节点日志中相同索引条目的 Term。
将获取到的 Term 与 Leader 请求中的 PrevLogTerm
进行比较。
如果不匹配,说明日志发生分歧,拒绝请求,设置 NoRetryBackoff
为 true,并返回。
如果无法获取到 PrevLogEntry
索引处的日志(例如索引越界或存储错误),也拒绝请求,设置 NoRetryBackoff
为 true,并返回。
处理新日志条目:
如果请求包含 Entries
,遍历 Leader 发来的条目。
查找与当前节点日志发生冲突或 Leader 独有的新条目的起始点。
如果发现冲突(相同索引但 Term 不同),删除当前节点从冲突点开始的所有后续日志条目。如果在删除范围内的配置变更日志被移除,则回退最新的配置信息。
将 Leader 发来的从冲突点或当前节点最后一个日志索引之后开始的条目视为新的,并将其追加到日志存储中。
处理新追加的日志条目中的配置变更类型。
更新当前节点的最后一个日志索引和 Term。
更新提交索引:
如果 Leader 的 LeaderCommitIndex
大于当前节点的 CommitIndex
,则更新当前节点的 CommitIndex
为 min(LeaderCommitIndex, 当前节点的最后一个日志索引)
。
如果最新的配置变更日志索引小于等于新的提交索引,则将最新的配置提升为已提交配置。
将已提交但尚未应用的日志条目应用到状态机, processLogs
应用日志 。
设置成功并更新最后联系时间: 如果所有检查和操作都成功完成,将响应的 Success
字段设置为 true,并更新记录最后一次与 Leader 成功联系的时间(用于重置选举定时器)。
简单说 appendEntries()
同步日志是 leader 和 follower 不断调整位置再同步数据的过程.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { defer metrics.MeasureSince([]string {"raft" , "rpc" , "appendEntries" }, time.Now()) resp := &AppendEntriesResponse{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), LastLog: r.getLastIndex(), Success: false , NoRetryBackoff: false , } var rpcErr error defer func () { rpc.Respond(resp, rpcErr) }() if a.Term < r.getCurrentTerm() { return } if a.Term > r.getCurrentTerm() || (r.getState() != Follower && !r.candidateFromLeadershipTransfer.Load()) { r.setState(Follower) r.setCurrentTerm(a.Term) resp.Term = a.Term } if len (a.Addr) > 0 { r.setLeader(r.trans.DecodePeer(a.Addr), ServerID(a.ID)) } else { r.setLeader(r.trans.DecodePeer(a.Leader), ServerID(a.ID)) } if a.PrevLogEntry > 0 { lastIdx, lastTerm := r.getLastEntry() var prevLogTerm uint64 if a.PrevLogEntry == lastIdx { prevLogTerm = lastTerm } else { var prevLog Log if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil { resp.NoRetryBackoff = true return } prevLogTerm = prevLog.Term } if a.PrevLogTerm != prevLogTerm { resp.NoRetryBackoff = true return } } if len (a.Entries) > 0 { start := time.Now() lastLogIdx, _ := r.getLastLog() var newEntries []*Log for i, entry := range a.Entries { if entry.Index > lastLogIdx { newEntries = a.Entries[i:] break } var storeEntry Log if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil { return } if entry.Term != storeEntry.Term { if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil { return } if entry.Index <= r.configurations.latestIndex { r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex) } newEntries = a.Entries[i:] break } } if n := len (newEntries); n > 0 { if err := r.logs.StoreLogs(newEntries); err != nil { return } for _, newEntry := range newEntries { if err := r.processConfigurationLogEntry(newEntry); err != nil { rpcErr = err return } } last := newEntries[n-1 ] r.setLastLog(last.Index, last.Term) } } if a.LeaderCommitIndex > 0 && a.LeaderCommitIndex > r.getCommitIndex() { start := time.Now() idx := min(a.LeaderCommitIndex, r.getLastIndex()) r.setCommitIndex(idx) if r.configurations.latestIndex <= idx { r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) } r.processLogs(idx, nil ) } resp.Success = true r.setLastContact() }
处理 RequestVoteRequest 投票请求 requestVote
函数处理来自其他 Raft 节点(候选者)的 RequestVote RPC 请求。其核心逻辑是根据 Raft 协议的选举规则决定是否授予投票。
核心流程:
前置检查:
检查候选者是否在当前配置中(如果提供了 ID)。如果不在且配置不为空,拒绝投票。
检查当前节点是否已知有其他领导者。如果已知且请求不是领导权转移,拒绝投票。
任期处理:
如果请求的任期小于当前任期,忽略该请求(不授予投票)。
如果请求的任期大于当前任期,更新当前节点的任期为请求任期,并转变为跟随者状态。更新响应中的任期。
投票者检查: 检查候选者是否是当前配置中的投票者(如果提供了 ID)。如果不是投票者且配置不为空,拒绝投票。
重复投票检查: 从持久化存储中获取上次投票的任期和候选者。如果在当前请求的任期内已经投过票,并且上次投票的候选者就是本次请求的候选者,则再次授予投票(处理幂等性);否则(投给了其他候选者或任期不同),不授予投票。
日志匹配检查 :
检查候选者的日志是否至少和本地日志一样新。
如果本地日志的最后任期大于候选者的,拒绝投票。
如果本地日志的最后任期与候选者的相同,但本地日志的最后索引大于候选者的,拒绝投票。
授予投票 :
如果通过了所有前面的检查,表示可以授予投票。
关键步骤: 在授予投票 之前 ,将本次投票的任期和候选者 ID 持久化到稳定存储中,确保安全性。
设置响应中的 Granted
字段为 true。
更新本地的最后联系时间。
返回: 函数结束,延迟函数发送包含投票结果的响应。
总的来说,requestVote
函数实现了 Raft 协议中跟随者(或其他状态节点)响应候选者投票请求的核心逻辑,包括任期处理、日志匹配检查、投票持久化以及处理配置变更和领导权转移等特殊情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { defer metrics.MeasureSince([]string {"raft" , "rpc" , "requestVote" }, time.Now()) r.observe(*req) resp := &RequestVoteResponse{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), Granted: false , } var rpcErr error defer func () { rpc.Respond(resp, rpcErr) }() if r.protocolVersion < 2 { resp.Peers = encodePeers(r.configurations.latest, r.trans) } var candidate ServerAddress var candidateBytes []byte if req.Term < r.getCurrentTerm() { return } if req.Term > r.getCurrentTerm() { r.logger.Debug("因收到更高任期的投票请求而失去领导权" ) r.setState(Follower) r.setCurrentTerm(req.Term) resp.Term = req.Term } if len (req.ID) > 0 { candidateID := ServerID(req.ID) if len (r.configurations.latest.Servers) > 0 && !hasVote(r.configurations.latest, candidateID) { r.logger.Warn("拒绝投票请求,因为候选人是非投票节点" , "from" , candidate) return } } lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm) if err != nil && err.Error() != "not found" { r.logger.Error("获取上一次投票任期失败" , "error" , err) return } lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand) if err != nil && err.Error() != "not found" { r.logger.Error("获取上一次投票候选人失败" , "error" , err) return } if lastVoteTerm == req.Term && lastVoteCandBytes != nil { r.logger.Info("收到相同任期的重复投票请求" , "term" , req.Term) if bytes.Equal(lastVoteCandBytes, candidateBytes) { r.logger.Warn("收到重复的投票请求" , "candidate" , candidate) resp.Granted = true } return } lastIdx, lastTerm := r.getLastEntry() if lastTerm > req.LastLogTerm { r.logger.Warn("拒绝投票请求,因为本节点的最后日志任期更大" , "candidate" , candidate, "last-term" , lastTerm, "last-candidate-term" , req.LastLogTerm) return } if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex { r.logger.Warn("拒绝投票请求,因为本节点的最后日志索引更大" , "candidate" , candidate, "last-index" , lastIdx, "last-candidate-index" , req.LastLogIndex) return } if err := r.persistVote(req.Term, candidateBytes); err != nil { r.logger.Error("持久化投票信息失败" , "error" , err) return } resp.Granted = true r.setLastContact() }
1.2 candidate 候选者运行逻辑
主要流程步骤:
任期更新与初始化:
节点将当前任期(Term)加一,进入新的任期进行选举。
记录日志和更新监控指标,表明进入 Candidate 状态。
计算赢得选举所需的多数派票数 (votesNeeded
)。
选举阶段选择(预投票 vs. 正式投票):
根据配置 (preVoteDisabled
) 和是否为领导权转移 (candidateFromLeadershipTransfer
) 决定是先进行 预投票 (Pre-Vote) 还是直接进行 **正式投票 (RequestVote)**。
如果启用预投票且非领导权转移,调用 preElectSelf()
发起预投票 RPC,监听 prevoteCh
。预投票不会改变任期,用于试探是否能获得多数支持。
否则,调用 electSelf()
发起正式投票 RPC,监听 voteCh
。正式投票会携带新的任期号。
设置选举超时:
设置一个随机化的选举超时定时器 (electionTimer
),防止多个 Candidate 同时超时并竞选,减少冲突。
主循环与事件处理:
进入一个循环,持续监听各种事件,直到节点状态不再是 Candidate。
使用 select语句同时监听:
Incoming RPCs (r.rpcCh
): 处理来自其他节点的 RPC 请求(如 AppendEntries、RequestVote 等)。如果收到带有更高任期的 RPC,节点会立即转为 Follower 并更新任期,退出 Candidate 状态。
预投票结果 (prevoteCh
):
如果收到带有更高任期的预投票响应,转为 Follower 并更新任期,退出 Candidate 状态。
统计收到的预投票赞成/反对票数。
如果获得多数派预投票赞成,认为预投票成功,关闭预投票监听, 发起正式投票 (electSelf()
),重置选举定时器,开始等待正式投票结果。
如果被多数派预投票拒绝,预投票活动失败,继续等待当前的选举超时。
正式投票结果 (voteCh
):
如果收到带有更高任期的投票响应,转为 Follower 并更新任期,退出 Candidate 状态。
统计收到的正式投票赞成票数。
遍历配置中的 server 集合, 过滤出状态为 Voter 的 server, 最后通过 n/2 + 1
公式计算出法定投票数, 简单说就是绝大多数的节点数量.如果获得多数派正式投票赞成 ,赢得选举 ,转为 Leader 状态,设置自己为 Leader,退出 Candidate 状态。
配置变更请求 (r.configurationChangeCh
): 由于 Candidate 不是 Leader,拒绝此类请求并返回错误。
选举超时 (electionTimer
):如果在超时时间内未能赢得选举(无论是在预投票阶段等待,还是在正式投票阶段等待),则认为本次选举失败,函数 return
。由于外部调用 runCandidate
的逻辑通常在一个无限循环中,这将导致节点在新的任期中重新开始 竞选过程。
节点关闭 (r.shutdownCh
): 收到关闭信号,退出循环和函数。
退出处理:
无论通过哪种方式退出 runCandidate
函数(成为 Follower、成为 Leader、shutdown、选举超时),都会执行 defer
中设置的逻辑,将 candidateFromLeadershipTransfer
标志重置为 false,防止其影响后续的选举行为。
相关源码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 func (r *Raft) runCandidate() { term := r.getCurrentTerm() + 1 r.logger.Info("entering candidate state" , "node" , r, "term" , term) metrics.IncrCounter([]string {"raft" , "state" , "candidate" }, 1 ) var voteCh <-chan *voteResult var prevoteCh <-chan *preVoteResult if !r.preVoteDisabled && !r.candidateFromLeadershipTransfer.Load() { prevoteCh = r.preElectSelf() } else { voteCh = r.electSelf() } defer func () { r.candidateFromLeadershipTransfer.Store(false ) }() electionTimeout := r.config().ElectionTimeout electionTimer := randomTimeout(electionTimeout) preVoteGrantedVotes := 0 preVoteRefusedVotes := 0 grantedVotes := 0 votesNeeded := r.quorumSize() r.logger.Debug("calculated votes needed" , "needed" , votesNeeded, "term" , term) for r.getState() == Candidate { r.mainThreadSaturation.sleeping() select { case rpc := <-r.rpcCh: r.mainThreadSaturation.working() r.processRPC(rpc) case preVote := <-prevoteCh: case vote := <-voteCh: r.mainThreadSaturation.working() if vote.Term > r.getCurrentTerm() { r.logger.Debug("newer term discovered, fallback to follower" , "term" , vote.Term) r.setState(Follower) r.setCurrentTerm(vote.Term) return } if vote.Granted { grantedVotes++ r.logger.Debug("vote granted" , "from" , vote.voterID, "term" , vote.Term, "tally" , grantedVotes) } if grantedVotes >= votesNeeded { r.logger.Info("election won" , "term" , vote.Term, "tally" , grantedVotes) r.setState(Leader) r.setLeader(r.localAddr, r.localID) return } case c := <-r.configurationChangeCh: r.mainThreadSaturation.working() c.respond(ErrNotLeader) case <-electionTimer: r.mainThreadSaturation.working() r.logger.Warn("Election timeout reached, restarting election" ) return case <-r.shutdownCh: return } } }
1.3 leader 领导者运行逻辑
主要逻辑: runLeader
为 leader 领导者的核心处理方法. 进入该函数说明当前节点为 leader.
startStopReplication
启动各个 follower 的 replication, 开启心跳 heartbeat 和同步 replicate 协程.
构建一个 LogNoop 空日志, 然后通过 dispatchLogs
方法发给所有的 follower 副本. 这里的空日志用来向 follower 通知确认 leader, 并获取各 follower 的一些日志元信息.
leaderLoop
为 leader 的主调度循环.响应 Follower 的复制进度、处理新的客户端请求、复制日志、管理集群配置变更、处理领导权转移、定期验证自身的 Leader 身份,并在需要时触发降级。
相关源码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 func (r *Raft) runLeader() { r.logger.Info("entering leader state" , "leader" , r) metrics.IncrCounter([]string {"raft" , "state" , "leader" }, 1 ) overrideNotifyBool(r.leaderCh, true ) notify := r.config().NotifyCh if notify != nil { select { case notify <- true : case <-r.shutdownCh: select { case notify <- true : default : } } } r.setupLeaderState() stopCh := make (chan struct {}) go emitLogStoreMetrics(r.logs, []string {"raft" , "leader" }, oldestLogGaugeInterval, stopCh) defer func () { }() r.startStopReplication() noop := &logFuture{log: Log{Type: LogNoop}} r.dispatchLogs([]*logFuture{noop}) r.leaderLoop() }
startStopReplication 启动日志复制机制:启动各个 follower 的 replication, 开启心跳 heartbeat 和同步日志.
为什么需要 leader 给 follower 发送心跳 ? raft 论文里有说明, 当 follower 在一段时间内收不到 leader 的心跳请求时, 则判定 leader 有异常, 切换到 candidate 进行选举 election.
关于replicate 如何进行日志复制,将在下一篇文章介绍
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 func (r *Raft) startStopReplication() { for _, server := range r.configurations.latest.Servers { if server.ID == r.localID { continue } s, ok := r.leaderState.replState[server.ID] if !ok { s = &followerReplication{ ... } r.leaderState.replState[server.ID] = s r.goFunc(func () { r.replicate(s) }) } } }func (r *Raft) replicate(s *followerReplication) { ... r.goFunc(func () { r.heartbeat(s, stopHeartbeat) }) ... ... }func (r *Raft) replicate(s *followerReplication) { stopHeartbeat := make (chan struct {}) defer close (stopHeartbeat) r.goFunc(func () { r.heartbeat(s, stopHeartbeat) }) RPC: shouldStop := false for !shouldStop { select { case <-s.triggerCh: lastLogIdx, _ := r.getLastLog() shouldStop = r.replicateTo(s, lastLogIdx) if !shouldStop && s.allowPipeline { goto PIPELINE } } return PIPELINE: s.allowPipeline = false if err := r.pipelineReplicate(s); err != nil { } goto RPC }
leaderLoop leaderLoop
是 Leader 状态下的大脑,它不断监听和处理各种事件,核心职责包括:响应 Follower 的复制进度、处理新的客户端请求、复制日志、管理集群配置变更、处理领导权转移、定期验证自身的 Leader 身份,并在需要时触发降级。
初始化:
设置一个 stepDown
标志,用于标记 Leader 是否因配置变更(尤其是移除自身)等原因需要降级。
初始化 Leader Lease(租约)计时器,用于定期验证 Leader 身份。
进入主循环:
节点进入一个无限循环,只要其状态保持为 Leader
就会持续运行。
事件驱动处理: 在循环中,通过 select
语句监听多个通道,处理来自外部或内部的各种事件:
接收 RPC 请求 (rpcCh
):
处理来自其他节点的 RPC,包括:
Follower 对 Leader 发送的 AppendEntries RPC 的响应(确认日志复制进度)。
来自其他节点的 RequestVote RPC(如果发现更高任期,Leader 会立即降级为 Follower)。
其他可能的 RPC。
内部降级信号 (leaderState.stepDown
): 收到信号后,Leader 立即转为 Follower 状态,并退出循环。
领导权转移请求 (leadershipTransferCh
):
处理手动触发的领导权转移请求。
检查是否已有转移正在进行。
选择或确认目标 Follower。
启动一个后台协程来执行转移逻辑(通常涉及等待日志同步并向目标 Follower 发送 TimeoutNow RPC)。
设置标志防止并行转移,并处理超时或 Leader 自身降级的情况。
日志提交通知 (leaderState.commitCh
):
当有日志条目被多数节点复制成功(达到可提交状态)时收到此通知。
更新 Leader 的 commitIndex
(已提交日志的最大索引)。
检查是否有新的配置变更日志被提交,如果 Leader 自己被移除,则设置 stepDown
标志。
从待提交队列 (inflight
) 中找出所有已落后于 commitIndex
的日志。
批量应用日志: 将这些已提交的日志批量应用到状态机 (FSM - State Machine),处理其副作用(如配置变更执行)。
清理已应用的日志。
如果 stepDown
标志被设置,根据配置选择关闭节点或降级为 Follower。
Leader 验证请求 (verifyCh
):
这是 Leader Lease 机制的一部分。Leader 定期向 Follower 发送验证请求。
处理来自验证请求的响应。如果未能获得多数节点的确认,说明可能已有新的 Leader 出现,当前 Leader 降级为 Follower。
用户快照恢复请求 (userRestoreCh
): 处理从用户提供的快照恢复状态的请求。
获取配置请求 (configurationsCh
): 处理客户端获取当前集群配置的请求。
配置变更请求 (configurationChangeChIfStable()
):
处理添加或移除节点等集群配置变更请求。
将配置变更作为特殊的日志条目追加到 Raft 日志中,并像普通日志一样复制和提交。
引导请求 (bootstrapCh
): 拒绝运行时的引导请求,因为 Raft 只在初始状态允许引导。
新日志条目请求 (applyCh
):
处理来自客户端的新的命令或数据请求。
批量提交优化: 尝试从通道中一次性读取多个待处理的客户端请求,进行批量处理。
如果 stepDown
标志已设置,拒绝新的日志请求。
将这些新的请求封装成日志条目,追加到 Leader 的日志中。
分发日志: 将这些新日志条目分发(通过 AppendEntries RPC)给所有 Follower,驱动日志复制过程。
Leader Lease 计时器超时 (lease
):
定期触发 Leader Lease 检查。
调用 checkLeaderLease
方法验证 Leader 身份。
根据检查结果调整下一个租约检查的间隔,并重置计时器。
如果租约验证失败,也可能触发降级。
通知通道 (leaderNotifyCh
, followerNotifyCh
): 用于唤醒等待特定事件的内部协程。
关机信号 (shutdownCh
): 收到信号后,Leader 退出循环并停止运行。
退出: 循环在以下情况下退出:
Leader 状态改变为 Follower(因发现更高任期、收到降级信号、Leader Lease 验证失败或自身被移除)。
节点收到关机信号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 func (r *Raft) leaderLoop() { stepDown := false lease := time.After(r.config().LeaderLeaseTimeout) for r.getState() == Leader { r.mainThreadSaturation.sleeping() select { case rpc := <-r.rpcCh: r.mainThreadSaturation.working() r.processRPC(rpc) case <-r.leaderState.stepDown: r.mainThreadSaturation.working() r.setState(Follower) case future := <-r.leadershipTransferCh: case <-r.leaderState.commitCh: r.mainThreadSaturation.working() oldCommitIndex := r.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex() r.setCommitIndex(commitIndex) if r.configurations.latestIndex > oldCommitIndex && r.configurations.latestIndex <= commitIndex { r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) if !hasVote(r.configurations.committed, r.localID) { stepDown = true } } start := time.Now() var groupReady []*list.Element groupFutures := make (map [uint64 ]*logFuture) var lastIdxInGroup uint64 for e := r.leaderState.inflight.Front(); e != nil ; e = e.Next() { commitLog := e.Value.(*logFuture) idx := commitLog.log.Index if idx > commitIndex { break } metrics.MeasureSince([]string {"raft" , "commitTime" }, commitLog.dispatch) groupReady = append (groupReady, e) groupFutures[idx] = commitLog lastIdxInGroup = idx } if len (groupReady) != 0 { r.processLogs(lastIdxInGroup, groupFutures) for _, e := range groupReady { r.leaderState.inflight.Remove(e) } } metrics.MeasureSince([]string {"raft" , "fsm" , "enqueue" }, start) metrics.SetGauge([]string {"raft" , "commitNumLogs" }, float32 (len (groupReady))) if stepDown { if r.config().ShutdownOnRemove { r.logger.Info("removed ourself, shutting down" ) r.Shutdown() } else { r.logger.Info("removed ourself, transitioning to follower" ) r.setState(Follower) } } case v := <-r.verifyCh: case newLog := <-r.applyCh: r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) continue } ready := []*logFuture{newLog} GROUP_COMMIT_LOOP: for i := 0 ; i < r.config().MaxAppendEntries; i++ { select { case newLog := <-r.applyCh: ready = append (ready, newLog) default : break GROUP_COMMIT_LOOP } } if stepDown { for i := range ready { ready[i].respond(ErrNotLeader) } } else { r.dispatchLogs(ready) } case <-r.shutdownCh: return } } }
dispatchLogs 日志调度派发 dispatchLogs
用来记录本地日志以及派发日志给所有的 follower. 数据来源主要通过leedloop中的applyCh实现新的日志信息
1 2 case newLog := <-r.applyCh:
日志持久化 :
将日志写入本地磁盘,确保日志的持久化。如果写入失败,Leader 节点会降级为 Follower 节点,并通知调用者操作失败。
状态更新 :
commitment.match 来计算各个 server 的 matchIndex, 计算出 commit 提交索引.
更新 Leader 节点的匹配索引(match index
),表示本地节点已成功存储日志。
更新 Leader 节点的最后日志索引和任期信息。
触发日志复制 :
异步通知所有 Follower 节点的复制器,触发日志复制流程,确保日志被同步到集群中的其他节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 func (r *Raft) dispatchLogs(applyLogs []*logFuture) { now := time.Now() defer metrics.MeasureSince([]string {"raft" , "leader" , "dispatchLog" }, now) term := r.getCurrentTerm() lastIndex := r.getLastIndex() n := len (applyLogs) logs := make ([]*Log, n) metrics.SetGauge([]string {"raft" , "leader" , "dispatchNumLogs" }, float32 (n)) for idx, applyLog := range applyLogs { applyLog.dispatch = now lastIndex++ applyLog.log.Index = lastIndex applyLog.log.Term = term applyLog.log.AppendedAt = now logs[idx] = &applyLog.log r.leaderState.inflight.PushBack(applyLog) } if err := r.logs.StoreLogs(logs); err != nil { r.logger.Error("failed to commit logs" , "error" , err) for _, applyLog := range applyLogs { applyLog.respond(err) } r.setState(Follower) return } r.leaderState.commitment.match(r.localID, lastIndex) r.setLastLog(lastIndex, term) for _, f := range r.leaderState.replState { asyncNotifyCh(f.triggerCh) } }
四、问题
问题 1:Raft 如何处理网络分区?
解答:Raft 通过领导人选举和多数派机制来处理网络分区。
分区形成: 当网络分区发生时,集群可能分裂成多个部分,每个部分都无法与多数节点通信。
选举: 在每个分区中,如果 Follower 节点在选举超时时间内没有收到 Leader 的心跳,它们会发起选举。
多数派: 只有包含多数节点的分区才能选出新的 Leader。少数派分区中的节点无法赢得选举,因为它们无法获得多数票。
旧 Leader: 如果旧 Leader 位于少数派分区,它会因为无法与多数节点通信而退位成 Follower。
数据一致性: Raft 保证,即使在网络分区的情况下,也只有一个分区能够提交新的日志条目,从而保证数据一致性。
分区恢复: 当网络分区恢复后,少数派分区中的节点会重新加入集群,并从新 Leader 那里同步最新的日志。
问题 2:Raft 如何保证数据一致性?
解答:Raft 通过以下机制保证数据一致性:
强领导者: 只有 Leader 才能接受客户端请求并生成新的日志条目。
日志复制: Leader 将日志条目复制到所有 Follower。只有当多数 Follower 确认收到日志条目后,Leader 才会提交该条目。
仅追加日志: 日志条目只能追加到日志末尾,不能修改或删除。
选举限制: 只有拥有最新日志的节点才能成为 Leader。
提交限制: 只有 Leader 才能推进 commitIndex
,且 commitIndex
只会单调递增。
状态机: 所有节点按照相同的顺序应用已提交的日志条目到状态机,保证状态机状态一致。
问题 3:Raft 中的 commitIndex
和 lastApplied
有什么区别?
解答:
commitIndex
: 表示已知已提交的最高日志条目的索引。这意味着索引小于或等于 commitIndex
的所有日志条目都已安全地复制到多数节点,可以应用到状态机。
lastApplied
: 表示已应用到状态机的最高日志条目的索引。每个节点独立维护自己的 lastApplied
。
关系: lastApplied
通常小于或等于 commitIndex
。当 lastApplied
小于 commitIndex
时,表示节点正在将已提交的日志条目应用到状态机。当两者相等时,表示状态机是最新的。
问题 4:Raft 如何处理客户端请求的幂等性?
解答:Raft 本身不直接处理客户端请求的幂等性。幂等性通常需要在客户端或应用层实现。一种常见的做法是:
客户端生成唯一 ID: 客户端为每个请求生成一个唯一的 ID(例如 UUID)。
服务器跟踪 ID: 服务器跟踪已处理的请求 ID。如果收到具有相同 ID 的重复请求,服务器可以直接返回之前的结果,而无需重新执行操作。
状态机: 在应用层,状态机可以记录已执行的请求 ID,以避免重复执行。
五、参考链接 1.Hashicorp Raft实现和API分析
2.hashicorp raft源码学习
3.源码分析 hashicorp raft replication 日志复制的实现原理