raft: 实现总结
这篇文章是对实现 Raft 后的一次总结,代码的实现可以在 raft.me 这个代码 仓库中找到。这个实现使用了 ZeroMQ 的 C 语言绑定 CZMQ,运行之前需要安装这些依赖。 编译完之后,启动三个节点的集群的命令如下:
$ ./raft.me tcp://127.0.0.1:57000 tcp://127.0.0.1:57001 tcp://127.0.0.1:57002 0
$ ./raft.me tcp://127.0.0.1:57000 tcp://127.0.0.1:57001 tcp://127.0.0.1:57002 1
$ ./raft.me tcp://127.0.0.1:57000 tcp://127.0.0.1:57001 tcp://127.0.0.1:57002 2
上面的每条命令需要打开一个新的终端,在里面输入运行。运行的过程中,可以通过 C-c 来任意终止程序的执行,只要保证大多数节点的正常(这里是两个节点),程序就可以保证 最终所有的数据都会同步。
为了方便,这里将实现的代码放到这篇文章的末尾。但代码仓库中还包含了具体实现的 PlusCal伪代码描述。
参考资料:
- In Search of an Understandable Consensus Algorithm (Extended Version)
- CONSENSUS: BRIDGING THEORY AND PRACTICE
Raft 一致性算法解决什么问题?
Raft 解决的问题和 Paxos 解决的问题是一样的。在异步网络模型中,在给定的一个节点集 合中,这个集合的每个节点都可以向集合的其他节点同步数据,在节点集合中,即使有些节 点会崩溃重启,这些节点最终的数据都是一致的。
异步网络模型的意思是:
- 节点运行的速度是任意的
- 节点间只能通过消息进行通信
- 消息可能会丢失、乱序、重复
- 消息不会损坏
“节点运行的速度是任意的”:意味着节点可能会崩溃,可能负载繁忙,运行时间很长。这 意味着算法可以容忍某些节点崩溃的情况下完成数据的同步,只要最终这些节点正常运行, 数据就可以完全地同步。这种数据同步的方法,是具有容错性的。
举个例子,有三个节点集合:{n1, n2, n3},客户端要向这些节点写入的数据序列是:« 1, 2, 3 »。这三个节点中,即使有一个崩溃,但最终正常运行,那么一致性算法可以保证 这些节点的每个节点保存的数据都是 « 1, 2, 3 »。
具有容错性质的一致性算法,有什么用?
在现实世界中(2018),越来越多应用的部署是分布式的。分布式的意思是,应用部署在不 同的机器上,它们没法通过本机通信机制来通信(管道、信号量、共享内存等),它们只能 通过网络来通信。为了保证应用的高可用,常常会在多个节点部署相同的应用,这些节点就 组成了一个集群,用户在可以访问集群中的任意一个节点来使用。
为了说明“分布式”和“集群”的概念,请看下面的例子:
现在有两个应用: {A , B},它们需要使用对方提供的功能。有 5 个节点: {n1, n2, n3, n4, n5}。如果应用A 部署在节点 n1 上,表示为 n1.A。现在有几种部署方式:
n1.A n2.B:这种部署也可以认为它们是分布式部署的,虽然它们是不同的应用,但是它们 只能通过网络消息进行通信,符合分布式系统的定义。
n1.A n2.A n3.A:这种部署就是一个集群。集群就是相同应用的集合。
在现实世界中,集群是一项非常有用的技术。因为在世界各地,访问本地的机器比访问另一 个半球的机器要快得多。将应用部署在北京、上海、广州,北京的用户访问北京节点,比访 问广州节点要快。由于一个节点总是有资源上限的,如果一个节点最多只能服务一千个 (1,000)用户,如果在一个地区中有上万(>10,000)个用户,那么一个节点就不能同时服 务这些用户了,需要使用集群将这些用户分摊在这些节点中。
如果应用是没有状态的,那么节点间就不需要进行数据同步。但是,在一个系统内不可能是 无状态的,系统总要保存一些数据以便能够正常运行。也就是说,系统内必定会有一些组件 要保存状态。
举个例子,现在许多应用都要保存用户名和密码。这些数据会保存在数据库软件中,如果只 将数据保存在一个节点,如果这个节点出现问题,暂时无法访问,那么整个系统都会受到极 大的影响。
为了消除这种单点故障对系统的影响,数据库通常会部署为主从模型进行数据备份。通常为 了保证性能,数据同步一般都是使用异步传输的方式。数据写入主节点后,会立刻返回,在 返回之后再将数据传输到从节点进行保存。
如果使用异步传输的方式,就涉及了在两个节点间进行数据同步的问题。假设现在有两个节 点(P、B),P 节点现在是主节点,处理用户的数据,同时将数据传输到从节点 B 节点保 存。考虑下面的场景:
- 用户修改密码,应用将密码保存在 P 节点
- P 节点保存数据并返回给应用
- P 节点崩溃
- B 节点变为主节点向外提供读写服务
- 用户修改密码,应用将密码保存在 B 节点中
- P 节点重启
在这种场景下,P 节点和 B 节点保存的数据就发生了不一致的情况。解决这种情况有许多 种方式,其中之一就是使用同步的方式进行数据传输,在上面的例子中,P 节点需要将数据 写入本节点和 B 节点之后,再给用户返回。但是,这样的性能就会受到一定的影响。
除了这种对数据的备份需要在节点间进行数据同步外,减少数据库的压力也需要进行数据同 步。原因还是在于不得不做,因为一个节点的资源有限,一个节点通常只能服务上千的用户, 如果有上万的用户需要访问,就需要使用集群。而这种数据集群,需要在集群的节点间进行 数据同步。如果一个集群有 10 个节点,使用同步的方式要写到所有的 10 个节点中,而且 其中有一个节点崩溃,整个系统就不可用了;即使所有节点都正常,一个节点的网络抖动, 也会影响到整个系统。
在有多个节点的集群间进行数据同步,数据传输的方式使用同步的方式就不可取。那么只能 使用异步的方式进行数据传输,这样就需要解决数据的不一致问题,还有需要让系统可以容 忍一些节点的崩溃。
分布式的一致性容错算法,就是解决上面提到的问题。但是,通常一些小型系统的并发用户 量不会超过一千,数据库使用主从模式就可以工作得很好,没有必要解决这种多个节点间的 数据同步问题。
使用异步方式传输数据需要解决的问题
我对于异步和同步的数据传输方式的理解是,异步是相对于同步而言的。如果同步方式是指 将数据保存到 5 个节点返回,那么异步的方式就是在数据没有保存在 5 个节点之前就返回。 这就说明,异步方式包括将数据保存在 0~4 个节点后返回。
我认为异步方式同步数据,需要主要解决两个问题:
- 数据应该保存在多少个节点才返回
- 如何解决两个节点的数据发生冲突
Raft 是如何解决数据一致性问题的?
简单地概括 Raft 算法:选出一个主节点,由主节点将数据同步到集群内大多数的节点。
选择数据同步的方式有很多种,这里只讨论从主节点将数据传输到其他节点的方式。没有选 择主节点进行数据同步的方式也可以解决这个问题,Paxos 和 Blockchain 就可以在没有主 节点的情况下解决数据的一致性问题。
既然选择了主节点同步数据的方式,那么就需要解决在不同的主节点中传输时可能产生的数 据冲突。当节点 n1 当选为主节点,它写了一部分数据;在 n1 之后,n2 当选为主节点, 它也写了一部分数据。主节点的变更可以表示为一个序列:« n5, n1, n2, … »。
考虑这样的场景:主节点变更的情况是 « n1, n2, n1, n3 »。节点 n4 接收到主节点的 数据包,要求保存数据。n4 接收到的数据包有可能是 {n1, n2, n3} 中的任意一个节点发 送的,因为消息传递的时间是任意的。节点需要能够区分到底是哪个主节点发送过来的,所 以节点每次变为主节点的标识都应该是唯一的。这样主节点有 n1 -> n2,也可以区分 n1 -> n2 -> n1 这样的情况。
节点在能够区分主节点之后,它就要决定究竟要接受哪个主节点传输的数据。这个过程应该 是确定性的,一旦选择之后就不能更改,如果能够更改的话,就会导致数据可能会不断变化, 没办法达到最终的一致性。一个简单的方法是,接受先收到或者后收到的消息,但是这样结 果就不是确定性的。接收顺序的不同会导致不同的结果:例如,接收顺序是 « n1, n2 » 和 « n2, n1 »,它们的结果就不一样;一个选择 n1,一个选择 n2。
因此,多个节点的标识,能够通过比较,得到一个确定性的结果。选择不同的标识,将会导 致选择不同的节点。Paxos/Raft 使用数字来对节点进行标识,Blockchain 使用节点的工作 量来进行标识。这样节点接收顺序即使不一样,也可以做出一个确定性的选择。
例如,收到不同主节点消息的顺序是 « n1, n2, n1 »,如果第一个 n1 的标识是 3,第 一个 n2 的标识是 5,第三个 n1 的标识是 4。接收顺序可以表示为 « 3, 5, 4 »,这时 通过选择最大或者最小的标识值。对于 Raft 来说,它让节点选择的是最大的标识值(5)。 对于 Paxos 来说,它让节点选择的是最小的标识值(3)。但归根到底 Raft 选择的还是最 小的节点标识,但是这个选择是在主节点做出的。
为了得到确定性的结果,还需要每个节点标识都是唯一,任意两个节点标识都不一样。所以, 现在需要解决的问题是,怎么保证每个主节点选择的节点标识,和其他所有的主节点的节点 标识都不一样。这里的方法不止一种,我们仅仅讨论 Raft 的做法。
只要主节点在发送数据之前,保证主节点标识的唯一性,那么其他节点收到的主节点标识也 是具有唯一性的。对于同一个节点变为主节点多次,可以使用一个数字的递增来表示不同的 主节点;而对于不同的节点变为主节点,每次变为主节点之后,递增主节点标识,就可以表 示不同的主节点标识。这样就可以保证主节点标识是唯一的。
新的主节点标识需要保证比旧的主节点标识大,因此新的主节点必须要知道旧的主节点标识, 它需要和其他的节点交换选择的主节点标识,才能确定选择了一个比旧主节点标识更大的标 识。主节点是由非主节点变成的,只要能够保证一个主节点标识最多只能由一个非主节点获 得,就可以保证一个主节点标识只有一个主节点。因此,一个主节点标识,可能没有节点获 得,也可能只有一个节点获得。
考虑任意两个非主节点(A, B),它们分别选择了主节点标识(A.x, B.y)。因为一个节点 选择的主节点标识比在这个节点出现的主节点标识都大,因此两个选择的主节点标识,最大 的那个主节点标识(max(A.x, B.y))将是比两个节点出现的主节点标识都大,并且是唯一 的。
一个非主节点如何获得一个主节点标识呢?一个非主节点选择了一个主节点标识,它不能马 上获得这个标识,因为其他节点可能选择相同的标识,这样就会造成主节点标识不是唯一的。 所以,一个非主节点在选择了一个主节点标识后,需要其他节点的确认。
如何使节点确认是唯一的呢?只要大多数的节点都确认一个非主节点获得了标识,并且支持 后就不能改变,因为集群内少数节点的数目不可能比大多数的数目大(5 个节点的集群,大 多数节点的个数是 {3, 4, 5},少数节点的数目是 {5-3, 5-4, 5-5} = {2, 1, 0})。使用 这个规则,只要大多数节点支持一个非主节点获得一个主节点标识,其他的非主节点就不能 获得相同的标识了。
既然需要节点确认,就需要记录节点给哪个非主节点和哪个主节点标识确认。这样它在接收 到其他非主节点的要求确认主节点标识时,能够知道是否确认或者拒绝。所以,节点记录的 确认是:[非主节点, 主节点标识]。
前面讲过,任意两个非主节点交换选择的主节点标识,选择大的那个可以确保主节点标识的 唯一性。选择小的主节点标识,需要确认选择大的主节点标识,将这个确认保存起来: [哪个非主节点, 大的主节点标识],同时将确认信息返回给对方。下次选择小的主节点标 识的这个节点,就可以从确认的最大的主节点标识开始递增。
非主节点在选择了一个主节点标识之后,它可以确定自己可以获得这个主节点标识。它给所 有的节点发出确认请求,等待接收确认响应。这个节点除了会接到确认响应,还会接到其他 节点的确认请求,如果它发现自己对同一个主节点标识已经确认过,就可以拒绝;如果对一 个主节点标识没有确认过,就可以确认。
如果有多个获得主节点标识的非主节点,到底应该选择哪一个作为主节点呢?(这些主节点 标识一定不相同,因为最多只有一个非主节点能够获得同一个主节点标识)这个结果应该是 确定性的,给定一个获得主节点标识的非主节点集合({n1, n2, n3}),选择多次,每次都 应该是相同的结果。选择最小或者最大的主节点标识,都会产生确定性的结果。如果选择最 小的,这个节点崩溃的话,整个系统就不能继续运行了;如果选择最大的,这个节点崩溃的 话,还会出现更大的主节点标识,系统可以正常运行。因此,当有多个非主节点都获得了主 节点标识,就选择拥有最大主节点标识的那个非主节点作为主节点。
现在,考虑一个获得主节点标识的非主节点,如何知道自己是否可以变为主节点。获得主节 点标识的非主节点,需要知道自己获得的主节点标识是不是当前集群内最大的。在没有收到 比自己更大的主节点标识时,这个节点就可以变为主节点。其他的节点在收到主节点的数据 时,只接受已确认过最大主节点标识的数据。在这种情况下,整个系统内,会有多个主节点, 但是每个主节点的主节点标识都不一样;在非主节点接收到数据时,它只保存已确认最大主 节点的数据,这样数据就不会发生冲突。拥有小的主节点标识的主节点,它最终会收到比它 大的主节点标识,给这个节点投票,同时放弃作为主节点,接收这个拥有更大主节点标识的 节点的数据。
在 Raft 算法中,主节点标识被称为“任期(term)”,确认任期叫做“投票(voteFor)”。 每个节点有自己的标识,这个标识和任期不同,节点标识(node_id)和一个任期组成一个 投票:voteFor = [term, node_id]。
解决了主节点选举的问题,接下来考虑主节点到非主节点的数据同步问题。
要求主节点和非主节点的数据是一致性的,将主节点接收的数据在非主节点上重演一遍,就 可以保证数据是一致的,数据可以看作是存放在一个无限长的数组中。
非主节点有可能接收到不同任期的主节点的数据,它需要区分并且要选择接收哪个主节点的 数据。考虑有两个不同任期的主节点,它们分别将数据同步到接收它们的非主节点中。这是 有可能发生的,任期小的主节点在不知道有任期比它更大的主节点时,能够同步数据到非主 节点。假设非主节点 n1 接收任期是 1 的主节点 n2 的数据,非主节点 n3 接收 主节点 n4 的数据,这时非主节点 n1 和 n3 的数据是冲突的。解决冲突时,需要选择哪个主节点 的数据作为最终的数据,所以非主节点要区分并且记录接收了哪个主节点的数据。区分主节 点,只要记录任期就足够了,在每个数据项中,需要添加接收的主节点任期,然后保存下来 以便之后解决冲突使用。
现在,非主节点保存数据是一个数组,数组里面的包括了接收的主节点任期。这一系列的数 据,用 C 语言表示的数据结构如下:
struct log {
struct log_entry entry[nentry];
};
struct log_entry {
uint64_t term;
};
要确保数据在所有的时间里是一致的,需要解决两个问题:1、不同任期的主节点向其他节 点同步数据,必须从这些主节点的数据中选择一个;2、新的主节点必须要拥有之前集群内 出现的主节点的所有数据。
对于第一个问题,同时存在两个任期不同的主节点同步数据,最终解决冲突时,需要选择一 个主节点的数据作为最终的数据。选择需要考虑,如果选择以后不能更改(如果选择后能够 更改,数据会不断变更,不能达到最终一致性,因为往后的选择会影响当前的选择)。
如果选择更小任期的主节点数据,那么这个主节点崩溃重启后,它会使所有的其他节点删除 更高任期的主节点数据。只要它不断地崩溃重启,系统不能正常运行。
如果选择更大任期的主节点数据,即使更大任期的主节点崩溃,也会有更大任期的主节点出 现。所以,选择更大任期的主节点数据,能够解决冲突。
对于第二个问题,新的主节点要拥有之前所有的主节点数据,只要每个后继任期的主节点都 拥有前任主节点的数据即可。只需要保证产生主节点时,只有拥有前任主节点数据的节点可 以变为主节点。如果主节点的数据同步到集群的大多数节点,这些大多数节点在任期确认时, 只要对方没有自己知道的前任主节点的数据,就拒绝任期确认。这样没有前任主节点数据的 节点,将不会得到这些大多数节点的任期确认(投票),将不能变为主节点。
在 Raft 算法中,为了清晰容易理解,进一步地将非主节点分为 Follower 和 Candidate。 Follower 只响应请求,Candidate 可以发起投票请求(任期确认)。每个节点在等待一段 时间,没有收到主节点的数据,就变为 Follower,同时递增任期。整个系统就可以运行下 去,但是,这个算法并不能保证最终一定能够可用,因为每个任期都可能选不出一个主节点 来。
至此,Raft 算法解决数据一致性问题的过程就讲解完毕了。
在常见场景下(五个节点),各个节点的行为
- 正常数据同步,数据如何复制到其他的节点?
首先,从集群中选出一个主节点,数据从主节点中发送到集群中其他的节点。
- 主节点崩溃,新的节点产生,其他 3 个节点怎么切换到新主节点?
假设主节点 n1 崩溃,余下的节点为 {n2, n3, n4, n5},它们在没有收到主节点的 AppendEntries 消息后超时,递增它们的任期,开始新一轮的主节点选举。一个节点等到它 自己,其他的两个节点给它的投票后变为主节点。
- 旧的主节点恢复,其他 3 个节点接受了新的主节点,这时会发生什么事情?
旧的主节点 n1 恢复,它的任期小于集群中其他节点的任期,它在超时之前会收到当前主节 点的 AppendEntries 请求,这时它会接受这个请求,同时重置选举定时器。
- 新主节点崩溃,旧主节点恢复,这时会发生什么?
新的主节点 n2 崩溃,它的任期和其余节点的任期相同;旧的主节点 n2 恢复,这时 n2 的 任期小于集群内其他节点的任期。
如果它在选举超时之前,收到其他节点的投票请求,会给这个节点投票,同时更新它自己的 任期到这个节点的任期。
如果它在选举超时之前,没有收到其他节点的投票请求,在选举超时之后会递增自己的任期, 同时向其他节点发送投票请求。
- 正常数据同步,一个从节点崩溃恢复,数据如何同步?
从节点崩溃前,如果它的任期和主节点的任期是相同的,在恢复之后,会收到主节点的 AppendEntries 请求,它的日志和主节点相比,如果相同的就插入日志中,如果落后,则等 待主节点找到缺失的位置,将数据发送过来。
- 正常数据同步,三个从节点崩溃,会发生什么?
n1 是主节点,n2 是从节点,{n3, n4, n5} 崩溃。这时 n1 仍然会能够写入新的日志到本 节点,同时同步数据到 n2 节点,但是日志的 commitIndex 不能更新,因为数据没有得到 大多数节点的确认。
- 新的节点加入(5 -> 6),集群如何处理?
主节点接收到这个请求,提交一个特殊的日志项,包括新的集群配置和旧的集群配置。接下 来所有的数据都需要同步是新的集群的大多数节点,和旧的集群的大多数节点。在这个特殊 的配置项同步到这两个大多数之后,主节点继续写入一个特殊的日志项,这个日志项包括新 的集群配置。将这个配置项同步到新的集群的大多数节点。
每个节点在接收到特殊的日志项的配置后,立刻应用。在包括新的集群配置的日志项提交后, 不在新集群的节点退出。
- 主节点和另一个从节点崩溃,另外 3 个节点存活,选出新主节点进行同步,这时旧的 2 个节点恢复,这时会发生什么?
旧的 2 个节点的任期小于另外的 3 个节点,它们会收到主节点的 AppendEntries 请求, 更新自己的任期到主节点的任期,同时将日志项写入自己的日志中。
- 所有节点初始化,到数据同步到所有节点,这中间经历了哪些过程?
主节点选举,日志同步。
- 一个从节点崩溃,集群选中新的主节点进行同步。从节点恢复,这时会发生什么?
接收主节点的 AppendEntries 请求,更新自己的任期到主节点的任期,可能会插入日志项。
- 旧的主节点崩溃,新的主节点选中,同步了一些数据,新的主节点崩溃,旧的主节点恢复,这时会发生什么?
旧的主节点恢复后,是 Follower 状态,它的任期小于集群中其他节点的任期。
在选举超时之前,如果收到投票请求,会给其他节点投票;在选举超时后,递增自己的任期, 给其他节点发起投票请求。
- 主节点崩溃,新的节点要加入,这时会发生什么?
选出一个新的主节点,这个新的主节点提交两个特殊的日志项。
- 一个从节点退出,一段时间后,重新加入,会发生什么?
收到 AppendEntries 请求后,任期更新到主节点的任期。没有收到则等待选举超时,发起 新一轮的选举。
- 如何保证结果的正确性?
每个任期最多只能有一个主节点,日志项在主节点的当前任期同步到大多数节点,才算提交。
- 另外 5 个节点的集群的数据不同,这个集群能加入吗?最终能不能形成 10 个节点的集群?a
如果是新加入的节点,不会有数据。有一种情况是节点退出集群后再加入,这时会找到数据 分叉的地方,删除不同的日志,接受主节点的数据同步。最终会形成新的配置指定的集群。
- 5 个节点,一个崩溃,另外 4 个(2-2)分裂成两个集群;它们各自选举,任期如何变化?
每个节点不断地递增任期,发起投票请求,选举超时后继续前面的过程。它们不能产生一个 主节点,因为集群内没有大多数的节点同意给一个节点投票。节点因为没有收到 AppendEntries 请求,最终会超时,递增任期,进行选举。
- 5 个节点,分裂成两个集群(2-3);2 个节点集群的任期如何变化?
如前所述。
- 主节点崩溃后恢复,数据如何复制?它还会再次成为 leader 吗?
如果它的日志落后于上次主节点提交的日志,那么它不能选举为主节点。
如果它的日志没有落后于上次主节点提交的日志,进行正常的选举流程。
- 节点收到投票请求后,会做什么操作?
如果收到的投票请求,任期比自己大,更新到这个任期,同时重置为 Follower 状态。
如果任期相同,还没有投票则给它投票;已经投票了,则不会投票。
如果任期小于自己,发送自己的任期回去,让它更新任期。
- 什么情况下会选不出领导?
没有大多数节点的投票。
- 领导选举的过程是怎样的?
每个节点在选举超时后,递增自己的任期,将投票请求发送给集群中其他的所有节点,在收 到大多数的投票同意后变为主节点。
每个节点在收到投票请求时,如果自己的任期小于对方的任期,更新任期到这个大的任期, 给这个节点投票;如果任期相同,没有投票,自己的日志和对方相同或者小于对方,则它投 票,如果已经投票则忽略请求;如果任期大于对方的任期,发送自己的任期回去。
- 当前的领导如何提交老的领导的 log entry?
主节点当选后,以当前最新的日志项同步到其他节点。如果收到确认的日志项位置,则从这 个确认的日志项的下一个位置方法日志。如果对方日志有冲突,它会删除日志,等待下一次 主节点的 AppendEntries 请求的到来。
- 如果新的领导删除了追随者的 log 而未同步自己的 log 到追随者,追随者的 log 会如何增长?
如果 Follower 的日志和主节点的日志后冲突,它会在收到一个 AppendEntries 请求时删 除自己的日志,直到日志和主节点的一致,再将主节点的日志同步到自己的日志中。整个过 程 Follower 的日志看起来是先减少再增加。
- 领导会不会删除自己的 log?
不会。
- 如何保证所有节点都执行相同的命令?一个从节点崩溃,它落后于主节点的进度,从节点恢复并成为主节点,如何保证它接下来的命令和老的领导不冲突?
所有节点只执行已提交的日志。
从节点崩溃后,它的日志如果和主节点已提交的日志冲突,不能当选为主节点。
代码实现
/*
* 为了获得对 Raft 算法的理解,实现它核心的同步协议。日志的存储是简单地限定 4K
* 字节的文件读写。集群成员的动态变更没有实现,日志快照没有实现。
*
* 实现这个算法,使它稳定可靠,要花费大量的时间来测试,这是一个时间黑洞……
*/
#include <assert.h>
#include <stdlib.h>
#include <czmq.h>
static const uint32_t Nil = ~0U;
static void *invalid_pointer = (void *)(~0UL);
enum state { Follower, Candidate, Leader };
static const char *state_string(enum state state)
{
if (state == Follower) {
return "Follower";
} else if (state == Candidate) {
return "Candidate";
} else if (state == Leader) {
return "Leader";
} else {
return "unknown";
}
}
struct entry {
uint64_t term;
uint32_t commandlen;
char *command;
};
struct log {
uint64_t commitIndex;
struct entry *entry;
uint64_t capacity;
uint64_t len;
};
struct log *create_log(void)
{
struct log *log = malloc(sizeof(struct log));
if (log) {
log->entry = calloc(1024, sizeof(struct entry));
if (log->entry) {
log->commitIndex = 0;
log->capacity = 1024;
log->len = 0;
return log;
} else {
return NULL;
}
} else {
return NULL;
}
}
void destroy_log(struct log **log)
{
if (log) {
struct log *ptr = *log;
if (ptr) {
assert(ptr->len < ptr->capacity);
int i;
for (i = 1; i <= ptr->len; i++) {
struct entry *e = &ptr->entry[i];
free(e->command);
}
free(ptr->entry);
*log = invalid_pointer;
}
}
}
struct node {
char *endpoint;
char identity[20];
uint32_t id;
uint32_t voteFor;
/* log replication */
uint64_t nextIndex; /* NOTE: count from 1, be careful! */
uint64_t matchIndex;
};
struct timer {
zloop_t *loop;
int election_id;
int replication_id;
int client_request_id;
};
struct server {
char filename[20];
struct timer timer;
zsock_t *sock;
int nserver;
struct node *cluster;
struct node *self;
struct log *log;
uint64_t currentTerm;
uint32_t grant_count;
enum state state;
};
uint64_t LastIndex(struct server *s)
{
return s->log->len;
}
uint64_t LastTerm(struct server *s)
{
if (s->log->len == 0) {
return 0; /* empty */
} else {
return s->log->entry[s->log->len].term;
}
}
int quorom_grant(struct server *s)
{
return s->grant_count * 2 > s->nserver;
}
int node_is_grant(struct node *node, uint32_t voter_id)
{
return node->voteFor == voter_id;
}
void server_update_election_timer(struct server *s);
void UpdateTerm(struct server *s, uint64_t term)
{
if (s->currentTerm < term) {
printf("%s:%lu: update term to %lu\n",
state_string(s->state), s->currentTerm, term);
if (s->state == Leader) {
int timer_id, rc;
/* Leader need to stop replication */
timer_id = s->timer.replication_id;
assert(timer_id > 0);
assert(s->timer.loop && s->timer.loop != invalid_pointer);
rc = zloop_timer_end(s->timer.loop, timer_id);
assert(rc == 0);
s->timer.replication_id = 0;
/* also client request */
timer_id = s->timer.client_request_id;
assert(timer_id > 0);
rc = zloop_timer_end(s->timer.loop, timer_id);
assert(rc == 0);
s->timer.client_request_id = 0;
}
s->currentTerm = term;
s->state = Follower;
s->grant_count = 0;
int i;
for (i = 0; i < s->nserver; i++) {
struct node *node = &s->cluster[i];
node->voteFor = Nil;
}
}
}
void send_RequestVoteRequest(struct server *s, struct node *dst)
{
assert(s);
assert(s->sock);
assert(dst);
zsock_t *sock = s->sock;
zmsg_t *msg = zmsg_new();
assert(msg);
zmsg_addstrf(msg, "%s", dst->identity); /* for ZeroMQ ROUTER socket */
zmsg_addstr(msg, "RequestVoteRequest"); /* type */
zmsg_addstrf(msg, "%lu", s->currentTerm); /* term */
zmsg_addstrf(msg, "%lu", LastTerm(s)); /* log last term */
zmsg_addstrf(msg, "%lu", LastIndex(s)); /* log last index */
zmsg_addstrf(msg, "%d", s->self->id); /* source */
zmsg_addstrf(msg, "%d", dst->id); /* destination */
zmsg_send(&msg, sock);
printf("%s:%lu: send RequestVoteRequest to %s\n",
state_string(s->state), s->currentTerm, dst->identity);
}
void RequestVote(struct server *s)
{
assert(s->state == Candidate);
assert(s->self->voteFor == s->self->id);
int i;
for (i = 0; i < s->nserver; i++) {
struct node *node = &s->cluster[i];
if (node != s->self && !node_is_grant(node, s->self->id)) {
send_RequestVoteRequest(s, node);
}
}
}
void server_dump(struct server *s);;
int server_save(struct server *s);
void start_election(struct server *s)
{
assert(s->state == Follower || s->state == Candidate);
UpdateTerm(s, s->currentTerm + 1);
s->state = Candidate;
s->grant_count = 1;
s->self->voteFor = s->self->id;
server_save(s);
server_dump(s);
RequestVote(s);
}
long int get_timeout(long int beg, long int end)
{
uint64_t seed = zclock_time();
srandom(seed);
long int n = random();
long int range = end + 1 - beg;
return beg + n % range;
}
/* self is up to date: return 1 */
int server_is_uptodate(
struct server *s,
uint64_t pkt_last_term, uint64_t pkt_last_index)
{
uint64_t self_last_term = LastTerm(s);
uint64_t self_last_index = LastIndex(s);
if (self_last_term > pkt_last_term) {
return 1;
} else if (self_last_term == pkt_last_term) {
if (self_last_index > pkt_last_index) {
return 1;
} else {
return 0;
}
} else {
return 0;
}
}
int server_grant_node(
struct server *s,
uint64_t pkt_term, uint64_t pkt_last_term, uint64_t pkt_last_index)
{
if (pkt_term < s->currentTerm) {
return 0;
} else {
if (pkt_term > s->currentTerm) {
UpdateTerm(s, pkt_term);
server_save(s);
server_dump(s);
server_update_election_timer(s);
}
/* pkt_term == s->currentTerm */
if (s->self->voteFor != Nil) {
/* already vote for someone */
return 0;
} else {
/* not vote yet */
if (server_is_uptodate(s, pkt_last_term, pkt_last_index)) {
return 0;
} else {
return 1;
}
}
}
}
void server_handle_RequestVoteRequest(struct server *s, zmsg_t *msg)
{
char *term = zmsg_popstr(msg);
char *log_last_term = zmsg_popstr(msg);
char *log_last_index = zmsg_popstr(msg);
char *src_id = zmsg_popstr(msg);
char *dst_id = zmsg_popstr(msg);
uint64_t pkt_term = atol(term);
uint64_t pkt_last_term = atol(log_last_term);
uint64_t pkt_last_index = atol(log_last_index);
uint32_t pkt_node_id = atoi(src_id);
free(dst_id);
free(src_id);
free(log_last_index);
free(log_last_term);
free(term);
if (pkt_node_id <= s->nserver) {
int grant = server_grant_node(s, pkt_term, pkt_last_term, pkt_last_index);
if (grant) {
s->self->voteFor = pkt_node_id;
server_save(s);
server_dump(s);
printf("%s:%lu: voteFor %s\n",
state_string(s->state), s->currentTerm, s->cluster[pkt_node_id].identity);
zmsg_t *msg2 = zmsg_new();
zmsg_addstrf(msg2, "%s", s->cluster[pkt_node_id].identity);
zmsg_addstrf(msg2, "%s", "RequestVoteResponse"); /* type */
zmsg_addstrf(msg2, "%lu", s->currentTerm); /* term */
zmsg_addstrf(msg2, "%d", 1); /* grant */
zmsg_addstrf(msg2, "%d", s->self->id); /* source */
zmsg_send(&msg2, s->sock);
printf("%s:%lu: send RequestVoteResponse to %s\n",
state_string(s->state), s->currentTerm, s->cluster[pkt_node_id].identity);
}
}
}
uint64_t Min(uint64_t a, uint64_t b)
{
return a < b ? a : b;
}
uint64_t Max(uint64_t a, uint64_t b)
{
return a > b ? a : b;
}
/* treat AppendEntriesRequest as heartbeat */
void send_AppendEntriesRequest(struct server *s, struct node *target)
{
assert(s->state == Leader);
assert(s->sock);
uint64_t prevLogIndex = target->nextIndex - 1;
uint64_t prevLogTerm;
if (prevLogIndex == 0) {
prevLogTerm = 0;
} else {
prevLogTerm = s->log->entry[prevLogIndex].term;
}
uint64_t commitIndex = Min(s->log->commitIndex, target->nextIndex);
zmsg_t *msg = zmsg_new();
assert(msg);
zmsg_addstrf(msg, "%s", target->identity); /* for ZMQ ROUTER socket */
zmsg_addstrf(msg, "%s", "AppendEntriesRequest"); /* type */
zmsg_addstrf(msg, "%lu", s->currentTerm); /* term */
zmsg_addstrf(msg, "%lu", prevLogIndex); /* prevLogIndex */
zmsg_addstrf(msg, "%lu", prevLogTerm); /* prevLogTerm */
zmsg_addstrf(msg, "%lu", commitIndex); /* commitIndex to target */
zmsg_addstrf(msg, "%d", s->self->id); /* source */
/* have entry to send */
if (target->nextIndex < s->self->nextIndex) {
zmsg_addstrf(msg, "%lu", s->log->entry[target->nextIndex].term);
zmsg_addstrf(msg, "%s", s->log->entry[target->nextIndex].command);
}
zmsg_send(&msg, s->sock);
}
/* return 1 if log consistent, otherwise return 0 */
int logOK(struct log *log, uint64_t prevLogIndex, uint64_t prevLogTerm)
{
if (prevLogIndex == 0) {
return 1;
} else {
if (prevLogIndex <= log->len &&
log->entry[prevLogIndex].term == prevLogTerm) {
return 1;
} else {
printf("logOK false: "
"prevLogIndex: %lu, prevLogTerm: %lu, "
"loglen: %lu, entryterm: %lu\n",
prevLogIndex, prevLogTerm,
log->len, log->entry[prevLogIndex].term);
return 0;
}
}
}
void send_AppendEntriesResponse(
struct server *s,
int success, uint64_t matchIndex, uint32_t dst_id)
{
if (dst_id > s->nserver) {
return;
}
zmsg_t *msg = zmsg_new();
assert(msg);
zmsg_addstrf(msg, "%s", s->cluster[dst_id].identity);
zmsg_addstrf(msg, "%s", "AppendEntriesResponse"); /* type */
zmsg_addstrf(msg, "%lu", s->currentTerm); /* term */
zmsg_addstrf(msg, "%d", success); /* success */
zmsg_addstrf(msg, "%lu", matchIndex); /* matchIndex */
zmsg_addstrf(msg, "%u", s->self->id); /* source */
zmsg_send(&msg, s->sock);
}
void AdvanceCommitedIndex(struct server *s);
void server_handle_AppendEntriesResponse(struct server *s, zmsg_t *msg)
{
printf("---- server_handle_AppendEntriesResponse ----\n");
char *term_str = zmsg_popstr(msg);
uint64_t term = atol(term_str);
free(term_str);
char *success_str = zmsg_popstr(msg);
int success = atoi(success_str);
free(success_str);
char *matchIndex_str = zmsg_popstr(msg);
uint64_t matchIndex = atol(matchIndex_str);
free(matchIndex_str);
char *source_str = zmsg_popstr(msg);
uint32_t source_id = atoi(source_str);
free(source_str);
if (s->currentTerm < term) {
UpdateTerm(s, term);
server_save(s);
server_update_election_timer(s);
} else if (s->currentTerm == term) {
if (success) {
s->cluster[source_id].matchIndex = matchIndex;
s->cluster[source_id].nextIndex = matchIndex + 1;
if (s->state == Leader) {
AdvanceCommitedIndex(s);
}
} else {
s->cluster[source_id].nextIndex = Max(1, s->cluster[source_id].nextIndex - 1);
if (s->state == Leader) {
send_AppendEntriesRequest(s, &s->cluster[source_id]);
}
}
} else /* s->currentTerm > term */ {
printf("%s:%lu: currentTerm > term(%lu), ignore AppendEntriesResponse\n",
state_string(s->state), s->currentTerm, term);
}
}
void server_reject_AppendEntriesRequest(struct server *s, uint32_t dst_id)
{
int success = 0;
uint64_t matchIndex = 0;
send_AppendEntriesResponse(s, success, matchIndex, dst_id);
}
void server_handle_AppendEntriesRequest(struct server *s, zmsg_t *msg)
{
printf("---- server_handle_AppendEntriesRequest ----\n");
char *term_str = zmsg_popstr(msg);
uint64_t term = atol(term_str);
free(term_str);
char *prevLogIndex_str = zmsg_popstr(msg);
uint64_t prevLogIndex = atol(prevLogIndex_str);
free(prevLogIndex_str);
char *prevLogTerm_str = zmsg_popstr(msg);
uint64_t prevLogTerm = atol(prevLogTerm_str);
free(prevLogTerm_str);
char *commitIndex_str = zmsg_popstr(msg);
uint64_t commitIndex = atol(commitIndex_str);
free(commitIndex_str);
char *source_id_str = zmsg_popstr(msg);
int source_id = atoi(source_id_str);
free(source_id_str);
if (s->currentTerm < term) {
UpdateTerm(s, term);
server_update_election_timer(s);
server_save(s);
server_dump(s);
assert(s->currentTerm == term);
}
if (s->currentTerm > term) {
server_reject_AppendEntriesRequest(s, source_id);
printf("%s:%lu: reject AppendEntriesRequest from term %lu\n",
state_string(s->state), s->currentTerm, term);
} else /* s->currentTerm == term */ {
int log_ok = logOK(s->log, prevLogIndex, prevLogTerm);
if (!log_ok) {
printf("%s:%lu: logOK false: reject AppendEntriesRequest\n",
state_string(s->state), s->currentTerm);
server_reject_AppendEntriesRequest(s, source_id);
} else /* log_ok == 1 */ {
int nentry = zmsg_size(msg);
uint64_t nextIndex = prevLogIndex + 1;
if (s->state == Candidate) {
printf("%s:%lu: state transition: Candidate -> Follower\n",
state_string(s->state), s->currentTerm);
s->state = Follower;
}
server_update_election_timer(s);
if (nentry == 0) {
int success = 1;
s->log->commitIndex = commitIndex;
send_AppendEntriesResponse(s, success, prevLogIndex, source_id);
printf("%s:%lu: replication done\n",
state_string(s->state), s->currentTerm);
} else {
/* nentry > 0 */
char *entry_term_str = zmsg_popstr(msg);
uint64_t entry_term = atol(entry_term_str);
free(entry_term_str);
char *command = zmsg_popstr(msg);
/* no conflict: append entry */
if (prevLogIndex == s->log->len) {
s->log->entry[nextIndex].term = entry_term;
s->log->entry[nextIndex].commandlen = strlen(command) + 1;
s->log->entry[nextIndex].command = command;
s->log->len += 1;
server_save(s);
server_dump(s);
printf("%s:%lu: no conflict: append entry\n",
state_string(s->state), s->currentTerm);
send_AppendEntriesResponse(s, 1, nextIndex, source_id);
} else /* prevLogIndex < s->log->len */ {
if (nextIndex <= s->log->len) {
/* already done with request */
if (s->log->entry[nextIndex].term == entry_term) {
s->log->commitIndex = commitIndex;
send_AppendEntriesResponse(s, 1, nextIndex, source_id);
printf("%s:%lu: already done with request, commitIndex: %lu\n",
state_string(s->state), s->currentTerm, commitIndex);
}
/* conflict: remove 1 entry */
else {
s->log->len -= 1;
server_save(s);
server_dump(s);
printf("%s:%lu: conflict: remove 1 entry\n",
state_string(s->state), s->currentTerm);
}
}
else {
/* nentry > 2 */
send_AppendEntriesResponse(s, 1, prevLogIndex, source_id);
printf("%s:%lu: nentry(%d) > 2: need prevLogIndex(%lu)+1 entry\n",
state_string(s->state), s->currentTerm, nentry, prevLogIndex);
}
}
}
}
}
}
int compare(const void *p1, const void *p2)
{
uint64_t a = *((uint64_t *)p1);
uint64_t b = *((uint64_t *)p2);
if (a < b) {
return -1;
} else if (a == b) {
return 0;
} else {
return 1;
}
}
void AdvanceCommitedIndex(struct server *s)
{
assert(s->state == Leader);
uint64_t matchIndex[s->nserver];
int i;
for (i = 0; i < s->nserver; i++) {
matchIndex[i] = s->cluster[i].matchIndex;
}
qsort(&matchIndex, s->nserver, sizeof(matchIndex[0]), compare);
int quorum = s->nserver / 2 + 1;
/* 从最大的位置往回数集群的大多数节点个数,就是大多数节点都确认的最大的 matchIndex */
int index = s->nserver - quorum;
/* 只有在当前任期提交,才算有效。这个限制导致在选举的时候,拥有当前任期
* 提交的日志,才能当选为主节点。由于当前任期不会小于已提交日志项的所有
* 任期,所以未同步的节点,它的最后日志项的任期一定小于已同步的大多数节点,
* 因此,只要在节点选举时,加入这种检查,就可以阻止这些未同步的节点当选
* 为主节点。如果允许这些未同步的节点当选为主节点,那么就会存在和大多数
* 节点的数据冲突,它要删除自己的日志,这时还不如不让它当选为主节点。
*/
if (s->log->entry[matchIndex[index]].term == s->currentTerm) {
s->log->commitIndex = matchIndex[index];
printf("%s:%lu: update commitIndex to %lu\n",
state_string(s->state), s->currentTerm, s->log->commitIndex);
}
}
/* send AppendEntries to all cluster nodes */
void send_AppendEntriesRequest_to_cluster(struct server *s)
{
int i;
for (i = 0; i < s->nserver; i++) {
struct node *n = &s->cluster[i];
if (n != s->self) {
send_AppendEntriesRequest(s, n);
printf("%s:%lu: send AppendEntriesRequest to %s\n",
state_string(s->state), s->currentTerm, n->identity);
}
}
}
int log_replication_timer(zloop_t *loop, int timer_id, void *arg)
{
struct server *s = (struct server *)arg;
assert(s);
assert(s->state == Leader);
AdvanceCommitedIndex(s);
send_AppendEntriesRequest_to_cluster(s);
return 0;
}
/* 模拟客户端请求的定时器 */
int ClientRequest_timer(zloop_t *loop, int timer_id, void *arg)
{
printf("---- ClientRequest_timer ----\n");
struct server *s = (struct server *)arg;
assert(s && s->state == Leader);
assert(timer_id == s->timer.client_request_id);
assert(s->self->nextIndex == s->log->len + 1);
char *command = malloc(20);
assert(command);
snprintf(command, 20, "set x %lu", s->log->len);
uint64_t nextIndex = s->self->nextIndex;
s->log->entry[nextIndex].term = s->currentTerm;
s->log->entry[nextIndex].command = command;
s->log->entry[nextIndex].commandlen = strlen(command) + 1;
s->log->len += 1;
s->self->matchIndex = s->log->len;
s->self->nextIndex += 1;
server_save(s);
server_dump(s);
return 0;
}
void BecomeLeader(struct server *s)
{
int rc;
int timer_id;
assert(s->state == Candidate);
assert(s->timer.election_id > 0);
assert(s->timer.loop && s->timer.loop != invalid_pointer);
/* remove election timer */
timer_id = s->timer.election_id;
rc = zloop_timer_end(s->timer.loop, timer_id);
assert(rc == 0);
s->timer.election_id = 0;
size_t delay, always_on;
/* register replication timer */
delay = get_timeout(1000, 3000);
always_on = 0;
timer_id = zloop_timer(s->timer.loop, delay, always_on, log_replication_timer, s);
assert(timer_id != 0);
s->timer.replication_id = timer_id;
/* register client request timer */
delay = get_timeout(25000, 30000);
always_on = 0;
timer_id = zloop_timer(s->timer.loop, delay, always_on, ClientRequest_timer, s);
assert(timer_id != 0);
s->timer.client_request_id = timer_id;
int i;
for (i = 0; i < s->nserver; i++) {
struct node *n = &s->cluster[i];
if (n != s->self) {
n->nextIndex = s->log->len + 1;
n->matchIndex = 0;
} else {
n->nextIndex = s->log->len + 1;
n->matchIndex = s->log->len;
}
}
s->state = Leader;
}
void server_handle_RequestVoteResponse(struct server *s, zmsg_t *msg)
{
char *term_str = zmsg_popstr(msg);
char *grant_str = zmsg_popstr(msg);
char *source_str = zmsg_popstr(msg);
int term = atoi(term_str);
int grant = atoi(grant_str);
int source = atoi(source_str);
free(source_str);
free(grant_str);
free(term_str);
if (term > s->currentTerm) {
UpdateTerm(s, term);
server_save(s);
server_dump(s);
server_update_election_timer(s);
} else if (term == s->currentTerm) {
if (source < s->nserver && source != s->self->id) {
if (grant) {
printf("%s:%lu: grant vote from %s\n",
state_string(s->state), s->currentTerm, s->cluster[source].identity);
s->grant_count += 1;
s->cluster[source].voteFor = s->self->id;
if (s->grant_count * 2 > s->nserver && s->state == Candidate) {
printf("%s:%lu: >>>> grant vote from quorom, become Leader <<<<\n",
state_string(s->state), s->currentTerm);
BecomeLeader(s);
send_AppendEntriesRequest_to_cluster(s);
}
}
}
} else {
/* term < self->currentTerm, just ignore */
}
}
int on_recv(zloop_t *loop, zsock_t *sock, void *arg)
{
printf("---- on_recv ----\n");
struct server *server = (struct server *)arg;
assert(server);
zmsg_t *msg = zmsg_recv(sock);
assert(msg);
zmsg_print(msg);
char *identity = zmsg_popstr(msg);
free(identity); /* server know the identity of source node */
char *type = zmsg_popstr(msg);
if (streq(type, "RequestVoteRequest")) {
server_handle_RequestVoteRequest(server, msg);
} else if (streq(type, "RequestVoteResponse")) {
server_handle_RequestVoteResponse(server, msg);
} else if (streq(type, "AppendEntriesRequest")) {
server_handle_AppendEntriesRequest(server, msg);
} else if (streq(type, "AppendEntriesResponse")) {
server_handle_AppendEntriesResponse(server, msg);
} else {
/* unknown message, ignore */
}
free(type);
zmsg_destroy(&msg);
return 0;
}
void server_dump(struct server *s)
{
int i;
printf("---- server_dump ----\n");
printf("state: %s\n"
"currentTerm: %lu\n"
"voteFor: 0x%x\n"
"filename: %s\n"
"nserver: %d\n"
"loglen: %lu\n",
state_string(s->state),
s->currentTerm, s->self->voteFor,
s->filename, s->nserver, s->log->len);
printf("---- node ----\n");
assert(s->cluster);
for (i = 0; i < s->nserver; i++) {
struct node *n = &s->cluster[i];
printf("node%d: identity: %s, voteFor: 0x%x, matchIndex: %lu, nextIndex: %lu\n",
i, n->identity, n->voteFor, n->matchIndex, n->nextIndex);
}
printf("---- log ----\n");
assert(s->log && s->log->len < s->log->capacity);
for (i = 1; i <= s->log->len; i++) {
struct entry *e = &s->log->entry[i];
printf("entry%d: term: %lu, commandlen: %u, command: %s\n",
i, e->term, e->commandlen, e->command);
}
}
/* 将 currentTerm voteFor log 保存到文件 */
int server_save(struct server *s)
{
printf("---- server_save ----\n");
zchunk_t *chunk = zchunk_new(NULL, 4096);
assert(chunk);
zchunk_append(chunk, &s->currentTerm, sizeof(s->currentTerm));
zchunk_append(chunk, &s->self->voteFor, sizeof(s->self->voteFor));
zchunk_append(chunk, &s->log->len, sizeof(s->log->len));
assert(s->log->len < s->log->capacity);
uint64_t i;
for (i = 1; i <= s->log->len; i++) {
struct entry *e = &s->log->entry[i];
zchunk_append(chunk, &e->term, sizeof(e->term));
zchunk_append(chunk, &e->commandlen, sizeof(e->commandlen));
zchunk_append(chunk, e->command, e->commandlen);
}
int rc;
zfile_t *file = zfile_new("./", s->filename);
assert(file);
rc = zfile_output(file);
assert(rc == 0);
rc = zfile_write(file, chunk, 0);
assert(rc == 0);
zfile_destroy(&file);
zchunk_destroy(&chunk);
return 0;
}
int server_load(struct server *s)
{
if (zsys_file_exists(s->filename)) {
int rc;
zfile_t *file = zfile_new("./", s->filename);
assert(file);
rc = zfile_input(file);
if (!rc) {
uint64_t len;
zchunk_t *chunk = zfile_read(file, 4096, 0);
assert(chunk);
byte *data = zchunk_data(chunk);
assert(data);
len = 0;
memcpy(&s->currentTerm, data + len, sizeof(s->currentTerm));
len += sizeof(s->currentTerm);
memcpy(&s->self->voteFor, data + len, sizeof(s->self->voteFor));
len += sizeof(s->self->voteFor);
memcpy(&s->log->len, data + len, sizeof(s->log->len));
len += sizeof(s->log->len);
assert(s->log->len < s->log->capacity);
uint64_t i;
for (i = 1; i <= s->log->len; i++) {
struct entry *e = &s->log->entry[i];
memcpy(&e->term, data + len, sizeof(e->term));
len += sizeof(e->term);
uint32_t commandlen;
memcpy(&commandlen, data + len, sizeof(commandlen));
e->commandlen = commandlen;
len += sizeof(commandlen);
e->command = malloc(commandlen);
assert(e->command);
memcpy(e->command, data + len, commandlen);
len += commandlen;
}
zfile_close(file);
zchunk_destroy(&chunk);
zfile_destroy(&file);
assert(!chunk);
assert(!file);
return 0;
} else {
zfile_destroy(&file);
assert(!file);
return -1;
}
} else {
return -1;
}
}
struct server *create_server(int argc, char *argv[])
{
int rc;
printf("---- create_server ----\n");
struct server *s = malloc(sizeof(struct server));
assert(s);
int nserver = argc - 2;
int self_id = atoi(argv[argc - 1]);
assert(0 <= self_id && self_id < nserver);
s->cluster = calloc(nserver, sizeof(struct node));
assert(s->cluster);
s->nserver = nserver;
s->log = create_log();
assert(s->log);
s->state = Follower;
s->grant_count = 0;
s->timer.loop = invalid_pointer;
s->timer.election_id = 0;
s->timer.replication_id = 0;
s->timer.client_request_id = 0;
s->self = &s->cluster[self_id];
s->self->endpoint = argv[1 + self_id];
snprintf(s->self->identity, sizeof(s->self->identity), "node%d", self_id);
s->self->id = self_id;
/* 从文件中读取 currentTerm, voteFor, log,如果没有或者失败,则进行初始化 */
snprintf(s->filename, sizeof(s->filename), "node%d.save.1", self_id);
rc = server_load(s);
if (rc != 0) {
s->self->voteFor = Nil;
s->currentTerm = 0;
s->log->len = 0;
}
server_dump(s);
s->self->matchIndex = s->log->len;
s->self->nextIndex = s->log->len + 1;
s->sock = zsock_new(ZMQ_ROUTER);
assert(s->sock);
zsock_set_identity(s->sock, s->self->identity);
/*
* zmq 的 ROUTER socket 在接收到 identity 是相同的客户端时,如果没有设置
* handover,会拒绝第二个客户端的连接;如果设置了 handover 为 1,在接收
* 第二个客户端连接时,将会关闭第一个客户端的连接,将 identity 绑定到第
* 二个客户端。由于每个节点指定了 identity,在崩溃重启后,设置的是同一个
* identity,所以需要让对端的 ROUTER socket 接收第二个客户端的连接。
*/
zsock_set_router_handover(s->sock, 1);
rc = zsock_bind(s->sock, "%s", s->self->endpoint);
/* 这里有时候在程序中断重启后,会绑定失败,原因待查。 */
assert(rc != -1);
int i;
for (i = 0; i < nserver; i++) {
struct node *e = &s->cluster[i];
if (e != s->self) {
e->id = i;
e->nextIndex = 1;
e->matchIndex = 0;
e->endpoint = argv[1 + i];
snprintf(e->identity, sizeof(e->identity), "node%d", i);
rc = zsock_connect(s->sock, "%s", e->endpoint);
assert(rc == 0);
}
}
return s;
}
void destroy_server(struct server **p)
{
if (p) {
struct server *s = *p;
zsock_destroy(&s->sock);
destroy_log(&s->log);
free(s->cluster);
free(s);
s = NULL;
}
}
void server_update_election_timer(struct server *s);
int elect_timeout(zloop_t *loop, int timer_id, void *arg)
{
printf("---- elect_timeout ----\n");
struct server *server = (struct server *)arg;
assert(server);
server_update_election_timer(server);
start_election(server);
return 0;
}
void server_create_election_timer(struct server *s)
{
size_t delay = get_timeout(3000, 5000);
size_t only_once = 1;
int timer_id = zloop_timer(s->timer.loop, delay, only_once, elect_timeout, s);
assert(timer_id != 0);
s->timer.election_id = timer_id;
}
void server_update_election_timer(struct server *s)
{
assert(s);
assert(s->timer.loop != invalid_pointer);
if (s->timer.election_id > 0) {
int rc = zloop_timer_end(s->timer.loop, s->timer.election_id);
assert(rc == 0);
}
server_create_election_timer(s);
}
void server_start_loop(struct server *s)
{
int rc;
zloop_t *loop = zloop_new();
assert(loop);
s->timer.loop = loop;
rc = zloop_reader(loop, s->sock, on_recv, s);
assert(rc == 0);
server_update_election_timer(s);
rc = zloop_start(loop);
/* interrupted: 0, cancel by handler: -1 */
assert(rc == 0 || rc == -1);
zloop_destroy(&s->timer.loop);
}
int main(int argc, char *argv[])
{
if (argc < 3) {
printf("usage: %s peer1 peer2 peer3 ... self_id\n", argv[0]);
exit(EXIT_FAILURE);
return -1;
} else {
struct server *server = create_server(argc, argv);
assert(server);
server_start_loop(server);
destroy_server(&server);
return 0;
}
}