paxos 解决什么问题?

paxos 算法要解决的问题是:“在网络有一些进程,它们各自都可以发起一个值。一致性算 法保证在这些值当中选择一个,最终所有进程的值都是这个被选中的值”。它同时也是设计 为“容错”的,在这个过程中,即使有进程出现崩溃、重启,也能够选出一个值来。

举个例子,网络中有 3 个进程,分别是 A,B,C。它们发起的值分别是 1,2,3。也就是:

A -> 1
B -> 2
C -> 3

这个算法保证在值 1,2,3 当中选择一个。假如选择了值 2,那么保证最终这些进程都设置 它们的值是 2。“最终”的意思是,在进程可能经历了异常之后,它最终和其他的进程通信之 后,能够设置它的值是 2。

假如在选择的过程中,进程 C 崩溃了,它不能接收,也不能发送消息。paxos 算法也能在 进程 C 崩溃之后,选出一个值来。如果进程 C 最终恢复了,它也能设置它的值是 2。

为了解决这个问题,它建立了一个模型:

  • 网络中的进程,它的速度可以是任意的,可能出错崩溃,可能重启。
  • 这些进程只能通过消息进行通信。消息传递的时间是任意长短的,消息可能丢失、重复, 但是不会损坏。

这些问题的难度在哪里?

现在来分析一下,这个问题的难度体现在哪里。依然是上面的一个例子,考虑“容错”和“最 终的一致性”。假如每个进程都决定,选择它接收到的最小的一个值。

A 接收到 [1, 2, 3],它选择了 1;B 也接收到 [1, 2, 3],它也选择了 1;C 也接收到 [1, 2, 3],它也选择 1。

这时,看起来可以解决这个问题了。但是,我们考虑 A 它只接收到 [1, 2] 的情况,也就 是它没有接收到 C 的消息。A 现在要做什么事情呢?它是继续等待还是要选出一个值来? 显然不能选出一个值来,因为 C 的消息可能会对结果造成影响。那么,它就是要继续等待 了,那么需要等待多久呢?如果是一直等待,直到收到 C 的消息时,才决定结果,这样可 以保证一致性,但是它就没有了容错的能力,因为一个进程永远都崩溃的话,整个算法不能 选出一个值来。

不能永远等待,那就只能等待一段时间了。过了一段时间之后,如果没有收到 C 的消息, 那么还是要面临一个问题:在没有收到 C 的消息时,要不要选出一个值来?考虑到来自 C 的消息可能会对结果造成影响,那么 A 还可以做什么呢?

paxos 是如何解决这个问题的?

如果需要容错,那么就意味着就算有进程崩溃,也能够选择一个值。而在崩溃的进程恢复之 后,能够知道并设置已被选中的这个值。换句话说,需要做到选择一个值的时候,不需要所 有的进程同意。

那么,至少需要多少个进程同意呢?这是一个非常非常关键的决定:只要有多数的进程同意, 就能够选出一个值。一个进程最多只能支持一个值,如果一个进程能够支持多个值,那么这 种方法就会失效。比如,现在有 5 个进程,2(A, B) 个支持了值 88,3(A, C, D) 个支持 了值 99,但是这个时候还不能选出一个值,因为还没有表态的进程,可能会影响最终的结 果。

paxos 算法定义了 3 个角色:proposer、acceptor、learner。proposer 可以发起一个请 求,请求里面带有它希望被选中的值,我们把这个请求叫做“提议”。acceptor 可以决定是 否接受这个提议,learner 可以学习到已经被选中的提议。

  • 提议只有被发起,才可能被接受
  • 如果没有发起提议,就不会被接受

一个进程同时可以扮演这三个角色。

由于进程可能出错崩溃,导致它以后可能也不会发出消息了。因此,其它的进程如果收到了 它的提议,那么就需要接受它。

  • acceptor 必须接受第一个收到的提议

这条约束会导致一个问题:假如有 4 个进程,2 个接受了提议 P1,2 个接受了提议 P2, 这时它们不能选出一个值来。因此,acceptor 需要能够接受多个提议,由于最终只能有一 个值,因此 acceptor 接受的提议的值应该相同。既然 acceptor 可以接受多个提议,那么 需要区分这些提议。我们使用提议号来区分提议,因此一个提议的组成是:(提议号,值)。

现在的任务是,如何保证 acceptor 接受的不同提议号的提议,它们的值都是相同的。我们 可以规定提议号是按顺序递增的,acceptor 能够接受一个提议,必定是因为一个 proposer 发起了一个提议。这样问题转移到 proposer 这里来了,proposer 需要保证不同的提议号, 它们的值都是相同的。我们需要一个递推关系,只要提议号是 m 的提议的值是 v,那么所 有 n > m 的提议,它们的值都是 v。

proposer 在发起一个提议 n 之前,它需要知道小于 n 的最大的提议号 m,然后提议 n 的值 设置为 m 的值。这样我们就得到了一个递推关系,然后我们还需要一个起始点。起始点其 实就是,如果提议 n 是提议号最小的,那么它就是一个起始点。

这样就可以保证从 m 到 n 的提议都得到相同的值,但我们还有一个问题没有解决。如果将 来其它的 proposer 发起了一个提议号是最小的提议怎么办?我们可以让 acceptor 不接受 这样的一个提议,同时让这个 proposer 增大它的提议号。proposer 可以发送一个请求让 acceptor 不再接受小于它要发送的提议号 n 的提议,我们可以将这个请求叫做“prepare request”。如果 acceptor 发送回应,做出保证之后,那么 proposer 可以最终地发起提议 了,我们将这个提议叫做“accept request”。

最终,paxos 算法归结为一个两阶段的步骤:

1a. proposer 发起一个 prepare request,包含它要发起的提议号 n。
1b. acceptor 接受到 prepare request 之后,如果已接受的最大提议号 m < n,那么
    它就接受这个提议,同时把它接受的最大提议号和它的值(如果有的话)发送回
    proposer,这个响应叫做 promise。
2a. proposer 收到大多数 acceptor 发回的 promise 响应之后,它发起一个提议(n,
    v)。其中 v 的值是收到的所有响应中小于 n 的最高提议号的值,如果这个由
    acceptor 发回的提议号没有值,那么 proposer 可以自己选一个。
2b. acceptor 在收到 accept request 之后,如果收到的提议号 n 不小于最高的
    promise 提议号,那么 acceptor 就接受它。

那么,如何实现 paxos 算法呢?

这个实现使用了 ZeroMQ 进行收发消息,所以需要安装它:

$ wget -c https://github.com/zeromq/zeromq4-1/releases/download/v4.1.5/zeromq-4.1.5.tar.gz
$ tar xf zeromq-4.1.5.tar.gz
$ cd zeromq-4.1.5
$ ./autogen.sh && ./configure && make -j4
$ make check
$ sudo make install
$ sudo ldconfig
$ sudo pip3 install --upgrade pyzmq

实现的代码比较原始,没有整理过:

# -*- coding: utf-8 -*-

import os
import sys
import zmq
import random

myself = sys.argv[1]
peers = sys.argv[1:]
majority = len(peers) // 2 + 1
print("myself: {0}, peers: {1}, majority: {2}".format(myself, peers, majority))

def asbytes(obj):
    s = str(obj)
    if str is not bytes:
        # Python 3
        s = s.encode('ascii')
    return s

def read_proposal(fd):
    line = fd.readline()
    items = line.split()
    number = None
    value = None
    if len(items) == 2:
        number = int(items[0])
        value = int(items[1])
    elif len(items) == 1:
        number = int(items[0])
        value = None
    else:
        number = None
        value = None
    return (number, value)

def init(promised = "promised.txt", accepted = "accepted.txt"):
    fd1 = open(promised, "a+")
    fd1.seek(0)
    number1, value1 = read_proposal(fd1)

    fd2 = open(accepted, "a+")
    fd2.seek(0)
    number2, value2 = read_proposal(fd2)

    return (fd1, number1, fd2, number2, value2)

def remember_promised(fd, number):
    fd.seek(0)
    fd.truncate(0)
    fd.write("{0}".format(number))

def remember_accepted(fd, number, value):
    fd.seek(0)
    fd.truncate(0)
    if value:
        fd.write("{0} {1}".format(number, value))
    else:
        fd.write("{0}".format(number))

context = zmq.Context()

commander = context.socket(zmq.PULL)
commander.bind("ipc://commander-{0}.ipc".format(myself))

leader = context.socket(zmq.ROUTER)
identity = asbytes("leader-{0}".format(myself))
leader.setsockopt(zmq.IDENTITY, identity)
print("=== leader set identity to {0}".format(identity))

leader.bind("ipc://leader-{0}.ipc".format(myself))
print("=== leader listen on ipc://leader-{0}.ipc".format(myself))

acceptor = context.socket(zmq.ROUTER)
identity = asbytes("acceptor-{0}".format(myself))
acceptor.setsockopt(zmq.IDENTITY, identity)
print("=== acceptor set identity to {0}".format(identity))

poller = zmq.Poller()
poller.register(leader, zmq.POLLIN)
poller.register(acceptor, zmq.POLLIN)
poller.register(commander, zmq.POLLIN)

for peer in peers:
    print(">>> connecting ipc://{0}-leader.ipc".format(peer))
    acceptor.connect("ipc://leader-{0}.ipc".format(peer))

proposal_number = 0
saved_values = set()
self_value = random.randrange(1, 10000)

promised_proposal_number = None
promised_proposal_value = None

accepted_proposal_number = None
accepted_proposal_value = None

values = {}
choosed_value = None
leader_identity = None

self_leader = False

def make_promise(number):
    if accepted_proposal_number is None:
        return True
    if promised_proposal_number is None:
        return True
    return number > promised_proposal_number

promised_fd, promised_proposal_number, accepted_fd, accepted_proposal_number, accepted_proposal_value = init()
print("=== init: promised_fd({0}), promised_number({1}), accepted_fd({2}), accepted_number({3}), accepted_value({4})".format(promised_fd, promised_proposal_number, accepted_fd, accepted_proposal_number, accepted_proposal_value))

while True:
    socks = dict(poller.poll(timeout = 3000))
    if commander in socks:
        command = commander.recv()
        if command == b"LEADER":
            self_leader = True
            for peer in peers:
                identity = asbytes("acceptor-{0}".format(peer))
                leader.send_multipart([identity, b"LEADER"])
        else:
            print("===C leader: command({0}) not supported".format(command))

    elif leader in socks:
        msg = leader.recv_multipart()
        ident = msg[0]
        command = msg[1]
        print("###L recv: ident({0}), command({1})".format(ident, command))

        if command == b"PROPOSAL":
            value_s = msg[2]
            saved_values.add(value_s)
            print("###L recv: PROPOSAL({0})".format(msg))

            proposal_number += 1
            number_s = asbytes(str(proposal_number))
            values[number_s] = []

            for peer in peers:
                peer_ident = asbytes("acceptor-{0}".format(peer))
                print("###L send: ident({0}), PREPARE({1})".format(peer_ident, number_s))
                leader.send_multipart([peer_ident, b"PREPARE", number_s, value_s])

        elif command == b"PROMISE":
            proposal_n = msg[2]
            accepted_n = msg[3]
            accepted_v = msg[4]
            print("###L recv: ident({0}), PROMISE({1} -> {2} {3})".format(ident, proposal_n, accepted_n, accepted_v))

            if choosed_value is None:
                choosed_value = saved_values.pop()
                saved_values.add(choosed_value)

            # accepts: [(n1, v1), (n2, v2)]
            def choose_value(accepts):
                n1, v1 = None, None
                for n_s, v_s in accepts:
                    if n_s != b'' and v_s != b'':
                        n2 = int(n_s)
                        v2 = int(v_s)
                        if (n1 is not None) and (n2 > n1):
                            n1 = n2
                            v1 = v2
                return v1

            values[proposal_n].append((accepted_n, accepted_v))
            supports = len(values[proposal_n])
            print("###L supports: {0}, majority: {1}".format(supports, majority))
            if supports >= majority:
                accepted_value = choose_value(values[proposal_n])
                if accepted_value is None:
                    accepted_value = choosed_value
                for peer in peers:
                    peer_ident = asbytes("acceptor-{0}".format(peer))
                    leader.send_multipart([peer_ident, b"ACCEPT", proposal_n, accepted_value])

        elif command == b"LEARN":
            proposal_n = msg[2]
            accepted_value = msg[3]

            n = int(proposal_n)
            v = int(accepted_value)

            accepted_proposal_number = n
            accepted_proposal_value = v

            remember_accepted(accepted_fd, n, v)

    elif acceptor in socks:
        msg = acceptor.recv_multipart()
        ident = msg[0]
        command = msg[1]
        print("$$$A recv: ident({0}), command({1})".format(ident, command))

        if command == b"LEADER":
            value_s = asbytes(str(self_value))
            leader_identity = ident
            acceptor.send_multipart([ident, b"PROPOSAL", value_s])
            print("$$$A send: ident({0}), PROPOSAL({1})".format(ident, self_value))
        elif command == b"PREPARE":
            number_s = msg[2]
            print("$$$A recv: PREPARE({0})".format(number_s))

            success = make_promise(int(number_s))
            if success:
                promised_proposal_number = int(number_s)
                remember_promised(promised_fd, promised_proposal_number)

                n_s = b''
                if accepted_proposal_number and accepted_proposal_number < promised_proposal_number:
                    n_s = asbytes(str(accepted_proposal_number))

                v_s = b''
                if accepted_proposal_value:
                    v_s = asbytes(str(accepted_proposal_value))

                print("$$$A send: ident({0}), PROMISE({1} -> {2} {3})".format(ident, promised_proposal_number, n_s, v_s))
                acceptor.send_multipart([ident, b"PROMISE", number_s, n_s, v_s])
        elif command == b"ACCEPT":
            proposal_n = msg[2]
            accepted_value = msg[3]

            n = int(proposal_n)
            v = int(accepted_value)
            if n >= promised_proposal_number:
                accepted_proposal_number = n
                accepted_proposal_value = v
                remember_accepted(accepted_fd, n, v)
                acceptor.send_multipart([ident, b"LEARN", proposal_n, accepted_value])
                print("$$$A ACCEPT({0} {1})".format(proposal_n, accepted_value))
        elif command == b"DISCOVER":
            if leader_identity is None:
                leader_identity = ident

    else:
        # timeout

        # leader
        #print("###L self_leader: {0}".format(self_leader))
        if self_leader:
            for peer in peers:
                peer_ident = asbytes("acceptor-{0}".format(peer))
                #print("###L send: ident({0})".format(peer_ident))
                leader.send_multipart([peer_ident, b"DISCOVER"])

        # proposer
        #print("$$$P: accepted: {0}, leader_identity: {1}".format(accepted_proposal_value, leader_identity))
        if accepted_proposal_value is None and leader_identity:
            value_s = asbytes(str(self_value))
            #print("$$$P: send: ident({0}), PROPOSAL({1})".format(leader_identity, value_s))
            acceptor.send_multipart([leader_identity, b"PROPOSAL", value_s])
# -*- coding: utf-8 -*-

import sys
import zmq

def usage():
    print("usage: {0} <leader>")
    sys.exit(1)

if len(sys.argv) != 2:
    usage()

where = sys.argv[1]

context = zmq.Context()
commander = context.socket(zmq.PUSH)
commander.connect("ipc://commander-{0}.ipc".format(where))

commander.send(b"LEADER")

运行这个程序:

$ python3 paxos.py node0 node1 node2
$ python3 paxos.py node1 node0 node2
$ python3 command.py node0
$ python3 paxos.py node2 node0 node1

这个实现有一些限制:没有实现多轮的选择,不能支持任意的节点加入和退出。首先启动 node0 和 node1 两个节点,然后手动选择 node0 作为 leader 节点。

注意到现在 node2 节点现在还没有启动,但是现在还是可以选择一个值出来。等到 node2 节点启动后,它却可以选出一个和 node0 和 node1 节点相同的值。

参考资料