Etcd Raft使用入门及原理解析

Etcd Raft使用入门及原理解析

閱讀本文約花費: 13 (分鐘)

什么是Raft

Raft是一个分布式一致性算法,充分的利用了可复制状态机以及日志。其最核心的设计目标就是易于理解。在性能、错误容错等方面来看有点类似Paxos,但不同之处在于,Raft论文较为清晰的描述了其主要流程以及其中一些细节问题,而Paxos我们知道非常难以理解。

当构建一个分布式系统时,一个非常重要的设计目标就是fault tolerance。如果系统基于Raft协议实现,那么当其中一个节点挂掉,或者发生了网络分区等异常情况时,只要大多数节点仍然能够正常通讯,整个集群就能够正常对外提供服务而不会挂掉。

关于Raft更多的细节,这里建议直接阅读论文: “In Search of an Understandable Consensus Algorithm

介绍

Etcd的Raft库已经在生产环境得到了非常广泛的应用,有力的支撑了etcd、K8S、Docker Swarm、TiDB/TiKV等分布式系统的构建,当你能够熟练的使用一个成熟的Raft库、甚至如果能够自己实现一个,那会有种’有了锤子,干什么都是钉子’的感觉。

特性

Etcd raft基本上已经实现了Raft协议的完整特性,包括: – Leader选举 – 日志复制 – 日志压缩 – 成员变更 – Leader和Follower都支持高效的线性只读查询请求 – 通过batch、pipeline等手段优化日志复制、网络IO的延迟

概览

etcd的raft实现都在etcd/raft目录下,但是大部分的实现都在下面几个比较核心的文件: – raft.go: 从名字也可以看出来,这个是最核心的部分,比如leader选择的逻辑、raft消息的处理逻辑等 – node.go: 可以理解为raft集群的一个节点,客户端也主要是这个类打交道,比如心跳的逻辑、propose、状态机、成员变更等都是这个类负责处理。 – log.go: raft日志相关的代码,比如保存日志记录 – raft.proto: 定义了raft一些核心的RPC数据结构,由于protobuf是跨语言的,因此如果想用其他语言重写etcd raft,那么至少这部分内容都是可以复用的

用法

客户端主要使用Node和raft集群交互,首先需要启动一个raft集群,有两种方式: – 启动一个全新的raft集群 – 加入一个已经存在的raft集群(节点重启、扩容、缩容)

启动一个三节点的集群:

storage := raft.NewMemoryStorage()
  c := &Config{
    //代表一个节点的ID,必须唯一,并且不能为0,不能重复利用,和zookeeper的id类似
    ID:              0x01, 
    ElectionTick:    10, 
    HeartbeatTick:   1,
    Storage:         storage,
    MaxSizePerMsg:   4096,
    MaxInflightMsgs: 256,
  }

 //设置节点列表
  n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

这里需要强调一个点,etcd的raft实现并不包括网络部分,网络通讯部分需要使用者自己实现,因此这里节点列表传入的是ID,而ip:port到id的映射需要库使用者自己实现。 如果让一个新的节点加入集群,那么就不需要传入节点列表,首先通过ProposeConfChangeRPC发起一个成员变更请求,在任意一个raft集群节点都可以,然后启动这个节点:

//配置参考上文中的代码段
  n := raft.StartNode(c, nil)

如果是重启一个节点,那么这里需要注意,我们需要恢复这个节点之前的状态,比如当前term、根据快照和日志恢复状态机等:

storage := raft.NewMemoryStorage()

  // Recover the in-memory storage from persistent snapshot, state and entries.
  // 根据快照、entry日志等恢复当前raft节点到之前的状态
  storage.ApplySnapshot(snapshot)
  storage.SetHardState(state)
  storage.Append(entries)

  c := &Config{
    ID:              0x01,
    ElectionTick:    10,
    HeartbeatTick:   1,
    Storage:         storage,
    MaxSizePerMsg:   4096,
    MaxInflightMsgs: 256,
  }

  // Restart raft without peer information.
  // Peer information is already included in the storage.
  // 重启该raft节点,此时不用传入任何节点相关信息,因为已经在刚刚的恢复过程中填充好了
  n := raft.RestartNode(c)

当raft集群启动完成后,对于一个raft节点,用户需要做几件事情,伪码如下:

for {
    select {
    case <-s.Ticker:
      n.Tick()
    case rd := <-s.Node.Ready():
      saveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
      send(rd.Messages)
      if !raft.IsEmptySnap(rd.Snapshot) {
        processSnapshot(rd.Snapshot)
      }
      for _, entry := range rd.CommittedEntries {
        process(entry)
        if entry.Type == raftpb.EntryConfChange {
          var cc raftpb.ConfChange
          cc.Unmarshal(entry.Data)
          s.Node.ApplyConfChange(cc)
        }
      }
      s.Node.Advance()
    case <-s.done:
      return
    }
  }
case <-s.Ticker

库使用者需要定时调用tick()方法,根据节点当前的角色调用对应的逻辑: – 心跳, leader需要定时发送心跳包给follower – 选举,如果一定时间没有收到leader的心跳,则转换为候选者,竞选leader

case rd := <-s.Node.Ready(): 处理Ready

Ready封装了可以准备开始读取的entries、messages,需要保存到持久化介质、同步给其他节点:

type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState

// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState

// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState

// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry

// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot

// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry

// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message

// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
  • 调用Node.Ready(),处理当前raft节点的状态,其中有些步骤可以并行执行
    • 将entries、HardState、快照按照顺序写到持久化介质中,底层存储介质支持原子写入,那么也可以一次性将他们写入
    • 将所有的消息发送给远程节点,但一定要先将最近的HardState、上一轮Ready中的entries都持久化之后(可以和同一轮的entries持久化并行执行)。如果有类型为MsgSnap的消息,在这个消息发送成功之后,需要调用Node.ReportSnapshot()
    • 如果有快照的话需要和已提交的entries一起应用到状态机(库使用者提供),如果已经提交的entries中包含EntryConfChange,那么需要调用Node.ApplyConfChange()将节点的变更信息同步到本节点
  • 调用Node.Advance()通知节点,表明本轮Ready已经处理完毕,可以开始处理下一轮。

另外还需要注意,由于网络部分需要库使用者自己实现,因此当收到一条消息的时候,需要将该消息转发给raft节点:

func recvRaftRPC(ctx context.Context, m raftpb.Message) {
n.Step(ctx, m)
}

发起提议

如果需要向raft集群发起一个提议,那么需要用下面这种方式:

// 协议的数据持久化成字节数组
n.Propose(ctx, data)

如果找个提议处理完成(已经持久化到持久化介质并同步到其他节点),那么就可以通过Ready的comitedEntries获取到,类型是raftpb.EntryNormal, 然后用户就可以根据自己的业务逻辑,将其应用到状态机中。

raft集群不保证该协议一定能够处理成功,若一定超时时间内,还未收到响应,那么需要根据业务场景考虑是否需要重试。

节点变更

如果需要对raft集群扩容或缩容,那么需要构造ConfChange,并调用:

n.ProposeConfChange(ctx, cc)

如果该变更请求处理成功,那么在commitedEntries中会有一条类型为raftpb.EntryConfChange的记录,

var cc raftpb.ConfChange
cc.Unmarshal(data)
n.ApplyConfChange(cc)

需要自己实现的部分

etcd的raft已经实现了大部分的功能,但是还是有几个组件需要使用者自己根据业务场景实现: – 网络通讯部分 – Write ahead log – 快照

网络通讯部分

网络部分说白了就是消息的收发,你可以理解为raft只依赖了接口,这个接口实现了两个方法: sendreceive,但是具体的实现需要库使用者自己写,这部分相对比较简单,使用RPC、HTTP、自定义协议都可以,具体的实现逻辑可以参考etcd自己的代码

Write-Ahead-Log(WAL)

如上文中提到的,用户需要保存Ready中的一些状态,比如entries、hardstate等,WAL有很多分布式系统都实现了,基本上参考他们的实现,结合自己的业务实现一个难度不会很大,如果是直接使用etcd raft库,那么可以直接基于etcd中wal的实现,另外也可以基于RocksDB等嵌入式KV实现,但是对于key-value的结构设计要考虑好,wal的原理后面有时间再叙述。

快照

快照应该都知道,比如说Redis的持久化,有一种模式是保存用户发过来的命令,但时间长了之后,这个日志会变的越来越大,这个时候当你扩容、重启节点的时候,加载这个文件会耗费很长时间,导致服务不可用,因此需要将内存中的状态持久化到磁盘中。 比如:

incr index 
incr index

这个时候index的值为2,当然这个例子只有两条命令,但假如说有一千万条记录,那么重放日志需要耗费很长时间,因此我们可以直接将index:2这个kv对写到磁盘中,那么这个时候之前对这个key的一千万条操作日志就变成了这一条记录。 那么raft的快照其实也类似,应用需要将自己状态机的当前快照,持久化成一个快照文件,并写入磁盘中,我们知道这个过程会非常慢,因此可以考虑和其他过程并行执行,以及其他的一些性能优化,这个后面的博客再写。 简单来实现的话,我们直接将状态机用json序列化成一个字节数组,并写入到本地文件中,后续读取的时候。

如何基于raft实现一个简单的分布式KV存储

这里简单描述一下流程,只是为了更容易理解etcd raft的使用方法,后面会再写篇博客详细记录: – 应用实现自己的状态机,处理快照、已提交日志、WAL等 – 当用户发起一个put请求时,将该请求序列化成字节数组,propose到raft集群 – 处理成功后,会出现在commitedEntries中,解析该entry,回放到状态机中,这个时候该请求的结果已经可以在所有的raft节点上查询到了 – 用户发起查询请求,直接在用户封装的状态机中查询,并返回给用户

总结

本文只是简单描述了下etcd raft的使用方法,总的来说etcd raft的实现已经非常完善,但还是需要用户自己处理非常多的细节,比如网络、write aheadlog等,如果对raft不熟悉,相信会很难上手,我的想法是能够在其之上再封装一层,提供一个状态机接口,用户只需要关心自己的业务逻辑,其他的全部都交给库来处理。

Rate this post

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注