paxos, a revelation of distributed consensus when scaling a distributed architecture
- Things fail. You can never count on anything being reliable. Even if you have a perfectly bug-free software, and hardware that never breaks, you’ve still got to deal with the fact that network connections can break, or messages within a network can get lost, or that some bozo might sever your network connection with a bulldozer. (That really happened while I was at Google!)
- Given (1), you can never rely on one copy of anything, because that copy might become unavailable due to a failure. So you need to keep multiple copies, and those copies need to be consistent – meaning that at any time, all of the copies agree about their contents.
- There’s no way to maintain a single completely consistent view of time between multiple computers. Due to inconsistencies in individual machine performance, and variable network delays, variable storage latency, and several other factors, there’s no canonical way of saying that for two events X and Y, “X happened before Y”. What that means is that when you try to maintain a consistent set of data, you can’t just say “Run all of the events in order”, because while one server maintaining one copy might “know” that X happened before Y, another server maintaining another copy might be just as certain that Y happened before X.
- Processors (aka participants, servers, computers):
- operate at any speed. No two processors necessarily operate at the same speed.
- may fail without warning.
- may rejoin after recovering from a failure.
- are cooperative (in the sense that they do not attempt to cause failures).
- Delivers messages between any pair of processors.
- Transmits messages asynchronously.
- Delivers messages at arbitrary speeds.
- Does not guarantee that messages will be delivered in the order in which they were transmitted.
- Does guarantee that a message, if delivered, will be delivered correctly, without any changes.
- May fail to deliver a message.
- May deliver multiple copies of the same message.
- The client is not part of the paxos cluster. It’s an external entity whose actions trigger state changes by makingrequeststo the paxos system. Each state update in paxos is initiated by a client request, and completed by a reply to the client.
- An acceptor (also called a voter) is a participant in the maintanence of distributed storage. A state change in a paxos cluster does not occur until a majority (quorum) of acceptors agree upon it.
- A proposer recieves a request from the client, and attempts to get a quorum of acceptors to agree on it.
- One of the proposers is special. It is the single proposer who most recently had a proposal accepted. In many paxos implementations, there is only one active proposer serving client requests: the only time the other proposers send proposals is when the current leader fails, and a new one needs to be selected.
- The learner is the real service provided by the paxos cluster. Once a proposal is accepted, alearnerprocesses the request from the client, and sends it the result.
- Phase One: Prepare/Promise
- Proposer: A proposer attempts to start setting a new consensus by sending a Prepare(N) message to a quorum of acceptors. It can send to any group of acceptors, so long as that group forms a majority of the acceptors. The prepare message specifies a numeric identifier N for its proposal, which is larger than any proposal that’s been sent by this proposer.
Acceptors:Each acceptor, upon receiving the proposal, checks if the N-value from the prepare message is greater than any proposal from the current round that it has accepted. If so, it sends a reply called a Promise to the proposer, promising that it will never accept any proposal with a number less than N. If the acceptor has accepted a proposal with number less than in the current round, then it includes the pair consisting of the proposed consensus value and the number of the accepted proposal that proposed .
The acceptor thus sends a message Promise(N, (v, nv)) (if it has accepted a proposal this round) or Promise(N, null) (if it has not yet accepted a proposal with a number less than N).
Once it’s sent a promising message, it must not accept any request for a proposal with number less than N. Note though that this does not mean that the acceptor promises to accept the proposal: all it’s doing is promising not to accept any proposal with number less than N! If in receives a message Prepare(N+1), it’s free to promise that – but if it does, it will no longer be able to accept the proposal for N.
(If N is smaller than the number of any proposal promised or accepted by the acceptor, then in the original version of Paxos, the acceptor does nothing; in some optimizations of the protocol, it replies Reject(nv).)
What this phase does is allow a proposer to determine whether or not a new proposal is even worth considering. If a quorum (majority) of acceptors send promises, then it can move on to phase 2.
- Phase Two: Accept!/Accepted
When a proposer receives promises from a quorum of acceptors, then it moves forward to try to actually commit the proposal. In order to do this, it needs to choose a value for the proposal. If any of the Promise messages contained a value, then the value of this proposal must be set to the value of the highest proposal number in any of the promises. If all of the promises were empty, then the proposer can choose any value that it wants for the proposal.Once the proposer has chosen a value, then it sends a message Accept!(N, V) to a quorum of acceptors. This is typically written with the exclamation point because it’s really a command to the acceptors: they’re being told to accept the proposal if they can.When an acceptor receives an Accept!(N, v) message, if it has not issued a promise for a proposal with a number greater than N, then it must accept the message. It accepts the proposal by sending a message Accepted(N, V) to both the original proposer and all of the learners.When Accepted messages have been received from a quorum of acceptors, the new value V becomes the consensus value for the Paxos cluster, and the new proposal number N is fully committed.
- a client is a user attempting to run a new job on the cluster. It sends a request to the scheduler detailing the set of resources that it wants to request.
- Each duplicate of the scheduler is a proposer, an acceptor, and a learner. There’s one active instance of the scheduler, which is the leader. When a client wants to schedule a job, its request gets sent to the leading scheduler.
- In the normal non-error case, this works as follows:
- When a scheduling request is received, the leader proposes scheduling the job, by sending a message to all of the other schedulers saying that it wants to schedule job N.
- The other schedulers, if they haven’t seen a proposal for a job with a number greater than I, make promises to accept that proposal.
- The leading scheduler chooses resources for the job, and then sends an Accept! message to the other schedulers.
- The other schedulers reply accepting the scheduling. The non-leader schedulers, acting as learnings, record the scheduling information, and the leader actually starts the job.
- Errors occur when there was some kind of failure. In that case, we don’t necessarily know who the leader is – so we get multiple schedulers trying to act as if they’re the leader. So they each send proposals. Whichever proposal had the largest proposal number will eventually get accepted, and its proposer becomes the new leader.
It’s a pretty simple thing – the core concept is simply that no consensus proposal is considered “committed” until it’s been accepted by a majority of the participants. And if it’s been accepted by a majority of the participants, that means that no conflicting proposal can ever reach consensus – because that would require at least one participant to accept 2 conflicting proposals.
- No value ever reaches consensus without first being proposed, and having its proposal accepted.
- No two distinct values ever reach consensus at the same time.
- An acceptor can only accept a proposal P if and only if it has not yet made a promise
for a proposal where n>=P is the consensus value of the
highest numbered proposal that has been accepted before this proposal.
Some simple python implementations that I would like to point here as references to anyone interested in the nuts and bolts of the underlying engineering are listed below.
Obviously it is even better to display some code when necessary. I strongly believe that the following interfaces of python are self-explanatory.
''' This module provides an implementation of the Paxos algorithm as a set of composable classes. The code is available: https://raw.githubusercontent.com/cocagne/python-composable-paxos/master/composable_paxos.py ''' import collections # ProposalID # # In order for the Paxos algorithm to function, all proposal ids must be # unique. A simple way to ensure this is to include the proposer's unique # id in the proposal id. # # Python tuples allow the proposal number and the UID to be combined in a # manner that supports comparison in the expected manner: # # (4,'C') > (4,'B') > (3,'Z') # # Named tuples from the collections module support all of the regular # tuple operations but additionally allow access to the contents by # name so the numeric component of the proposal ID may be referred to # via 'proposal_id.number' instead of 'proposal_id'. # ProposalID = collections.namedtuple('ProposalID', ['number', 'uid']) class PaxosMessage (object): ''' Base class for all messages defined in this module ''' from_uid = None # Set by subclass constructor class Prepare (PaxosMessage): ''' Prepare messages should be broadcast to all Acceptors. ''' def __init__(self, from_uid, proposal_id): self.from_uid = from_uid self.proposal_id = proposal_id class Nack (PaxosMessage): ''' NACKs are technically optional though few practical applications will want to omit their use. They are used to signal a proposer that their current proposal number is out of date and that a new one should be chosen. NACKs may be sent in response to both Prepare and Accept messages ''' def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): self.from_uid = from_uid self.proposal_id = proposal_id self.proposer_uid = proposer_uid self.promised_proposal_id = promised_proposal_id class Promise (PaxosMessage): ''' Promise messages should be sent to at least the Proposer specified in the proposer_uid field ''' def __init__(self, from_uid, proposer_uid, proposal_id, last_accepted_id, last_accepted_value): self.from_uid = from_uid self.proposer_uid = proposer_uid self.proposal_id = proposal_id self.last_accepted_id = last_accepted_id self.last_accepted_value = last_accepted_value class Accept (PaxosMessage): ''' Accept messages should be broadcast to all Acceptors ''' def __init__(self, from_uid, proposal_id, proposal_value): self.from_uid = from_uid self.proposal_id = proposal_id self.proposal_value = proposal_value class Accepted (PaxosMessage): ''' Accepted messages should be sent to all Learners ''' def __init__(self, from_uid, proposal_id, proposal_value): self.from_uid = from_uid self.proposal_id = proposal_id self.proposal_value = proposal_value class Resolution (PaxosMessage): ''' Optional message used to indicate that the final value has been selected ''' def __init__(self, from_uid, value): self.from_uid = from_uid self.value = value class InvalidMessageError (Exception): ''' Thrown if a PaxosMessage subclass is passed to a class that does not support it ''' class MessageHandler (object): def receive(self, msg): ''' Message dispatching function. This function accepts any PaxosMessage subclass and calls the appropriate handler function ''' handler = getattr(self, 'receive_' + msg.__class__.__name__.lower(), None) if handler is None: raise InvalidMessageError('Receiving class does not support messages of type: ' + msg.__class__.__name__) return handler( msg ) class Proposer (MessageHandler): ''' The 'leader' attribute is a boolean value indicating the Proposer's belief in whether or not it is the current leader. This is not a reliable value as multiple nodes may simultaneously believe themselves to be the leader. ''' leader = False proposed_value = None proposal_id = None highest_accepted_id = None promises_received = None nacks_received = None current_prepare_msg = None current_accept_msg = None def __init__(self, network_uid, quorum_size): self.network_uid = network_uid self.quorum_size = quorum_size self.proposal_id = ProposalID(0, network_uid) self.highest_proposal_id = ProposalID(0, network_uid) def propose_value(self, value): ''' Sets the proposal value for this node iff this node is not already aware of a previous proposal value. If the node additionally believes itself to be the current leader, an Accept message will be returned ''' if self.proposed_value is None: self.proposed_value = value if self.leader: self.current_accept_msg = Accept(self.network_uid, self.proposal_id, value) return self.current_accept_msg def prepare(self): ''' Returns a new Prepare message with a proposal id higher than that of any observed proposals. A side effect of this method is to clear the leader flag if it is currently set. ''' self.leader = False self.promises_received = set() self.nacks_received = set() self.proposal_id = ProposalID(self.highest_proposal_id.number + 1, self.network_uid) self.highest_proposal_id = self.proposal_id self.current_prepare_msg = Prepare(self.network_uid, self.proposal_id) return self.current_prepare_msg def observe_proposal(self, proposal_id): ''' Optional method used to update the proposal counter as proposals are seen on the network. When co-located with Acceptors and/or Learners, this method may be used to avoid a message delay when attempting to assume leadership (guaranteed NACK if the proposal number is too low). This method is automatically called for all received Promise and Nack messages. ''' if proposal_id > self.highest_proposal_id: self.highest_proposal_id = proposal_id def receive_nack(self, msg): ''' Returns a new Prepare message if the number of Nacks received reaches a quorum. ''' self.observe_proposal( msg.promised_proposal_id ) if msg.proposal_id == self.proposal_id and self.nacks_received is not None: self.nacks_received.add( msg.from_uid ) if len(self.nacks_received) == self.quorum_size: return self.prepare() # Lost leadership or failed to acquire it def receive_promise(self, msg): ''' Returns an Accept messages if a quorum of Promise messages is achieved ''' self.observe_proposal( msg.proposal_id ) if not self.leader and msg.proposal_id == self.proposal_id and msg.from_uid not in self.promises_received: self.promises_received.add( msg.from_uid ) if msg.last_accepted_id > self.highest_accepted_id: self.highest_accepted_id = msg.last_accepted_id if msg.last_accepted_value is not None: self.proposed_value = msg.last_accepted_value if len(self.promises_received) == self.quorum_size: self.leader = True if self.proposed_value is not None: self.current_accept_msg = Accept(self.network_uid, self.proposal_id, self.proposed_value) return self.current_accept_msg class Acceptor (MessageHandler): ''' Acceptors act as the fault-tolerant memory for Paxos. To ensure correctness in the presense of failure, Acceptors must be able to remember the promises they've made even in the event of power outages. Consequently, any changes to the promised_id, accepted_id, and/or accepted_value must be persisted to stable media prior to sending promise and accepted messages. When an Acceptor instance is composed alongside a Proposer instance, it is generally advantageous to call the proposer's observe_proposal() method when methods of this class are called. ''' def __init__(self, network_uid, promised_id=None, accepted_id=None, accepted_value=None): ''' promised_id, accepted_id, and accepted_value should be provided if and only if this instance is recovering from persistent state. ''' self.network_uid = network_uid self.promised_id = promised_id self.accepted_id = accepted_id self.accepted_value = accepted_value def receive_prepare(self, msg): ''' Returns either a Promise or a Nack in response. The Acceptor's state must be persisted to disk prior to transmitting the Promise message. ''' if msg.proposal_id >= self.promised_id: self.promised_id = msg.proposal_id return Promise(self.network_uid, msg.from_uid, self.promised_id, self.accepted_id, self.accepted_value) else: return Nack(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) def receive_accept(self, msg): ''' Returns either an Accepted or Nack message in response. The Acceptor's state must be persisted to disk prior to transmitting the Accepted message. ''' if msg.proposal_id >= self.promised_id: self.promised_id = msg.proposal_id self.accepted_id = msg.proposal_id self.accepted_value = msg.proposal_value return Accepted(self.network_uid, msg.proposal_id, msg.proposal_value) else: return Nack(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) class Learner (MessageHandler): ''' This class listens to Accepted messages, determines when the final value is selected, and tracks which peers have accepted the final value. ''' class ProposalStatus (object): __slots__ = ['accept_count', 'retain_count', 'acceptors', 'value'] def __init__(self, value): self.accept_count = 0 self.retain_count = 0 self.acceptors = set() self.value = value def __init__(self, network_uid, quorum_size): self.network_uid = network_uid self.quorum_size = quorum_size self.proposals = dict() # maps proposal_id => ProposalStatus self.acceptors = dict() # maps from_uid => last_accepted_proposal_id self.final_value = None self.final_acceptors = None # Will be a set of acceptor UIDs once the final value is chosen self.final_proposal_id = None def receive_accepted(self, msg): ''' Called when an Accepted message is received from an acceptor. Once the final value is determined, the return value of this method will be a Resolution message containing the consentual value. Subsequent calls after the resolution is chosen will continue to add new Acceptors to the final_acceptors set and return Resolution messages. ''' if self.final_value is not None: if msg.proposal_id >= self.final_proposal_id and msg.proposal_value == self.final_value: self.final_acceptors.add( msg.from_uid ) return Resolution(self.network_uid, self.final_value) last_pn = self.acceptors.get(msg.from_uid) if msg.proposal_id <= last_pn: return # Old message self.acceptors[ msg.from_uid ] = msg.proposal_id if last_pn is not None: ps = self.proposals[ last_pn ] ps.retain_count -= 1 ps.acceptors.remove(msg.from_uid) if ps.retain_count == 0: del self.proposals[ last_pn ] if not msg.proposal_id in self.proposals: self.proposals[ msg.proposal_id ] = Learner.ProposalStatus(msg.proposal_value) ps = self.proposals[ msg.proposal_id ] assert msg.proposal_value == ps.value, 'Value mismatch for single proposal!' ps.accept_count += 1 ps.retain_count += 1 ps.acceptors.add(msg.from_uid) if ps.accept_count == self.quorum_size: self.final_proposal_id = msg.proposal_id self.final_value = msg.proposal_value self.final_acceptors = ps.acceptors self.proposals = None self.acceptors = None return Resolution( self.network_uid, self.final_value ) class PaxosInstance (Proposer, Acceptor, Learner): ''' Aggregate Proposer, Accepter, & Learner class. ''' def __init__(self, network_uid, quorum_size, promised_id=None, accepted_id=None, accepted_value=None): Proposer.__init__(self, network_uid, quorum_size) Acceptor.__init__(self, network_uid, promised_id, accepted_id, accepted_value) Learner.__init__(self, network_uid, quorum_size) def receive_prepare(self, msg): self.observe_proposal( msg.proposal_id ) return super(PaxosInstance,self).receive_prepare(msg) def receive_accept(self, msg): self.observe_proposal( msg.proposal_id ) return super(PaxosInstance,self).receive_accept(msg)