Paxos And Read Consistency

by simbo1905

In the last post we did a deep-dive into gaining consensus in the ordering of writes using Paxos. We explored how this could be used to replicate a stream of commands to create a set of datastore backups with automated failover. We can upgrade that model with compare-and-swap semantics, and add weakly consistent reads from cluster replicas. We can also choose to route reads via the leader yet actually run them on replicas whilst retaining strongly consistent semantics.

The simple replicated map outlined in the last post does not naturally lend itself to avoiding consistency problems when multiple clients are attempting to update the same keys concurrently. Clients can read, modify, then write-back, overwriting each other’s working due to interleaving operations. Simple concurrency without locks means that it is not possible to safely increment a number; nor use a map to safely store which process holds a given resource lock. Concurrent threads trying to update a simple map within a single application face the same synchronisation challenge. So this isn’t a problem solvable by the replication protocol of the cluster of servers; it is a problem with the client application protocol.

The Spinnaker paper presents a better key-value model. A cut down version of their application protocol adds conditional put(key,_,version), and remove(key,version) operations to our map where the version number is supplied on read. This gives the application compare-and-swap semantics which are safe when concurrent processing by multiple clients means that they may be working with stale values. The read command is get(key: String, consistent: Boolean) which returns a value and its version in the store letting you specify a read consistency flag:

The setting of the ‘consistent’ flag is used to choose the consistency level. Setting it to ‘true’ chooses strong consistency, and the latest value is always returned. Setting it to ‘false’ chooses timeline consistency, and a possibly stale value is returned in exchange for better performance.

Here timeline consistency implies that we read committed data, and a single thread of a client sees its own writes, but the reads may see cached values. That sounds complex or not very useful. Yet consider that a web-application user has an experience built upon request-response semantics. If that logical thread of control at the browser does numerous reads and writes in servicing a user interaction you certainly want to see your own writes. If we have three users all interacting with our service we may not need each to see a consistent ordering of one another’s actions. 

So what can we do with this model? We can requesting strong consistency when attempting to increment a counter with:

// strong consistency
val (counter,version) = store.get(key, true);
// optimistic update may fail
val successOrFailure = store.put(key, counter+1, version)

The update may fail if another process got there first so we would loop until success.

Lets imagine that we want to acquire a lock if the lock has expired. We can chose to use a fast but possibly stale read:

// weak consistency
val (oldLock, version) = store.get(resource,false)
// if the lock has expired attempt to take it
val holdLock = if( oldLock.expiry <= currentTime ) {
val newLock = Lock(this,resource)
store.put(resource, newLock, version) // may return true or false
} else false

This is safe because we only take the lock if the expired lock is still the latest value in the store when the put operation occurs. Given that the application protocol is now safe to stale reads we can choose to take the read from committed data at the leader; or even directly from any replica. To scale to high read load we can round-robin assign clients to read from different replicas within the cluster removing all of the read load from the leader. Note that if there is high contention for a resource lock then stale reads from replicas may lead to too much spinning in the loop; so it depends on both the read and write patterns of your data and the application safety protocol whether this is a good design choice.

Note that it’s the application semantics, the design of the application service API, which has unlocked this potential speed up. Many web applications use optimistic locking to handle concurrent user edits safely which works in a similar way to compare-and-swap. In the case of two users concurrently editing the same data this means giving one of the users an error message and refreshing the screen to reload the latest data. Whether you want this user experience, or pessimistic locks, or just to clobber the data (no checks, no locks, good luck!), it is your call. Pick your poison when it comes to multiple users updating the same data over a network concurrently. If you do use such techniques and there is low update contention in your data access patterns then cheap reads directly off of your replicas may greatly increase scalability.

So how exactly does a strongly consistent read work? To get strong consistency we need to order all concurrent client strong reads and writes within the cluster. We trivially get that if we treat all client commands the same and pass both types of commands through the Paxos algorithm. The problem with that is that the algorithm does disk flushes which are unnecessary for read-only commands as they don’t need to be made crash durable. The PaxStore paper describes the following strong read algorithm:

When choosing strong consistent read, the system needs first read record from the leader and then read epoch message from a [majority of] follower[s] of this leader, if the follower[s] has the same epoch message with leader, it shows that we read data successfully, otherwise the system error occurs.

As the leader has checked with a majority that it has not lost the leadership then only it is updating the value so what it returns to clients must be fresh. Issuing a leader lease such that no other nodes attempts to take the leadership in a window of time removes the need for the leader to query nodes whether it has lost leadership within its lease timeout. It then just reads committed data from the leader.

Note that we have a choice here. We can choose to serialise reads and writes based upon arrival order or we could choose to reorder reads with respect pending writes at the leader. Why? Consider the case that a write arrives followed immediately by a read. The write is sent through the algorithm and is pending a majority of acks from remote followers. The leader can be configured to allow it to run the read while it is idle ahead of the pending write.

Is such reordering a problem? Depends. Single threaded clients or thread-per-browser-request clients shouldn’t see any problems. A single thread will block awaiting sequential write and read commands and the leader cannot reorder them. It is plausible that a multithreaded application could be reliant on reads not being moved ahead of pending writes. Yet it would seem desirable to engineer an application to allow the leader to run reads whilst it is awaiting acks to fix the next write to maximise the resource utilisation on the leader. High performance computing platforms may choose to reorder overlapping writes for optimal disk usage; so such tradeoff of speed verses complexity are typical engineering practices.

The problem with running all the reads on the leader is that this inhibits scalability when we have a read dominated load. Since we have replicas in the cluster can we read from them without reading stale data? Yes. The leader can order the reads and writes then hand off the read work to a replica as described in the Gaios paper. It describes their SMARTER Paxos engine having the leader order reads and writes then handing off the actual read to a replica. The leader can have a lease so that it knows that it is still the leader when reads arrive. It can then hand off the read to a learner tagged with the ballot number and slot number of the write which precedes it. The learner holds the read until it learns that the preceding write is fixed. It then does a final check that the preceding write was chosen by the leader which sent it the read and that the lease is still valid. If that check passes it runs the read and returns the result to the client.

Is that a strong or weak read? It is a strong read. Once again we may see some reordering. In this case because the read is going via a different node freeing up the leader to run more writes. An expensive read may take a long time to run and be returned to the client very late. Meanwhile the leader may have run other writes ordered after the read and acknowledged those writes to clients. This is shown in the following diagram:

Screen Shot 2016-01-22 at 11.33.01

Time moves left to right and the horizontal bars show how much time each read or write was pending. The arrows show the network exchange of two writes and a read. The dashed box is the cluster. Going left to right (forward in time) the commands hit the leader in the order [writeA, readA, writeB]. The read is handed off to a replica node to run and is returned directly to the client. So thats a longer network path and the replica may have to do a lot of IO to perform the read. Meanwhile the leader may perform the second write if it gets fast responses from the acceptors in the cluster (not shown). The results get back to the three client threads in the wall clock order [writeA, writeB, readA]. That isn’t a stale read; its a delayed response to a read which occurs between two writes. Once again single threaded apps or thread-per-user-requests apps won’t have a problem with this. Once again given that this technique allows high read load to be spread across all the replicas it seems like a very good idea to engineering an application to work correctly in such scenarios.

The next post will discuss leader leases.