paxos, a revelation of distributed consensus when scaling a distributed architecture

paxos

paxos

The work that I mainly do in real life is focused on architecting systems and platforms and this has to do a lot with DevOps. I don’t know if this is a good or a bad thing but nevertheless, this is how I spend my days. In today’s environment, that means that I’ve spent a lot of time working on tools that, in one way or another, help other developers deal with distributed systems. In that work, I’ve noticed that there are some really key things that straddle the line between pure math and pure engineering. That’s really interesting to someone like me!
A good example of that is something called paxos. My first exposure to paxos was very interesting. I read the (only) book in Site Reliability Engineering (SRE) and I was exposed to a variety of different concepts that have to do with scaling distributed systems, availability, consistency and more. Paxos was one of the outcomes of this book that I really didn’t expect to learn, but I was amazed by.
Paxos is a system for managing consensus.
In distributed systems, there a collection of hard problems that you constantly need to deal with.
  1. Things fail. You can never count on anything being reliable. Even if you have a perfectly bug-free software, and hardware that never breaks, you’ve still got to deal with the fact that network connections can break, or messages within a network can get lost, or that some bozo might sever your network connection with a bulldozer. (That really happened while I was at Google!)
  2. Given (1), you can never rely on one copy of anything, because that copy might become unavailable due to a failure. So you need to keep multiple copies, and those copies need to be consistent – meaning that at any time, all of the copies agree about their contents.
  3. There’s no way to maintain a single completely consistent view of time between multiple computers. Due to inconsistencies in individual machine performance, and variable network delays, variable storage latency, and several other factors, there’s no canonical way of saying that for two events X and Y, “X happened before Y”. What that means is that when you try to maintain a consistent set of data, you can’t just say “Run all of the events in order”, because while one server maintaining one copy might “know” that X happened before Y, another server maintaining another copy might be just as certain that Y happened before X.
In a world where you can’t count on different agents seeing events in the same order, and where you expect things to be constantly failing, how can you make sure that any distributed system you build ends up with a consistent view of reality?
The answer is a consensus protocol. You need to create a mechanism based on communication between the copies of your system that allows them to maintain a consistent consensus of what the current state of the world is, even in the presence of failures of machines, storage systems, and communications.
paxos is a very elegant, reasonably simple consensus protocol.

Let’s get a bit more precise. Paxos is built on a model of storage. The original application of it was a consistent database, so it’s built around the idea of keeping data consistent. In paxos, the state of the storage is modeled as a sequence of transactions. Each transaction is a pair(t, v), where t is a numeric transaction identifier, and a v is a transaction value.
The state of the system being modeled is a sequence of transaction pairs,[(ti, vi), (tj, vj), ..], where the values are increasing as you progress through the sequence. As time passes, new transaction pairs can be added to the state.
The focus of the paxos protocol is ensuring that in a collection of2n+1participants, all surviving participants will agree on the current value of the state, even if up to participants fail, and even if messages can be delivered in arbitrary order.
Before I go further into the description of paxos, we need to look at the basic assumptions that underlie it. Like any formal model, it’s not defined in terms of real computers. It’s defined in terms of an abstraction that approximates reality. In this case, the approximation is quite good, but we still need to go through the basic assumptions that make up its model of the universe.
  1. Processors (aka participants, servers, computers):
    1. operate at any speed. No two processors necessarily operate at the same speed.
    2. may fail without warning.
    3. may rejoin after recovering from a failure.
    4. are cooperative (in the sense that they do not attempt to cause failures).
  2. Network:
    1. Delivers messages between any pair of processors.
    2. Transmits messages asynchronously.
    3. Delivers messages at arbitrary speeds.
    4. Does not guarantee that messages will be delivered in the order in which they were transmitted.
    5. Does guarantee that a message, if delivered, will be delivered correctly, without any changes.
    6. May fail to deliver a message.
    7. May deliver multiple copies of the same message.
In short, everything can fail at any time; after failure, participants can recover and rejoin the system; and no part of the system acts in an actively adversarial way.
The protocol describes the behavior of the system in terms of a collection of roles. A participant can have more than one role in the system – in fact, in most implementations of paxos, all partipants do have multiple roles. The roles are:
Client
The client is not part of the paxos cluster. It’s an external entity whose actions trigger state changes by makingrequeststo the paxos system. Each state update in paxos is initiated by a client request, and completed by a reply to the client.
Acceptor
An acceptor (also called a voter) is a participant in the maintanence of distributed storage. A state change in a paxos cluster does not occur until a majority (quorum) of acceptors agree upon it.
Proposer
A proposer recieves a request from the client, and attempts to get a quorum of acceptors to agree on it.
Leader
One of the proposers is special. It is the single proposer who most recently had a proposal accepted. In many paxos implementations, there is only one active proposer serving client requests: the only time the other proposers send proposals is when the current leader fails, and a new one needs to be selected.
Learner
The learner is the real service provided by the paxos cluster. Once a proposal is accepted, alearnerprocesses the request from the client, and sends it the result.
In a typical paxos cluster, the client sends requests to a proposer. The proposer sends a proposal to update the state with the new client request and attempts to convince a majority of the acceptors to accept it. Once a majority accepts it, the client request is processed by the learner, and a result is returned to the client.
The meat of paxos the protocol that the proposer gets a majority of acceptors to agree on a proposal, and how that protocol process ensures that the collection of acceptors maintains a consistent state.

The protocol itself is pretty simple. Each round is effectively independent and consists of a process of attempting to reach consensus. Within each round, finding consensus is a two-phase process, where each phase consists of a message sent from a proposer to a group of acceptors, and a reply from the acceptors to the proposer.
  1. Phase One: Prepare/Promise
    • Proposer: A proposer attempts to start setting a new consensus by sending a Prepare(N) message to a quorum of acceptors. It can send to any group of acceptors, so long as that group forms a majority of the acceptors. The prepare message specifies a numeric identifier N for its proposal, which is larger than any proposal that’s been sent by this proposer.
    • Acceptors:
      Each acceptor, upon receiving the proposal, checks if the N-value from the prepare message is greater than any proposal from the current round that it has accepted. If so, it sends a reply called a Promise to the proposer, promising that it will never accept any proposal with a number less than N. If the acceptor has accepted a proposal with number less than in the current round, then it includes the pair consisting of the proposed consensus value and the number of the accepted proposal that proposed .

      The acceptor thus sends a message Promise(N, (v, nv)) (if it has accepted a proposal this round) or Promise(N, null) (if it has not yet accepted a proposal with a number less than N).

      Once it’s sent a promising message, it must not accept any request for a proposal with number less than N. Note though that this does not mean that the acceptor promises to accept the proposal: all it’s doing is promising not to accept any proposal with number less than N! If in receives a message Prepare(N+1), it’s free to promise that – but if it does, it will no longer be able to accept the proposal for N.

      (If N is smaller than the number of any proposal promised or accepted by the acceptor, then in the original version of Paxos, the acceptor does nothing; in some optimizations of the protocol, it replies Reject(nv).)

  2. What this phase does is allow a proposer to determine whether or not a new proposal is even worth considering. If a quorum (majority) of acceptors send promises, then it can move on to phase 2.
  3. Phase Two: Accept!/Accepted
    When a proposer receives promises from a quorum of acceptors, then it moves forward to try to actually commit the proposal. In order to do this, it needs to choose a value for the proposal. If any of the Promise messages contained a value, then the value of this proposal must be set to the value of the highest proposal number in any of the promises. If all of the promises were empty, then the proposer can choose any value that it wants for the proposal.Once the proposer has chosen a value, then it sends a message Accept!(N, V) to a quorum of acceptors. This is typically written with the exclamation point because it’s really a command to the acceptors: they’re being told to accept the proposal if they can.When an acceptor receives an Accept!(N, v) message, if it has not issued a promise for a proposal with a number greater than N, then it must accept the message. It accepts the proposal by sending a message Accepted(N, V) to both the original proposer and all of the learners.When Accepted messages have been received from a quorum of acceptors, the new value V becomes the consensus value for the Paxos cluster, and the new proposal number N is fully committed.
As with so many things, this is easier to understand when you think about an example. One use of Paxos that I’ve worked with is in a cluster scheduling service. In that system:
  • a client is a user attempting to run a new job on the cluster. It sends a request to the scheduler detailing the set of resources that it wants to request.
  • Each duplicate of the scheduler is a proposer, an acceptor, and a learner. There’s one active instance of the scheduler, which is the leader. When a client wants to schedule a job, its request gets sent to the leading scheduler.
  • In the normal non-error case, this works as follows:
    1. When a scheduling request is received, the leader proposes scheduling the job, by sending a message to all of the other schedulers saying that it wants to schedule job N.
    2. The other schedulers, if they haven’t seen a proposal for a job with a number greater than I, make promises to accept that proposal.
    3. The leading scheduler chooses resources for the job, and then sends an Accept! message to the other schedulers.
    4. The other schedulers reply accepting the scheduling. The non-leader schedulers, acting as learnings, record the scheduling information, and the leader actually starts the job.
  • Errors occur when there was some kind of failure. In that case, we don’t necessarily know who the leader is – so we get multiple schedulers trying to act as if they’re the leader. So they each send proposals. Whichever proposal had the largest proposal number will eventually get accepted, and its proposer becomes the new leader.

It’s a pretty simple thing – the core concept is simply that no consensus proposal is considered “committed” until it’s been accepted by a majority of the participants. And if it’s been accepted by a majority of the participants, that means that no conflicting proposal can ever reach consensus – because that would require at least one participant to accept 2 conflicting proposals.

But there’s still a bit of formality that’s working looking at. Exactly what guarantees does paxos give? What properties does paxos-style consensus have?
Even the formal properties of paxos are easy to understand. Paxos provides two key properties: validity, and agreement.
Validity
No value ever reaches consensus without first being proposed, and having its proposal accepted.
Agreement
No two distinct values ever reach consensus at the same time.
You can easily prove those two properties. In fact, the proof is completely obvious once you recognize that the paxos protocol has two invariants (and those invariants are themselves clear from the definition of the protocol!):
  1. An acceptor can only accept a proposal P if and only if it has not yet made a promise
    for a proposal where n>=P is the consensus value of the
    highest numbered proposal that has been accepted before this proposal.
Getting back to the beginning: the point of all of this is to have a system in which we can be sure that things work correctly even in the presence of failures. In paxos, as long as at some point there was a quorum of machines that come to an agreement, then any failure that leaves a surviving quorum of machines must have overlapped with the previous quorum – which means that the previous consensus still remains in effect, and will be propagated to the remaining participants. If you’ve got 5 machines, then two can fail, and you won’t lose consistency among the remaining ones.

Python Libraries

Some simple python implementations that I would like to point here as references to anyone interested in the nuts and bolts of the underlying engineering are listed below.

Obviously it is even better to display some code when necessary. I strongly believe that the following interfaces of python are self-explanatory.

[sourcecode language=”python”]
”’
This module provides an implementation of the Paxos algorithm as
a set of composable classes.
The code is available:
https://raw.githubusercontent.com/cocagne/python-composable-paxos/master/composable_paxos.py
”’

import collections

# ProposalID
#
# In order for the Paxos algorithm to function, all proposal ids must be
# unique. A simple way to ensure this is to include the proposer’s unique
# id in the proposal id.
#
# Python tuples allow the proposal number and the UID to be combined in a
# manner that supports comparison in the expected manner:
#
# (4,’C’) > (4,’B’) > (3,’Z’)
#
# Named tuples from the collections module support all of the regular
# tuple operations but additionally allow access to the contents by
# name so the numeric component of the proposal ID may be referred to
# via ‘proposal_id.number’ instead of ‘proposal_id[0]’.
#
ProposalID = collections.namedtuple(‘ProposalID’, [‘number’, ‘uid’])

class PaxosMessage (object):
”’
Base class for all messages defined in this module
”’
from_uid = None # Set by subclass constructor

class Prepare (PaxosMessage):
”’
Prepare messages should be broadcast to all Acceptors.
”’
def __init__(self, from_uid, proposal_id):
self.from_uid = from_uid
self.proposal_id = proposal_id

class Nack (PaxosMessage):
”’
NACKs are technically optional though few practical applications will
want to omit their use. They are used to signal a proposer that their
current proposal number is out of date and that a new one should be
chosen. NACKs may be sent in response to both Prepare and Accept
messages
”’
def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id):
self.from_uid = from_uid
self.proposal_id = proposal_id
self.proposer_uid = proposer_uid
self.promised_proposal_id = promised_proposal_id

class Promise (PaxosMessage):
”’
Promise messages should be sent to at least the Proposer specified in
the proposer_uid field
”’
def __init__(self, from_uid, proposer_uid, proposal_id, last_accepted_id, last_accepted_value):
self.from_uid = from_uid
self.proposer_uid = proposer_uid
self.proposal_id = proposal_id
self.last_accepted_id = last_accepted_id
self.last_accepted_value = last_accepted_value

class Accept (PaxosMessage):
”’
Accept messages should be broadcast to all Acceptors
”’
def __init__(self, from_uid, proposal_id, proposal_value):
self.from_uid = from_uid
self.proposal_id = proposal_id
self.proposal_value = proposal_value

class Accepted (PaxosMessage):
”’
Accepted messages should be sent to all Learners
”’
def __init__(self, from_uid, proposal_id, proposal_value):
self.from_uid = from_uid
self.proposal_id = proposal_id
self.proposal_value = proposal_value

class Resolution (PaxosMessage):
”’
Optional message used to indicate that the final value has been selected
”’
def __init__(self, from_uid, value):
self.from_uid = from_uid
self.value = value

class InvalidMessageError (Exception):
”’
Thrown if a PaxosMessage subclass is passed to a class that does not
support it
”’

class MessageHandler (object):

def receive(self, msg):
”’
Message dispatching function. This function accepts any PaxosMessage subclass and calls
the appropriate handler function
”’
handler = getattr(self, ‘receive_’ + msg.__class__.__name__.lower(), None)
if handler is None:
raise InvalidMessageError(‘Receiving class does not support messages of type: ‘ + msg.__class__.__name__)
return handler( msg )

class Proposer (MessageHandler):
”’
The ‘leader’ attribute is a boolean value indicating the Proposer’s
belief in whether or not it is the current leader. This is not a reliable
value as multiple nodes may simultaneously believe themselves to be the
leader.
”’

leader = False
proposed_value = None
proposal_id = None
highest_accepted_id = None
promises_received = None
nacks_received = None
current_prepare_msg = None
current_accept_msg = None

def __init__(self, network_uid, quorum_size):
self.network_uid = network_uid
self.quorum_size = quorum_size
self.proposal_id = ProposalID(0, network_uid)
self.highest_proposal_id = ProposalID(0, network_uid)

def propose_value(self, value):
”’
Sets the proposal value for this node iff this node is not already aware of
a previous proposal value. If the node additionally believes itself to be
the current leader, an Accept message will be returned
”’
if self.proposed_value is None:
self.proposed_value = value

if self.leader:
self.current_accept_msg = Accept(self.network_uid, self.proposal_id, value)
return self.current_accept_msg

def prepare(self):
”’
Returns a new Prepare message with a proposal id higher than
that of any observed proposals. A side effect of this method is
to clear the leader flag if it is currently set.
”’

self.leader = False
self.promises_received = set()
self.nacks_received = set()
self.proposal_id = ProposalID(self.highest_proposal_id.number + 1, self.network_uid)
self.highest_proposal_id = self.proposal_id
self.current_prepare_msg = Prepare(self.network_uid, self.proposal_id)

return self.current_prepare_msg

def observe_proposal(self, proposal_id):
”’
Optional method used to update the proposal counter as proposals are
seen on the network. When co-located with Acceptors and/or Learners,
this method may be used to avoid a message delay when attempting to
assume leadership (guaranteed NACK if the proposal number is too low).
This method is automatically called for all received Promise and Nack
messages.
”’
if proposal_id > self.highest_proposal_id:
self.highest_proposal_id = proposal_id

def receive_nack(self, msg):
”’
Returns a new Prepare message if the number of Nacks received reaches
a quorum.
”’
self.observe_proposal( msg.promised_proposal_id )

if msg.proposal_id == self.proposal_id and self.nacks_received is not None:
self.nacks_received.add( msg.from_uid )

if len(self.nacks_received) == self.quorum_size:
return self.prepare() # Lost leadership or failed to acquire it

def receive_promise(self, msg):
”’
Returns an Accept messages if a quorum of Promise messages is achieved
”’
self.observe_proposal( msg.proposal_id )

if not self.leader and msg.proposal_id == self.proposal_id and msg.from_uid not in self.promises_received:

self.promises_received.add( msg.from_uid )

if msg.last_accepted_id > self.highest_accepted_id:
self.highest_accepted_id = msg.last_accepted_id
if msg.last_accepted_value is not None:
self.proposed_value = msg.last_accepted_value

if len(self.promises_received) == self.quorum_size:
self.leader = True

if self.proposed_value is not None:
self.current_accept_msg = Accept(self.network_uid, self.proposal_id, self.proposed_value)
return self.current_accept_msg

class Acceptor (MessageHandler):
”’
Acceptors act as the fault-tolerant memory for Paxos. To ensure correctness
in the presense of failure, Acceptors must be able to remember the promises
they’ve made even in the event of power outages. Consequently, any changes
to the promised_id, accepted_id, and/or accepted_value must be persisted to
stable media prior to sending promise and accepted messages.

When an Acceptor instance is composed alongside a Proposer instance, it
is generally advantageous to call the proposer’s observe_proposal()
method when methods of this class are called.
”’

def __init__(self, network_uid, promised_id=None, accepted_id=None, accepted_value=None):
”’
promised_id, accepted_id, and accepted_value should be provided if and only if this
instance is recovering from persistent state.
”’
self.network_uid = network_uid
self.promised_id = promised_id
self.accepted_id = accepted_id
self.accepted_value = accepted_value

def receive_prepare(self, msg):
”’
Returns either a Promise or a Nack in response. The Acceptor’s state must be persisted to disk
prior to transmitting the Promise message.
”’
if msg.proposal_id >= self.promised_id:
self.promised_id = msg.proposal_id
return Promise(self.network_uid, msg.from_uid, self.promised_id, self.accepted_id, self.accepted_value)
else:
return Nack(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id)

def receive_accept(self, msg):
”’
Returns either an Accepted or Nack message in response. The Acceptor’s state must be persisted
to disk prior to transmitting the Accepted message.
”’
if msg.proposal_id >= self.promised_id:
self.promised_id = msg.proposal_id
self.accepted_id = msg.proposal_id
self.accepted_value = msg.proposal_value
return Accepted(self.network_uid, msg.proposal_id, msg.proposal_value)
else:
return Nack(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id)

class Learner (MessageHandler):
”’
This class listens to Accepted messages, determines when the final value is
selected, and tracks which peers have accepted the final value.
”’
class ProposalStatus (object):
__slots__ = [‘accept_count’, ‘retain_count’, ‘acceptors’, ‘value’]
def __init__(self, value):
self.accept_count = 0
self.retain_count = 0
self.acceptors = set()
self.value = value

def __init__(self, network_uid, quorum_size):
self.network_uid = network_uid
self.quorum_size = quorum_size
self.proposals = dict() # maps proposal_id => ProposalStatus
self.acceptors = dict() # maps from_uid => last_accepted_proposal_id
self.final_value = None
self.final_acceptors = None # Will be a set of acceptor UIDs once the final value is chosen
self.final_proposal_id = None

def receive_accepted(self, msg):
”’
Called when an Accepted message is received from an acceptor. Once the final value
is determined, the return value of this method will be a Resolution message containing
the consentual value. Subsequent calls after the resolution is chosen will continue to add
new Acceptors to the final_acceptors set and return Resolution messages.
”’
if self.final_value is not None:
if msg.proposal_id >= self.final_proposal_id and msg.proposal_value == self.final_value:
self.final_acceptors.add( msg.from_uid )
return Resolution(self.network_uid, self.final_value)

last_pn = self.acceptors.get(msg.from_uid)

if msg.proposal_id <= last_pn:
return # Old message

self.acceptors[ msg.from_uid ] = msg.proposal_id

if last_pn is not None:
ps = self.proposals[ last_pn ]
ps.retain_count -= 1
ps.acceptors.remove(msg.from_uid)
if ps.retain_count == 0:
del self.proposals[ last_pn ]

if not msg.proposal_id in self.proposals:
self.proposals[ msg.proposal_id ] = Learner.ProposalStatus(msg.proposal_value)

ps = self.proposals[ msg.proposal_id ]

assert msg.proposal_value == ps.value, ‘Value mismatch for single proposal!’

ps.accept_count += 1
ps.retain_count += 1
ps.acceptors.add(msg.from_uid)

if ps.accept_count == self.quorum_size:
self.final_proposal_id = msg.proposal_id
self.final_value = msg.proposal_value
self.final_acceptors = ps.acceptors
self.proposals = None
self.acceptors = None

return Resolution( self.network_uid, self.final_value )

class PaxosInstance (Proposer, Acceptor, Learner):
”’
Aggregate Proposer, Accepter, & Learner class.
”’

def __init__(self, network_uid, quorum_size, promised_id=None, accepted_id=None, accepted_value=None):
Proposer.__init__(self, network_uid, quorum_size)
Acceptor.__init__(self, network_uid, promised_id, accepted_id, accepted_value)
Learner.__init__(self, network_uid, quorum_size)

def receive_prepare(self, msg):
self.observe_proposal( msg.proposal_id )
return super(PaxosInstance,self).receive_prepare(msg)

def receive_accept(self, msg):
self.observe_proposal( msg.proposal_id )
return super(PaxosInstance,self).receive_accept(msg)
[/sourcecode]