paxos
- Things fail. You can never count on anything being reliable. Even if you have a perfectly bug-free software, and hardware that never breaks, you’ve still got to deal with the fact that network connections can break, or messages within a network can get lost, or that some bozo might sever your network connection with a bulldozer. (That really happened while I was at Google!)
- Given (1), you can never rely on one copy of anything, because that copy might become unavailable due to a failure. So you need to keep multiple copies, and those copies need to be consistent – meaning that at any time, all of the copies agree about their contents.
- There’s no way to maintain a single completely consistent view of time between multiple computers. Due to inconsistencies in individual machine performance, and variable network delays, variable storage latency, and several other factors, there’s no canonical way of saying that for two events X and Y, “X happened before Y”. What that means is that when you try to maintain a consistent set of data, you can’t just say “Run all of the events in order”, because while one server maintaining one copy might “know” that X happened before Y, another server maintaining another copy might be just as certain that Y happened before X.
- Processors (aka participants, servers, computers):
- operate at any speed. No two processors necessarily operate at the same speed.
- may fail without warning.
- may rejoin after recovering from a failure.
- are cooperative (in the sense that they do not attempt to cause failures).
- Network:
- Delivers messages between any pair of processors.
- Transmits messages asynchronously.
- Delivers messages at arbitrary speeds.
- Does not guarantee that messages will be delivered in the order in which they were transmitted.
- Does guarantee that a message, if delivered, will be delivered correctly, without any changes.
- May fail to deliver a message.
- May deliver multiple copies of the same message.
- Client
- The client is not part of the paxos cluster. It’s an external entity whose actions trigger state changes by makingrequeststo the paxos system. Each state update in paxos is initiated by a client request, and completed by a reply to the client.
- Acceptor
- An acceptor (also called a voter) is a participant in the maintanence of distributed storage. A state change in a paxos cluster does not occur until a majority (quorum) of acceptors agree upon it.
- Proposer
- A proposer recieves a request from the client, and attempts to get a quorum of acceptors to agree on it.
- Leader
- One of the proposers is special. It is the single proposer who most recently had a proposal accepted. In many paxos implementations, there is only one active proposer serving client requests: the only time the other proposers send proposals is when the current leader fails, and a new one needs to be selected.
- Learner
- The learner is the real service provided by the paxos cluster. Once a proposal is accepted, alearnerprocesses the request from the client, and sends it the result.
- Phase One: Prepare/Promise
- Proposer: A proposer attempts to start setting a new consensus by sending a Prepare(N) message to a quorum of acceptors. It can send to any group of acceptors, so long as that group forms a majority of the acceptors. The prepare message specifies a numeric identifier N for its proposal, which is larger than any proposal that’s been sent by this proposer.
-
Acceptors:Each acceptor, upon receiving the proposal, checks if the N-value from the prepare message is greater than any proposal from the current round that it has accepted. If so, it sends a reply called a Promise to the proposer, promising that it will never accept any proposal with a number less than N. If the acceptor has accepted a proposal with number less than
in the current round, then it includes the pair
consisting of the proposed consensus value
and the number
of the accepted proposal that proposed
.
The acceptor thus sends a message Promise(N, (v, nv)) (if it has accepted a proposal this round) or Promise(N, null) (if it has not yet accepted a proposal with a number less than N).
Once it’s sent a promising message, it must not accept any request for a proposal with number less than N. Note though that this does not mean that the acceptor promises to accept the proposal: all it’s doing is promising not to accept any proposal with number less than N! If in receives a message Prepare(N+1), it’s free to promise that – but if it does, it will no longer be able to accept the proposal for N.
(If N is smaller than the number of any proposal promised or accepted by the acceptor, then in the original version of Paxos, the acceptor does nothing; in some optimizations of the protocol, it replies Reject(nv).)
-
What this phase does is allow a proposer to determine whether or not a new proposal is even worth considering. If a quorum (majority) of acceptors send promises, then it can move on to phase 2.
- Phase Two: Accept!/Accepted
When a proposer receives promises from a quorum of acceptors, then it moves forward to try to actually commit the proposal. In order to do this, it needs to choose a value for the proposal. If any of the Promise messages contained a value, then the value of this proposal must be set to the value of the highest proposal number in any of the promises. If all of the promises were empty, then the proposer can choose any value that it wants for the proposal.Once the proposer has chosen a value, then it sends a message Accept!(N, V) to a quorum of acceptors. This is typically written with the exclamation point because it’s really a command to the acceptors: they’re being told to accept the proposal if they can.When an acceptor receives an Accept!(N, v) message, if it has not issued a promise for a proposal with a number greater than N, then it must accept the message. It accepts the proposal by sending a message Accepted(N, V) to both the original proposer and all of the learners.When Accepted messages have been received from a quorum of acceptors, the new value V becomes the consensus value for the Paxos cluster, and the new proposal number N is fully committed.
- a client is a user attempting to run a new job on the cluster. It sends a request to the scheduler detailing the set of resources that it wants to request.
- Each duplicate of the scheduler is a proposer, an acceptor, and a learner. There’s one active instance of the scheduler, which is the leader. When a client wants to schedule a job, its request gets sent to the leading scheduler.
- In the normal non-error case, this works as follows:
- When a scheduling request is received, the leader proposes scheduling the job, by sending a message to all of the other schedulers saying that it wants to schedule job N.
- The other schedulers, if they haven’t seen a proposal for a job with a number greater than I, make promises to accept that proposal.
- The leading scheduler chooses resources for the job, and then sends an Accept! message to the other schedulers.
- The other schedulers reply accepting the scheduling. The non-leader schedulers, acting as learnings, record the scheduling information, and the leader actually starts the job.
- Errors occur when there was some kind of failure. In that case, we don’t necessarily know who the leader is – so we get multiple schedulers trying to act as if they’re the leader. So they each send proposals. Whichever proposal had the largest proposal number will eventually get accepted, and its proposer becomes the new leader.
It’s a pretty simple thing – the core concept is simply that no consensus proposal is considered “committed” until it’s been accepted by a majority of the participants. And if it’s been accepted by a majority of the participants, that means that no conflicting proposal can ever reach consensus – because that would require at least one participant to accept 2 conflicting proposals.
- Validity
- No value ever reaches consensus without first being proposed, and having its proposal accepted.
- Agreement
- No two distinct values ever reach consensus at the same time.
- An acceptor can only accept a proposal P if and only if it has not yet made a promise
for a proposalwhere n>=P is the consensus value of the
highest numbered proposal that has been accepted before this proposal.
Python Libraries
Some simple python implementations that I would like to point here as references to anyone interested in the nuts and bolts of the underlying engineering are listed below.
- https://github.com/cocagne/paxos/blob/master/paxos/practical.py
- https://github.com/cocagne/python-composable-paxos/blob/master/composable_paxos.py
- https://github.com/gdub/python-paxos/tree/master/paxos
Obviously it is even better to display some code when necessary. I strongly believe that the following interfaces of python are self-explanatory.
[sourcecode language=”python”]
”’
This module provides an implementation of the Paxos algorithm as
a set of composable classes.
The code is available:
https://raw.githubusercontent.com/cocagne/python-composable-paxos/master/composable_paxos.py
”’
import collections
# ProposalID
#
# In order for the Paxos algorithm to function, all proposal ids must be
# unique. A simple way to ensure this is to include the proposer’s unique
# id in the proposal id.
#
# Python tuples allow the proposal number and the UID to be combined in a
# manner that supports comparison in the expected manner:
#
# (4,’C’) > (4,’B’) > (3,’Z’)
#
# Named tuples from the collections module support all of the regular
# tuple operations but additionally allow access to the contents by
# name so the numeric component of the proposal ID may be referred to
# via ‘proposal_id.number’ instead of ‘proposal_id[0]’.
#
ProposalID = collections.namedtuple(‘ProposalID’, [‘number’, ‘uid’])
class PaxosMessage (object):
”’
Base class for all messages defined in this module
”’
from_uid = None # Set by subclass constructor
class Prepare (PaxosMessage):
”’
Prepare messages should be broadcast to all Acceptors.
”’
def __init__(self, from_uid, proposal_id):
self.from_uid = from_uid
self.proposal_id = proposal_id
class Nack (PaxosMessage):
”’
NACKs are technically optional though few practical applications will
want to omit their use. They are used to signal a proposer that their
current proposal number is out of date and that a new one should be
chosen. NACKs may be sent in response to both Prepare and Accept
messages
”’
def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id):
self.from_uid = from_uid
self.proposal_id = proposal_id
self.proposer_uid = proposer_uid
self.promised_proposal_id = promised_proposal_id
class Promise (PaxosMessage):
”’
Promise messages should be sent to at least the Proposer specified in
the proposer_uid field
”’
def __init__(self, from_uid, proposer_uid, proposal_id, last_accepted_id, last_accepted_value):
self.from_uid = from_uid
self.proposer_uid = proposer_uid
self.proposal_id = proposal_id
self.last_accepted_id = last_accepted_id
self.last_accepted_value = last_accepted_value
class Accept (PaxosMessage):
”’
Accept messages should be broadcast to all Acceptors
”’
def __init__(self, from_uid, proposal_id, proposal_value):
self.from_uid = from_uid
self.proposal_id = proposal_id
self.proposal_value = proposal_value
class Accepted (PaxosMessage):
”’
Accepted messages should be sent to all Learners
”’
def __init__(self, from_uid, proposal_id, proposal_value):
self.from_uid = from_uid
self.proposal_id = proposal_id
self.proposal_value = proposal_value
class Resolution (PaxosMessage):
”’
Optional message used to indicate that the final value has been selected
”’
def __init__(self, from_uid, value):
self.from_uid = from_uid
self.value = value
class InvalidMessageError (Exception):
”’
Thrown if a PaxosMessage subclass is passed to a class that does not
support it
”’
class MessageHandler (object):
def receive(self, msg):
”’
Message dispatching function. This function accepts any PaxosMessage subclass and calls
the appropriate handler function
”’
handler = getattr(self, ‘receive_’ + msg.__class__.__name__.lower(), None)
if handler is None:
raise InvalidMessageError(‘Receiving class does not support messages of type: ‘ + msg.__class__.__name__)
return handler( msg )
class Proposer (MessageHandler):
”’
The ‘leader’ attribute is a boolean value indicating the Proposer’s
belief in whether or not it is the current leader. This is not a reliable
value as multiple nodes may simultaneously believe themselves to be the
leader.
”’
leader = False
proposed_value = None
proposal_id = None
highest_accepted_id = None
promises_received = None
nacks_received = None
current_prepare_msg = None
current_accept_msg = None
def __init__(self, network_uid, quorum_size):
self.network_uid = network_uid
self.quorum_size = quorum_size
self.proposal_id = ProposalID(0, network_uid)
self.highest_proposal_id = ProposalID(0, network_uid)
def propose_value(self, value):
”’
Sets the proposal value for this node iff this node is not already aware of
a previous proposal value. If the node additionally believes itself to be
the current leader, an Accept message will be returned
”’
if self.proposed_value is None:
self.proposed_value = value
if self.leader:
self.current_accept_msg = Accept(self.network_uid, self.proposal_id, value)
return self.current_accept_msg
def prepare(self):
”’
Returns a new Prepare message with a proposal id higher than
that of any observed proposals. A side effect of this method is
to clear the leader flag if it is currently set.
”’
self.leader = False
self.promises_received = set()
self.nacks_received = set()
self.proposal_id = ProposalID(self.highest_proposal_id.number + 1, self.network_uid)
self.highest_proposal_id = self.proposal_id
self.current_prepare_msg = Prepare(self.network_uid, self.proposal_id)
return self.current_prepare_msg
def observe_proposal(self, proposal_id):
”’
Optional method used to update the proposal counter as proposals are
seen on the network. When co-located with Acceptors and/or Learners,
this method may be used to avoid a message delay when attempting to
assume leadership (guaranteed NACK if the proposal number is too low).
This method is automatically called for all received Promise and Nack
messages.
”’
if proposal_id > self.highest_proposal_id:
self.highest_proposal_id = proposal_id
def receive_nack(self, msg):
”’
Returns a new Prepare message if the number of Nacks received reaches
a quorum.
”’
self.observe_proposal( msg.promised_proposal_id )
if msg.proposal_id == self.proposal_id and self.nacks_received is not None:
self.nacks_received.add( msg.from_uid )
if len(self.nacks_received) == self.quorum_size:
return self.prepare() # Lost leadership or failed to acquire it
def receive_promise(self, msg):
”’
Returns an Accept messages if a quorum of Promise messages is achieved
”’
self.observe_proposal( msg.proposal_id )
if not self.leader and msg.proposal_id == self.proposal_id and msg.from_uid not in self.promises_received:
self.promises_received.add( msg.from_uid )
if msg.last_accepted_id > self.highest_accepted_id:
self.highest_accepted_id = msg.last_accepted_id
if msg.last_accepted_value is not None:
self.proposed_value = msg.last_accepted_value
if len(self.promises_received) == self.quorum_size:
self.leader = True
if self.proposed_value is not None:
self.current_accept_msg = Accept(self.network_uid, self.proposal_id, self.proposed_value)
return self.current_accept_msg
class Acceptor (MessageHandler):
”’
Acceptors act as the fault-tolerant memory for Paxos. To ensure correctness
in the presense of failure, Acceptors must be able to remember the promises
they’ve made even in the event of power outages. Consequently, any changes
to the promised_id, accepted_id, and/or accepted_value must be persisted to
stable media prior to sending promise and accepted messages.
When an Acceptor instance is composed alongside a Proposer instance, it
is generally advantageous to call the proposer’s observe_proposal()
method when methods of this class are called.
”’
def __init__(self, network_uid, promised_id=None, accepted_id=None, accepted_value=None):
”’
promised_id, accepted_id, and accepted_value should be provided if and only if this
instance is recovering from persistent state.
”’
self.network_uid = network_uid
self.promised_id = promised_id
self.accepted_id = accepted_id
self.accepted_value = accepted_value
def receive_prepare(self, msg):
”’
Returns either a Promise or a Nack in response. The Acceptor’s state must be persisted to disk
prior to transmitting the Promise message.
”’
if msg.proposal_id >= self.promised_id:
self.promised_id = msg.proposal_id
return Promise(self.network_uid, msg.from_uid, self.promised_id, self.accepted_id, self.accepted_value)
else:
return Nack(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id)
def receive_accept(self, msg):
”’
Returns either an Accepted or Nack message in response. The Acceptor’s state must be persisted
to disk prior to transmitting the Accepted message.
”’
if msg.proposal_id >= self.promised_id:
self.promised_id = msg.proposal_id
self.accepted_id = msg.proposal_id
self.accepted_value = msg.proposal_value
return Accepted(self.network_uid, msg.proposal_id, msg.proposal_value)
else:
return Nack(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id)
class Learner (MessageHandler):
”’
This class listens to Accepted messages, determines when the final value is
selected, and tracks which peers have accepted the final value.
”’
class ProposalStatus (object):
__slots__ = [‘accept_count’, ‘retain_count’, ‘acceptors’, ‘value’]
def __init__(self, value):
self.accept_count = 0
self.retain_count = 0
self.acceptors = set()
self.value = value
def __init__(self, network_uid, quorum_size):
self.network_uid = network_uid
self.quorum_size = quorum_size
self.proposals = dict() # maps proposal_id => ProposalStatus
self.acceptors = dict() # maps from_uid => last_accepted_proposal_id
self.final_value = None
self.final_acceptors = None # Will be a set of acceptor UIDs once the final value is chosen
self.final_proposal_id = None
def receive_accepted(self, msg):
”’
Called when an Accepted message is received from an acceptor. Once the final value
is determined, the return value of this method will be a Resolution message containing
the consentual value. Subsequent calls after the resolution is chosen will continue to add
new Acceptors to the final_acceptors set and return Resolution messages.
”’
if self.final_value is not None:
if msg.proposal_id >= self.final_proposal_id and msg.proposal_value == self.final_value:
self.final_acceptors.add( msg.from_uid )
return Resolution(self.network_uid, self.final_value)
last_pn = self.acceptors.get(msg.from_uid)
if msg.proposal_id <= last_pn:
return # Old message
self.acceptors[ msg.from_uid ] = msg.proposal_id
if last_pn is not None:
ps = self.proposals[ last_pn ]
ps.retain_count -= 1
ps.acceptors.remove(msg.from_uid)
if ps.retain_count == 0:
del self.proposals[ last_pn ]
if not msg.proposal_id in self.proposals:
self.proposals[ msg.proposal_id ] = Learner.ProposalStatus(msg.proposal_value)
ps = self.proposals[ msg.proposal_id ]
assert msg.proposal_value == ps.value, ‘Value mismatch for single proposal!’
ps.accept_count += 1
ps.retain_count += 1
ps.acceptors.add(msg.from_uid)
if ps.accept_count == self.quorum_size:
self.final_proposal_id = msg.proposal_id
self.final_value = msg.proposal_value
self.final_acceptors = ps.acceptors
self.proposals = None
self.acceptors = None
return Resolution( self.network_uid, self.final_value )
class PaxosInstance (Proposer, Acceptor, Learner):
”’
Aggregate Proposer, Accepter, & Learner class.
”’
def __init__(self, network_uid, quorum_size, promised_id=None, accepted_id=None, accepted_value=None):
Proposer.__init__(self, network_uid, quorum_size)
Acceptor.__init__(self, network_uid, promised_id, accepted_id, accepted_value)
Learner.__init__(self, network_uid, quorum_size)
def receive_prepare(self, msg):
self.observe_proposal( msg.proposal_id )
return super(PaxosInstance,self).receive_prepare(msg)
def receive_accept(self, msg):
self.observe_proposal( msg.proposal_id )
return super(PaxosInstance,self).receive_accept(msg)
[/sourcecode]