paxos: 一致性算法的理解与实现
paxos 解决什么问题?
paxos 算法要解决的问题是:“在网络有一些进程,它们各自都可以发起一个值。一致性算 法保证在这些值当中选择一个,最终所有进程的值都是这个被选中的值”。它同时也是设计 为“容错”的,在这个过程中,即使有进程出现崩溃、重启,也能够选出一个值来。
举个例子,网络中有 3 个进程,分别是 A,B,C。它们发起的值分别是 1,2,3。也就是:
A -> 1
B -> 2
C -> 3
这个算法保证在值 1,2,3 当中选择一个。假如选择了值 2,那么保证最终这些进程都设置 它们的值是 2。“最终”的意思是,在进程可能经历了异常之后,它最终和其他的进程通信之 后,能够设置它的值是 2。
假如在选择的过程中,进程 C 崩溃了,它不能接收,也不能发送消息。paxos 算法也能在 进程 C 崩溃之后,选出一个值来。如果进程 C 最终恢复了,它也能设置它的值是 2。
为了解决这个问题,它建立了一个模型:
- 网络中的进程,它的速度可以是任意的,可能出错崩溃,可能重启。
- 这些进程只能通过消息进行通信。消息传递的时间是任意长短的,消息可能丢失、重复, 但是不会损坏。
这些问题的难度在哪里?
现在来分析一下,这个问题的难度体现在哪里。依然是上面的一个例子,考虑“容错”和“最 终的一致性”。假如每个进程都决定,选择它接收到的最小的一个值。
A 接收到 [1, 2, 3],它选择了 1;B 也接收到 [1, 2, 3],它也选择了 1;C 也接收到 [1, 2, 3],它也选择 1。
这时,看起来可以解决这个问题了。但是,我们考虑 A 它只接收到 [1, 2] 的情况,也就 是它没有接收到 C 的消息。A 现在要做什么事情呢?它是继续等待还是要选出一个值来? 显然不能选出一个值来,因为 C 的消息可能会对结果造成影响。那么,它就是要继续等待 了,那么需要等待多久呢?如果是一直等待,直到收到 C 的消息时,才决定结果,这样可 以保证一致性,但是它就没有了容错的能力,因为一个进程永远都崩溃的话,整个算法不能 选出一个值来。
不能永远等待,那就只能等待一段时间了。过了一段时间之后,如果没有收到 C 的消息, 那么还是要面临一个问题:在没有收到 C 的消息时,要不要选出一个值来?考虑到来自 C 的消息可能会对结果造成影响,那么 A 还可以做什么呢?
paxos 是如何解决这个问题的?
如果需要容错,那么就意味着就算有进程崩溃,也能够选择一个值。而在崩溃的进程恢复之 后,能够知道并设置已被选中的这个值。换句话说,需要做到选择一个值的时候,不需要所 有的进程同意。
那么,至少需要多少个进程同意呢?这是一个非常非常关键的决定:只要有多数的进程同意, 就能够选出一个值。一个进程最多只能支持一个值,如果一个进程能够支持多个值,那么这 种方法就会失效。比如,现在有 5 个进程,2(A, B) 个支持了值 88,3(A, C, D) 个支持 了值 99,但是这个时候还不能选出一个值,因为还没有表态的进程,可能会影响最终的结 果。
paxos 算法定义了 3 个角色:proposer、acceptor、learner。proposer 可以发起一个请 求,请求里面带有它希望被选中的值,我们把这个请求叫做“提议”。acceptor 可以决定是 否接受这个提议,learner 可以学习到已经被选中的提议。
- 提议只有被发起,才可能被接受
- 如果没有发起提议,就不会被接受
一个进程同时可以扮演这三个角色。
由于进程可能出错崩溃,导致它以后可能也不会发出消息了。因此,其它的进程如果收到了 它的提议,那么就需要接受它。
- acceptor 必须接受第一个收到的提议
这条约束会导致一个问题:假如有 4 个进程,2 个接受了提议 P1,2 个接受了提议 P2, 这时它们不能选出一个值来。因此,acceptor 需要能够接受多个提议,由于最终只能有一 个值,因此 acceptor 接受的提议的值应该相同。既然 acceptor 可以接受多个提议,那么 需要区分这些提议。我们使用提议号来区分提议,因此一个提议的组成是:(提议号,值)。
现在的任务是,如何保证 acceptor 接受的不同提议号的提议,它们的值都是相同的。我们 可以规定提议号是按顺序递增的,acceptor 能够接受一个提议,必定是因为一个 proposer 发起了一个提议。这样问题转移到 proposer 这里来了,proposer 需要保证不同的提议号, 它们的值都是相同的。我们需要一个递推关系,只要提议号是 m 的提议的值是 v,那么所 有 n > m 的提议,它们的值都是 v。
proposer 在发起一个提议 n 之前,它需要知道小于 n 的最大的提议号 m,然后提议 n 的值 设置为 m 的值。这样我们就得到了一个递推关系,然后我们还需要一个起始点。起始点其 实就是,如果提议 n 是提议号最小的,那么它就是一个起始点。
这样就可以保证从 m 到 n 的提议都得到相同的值,但我们还有一个问题没有解决。如果将 来其它的 proposer 发起了一个提议号是最小的提议怎么办?我们可以让 acceptor 不接受 这样的一个提议,同时让这个 proposer 增大它的提议号。proposer 可以发送一个请求让 acceptor 不再接受小于它要发送的提议号 n 的提议,我们可以将这个请求叫做“prepare request”。如果 acceptor 发送回应,做出保证之后,那么 proposer 可以最终地发起提议 了,我们将这个提议叫做“accept request”。
最终,paxos 算法归结为一个两阶段的步骤:
1a. proposer 发起一个 prepare request,包含它要发起的提议号 n。
1b. acceptor 接受到 prepare request 之后,如果已接受的最大提议号 m < n,那么
它就接受这个提议,同时把它接受的最大提议号和它的值(如果有的话)发送回
proposer,这个响应叫做 promise。
2a. proposer 收到大多数 acceptor 发回的 promise 响应之后,它发起一个提议(n,
v)。其中 v 的值是收到的所有响应中小于 n 的最高提议号的值,如果这个由
acceptor 发回的提议号没有值,那么 proposer 可以自己选一个。
2b. acceptor 在收到 accept request 之后,如果收到的提议号 n 不小于最高的
promise 提议号,那么 acceptor 就接受它。
那么,如何实现 paxos 算法呢?
这个实现使用了 ZeroMQ 进行收发消息,所以需要安装它:
$ wget -c https://github.com/zeromq/zeromq4-1/releases/download/v4.1.5/zeromq-4.1.5.tar.gz
$ tar xf zeromq-4.1.5.tar.gz
$ cd zeromq-4.1.5
$ ./autogen.sh && ./configure && make -j4
$ make check
$ sudo make install
$ sudo ldconfig
$ sudo pip3 install --upgrade pyzmq
实现的代码比较原始,没有整理过:
# -*- coding: utf-8 -*-
import os
import sys
import zmq
import random
myself = sys.argv[1]
peers = sys.argv[1:]
majority = len(peers) // 2 + 1
print("myself: {0}, peers: {1}, majority: {2}".format(myself, peers, majority))
def asbytes(obj):
s = str(obj)
if str is not bytes:
# Python 3
s = s.encode('ascii')
return s
def read_proposal(fd):
line = fd.readline()
items = line.split()
number = None
value = None
if len(items) == 2:
number = int(items[0])
value = int(items[1])
elif len(items) == 1:
number = int(items[0])
value = None
else:
number = None
value = None
return (number, value)
def init(promised = "promised.txt", accepted = "accepted.txt"):
fd1 = open(promised, "a+")
fd1.seek(0)
number1, value1 = read_proposal(fd1)
fd2 = open(accepted, "a+")
fd2.seek(0)
number2, value2 = read_proposal(fd2)
return (fd1, number1, fd2, number2, value2)
def remember_promised(fd, number):
fd.seek(0)
fd.truncate(0)
fd.write("{0}".format(number))
def remember_accepted(fd, number, value):
fd.seek(0)
fd.truncate(0)
if value:
fd.write("{0} {1}".format(number, value))
else:
fd.write("{0}".format(number))
context = zmq.Context()
commander = context.socket(zmq.PULL)
commander.bind("ipc://commander-{0}.ipc".format(myself))
leader = context.socket(zmq.ROUTER)
identity = asbytes("leader-{0}".format(myself))
leader.setsockopt(zmq.IDENTITY, identity)
print("=== leader set identity to {0}".format(identity))
leader.bind("ipc://leader-{0}.ipc".format(myself))
print("=== leader listen on ipc://leader-{0}.ipc".format(myself))
acceptor = context.socket(zmq.ROUTER)
identity = asbytes("acceptor-{0}".format(myself))
acceptor.setsockopt(zmq.IDENTITY, identity)
print("=== acceptor set identity to {0}".format(identity))
poller = zmq.Poller()
poller.register(leader, zmq.POLLIN)
poller.register(acceptor, zmq.POLLIN)
poller.register(commander, zmq.POLLIN)
for peer in peers:
print(">>> connecting ipc://{0}-leader.ipc".format(peer))
acceptor.connect("ipc://leader-{0}.ipc".format(peer))
proposal_number = 0
saved_values = set()
self_value = random.randrange(1, 10000)
promised_proposal_number = None
promised_proposal_value = None
accepted_proposal_number = None
accepted_proposal_value = None
values = {}
choosed_value = None
leader_identity = None
self_leader = False
def make_promise(number):
if accepted_proposal_number is None:
return True
if promised_proposal_number is None:
return True
return number > promised_proposal_number
promised_fd, promised_proposal_number, accepted_fd, accepted_proposal_number, accepted_proposal_value = init()
print("=== init: promised_fd({0}), promised_number({1}), accepted_fd({2}), accepted_number({3}), accepted_value({4})".format(promised_fd, promised_proposal_number, accepted_fd, accepted_proposal_number, accepted_proposal_value))
while True:
socks = dict(poller.poll(timeout = 3000))
if commander in socks:
command = commander.recv()
if command == b"LEADER":
self_leader = True
for peer in peers:
identity = asbytes("acceptor-{0}".format(peer))
leader.send_multipart([identity, b"LEADER"])
else:
print("===C leader: command({0}) not supported".format(command))
elif leader in socks:
msg = leader.recv_multipart()
ident = msg[0]
command = msg[1]
print("###L recv: ident({0}), command({1})".format(ident, command))
if command == b"PROPOSAL":
value_s = msg[2]
saved_values.add(value_s)
print("###L recv: PROPOSAL({0})".format(msg))
proposal_number += 1
number_s = asbytes(str(proposal_number))
values[number_s] = []
for peer in peers:
peer_ident = asbytes("acceptor-{0}".format(peer))
print("###L send: ident({0}), PREPARE({1})".format(peer_ident, number_s))
leader.send_multipart([peer_ident, b"PREPARE", number_s, value_s])
elif command == b"PROMISE":
proposal_n = msg[2]
accepted_n = msg[3]
accepted_v = msg[4]
print("###L recv: ident({0}), PROMISE({1} -> {2} {3})".format(ident, proposal_n, accepted_n, accepted_v))
if choosed_value is None:
choosed_value = saved_values.pop()
saved_values.add(choosed_value)
# accepts: [(n1, v1), (n2, v2)]
def choose_value(accepts):
n1, v1 = None, None
for n_s, v_s in accepts:
if n_s != b'' and v_s != b'':
n2 = int(n_s)
v2 = int(v_s)
if (n1 is not None) and (n2 > n1):
n1 = n2
v1 = v2
return v1
values[proposal_n].append((accepted_n, accepted_v))
supports = len(values[proposal_n])
print("###L supports: {0}, majority: {1}".format(supports, majority))
if supports >= majority:
accepted_value = choose_value(values[proposal_n])
if accepted_value is None:
accepted_value = choosed_value
for peer in peers:
peer_ident = asbytes("acceptor-{0}".format(peer))
leader.send_multipart([peer_ident, b"ACCEPT", proposal_n, accepted_value])
elif command == b"LEARN":
proposal_n = msg[2]
accepted_value = msg[3]
n = int(proposal_n)
v = int(accepted_value)
accepted_proposal_number = n
accepted_proposal_value = v
remember_accepted(accepted_fd, n, v)
elif acceptor in socks:
msg = acceptor.recv_multipart()
ident = msg[0]
command = msg[1]
print("$$$A recv: ident({0}), command({1})".format(ident, command))
if command == b"LEADER":
value_s = asbytes(str(self_value))
leader_identity = ident
acceptor.send_multipart([ident, b"PROPOSAL", value_s])
print("$$$A send: ident({0}), PROPOSAL({1})".format(ident, self_value))
elif command == b"PREPARE":
number_s = msg[2]
print("$$$A recv: PREPARE({0})".format(number_s))
success = make_promise(int(number_s))
if success:
promised_proposal_number = int(number_s)
remember_promised(promised_fd, promised_proposal_number)
n_s = b''
if accepted_proposal_number and accepted_proposal_number < promised_proposal_number:
n_s = asbytes(str(accepted_proposal_number))
v_s = b''
if accepted_proposal_value:
v_s = asbytes(str(accepted_proposal_value))
print("$$$A send: ident({0}), PROMISE({1} -> {2} {3})".format(ident, promised_proposal_number, n_s, v_s))
acceptor.send_multipart([ident, b"PROMISE", number_s, n_s, v_s])
elif command == b"ACCEPT":
proposal_n = msg[2]
accepted_value = msg[3]
n = int(proposal_n)
v = int(accepted_value)
if n >= promised_proposal_number:
accepted_proposal_number = n
accepted_proposal_value = v
remember_accepted(accepted_fd, n, v)
acceptor.send_multipart([ident, b"LEARN", proposal_n, accepted_value])
print("$$$A ACCEPT({0} {1})".format(proposal_n, accepted_value))
elif command == b"DISCOVER":
if leader_identity is None:
leader_identity = ident
else:
# timeout
# leader
#print("###L self_leader: {0}".format(self_leader))
if self_leader:
for peer in peers:
peer_ident = asbytes("acceptor-{0}".format(peer))
#print("###L send: ident({0})".format(peer_ident))
leader.send_multipart([peer_ident, b"DISCOVER"])
# proposer
#print("$$$P: accepted: {0}, leader_identity: {1}".format(accepted_proposal_value, leader_identity))
if accepted_proposal_value is None and leader_identity:
value_s = asbytes(str(self_value))
#print("$$$P: send: ident({0}), PROPOSAL({1})".format(leader_identity, value_s))
acceptor.send_multipart([leader_identity, b"PROPOSAL", value_s])
# -*- coding: utf-8 -*-
import sys
import zmq
def usage():
print("usage: {0} <leader>")
sys.exit(1)
if len(sys.argv) != 2:
usage()
where = sys.argv[1]
context = zmq.Context()
commander = context.socket(zmq.PUSH)
commander.connect("ipc://commander-{0}.ipc".format(where))
commander.send(b"LEADER")
运行这个程序:
$ python3 paxos.py node0 node1 node2
$ python3 paxos.py node1 node0 node2
$ python3 command.py node0
$ python3 paxos.py node2 node0 node1
这个实现有一些限制:没有实现多轮的选择,不能支持任意的节点加入和退出。首先启动 node0 和 node1 两个节点,然后手动选择 node0 作为 leader 节点。
注意到现在 node2 节点现在还没有启动,但是现在还是可以选择一个值出来。等到 node2 节点启动后,它却可以选出一个和 node0 和 node1 节点相同的值。