这篇文章是对实现 Raft 后的一次总结,代码的实现可以在 raft.me 这个代码 仓库中找到。这个实现使用了 ZeroMQ 的 C 语言绑定 CZMQ,运行之前需要安装这些依赖。 编译完之后,启动三个节点的集群的命令如下:

$ ./raft.me tcp://127.0.0.1:57000 tcp://127.0.0.1:57001 tcp://127.0.0.1:57002 0
$ ./raft.me tcp://127.0.0.1:57000 tcp://127.0.0.1:57001 tcp://127.0.0.1:57002 1
$ ./raft.me tcp://127.0.0.1:57000 tcp://127.0.0.1:57001 tcp://127.0.0.1:57002 2

上面的每条命令需要打开一个新的终端,在里面输入运行。运行的过程中,可以通过 C-c 来任意终止程序的执行,只要保证大多数节点的正常(这里是两个节点),程序就可以保证 最终所有的数据都会同步。

为了方便,这里将实现的代码放到这篇文章的末尾。但代码仓库中还包含了具体实现的 PlusCal伪代码描述。

参考资料:

Raft 一致性算法解决什么问题?

Raft 解决的问题和 Paxos 解决的问题是一样的。在异步网络模型中,在给定的一个节点集 合中,这个集合的每个节点都可以向集合的其他节点同步数据,在节点集合中,即使有些节 点会崩溃重启,这些节点最终的数据都是一致的。

异步网络模型的意思是:

  • 节点运行的速度是任意的
  • 节点间只能通过消息进行通信
  • 消息可能会丢失、乱序、重复
  • 消息不会损坏

“节点运行的速度是任意的”:意味着节点可能会崩溃,可能负载繁忙,运行时间很长。这 意味着算法可以容忍某些节点崩溃的情况下完成数据的同步,只要最终这些节点正常运行, 数据就可以完全地同步。这种数据同步的方法,是具有容错性的。

举个例子,有三个节点集合:{n1, n2, n3},客户端要向这些节点写入的数据序列是:« 1, 2, 3 »。这三个节点中,即使有一个崩溃,但最终正常运行,那么一致性算法可以保证 这些节点的每个节点保存的数据都是 « 1, 2, 3 »。

具有容错性质的一致性算法,有什么用?

在现实世界中(2018),越来越多应用的部署是分布式的。分布式的意思是,应用部署在不 同的机器上,它们没法通过本机通信机制来通信(管道、信号量、共享内存等),它们只能 通过网络来通信。为了保证应用的高可用,常常会在多个节点部署相同的应用,这些节点就 组成了一个集群,用户在可以访问集群中的任意一个节点来使用。

为了说明“分布式”和“集群”的概念,请看下面的例子:

现在有两个应用: {A , B},它们需要使用对方提供的功能。有 5 个节点: {n1, n2, n3, n4, n5}。如果应用A 部署在节点 n1 上,表示为 n1.A。现在有几种部署方式:

n1.A n2.B:这种部署也可以认为它们是分布式部署的,虽然它们是不同的应用,但是它们 只能通过网络消息进行通信,符合分布式系统的定义。

n1.A n2.A n3.A:这种部署就是一个集群。集群就是相同应用的集合。

在现实世界中,集群是一项非常有用的技术。因为在世界各地,访问本地的机器比访问另一 个半球的机器要快得多。将应用部署在北京、上海、广州,北京的用户访问北京节点,比访 问广州节点要快。由于一个节点总是有资源上限的,如果一个节点最多只能服务一千个 (1,000)用户,如果在一个地区中有上万(>10,000)个用户,那么一个节点就不能同时服 务这些用户了,需要使用集群将这些用户分摊在这些节点中。

如果应用是没有状态的,那么节点间就不需要进行数据同步。但是,在一个系统内不可能是 无状态的,系统总要保存一些数据以便能够正常运行。也就是说,系统内必定会有一些组件 要保存状态。

举个例子,现在许多应用都要保存用户名和密码。这些数据会保存在数据库软件中,如果只 将数据保存在一个节点,如果这个节点出现问题,暂时无法访问,那么整个系统都会受到极 大的影响。

为了消除这种单点故障对系统的影响,数据库通常会部署为主从模型进行数据备份。通常为 了保证性能,数据同步一般都是使用异步传输的方式。数据写入主节点后,会立刻返回,在 返回之后再将数据传输到从节点进行保存。

如果使用异步传输的方式,就涉及了在两个节点间进行数据同步的问题。假设现在有两个节 点(P、B),P 节点现在是主节点,处理用户的数据,同时将数据传输到从节点 B 节点保 存。考虑下面的场景:

  • 用户修改密码,应用将密码保存在 P 节点
  • P 节点保存数据并返回给应用
  • P 节点崩溃
  • B 节点变为主节点向外提供读写服务
  • 用户修改密码,应用将密码保存在 B 节点中
  • P 节点重启

在这种场景下,P 节点和 B 节点保存的数据就发生了不一致的情况。解决这种情况有许多 种方式,其中之一就是使用同步的方式进行数据传输,在上面的例子中,P 节点需要将数据 写入本节点和 B 节点之后,再给用户返回。但是,这样的性能就会受到一定的影响。

除了这种对数据的备份需要在节点间进行数据同步外,减少数据库的压力也需要进行数据同 步。原因还是在于不得不做,因为一个节点的资源有限,一个节点通常只能服务上千的用户, 如果有上万的用户需要访问,就需要使用集群。而这种数据集群,需要在集群的节点间进行 数据同步。如果一个集群有 10 个节点,使用同步的方式要写到所有的 10 个节点中,而且 其中有一个节点崩溃,整个系统就不可用了;即使所有节点都正常,一个节点的网络抖动, 也会影响到整个系统。

在有多个节点的集群间进行数据同步,数据传输的方式使用同步的方式就不可取。那么只能 使用异步的方式进行数据传输,这样就需要解决数据的不一致问题,还有需要让系统可以容 忍一些节点的崩溃。

分布式的一致性容错算法,就是解决上面提到的问题。但是,通常一些小型系统的并发用户 量不会超过一千,数据库使用主从模式就可以工作得很好,没有必要解决这种多个节点间的 数据同步问题。

使用异步方式传输数据需要解决的问题

我对于异步和同步的数据传输方式的理解是,异步是相对于同步而言的。如果同步方式是指 将数据保存到 5 个节点返回,那么异步的方式就是在数据没有保存在 5 个节点之前就返回。 这就说明,异步方式包括将数据保存在 0~4 个节点后返回。

我认为异步方式同步数据,需要主要解决两个问题:

  • 数据应该保存在多少个节点才返回
  • 如何解决两个节点的数据发生冲突

Raft 是如何解决数据一致性问题的?

简单地概括 Raft 算法:选出一个主节点,由主节点将数据同步到集群内大多数的节点。

选择数据同步的方式有很多种,这里只讨论从主节点将数据传输到其他节点的方式。没有选 择主节点进行数据同步的方式也可以解决这个问题,Paxos 和 Blockchain 就可以在没有主 节点的情况下解决数据的一致性问题。

既然选择了主节点同步数据的方式,那么就需要解决在不同的主节点中传输时可能产生的数 据冲突。当节点 n1 当选为主节点,它写了一部分数据;在 n1 之后,n2 当选为主节点, 它也写了一部分数据。主节点的变更可以表示为一个序列:« n5, n1, n2, … »。

考虑这样的场景:主节点变更的情况是 « n1, n2, n1, n3 »。节点 n4 接收到主节点的 数据包,要求保存数据。n4 接收到的数据包有可能是 {n1, n2, n3} 中的任意一个节点发 送的,因为消息传递的时间是任意的。节点需要能够区分到底是哪个主节点发送过来的,所 以节点每次变为主节点的标识都应该是唯一的。这样主节点有 n1 -> n2,也可以区分 n1 -> n2 -> n1 这样的情况。

节点在能够区分主节点之后,它就要决定究竟要接受哪个主节点传输的数据。这个过程应该 是确定性的,一旦选择之后就不能更改,如果能够更改的话,就会导致数据可能会不断变化, 没办法达到最终的一致性。一个简单的方法是,接受先收到或者后收到的消息,但是这样结 果就不是确定性的。接收顺序的不同会导致不同的结果:例如,接收顺序是 « n1, n2 » 和 « n2, n1 »,它们的结果就不一样;一个选择 n1,一个选择 n2。

因此,多个节点的标识,能够通过比较,得到一个确定性的结果。选择不同的标识,将会导 致选择不同的节点。Paxos/Raft 使用数字来对节点进行标识,Blockchain 使用节点的工作 量来进行标识。这样节点接收顺序即使不一样,也可以做出一个确定性的选择。

例如,收到不同主节点消息的顺序是 « n1, n2, n1 »,如果第一个 n1 的标识是 3,第 一个 n2 的标识是 5,第三个 n1 的标识是 4。接收顺序可以表示为 « 3, 5, 4 »,这时 通过选择最大或者最小的标识值。对于 Raft 来说,它让节点选择的是最大的标识值(5)。 对于 Paxos 来说,它让节点选择的是最小的标识值(3)。但归根到底 Raft 选择的还是最 小的节点标识,但是这个选择是在主节点做出的。

为了得到确定性的结果,还需要每个节点标识都是唯一,任意两个节点标识都不一样。所以, 现在需要解决的问题是,怎么保证每个主节点选择的节点标识,和其他所有的主节点的节点 标识都不一样。这里的方法不止一种,我们仅仅讨论 Raft 的做法。

只要主节点在发送数据之前,保证主节点标识的唯一性,那么其他节点收到的主节点标识也 是具有唯一性的。对于同一个节点变为主节点多次,可以使用一个数字的递增来表示不同的 主节点;而对于不同的节点变为主节点,每次变为主节点之后,递增主节点标识,就可以表 示不同的主节点标识。这样就可以保证主节点标识是唯一的。

新的主节点标识需要保证比旧的主节点标识大,因此新的主节点必须要知道旧的主节点标识, 它需要和其他的节点交换选择的主节点标识,才能确定选择了一个比旧主节点标识更大的标 识。主节点是由非主节点变成的,只要能够保证一个主节点标识最多只能由一个非主节点获 得,就可以保证一个主节点标识只有一个主节点。因此,一个主节点标识,可能没有节点获 得,也可能只有一个节点获得。

考虑任意两个非主节点(A, B),它们分别选择了主节点标识(A.x, B.y)。因为一个节点 选择的主节点标识比在这个节点出现的主节点标识都大,因此两个选择的主节点标识,最大 的那个主节点标识(max(A.x, B.y))将是比两个节点出现的主节点标识都大,并且是唯一 的。

一个非主节点如何获得一个主节点标识呢?一个非主节点选择了一个主节点标识,它不能马 上获得这个标识,因为其他节点可能选择相同的标识,这样就会造成主节点标识不是唯一的。 所以,一个非主节点在选择了一个主节点标识后,需要其他节点的确认。

如何使节点确认是唯一的呢?只要大多数的节点都确认一个非主节点获得了标识,并且支持 后就不能改变,因为集群内少数节点的数目不可能比大多数的数目大(5 个节点的集群,大 多数节点的个数是 {3, 4, 5},少数节点的数目是 {5-3, 5-4, 5-5} = {2, 1, 0})。使用 这个规则,只要大多数节点支持一个非主节点获得一个主节点标识,其他的非主节点就不能 获得相同的标识了。

既然需要节点确认,就需要记录节点给哪个非主节点和哪个主节点标识确认。这样它在接收 到其他非主节点的要求确认主节点标识时,能够知道是否确认或者拒绝。所以,节点记录的 确认是:[非主节点, 主节点标识]。

前面讲过,任意两个非主节点交换选择的主节点标识,选择大的那个可以确保主节点标识的 唯一性。选择小的主节点标识,需要确认选择大的主节点标识,将这个确认保存起来: [哪个非主节点, 大的主节点标识],同时将确认信息返回给对方。下次选择小的主节点标 识的这个节点,就可以从确认的最大的主节点标识开始递增。

非主节点在选择了一个主节点标识之后,它可以确定自己可以获得这个主节点标识。它给所 有的节点发出确认请求,等待接收确认响应。这个节点除了会接到确认响应,还会接到其他 节点的确认请求,如果它发现自己对同一个主节点标识已经确认过,就可以拒绝;如果对一 个主节点标识没有确认过,就可以确认。

如果有多个获得主节点标识的非主节点,到底应该选择哪一个作为主节点呢?(这些主节点 标识一定不相同,因为最多只有一个非主节点能够获得同一个主节点标识)这个结果应该是 确定性的,给定一个获得主节点标识的非主节点集合({n1, n2, n3}),选择多次,每次都 应该是相同的结果。选择最小或者最大的主节点标识,都会产生确定性的结果。如果选择最 小的,这个节点崩溃的话,整个系统就不能继续运行了;如果选择最大的,这个节点崩溃的 话,还会出现更大的主节点标识,系统可以正常运行。因此,当有多个非主节点都获得了主 节点标识,就选择拥有最大主节点标识的那个非主节点作为主节点。

现在,考虑一个获得主节点标识的非主节点,如何知道自己是否可以变为主节点。获得主节 点标识的非主节点,需要知道自己获得的主节点标识是不是当前集群内最大的。在没有收到 比自己更大的主节点标识时,这个节点就可以变为主节点。其他的节点在收到主节点的数据 时,只接受已确认过最大主节点标识的数据。在这种情况下,整个系统内,会有多个主节点, 但是每个主节点的主节点标识都不一样;在非主节点接收到数据时,它只保存已确认最大主 节点的数据,这样数据就不会发生冲突。拥有小的主节点标识的主节点,它最终会收到比它 大的主节点标识,给这个节点投票,同时放弃作为主节点,接收这个拥有更大主节点标识的 节点的数据。

在 Raft 算法中,主节点标识被称为“任期(term)”,确认任期叫做“投票(voteFor)”。 每个节点有自己的标识,这个标识和任期不同,节点标识(node_id)和一个任期组成一个 投票:voteFor = [term, node_id]。

解决了主节点选举的问题,接下来考虑主节点到非主节点的数据同步问题。

要求主节点和非主节点的数据是一致性的,将主节点接收的数据在非主节点上重演一遍,就 可以保证数据是一致的,数据可以看作是存放在一个无限长的数组中。

非主节点有可能接收到不同任期的主节点的数据,它需要区分并且要选择接收哪个主节点的 数据。考虑有两个不同任期的主节点,它们分别将数据同步到接收它们的非主节点中。这是 有可能发生的,任期小的主节点在不知道有任期比它更大的主节点时,能够同步数据到非主 节点。假设非主节点 n1 接收任期是 1 的主节点 n2 的数据,非主节点 n3 接收 主节点 n4 的数据,这时非主节点 n1 和 n3 的数据是冲突的。解决冲突时,需要选择哪个主节点 的数据作为最终的数据,所以非主节点要区分并且记录接收了哪个主节点的数据。区分主节 点,只要记录任期就足够了,在每个数据项中,需要添加接收的主节点任期,然后保存下来 以便之后解决冲突使用。

现在,非主节点保存数据是一个数组,数组里面的包括了接收的主节点任期。这一系列的数 据,用 C 语言表示的数据结构如下:

struct log {
    struct log_entry entry[nentry];
};

struct log_entry {
    uint64_t term;
};

要确保数据在所有的时间里是一致的,需要解决两个问题:1、不同任期的主节点向其他节 点同步数据,必须从这些主节点的数据中选择一个;2、新的主节点必须要拥有之前集群内 出现的主节点的所有数据。

对于第一个问题,同时存在两个任期不同的主节点同步数据,最终解决冲突时,需要选择一 个主节点的数据作为最终的数据。选择需要考虑,如果选择以后不能更改(如果选择后能够 更改,数据会不断变更,不能达到最终一致性,因为往后的选择会影响当前的选择)。

如果选择更小任期的主节点数据,那么这个主节点崩溃重启后,它会使所有的其他节点删除 更高任期的主节点数据。只要它不断地崩溃重启,系统不能正常运行。

如果选择更大任期的主节点数据,即使更大任期的主节点崩溃,也会有更大任期的主节点出 现。所以,选择更大任期的主节点数据,能够解决冲突。

对于第二个问题,新的主节点要拥有之前所有的主节点数据,只要每个后继任期的主节点都 拥有前任主节点的数据即可。只需要保证产生主节点时,只有拥有前任主节点数据的节点可 以变为主节点。如果主节点的数据同步到集群的大多数节点,这些大多数节点在任期确认时, 只要对方没有自己知道的前任主节点的数据,就拒绝任期确认。这样没有前任主节点数据的 节点,将不会得到这些大多数节点的任期确认(投票),将不能变为主节点。

在 Raft 算法中,为了清晰容易理解,进一步地将非主节点分为 Follower 和 Candidate。 Follower 只响应请求,Candidate 可以发起投票请求(任期确认)。每个节点在等待一段 时间,没有收到主节点的数据,就变为 Follower,同时递增任期。整个系统就可以运行下 去,但是,这个算法并不能保证最终一定能够可用,因为每个任期都可能选不出一个主节点 来。

至此,Raft 算法解决数据一致性问题的过程就讲解完毕了。

在常见场景下(五个节点),各个节点的行为

  • 正常数据同步,数据如何复制到其他的节点?

首先,从集群中选出一个主节点,数据从主节点中发送到集群中其他的节点。

  • 主节点崩溃,新的节点产生,其他 3 个节点怎么切换到新主节点?

假设主节点 n1 崩溃,余下的节点为 {n2, n3, n4, n5},它们在没有收到主节点的 AppendEntries 消息后超时,递增它们的任期,开始新一轮的主节点选举。一个节点等到它 自己,其他的两个节点给它的投票后变为主节点。

  • 旧的主节点恢复,其他 3 个节点接受了新的主节点,这时会发生什么事情?

旧的主节点 n1 恢复,它的任期小于集群中其他节点的任期,它在超时之前会收到当前主节 点的 AppendEntries 请求,这时它会接受这个请求,同时重置选举定时器。

  • 新主节点崩溃,旧主节点恢复,这时会发生什么?

新的主节点 n2 崩溃,它的任期和其余节点的任期相同;旧的主节点 n2 恢复,这时 n2 的 任期小于集群内其他节点的任期。

如果它在选举超时之前,收到其他节点的投票请求,会给这个节点投票,同时更新它自己的 任期到这个节点的任期。

如果它在选举超时之前,没有收到其他节点的投票请求,在选举超时之后会递增自己的任期, 同时向其他节点发送投票请求。

  • 正常数据同步,一个从节点崩溃恢复,数据如何同步?

从节点崩溃前,如果它的任期和主节点的任期是相同的,在恢复之后,会收到主节点的 AppendEntries 请求,它的日志和主节点相比,如果相同的就插入日志中,如果落后,则等 待主节点找到缺失的位置,将数据发送过来。

  • 正常数据同步,三个从节点崩溃,会发生什么?

n1 是主节点,n2 是从节点,{n3, n4, n5} 崩溃。这时 n1 仍然会能够写入新的日志到本 节点,同时同步数据到 n2 节点,但是日志的 commitIndex 不能更新,因为数据没有得到 大多数节点的确认。

  • 新的节点加入(5 -> 6),集群如何处理?

主节点接收到这个请求,提交一个特殊的日志项,包括新的集群配置和旧的集群配置。接下 来所有的数据都需要同步是新的集群的大多数节点,和旧的集群的大多数节点。在这个特殊 的配置项同步到这两个大多数之后,主节点继续写入一个特殊的日志项,这个日志项包括新 的集群配置。将这个配置项同步到新的集群的大多数节点。

每个节点在接收到特殊的日志项的配置后,立刻应用。在包括新的集群配置的日志项提交后, 不在新集群的节点退出。

  • 主节点和另一个从节点崩溃,另外 3 个节点存活,选出新主节点进行同步,这时旧的 2 个节点恢复,这时会发生什么?

旧的 2 个节点的任期小于另外的 3 个节点,它们会收到主节点的 AppendEntries 请求, 更新自己的任期到主节点的任期,同时将日志项写入自己的日志中。

  • 所有节点初始化,到数据同步到所有节点,这中间经历了哪些过程?

主节点选举,日志同步。

  • 一个从节点崩溃,集群选中新的主节点进行同步。从节点恢复,这时会发生什么?

接收主节点的 AppendEntries 请求,更新自己的任期到主节点的任期,可能会插入日志项。

  • 旧的主节点崩溃,新的主节点选中,同步了一些数据,新的主节点崩溃,旧的主节点恢复,这时会发生什么?

旧的主节点恢复后,是 Follower 状态,它的任期小于集群中其他节点的任期。

在选举超时之前,如果收到投票请求,会给其他节点投票;在选举超时后,递增自己的任期, 给其他节点发起投票请求。

  • 主节点崩溃,新的节点要加入,这时会发生什么?

选出一个新的主节点,这个新的主节点提交两个特殊的日志项。

  • 一个从节点退出,一段时间后,重新加入,会发生什么?

收到 AppendEntries 请求后,任期更新到主节点的任期。没有收到则等待选举超时,发起 新一轮的选举。

  • 如何保证结果的正确性?

每个任期最多只能有一个主节点,日志项在主节点的当前任期同步到大多数节点,才算提交。

  • 另外 5 个节点的集群的数据不同,这个集群能加入吗?最终能不能形成 10 个节点的集群?a

如果是新加入的节点,不会有数据。有一种情况是节点退出集群后再加入,这时会找到数据 分叉的地方,删除不同的日志,接受主节点的数据同步。最终会形成新的配置指定的集群。

  • 5 个节点,一个崩溃,另外 4 个(2-2)分裂成两个集群;它们各自选举,任期如何变化?

每个节点不断地递增任期,发起投票请求,选举超时后继续前面的过程。它们不能产生一个 主节点,因为集群内没有大多数的节点同意给一个节点投票。节点因为没有收到 AppendEntries 请求,最终会超时,递增任期,进行选举。

  • 5 个节点,分裂成两个集群(2-3);2 个节点集群的任期如何变化?

如前所述。

  • 主节点崩溃后恢复,数据如何复制?它还会再次成为 leader 吗?

如果它的日志落后于上次主节点提交的日志,那么它不能选举为主节点。

如果它的日志没有落后于上次主节点提交的日志,进行正常的选举流程。

  • 节点收到投票请求后,会做什么操作?

如果收到的投票请求,任期比自己大,更新到这个任期,同时重置为 Follower 状态。

如果任期相同,还没有投票则给它投票;已经投票了,则不会投票。

如果任期小于自己,发送自己的任期回去,让它更新任期。

  • 什么情况下会选不出领导?

没有大多数节点的投票。

  • 领导选举的过程是怎样的?

每个节点在选举超时后,递增自己的任期,将投票请求发送给集群中其他的所有节点,在收 到大多数的投票同意后变为主节点。

每个节点在收到投票请求时,如果自己的任期小于对方的任期,更新任期到这个大的任期, 给这个节点投票;如果任期相同,没有投票,自己的日志和对方相同或者小于对方,则它投 票,如果已经投票则忽略请求;如果任期大于对方的任期,发送自己的任期回去。

  • 当前的领导如何提交老的领导的 log entry?

主节点当选后,以当前最新的日志项同步到其他节点。如果收到确认的日志项位置,则从这 个确认的日志项的下一个位置方法日志。如果对方日志有冲突,它会删除日志,等待下一次 主节点的 AppendEntries 请求的到来。

  • 如果新的领导删除了追随者的 log 而未同步自己的 log 到追随者,追随者的 log 会如何增长?

如果 Follower 的日志和主节点的日志后冲突,它会在收到一个 AppendEntries 请求时删 除自己的日志,直到日志和主节点的一致,再将主节点的日志同步到自己的日志中。整个过 程 Follower 的日志看起来是先减少再增加。

  • 领导会不会删除自己的 log?

不会。

  • 如何保证所有节点都执行相同的命令?一个从节点崩溃,它落后于主节点的进度,从节点恢复并成为主节点,如何保证它接下来的命令和老的领导不冲突?

所有节点只执行已提交的日志。

从节点崩溃后,它的日志如果和主节点已提交的日志冲突,不能当选为主节点。

代码实现

/*
 * 为了获得对 Raft 算法的理解,实现它核心的同步协议。日志的存储是简单地限定 4K
 * 字节的文件读写。集群成员的动态变更没有实现,日志快照没有实现。
 *
 * 实现这个算法,使它稳定可靠,要花费大量的时间来测试,这是一个时间黑洞……
 */

#include <assert.h>
#include <stdlib.h>

#include <czmq.h>

static const uint32_t Nil = ~0U;
static void *invalid_pointer = (void *)(~0UL);

enum state { Follower, Candidate, Leader };

static const char *state_string(enum state state)
{
        if (state == Follower) {
                return "Follower";
        } else if (state == Candidate) {
                return "Candidate";
        } else if (state == Leader) {
                return "Leader";
        } else {
                return "unknown";
        }
}

struct entry {
        uint64_t term;
        uint32_t commandlen;
        char *command;
};

struct log {
        uint64_t commitIndex;
        struct entry *entry;
        uint64_t capacity;
        uint64_t len;
};

struct log *create_log(void)
{
        struct log *log = malloc(sizeof(struct log));
        if (log) {
                log->entry = calloc(1024, sizeof(struct entry));
                if (log->entry) {
                        log->commitIndex = 0;
                        log->capacity = 1024;
                        log->len = 0;
                        return log;
                } else {
                        return NULL;
                }
        } else {
                return NULL;
        }
}

void destroy_log(struct log **log)
{
        if (log) {
                struct log *ptr = *log;
                if (ptr) {
                        assert(ptr->len < ptr->capacity);
                        int i;
                        for (i = 1; i <= ptr->len; i++) {
                                struct entry *e = &ptr->entry[i];
                                free(e->command);
                        }
                        free(ptr->entry);
                        *log = invalid_pointer;
                }
        }
}

struct node {
        char *endpoint;
        char identity[20];

        uint32_t id;
        uint32_t voteFor;

        /* log replication */
        uint64_t nextIndex;     /* NOTE: count from 1, be careful! */
        uint64_t matchIndex;
};

struct timer {
        zloop_t *loop;
        int election_id;
        int replication_id;
        int client_request_id;
};

struct server {
        char filename[20];
        struct timer timer;
        zsock_t *sock;

        int nserver;
        struct node *cluster;
        struct node *self;

        struct log *log;
        uint64_t currentTerm;
        uint32_t grant_count;
        enum state state;
};

uint64_t LastIndex(struct server *s)
{
        return s->log->len;
}

uint64_t LastTerm(struct server *s)
{
        if (s->log->len == 0) {
                return 0;       /* empty */
        } else {
                return s->log->entry[s->log->len].term;
        }
}

int quorom_grant(struct server *s)
{
        return s->grant_count * 2 > s->nserver;
}

int node_is_grant(struct node *node, uint32_t voter_id)
{
        return node->voteFor == voter_id;
}

void server_update_election_timer(struct server *s);

void UpdateTerm(struct server *s, uint64_t term)
{
        if (s->currentTerm < term) {
                printf("%s:%lu: update term to %lu\n",
                       state_string(s->state), s->currentTerm, term);
                if (s->state == Leader) {
                        int timer_id, rc;

                        /* Leader need to stop replication */
                        timer_id = s->timer.replication_id;
                        assert(timer_id > 0);
                        assert(s->timer.loop && s->timer.loop != invalid_pointer);
                        rc = zloop_timer_end(s->timer.loop, timer_id);
                        assert(rc == 0);
                        s->timer.replication_id = 0;

                        /* also client request */
                        timer_id = s->timer.client_request_id;
                        assert(timer_id > 0);
                        rc = zloop_timer_end(s->timer.loop, timer_id);
                        assert(rc == 0);
                        s->timer.client_request_id = 0;
                }

                s->currentTerm = term;
                s->state = Follower;
                s->grant_count = 0;
                int i;
                for (i = 0; i < s->nserver; i++) {
                        struct node *node = &s->cluster[i];
                        node->voteFor = Nil;
                }
        }
}

void send_RequestVoteRequest(struct server *s, struct node *dst)
{
        assert(s);
        assert(s->sock);
        assert(dst);

        zsock_t *sock = s->sock;
        zmsg_t *msg = zmsg_new();
        assert(msg);

        zmsg_addstrf(msg, "%s", dst->identity);    /* for ZeroMQ ROUTER socket */

        zmsg_addstr(msg, "RequestVoteRequest");    /* type */
        zmsg_addstrf(msg, "%lu", s->currentTerm);  /* term */
        zmsg_addstrf(msg, "%lu", LastTerm(s));     /* log last term */
        zmsg_addstrf(msg, "%lu", LastIndex(s));    /* log last index */
        zmsg_addstrf(msg, "%d", s->self->id);      /* source */
        zmsg_addstrf(msg, "%d", dst->id);          /* destination */

        zmsg_send(&msg, sock);
        printf("%s:%lu: send RequestVoteRequest to %s\n",
               state_string(s->state), s->currentTerm, dst->identity);
}

void RequestVote(struct server *s)
{
        assert(s->state == Candidate);
        assert(s->self->voteFor == s->self->id);

        int i;
        for (i = 0; i < s->nserver; i++) {
                struct node *node = &s->cluster[i];
                if (node != s->self && !node_is_grant(node, s->self->id)) {
                        send_RequestVoteRequest(s, node);
                }
        }
}

void server_dump(struct server *s);;
int server_save(struct server *s);

void start_election(struct server *s)
{
        assert(s->state == Follower || s->state == Candidate);

        UpdateTerm(s, s->currentTerm + 1);
        s->state = Candidate;
        s->grant_count = 1;
        s->self->voteFor = s->self->id;
        server_save(s);
        server_dump(s);

        RequestVote(s);
}

long int get_timeout(long int beg, long int end)
{
        uint64_t seed = zclock_time();
        srandom(seed);
        long int n = random();
        long int range = end + 1 - beg;
        return beg + n % range;
}

/* self is up to date: return 1 */
int server_is_uptodate(
        struct server *s,
        uint64_t pkt_last_term, uint64_t pkt_last_index)
{
        uint64_t self_last_term = LastTerm(s);
        uint64_t self_last_index = LastIndex(s);

        if (self_last_term > pkt_last_term) {
                return 1;
        } else if (self_last_term == pkt_last_term) {
                if (self_last_index > pkt_last_index) {
                        return 1;
                } else {
                        return 0;
                }
        } else {
                return 0;
        }
}

int server_grant_node(
        struct server *s,
        uint64_t pkt_term, uint64_t pkt_last_term, uint64_t pkt_last_index)
{
        if (pkt_term < s->currentTerm) {
                return 0;
        } else {
                if (pkt_term > s->currentTerm) {
                        UpdateTerm(s, pkt_term);
                        server_save(s);
                        server_dump(s);
                        server_update_election_timer(s);
                }

                /* pkt_term == s->currentTerm */
                if (s->self->voteFor != Nil) {
                        /* already vote for someone */
                        return 0;
                } else {
                        /* not vote yet */
                        if (server_is_uptodate(s, pkt_last_term, pkt_last_index)) {
                                return 0;
                        } else {
                                return 1;
                        }
                }
        }
}

void server_handle_RequestVoteRequest(struct server *s, zmsg_t *msg)
{
        char *term = zmsg_popstr(msg);
        char *log_last_term = zmsg_popstr(msg);
        char *log_last_index = zmsg_popstr(msg);
        char *src_id = zmsg_popstr(msg);
        char *dst_id = zmsg_popstr(msg);

        uint64_t pkt_term = atol(term);
        uint64_t pkt_last_term = atol(log_last_term);
        uint64_t pkt_last_index = atol(log_last_index);
        uint32_t pkt_node_id = atoi(src_id);

        free(dst_id);
        free(src_id);
        free(log_last_index);
        free(log_last_term);
        free(term);

        if (pkt_node_id <= s->nserver) {
                int grant = server_grant_node(s, pkt_term, pkt_last_term, pkt_last_index);
                if (grant) {
                        s->self->voteFor = pkt_node_id;
                        server_save(s);
                        server_dump(s);
                        printf("%s:%lu: voteFor %s\n",
                               state_string(s->state), s->currentTerm, s->cluster[pkt_node_id].identity);

                        zmsg_t *msg2 = zmsg_new();
                        zmsg_addstrf(msg2, "%s", s->cluster[pkt_node_id].identity);
                        zmsg_addstrf(msg2, "%s", "RequestVoteResponse"); /* type */
                        zmsg_addstrf(msg2, "%lu", s->currentTerm); /* term */
                        zmsg_addstrf(msg2, "%d", 1); /* grant */
                        zmsg_addstrf(msg2, "%d", s->self->id); /* source */
                        zmsg_send(&msg2, s->sock);
                        printf("%s:%lu: send RequestVoteResponse to %s\n",
                               state_string(s->state), s->currentTerm, s->cluster[pkt_node_id].identity);
                }
        }
}

uint64_t Min(uint64_t a, uint64_t b)
{
        return a < b ? a : b;
}

uint64_t Max(uint64_t a, uint64_t b)
{
        return a > b ? a : b;
}

/* treat AppendEntriesRequest as heartbeat */
void send_AppendEntriesRequest(struct server *s, struct node *target)
{
        assert(s->state == Leader);
        assert(s->sock);

        uint64_t prevLogIndex = target->nextIndex - 1;
        uint64_t prevLogTerm;
        if (prevLogIndex == 0) {
                prevLogTerm = 0;
        } else {
                prevLogTerm = s->log->entry[prevLogIndex].term;
        }
        uint64_t commitIndex = Min(s->log->commitIndex, target->nextIndex);

        zmsg_t *msg = zmsg_new();
        assert(msg);

        zmsg_addstrf(msg, "%s", target->identity);       /* for ZMQ ROUTER socket */

        zmsg_addstrf(msg, "%s", "AppendEntriesRequest"); /* type */
        zmsg_addstrf(msg, "%lu", s->currentTerm);        /* term */
        zmsg_addstrf(msg, "%lu", prevLogIndex);          /* prevLogIndex */
        zmsg_addstrf(msg, "%lu", prevLogTerm);           /* prevLogTerm */
        zmsg_addstrf(msg, "%lu", commitIndex);           /* commitIndex to target */
        zmsg_addstrf(msg, "%d", s->self->id);            /* source */

        /* have entry to send */
        if (target->nextIndex < s->self->nextIndex) {
                zmsg_addstrf(msg, "%lu", s->log->entry[target->nextIndex].term);
                zmsg_addstrf(msg, "%s", s->log->entry[target->nextIndex].command);
        }

        zmsg_send(&msg, s->sock);
}

/* return 1 if log consistent, otherwise return 0 */
int logOK(struct log *log, uint64_t prevLogIndex, uint64_t prevLogTerm)
{
        if (prevLogIndex == 0) {
                return 1;
        } else {
                if (prevLogIndex <= log->len &&
                    log->entry[prevLogIndex].term == prevLogTerm) {
                        return 1;
                } else {
                        printf("logOK false: "
                               "prevLogIndex: %lu, prevLogTerm: %lu, "
                               "loglen: %lu, entryterm: %lu\n",
                               prevLogIndex, prevLogTerm,
                               log->len, log->entry[prevLogIndex].term);
                        return 0;
                }
        }
}

void send_AppendEntriesResponse(
        struct server *s,
        int success, uint64_t matchIndex, uint32_t dst_id)
{
        if (dst_id > s->nserver) {
                return;
        }

        zmsg_t *msg = zmsg_new();
        assert(msg);

        zmsg_addstrf(msg, "%s", s->cluster[dst_id].identity);
        zmsg_addstrf(msg, "%s", "AppendEntriesResponse"); /* type */
        zmsg_addstrf(msg, "%lu", s->currentTerm);         /* term */
        zmsg_addstrf(msg, "%d", success);                 /* success */
        zmsg_addstrf(msg, "%lu", matchIndex);             /* matchIndex */
        zmsg_addstrf(msg, "%u", s->self->id);             /* source */

        zmsg_send(&msg, s->sock);
}

void AdvanceCommitedIndex(struct server *s);

void server_handle_AppendEntriesResponse(struct server *s, zmsg_t *msg)
{
        printf("---- server_handle_AppendEntriesResponse ----\n");

        char *term_str = zmsg_popstr(msg);
        uint64_t term = atol(term_str);
        free(term_str);

        char *success_str = zmsg_popstr(msg);
        int success = atoi(success_str);
        free(success_str);

        char *matchIndex_str = zmsg_popstr(msg);
        uint64_t matchIndex = atol(matchIndex_str);
        free(matchIndex_str);

        char *source_str = zmsg_popstr(msg);
        uint32_t source_id = atoi(source_str);
        free(source_str);

        if (s->currentTerm < term) {
                UpdateTerm(s, term);
                server_save(s);
                server_update_election_timer(s);
        } else if (s->currentTerm == term) {
                if (success) {
                        s->cluster[source_id].matchIndex = matchIndex;
                        s->cluster[source_id].nextIndex = matchIndex + 1;
                        if (s->state == Leader) {
                                AdvanceCommitedIndex(s);
                        }
                } else {
                        s->cluster[source_id].nextIndex = Max(1, s->cluster[source_id].nextIndex - 1);
                        if (s->state == Leader) {
                                send_AppendEntriesRequest(s, &s->cluster[source_id]);
                        }
                }
        } else /* s->currentTerm > term */ {
                printf("%s:%lu: currentTerm > term(%lu), ignore AppendEntriesResponse\n",
                       state_string(s->state), s->currentTerm, term);
        }
}

void server_reject_AppendEntriesRequest(struct server *s, uint32_t dst_id)
{
        int success = 0;
        uint64_t matchIndex = 0;
        send_AppendEntriesResponse(s, success, matchIndex, dst_id);
}

void server_handle_AppendEntriesRequest(struct server *s, zmsg_t *msg)
{
        printf("---- server_handle_AppendEntriesRequest ----\n");

        char *term_str = zmsg_popstr(msg);
        uint64_t term = atol(term_str);
        free(term_str);

        char *prevLogIndex_str = zmsg_popstr(msg);
        uint64_t prevLogIndex = atol(prevLogIndex_str);
        free(prevLogIndex_str);

        char *prevLogTerm_str = zmsg_popstr(msg);
        uint64_t prevLogTerm = atol(prevLogTerm_str);
        free(prevLogTerm_str);

        char *commitIndex_str = zmsg_popstr(msg);
        uint64_t commitIndex = atol(commitIndex_str);
        free(commitIndex_str);

        char *source_id_str = zmsg_popstr(msg);
        int source_id = atoi(source_id_str);
        free(source_id_str);

        if (s->currentTerm < term) {
                UpdateTerm(s, term);
                server_update_election_timer(s);
                server_save(s);
                server_dump(s);
                assert(s->currentTerm == term);
        }

        if (s->currentTerm > term) {
                server_reject_AppendEntriesRequest(s, source_id);
                printf("%s:%lu: reject AppendEntriesRequest from term %lu\n",
                       state_string(s->state), s->currentTerm, term);
        } else /* s->currentTerm == term */ {
                int log_ok = logOK(s->log, prevLogIndex, prevLogTerm);
                if (!log_ok) {
                        printf("%s:%lu: logOK false: reject AppendEntriesRequest\n",
                               state_string(s->state), s->currentTerm);
                        server_reject_AppendEntriesRequest(s, source_id);
                } else /* log_ok == 1 */ {
                        int nentry = zmsg_size(msg);
                        uint64_t nextIndex = prevLogIndex + 1;

                        if (s->state == Candidate) {
                                printf("%s:%lu: state transition: Candidate -> Follower\n",
                                       state_string(s->state), s->currentTerm);
                                s->state = Follower;
                        }

                        server_update_election_timer(s);

                        if (nentry == 0) {
                                int success = 1;
                                s->log->commitIndex = commitIndex;
                                send_AppendEntriesResponse(s, success, prevLogIndex, source_id);
                                printf("%s:%lu: replication done\n",
                                       state_string(s->state), s->currentTerm);
                        } else {
                                /* nentry > 0 */
                                char *entry_term_str = zmsg_popstr(msg);
                                uint64_t entry_term = atol(entry_term_str);
                                free(entry_term_str);
                                char *command = zmsg_popstr(msg);

                                /* no conflict: append entry */
                                if (prevLogIndex == s->log->len) {
                                        s->log->entry[nextIndex].term = entry_term;
                                        s->log->entry[nextIndex].commandlen = strlen(command) + 1;
                                        s->log->entry[nextIndex].command = command;
                                        s->log->len += 1;
                                        server_save(s);
                                        server_dump(s);
                                        printf("%s:%lu: no conflict: append entry\n",
                                               state_string(s->state), s->currentTerm);
                                        send_AppendEntriesResponse(s, 1, nextIndex, source_id);
                                } else /* prevLogIndex < s->log->len */ {
                                        if (nextIndex <= s->log->len) {
                                                /* already done with request */
                                                if (s->log->entry[nextIndex].term == entry_term) {
                                                        s->log->commitIndex = commitIndex;
                                                        send_AppendEntriesResponse(s, 1, nextIndex, source_id);
                                                        printf("%s:%lu: already done with request, commitIndex: %lu\n",
                                                               state_string(s->state), s->currentTerm, commitIndex);
                                                }

                                                /* conflict: remove 1 entry */
                                                else {
                                                        s->log->len -= 1;
                                                        server_save(s);
                                                        server_dump(s);
                                                        printf("%s:%lu: conflict: remove 1 entry\n",
                                                               state_string(s->state), s->currentTerm);
                                                }
                                        }

                                        else {
                                                /* nentry > 2 */
                                                send_AppendEntriesResponse(s, 1, prevLogIndex, source_id);
                                                printf("%s:%lu: nentry(%d) > 2: need prevLogIndex(%lu)+1 entry\n",
                                                       state_string(s->state), s->currentTerm, nentry, prevLogIndex);
                                        }
                                }
                        }
                }
        }
}

int compare(const void *p1, const void *p2)
{
        uint64_t a = *((uint64_t *)p1);
        uint64_t b = *((uint64_t *)p2);

        if (a < b) {
                return -1;
        } else if (a == b) {
                return 0;
        } else {
                return 1;
        }
}

void AdvanceCommitedIndex(struct server *s)
{
        assert(s->state == Leader);

        uint64_t matchIndex[s->nserver];
        int i;
        for (i = 0; i < s->nserver; i++) {
                matchIndex[i] = s->cluster[i].matchIndex;
        }
        qsort(&matchIndex, s->nserver, sizeof(matchIndex[0]), compare);

        int quorum = s->nserver / 2 + 1;
        /* 从最大的位置往回数集群的大多数节点个数,就是大多数节点都确认的最大的 matchIndex */
        int index = s->nserver - quorum;
        /* 只有在当前任期提交,才算有效。这个限制导致在选举的时候,拥有当前任期
         * 提交的日志,才能当选为主节点。由于当前任期不会小于已提交日志项的所有
         * 任期,所以未同步的节点,它的最后日志项的任期一定小于已同步的大多数节点,
         * 因此,只要在节点选举时,加入这种检查,就可以阻止这些未同步的节点当选
         * 为主节点。如果允许这些未同步的节点当选为主节点,那么就会存在和大多数
         * 节点的数据冲突,它要删除自己的日志,这时还不如不让它当选为主节点。
         */
        if (s->log->entry[matchIndex[index]].term == s->currentTerm) {
                s->log->commitIndex = matchIndex[index];
                printf("%s:%lu: update commitIndex to %lu\n",
                       state_string(s->state), s->currentTerm, s->log->commitIndex);
        }
}

/* send AppendEntries to all cluster nodes */
void send_AppendEntriesRequest_to_cluster(struct server *s)
{
        int i;
        for (i = 0; i < s->nserver; i++) {
                struct node *n = &s->cluster[i];
                if (n != s->self) {
                        send_AppendEntriesRequest(s, n);
                        printf("%s:%lu: send AppendEntriesRequest to %s\n",
                               state_string(s->state), s->currentTerm, n->identity);
                }
        }
}

int log_replication_timer(zloop_t *loop, int timer_id, void *arg)
{
        struct server *s = (struct server *)arg;
        assert(s);
        assert(s->state == Leader);

        AdvanceCommitedIndex(s);
        send_AppendEntriesRequest_to_cluster(s);

        return 0;
}

/* 模拟客户端请求的定时器 */
int ClientRequest_timer(zloop_t *loop, int timer_id, void *arg)
{
        printf("---- ClientRequest_timer ----\n");

        struct server *s = (struct server *)arg;
        assert(s && s->state == Leader);
        assert(timer_id == s->timer.client_request_id);
        assert(s->self->nextIndex == s->log->len + 1);

        char *command = malloc(20);
        assert(command);
        snprintf(command, 20, "set x %lu", s->log->len);

        uint64_t nextIndex = s->self->nextIndex;
        s->log->entry[nextIndex].term = s->currentTerm;
        s->log->entry[nextIndex].command = command;
        s->log->entry[nextIndex].commandlen = strlen(command) + 1;
        s->log->len += 1;
        s->self->matchIndex = s->log->len;
        s->self->nextIndex += 1;

        server_save(s);
        server_dump(s);

        return 0;
}

void BecomeLeader(struct server *s)
{
        int rc;
        int timer_id;

        assert(s->state == Candidate);
        assert(s->timer.election_id > 0);
        assert(s->timer.loop && s->timer.loop != invalid_pointer);

        /* remove election timer */
        timer_id = s->timer.election_id;
        rc = zloop_timer_end(s->timer.loop, timer_id);
        assert(rc == 0);
        s->timer.election_id = 0;

        size_t delay, always_on;

        /* register replication timer */
        delay = get_timeout(1000, 3000);
        always_on = 0;
        timer_id = zloop_timer(s->timer.loop, delay, always_on, log_replication_timer, s);
        assert(timer_id != 0);
        s->timer.replication_id = timer_id;

        /* register client request timer */
        delay = get_timeout(25000, 30000);
        always_on = 0;
        timer_id = zloop_timer(s->timer.loop, delay, always_on, ClientRequest_timer, s);
        assert(timer_id != 0);
        s->timer.client_request_id = timer_id;

        int i;
        for (i = 0; i < s->nserver; i++) {
                struct node *n = &s->cluster[i];
                if (n != s->self) {
                        n->nextIndex = s->log->len + 1;
                        n->matchIndex = 0;
                } else {
                        n->nextIndex = s->log->len + 1;
                        n->matchIndex = s->log->len;
                }
        }
        s->state = Leader;
}

void server_handle_RequestVoteResponse(struct server *s, zmsg_t *msg)
{
        char *term_str = zmsg_popstr(msg);
        char *grant_str = zmsg_popstr(msg);
        char *source_str = zmsg_popstr(msg);

        int term = atoi(term_str);
        int grant = atoi(grant_str);
        int source = atoi(source_str);

        free(source_str);
        free(grant_str);
        free(term_str);

        if (term > s->currentTerm) {
                UpdateTerm(s, term);
                server_save(s);
                server_dump(s);
                server_update_election_timer(s);
        } else if (term == s->currentTerm) {
                if (source < s->nserver && source != s->self->id) {
                        if (grant) {
                                printf("%s:%lu: grant vote from %s\n",
                                       state_string(s->state), s->currentTerm, s->cluster[source].identity);
                                s->grant_count += 1;
                                s->cluster[source].voteFor = s->self->id;
                                if (s->grant_count * 2 > s->nserver && s->state == Candidate) {
                                        printf("%s:%lu: >>>> grant vote from quorom, become Leader <<<<\n",
                                               state_string(s->state), s->currentTerm);
                                        BecomeLeader(s);
                                        send_AppendEntriesRequest_to_cluster(s);
                                }
                        }
                }
        } else {
                /* term < self->currentTerm, just ignore */
        }
}

int on_recv(zloop_t *loop, zsock_t *sock, void *arg)
{
        printf("---- on_recv ----\n");

        struct server *server = (struct server *)arg;
        assert(server);

        zmsg_t *msg = zmsg_recv(sock);
        assert(msg);
        zmsg_print(msg);

        char *identity = zmsg_popstr(msg);
        free(identity);         /* server know the identity of source node */
        char *type = zmsg_popstr(msg);

        if (streq(type, "RequestVoteRequest")) {
                server_handle_RequestVoteRequest(server, msg);
        } else if (streq(type, "RequestVoteResponse")) {
                server_handle_RequestVoteResponse(server, msg);
        } else if (streq(type, "AppendEntriesRequest")) {
                server_handle_AppendEntriesRequest(server, msg);
        } else if (streq(type, "AppendEntriesResponse")) {
                server_handle_AppendEntriesResponse(server, msg);
        } else {
                /* unknown message, ignore */
        }

        free(type);
        zmsg_destroy(&msg);
        return 0;
}

void server_dump(struct server *s)
{
        int i;

        printf("---- server_dump ----\n");
        printf("state: %s\n"
               "currentTerm: %lu\n"
               "voteFor: 0x%x\n"
               "filename: %s\n"
               "nserver: %d\n"
               "loglen: %lu\n",
               state_string(s->state),
               s->currentTerm, s->self->voteFor,
               s->filename, s->nserver, s->log->len);

        printf("---- node ----\n");
        assert(s->cluster);
        for (i = 0; i < s->nserver; i++) {
                struct node *n = &s->cluster[i];
                printf("node%d: identity: %s, voteFor: 0x%x, matchIndex: %lu, nextIndex: %lu\n",
                       i, n->identity, n->voteFor, n->matchIndex, n->nextIndex);
        }

        printf("---- log ----\n");
        assert(s->log && s->log->len < s->log->capacity);
        for (i = 1; i <= s->log->len; i++) {
                struct entry *e = &s->log->entry[i];
                printf("entry%d: term: %lu, commandlen: %u, command: %s\n",
                       i, e->term, e->commandlen, e->command);
        }
}

/* 将 currentTerm voteFor log 保存到文件 */
int server_save(struct server *s)
{
        printf("---- server_save ----\n");

        zchunk_t *chunk = zchunk_new(NULL, 4096);
        assert(chunk);

        zchunk_append(chunk, &s->currentTerm, sizeof(s->currentTerm));
        zchunk_append(chunk, &s->self->voteFor, sizeof(s->self->voteFor));
        zchunk_append(chunk, &s->log->len, sizeof(s->log->len));
        assert(s->log->len < s->log->capacity);
        uint64_t i;
        for (i = 1; i <= s->log->len; i++) {
                struct entry *e = &s->log->entry[i];
                zchunk_append(chunk, &e->term, sizeof(e->term));
                zchunk_append(chunk, &e->commandlen, sizeof(e->commandlen));
                zchunk_append(chunk, e->command, e->commandlen);
        }

        int rc;

        zfile_t *file = zfile_new("./", s->filename);
        assert(file);
        rc = zfile_output(file);
        assert(rc == 0);

        rc = zfile_write(file, chunk, 0);
        assert(rc == 0);

        zfile_destroy(&file);
        zchunk_destroy(&chunk);
        return 0;
}

int server_load(struct server *s)
{
        if (zsys_file_exists(s->filename)) {
                int rc;

                zfile_t *file = zfile_new("./", s->filename);
                assert(file);

                rc = zfile_input(file);
                if (!rc) {
                        uint64_t len;
                        zchunk_t *chunk = zfile_read(file, 4096, 0);
                        assert(chunk);

                        byte *data = zchunk_data(chunk);
                        assert(data);

                        len = 0;
                        memcpy(&s->currentTerm, data + len, sizeof(s->currentTerm));
                        len += sizeof(s->currentTerm);

                        memcpy(&s->self->voteFor, data + len, sizeof(s->self->voteFor));
                        len += sizeof(s->self->voteFor);

                        memcpy(&s->log->len, data + len, sizeof(s->log->len));
                        len += sizeof(s->log->len);
                        assert(s->log->len < s->log->capacity);

                        uint64_t i;
                        for (i = 1; i <= s->log->len; i++) {
                                struct entry *e = &s->log->entry[i];

                                memcpy(&e->term, data + len, sizeof(e->term));
                                len += sizeof(e->term);

                                uint32_t commandlen;
                                memcpy(&commandlen, data + len, sizeof(commandlen));
                                e->commandlen = commandlen;
                                len += sizeof(commandlen);

                                e->command = malloc(commandlen);
                                assert(e->command);
                                memcpy(e->command, data + len, commandlen);
                                len += commandlen;
                        }

                        zfile_close(file);
                        zchunk_destroy(&chunk);
                        zfile_destroy(&file);

                        assert(!chunk);
                        assert(!file);
                        return 0;
                } else {
                        zfile_destroy(&file);
                        assert(!file);
                        return -1;
                }
        } else {
                return -1;
        }
}

struct server *create_server(int argc, char *argv[])
{
        int rc;

        printf("---- create_server ----\n");

        struct server *s = malloc(sizeof(struct server));
        assert(s);

        int nserver = argc - 2;
        int self_id = atoi(argv[argc - 1]);
        assert(0 <= self_id && self_id < nserver);

        s->cluster = calloc(nserver, sizeof(struct node));
        assert(s->cluster);
        s->nserver = nserver;

        s->log = create_log();
        assert(s->log);

        s->state = Follower;
        s->grant_count = 0;
        s->timer.loop = invalid_pointer;
        s->timer.election_id = 0;
        s->timer.replication_id = 0;
        s->timer.client_request_id = 0;

        s->self = &s->cluster[self_id];
        s->self->endpoint = argv[1 + self_id];
        snprintf(s->self->identity, sizeof(s->self->identity), "node%d", self_id);
        s->self->id = self_id;

        /* 从文件中读取 currentTerm, voteFor, log,如果没有或者失败,则进行初始化 */
        snprintf(s->filename, sizeof(s->filename), "node%d.save.1", self_id);
        rc = server_load(s);
        if (rc != 0) {
                s->self->voteFor = Nil;
                s->currentTerm = 0;
                s->log->len = 0;
        }
        server_dump(s);
        s->self->matchIndex = s->log->len;
        s->self->nextIndex = s->log->len + 1;

        s->sock = zsock_new(ZMQ_ROUTER);
        assert(s->sock);
        zsock_set_identity(s->sock, s->self->identity);
        /*
         * zmq 的 ROUTER socket 在接收到 identity 是相同的客户端时,如果没有设置
         * handover,会拒绝第二个客户端的连接;如果设置了 handover 为 1,在接收
         * 第二个客户端连接时,将会关闭第一个客户端的连接,将 identity 绑定到第
         * 二个客户端。由于每个节点指定了 identity,在崩溃重启后,设置的是同一个
         * identity,所以需要让对端的 ROUTER socket 接收第二个客户端的连接。
         */
        zsock_set_router_handover(s->sock, 1);
        rc = zsock_bind(s->sock, "%s", s->self->endpoint);
        /* 这里有时候在程序中断重启后,会绑定失败,原因待查。 */
        assert(rc != -1);

        int i;
        for (i = 0; i < nserver; i++) {
                struct node *e = &s->cluster[i];
                if (e != s->self) {
                        e->id = i;
                        e->nextIndex = 1;
                        e->matchIndex = 0;
                        e->endpoint = argv[1 + i];
                        snprintf(e->identity, sizeof(e->identity), "node%d", i);
                        rc = zsock_connect(s->sock, "%s", e->endpoint);
                        assert(rc == 0);
                }
        }

        return s;
}

void destroy_server(struct server **p)
{
        if (p) {
                struct server *s = *p;
                zsock_destroy(&s->sock);
                destroy_log(&s->log);
                free(s->cluster);
                free(s);
                s = NULL;
        }
}

void server_update_election_timer(struct server *s);

int elect_timeout(zloop_t *loop, int timer_id, void *arg)
{
        printf("---- elect_timeout ----\n");

        struct server *server = (struct server *)arg;
        assert(server);

        server_update_election_timer(server);
        start_election(server);

        return 0;
}

void server_create_election_timer(struct server *s)
{
        size_t delay = get_timeout(3000, 5000);
        size_t only_once = 1;
        int timer_id = zloop_timer(s->timer.loop, delay, only_once, elect_timeout, s);
        assert(timer_id != 0);
        s->timer.election_id = timer_id;
}

void server_update_election_timer(struct server *s)
{
        assert(s);
        assert(s->timer.loop != invalid_pointer);

        if (s->timer.election_id > 0) {
                int rc = zloop_timer_end(s->timer.loop, s->timer.election_id);
                assert(rc == 0);
        }
        server_create_election_timer(s);
}

void server_start_loop(struct server *s)
{
        int rc;

        zloop_t *loop = zloop_new();
        assert(loop);
        s->timer.loop = loop;

        rc = zloop_reader(loop, s->sock, on_recv, s);
        assert(rc == 0);

        server_update_election_timer(s);

        rc = zloop_start(loop);
        /* interrupted: 0, cancel by handler: -1 */
        assert(rc == 0 || rc == -1);

        zloop_destroy(&s->timer.loop);
}

int main(int argc, char *argv[])
{
        if (argc < 3) {
                printf("usage: %s peer1 peer2 peer3 ... self_id\n", argv[0]);
                exit(EXIT_FAILURE);
                return -1;
        } else {
                struct server *server = create_server(argc, argv);
                assert(server);

                server_start_loop(server);

                destroy_server(&server);
                return 0;
        }
}