Corfu: The Protocol

by simbo1905

This is the eighth and last in a series of posts about Corfu a strongly consistent and scalable distributed log. To quote the paper:

At its base, CORFU derives its success by scaling Paxos to data center settings and providing a durable engine for totally ordering client requests at tens of Gigabits per second throughput.

It does this through a number of existing techniques:

Our design borrows bits and pieces from various existing methods, yet we invented a new solution in order to scale SMR [state machine replication] to larger cluster scales and deliver aggregate cluster throughput to hundreds of clients.

What I really like about Corfu is that it innovates by bringing existing techniques such as Paxos together into something which is a real breakthrough. The protocol itself is very simple. The established techniques it uses to implement the protocol are very simple. Yet when assembled into the solution they become extremely powerful. This post will put the pieces together and discuss the Corfu protocol.

The key characteristics of Corfu is that it has a client-centric architecture:

Our design places most CORFU functionality at the clients, which reduces the complexity, cost, latency, and power consumption of the storage units.

Clients are typically application servers using the Corfu system as a shared global log. Moving the logic into the clients is a paradigm shift away from typical architectures for strong consistency. Algorithms such as Paxos, Raft or ZAB have a cluster of nodes running their own network protocol. In contrast in Corfu the cluster of storage nodes don’t exchange any messages at all. They are remarkably dumb.

That isn’t quite the full story through. If we get some sort of split brain situation where different clients are using different configurations the Corfu distributed log will be trashed. To solve this Corfu uses Paxos to ensure that the cluster configuration is made strongly consistent. Yet that is moving around meta-data which is slowly changing and that network protocol runs out-of-band to the main transactional workload.

To get a flavour of how little logic is used to run the main transational load we can cover the full storage node logic in a few paragraphs. The state maintained by the storage units is:

  • An “epoch” number. This is an integer corresponding to the cluster configuration version which is valid to interact with the storage unit. It is managed by clients running Paxos out-of-band to the main workload. More on that below.
  • A mapping of logical log pages onto physical disk pages. This is described as a hash map within the paper.

Copy compaction (Part 5) as well as stripe and mirror (Part 4) means that the mapping of logical pages onto physical disk pages is arbitrary. A hash map can hold any arbitrary maping with an O(1) lookup.  That along with a big SSD disk holding the main data is exposed in the following simple network API:

  • read(epoch,address) valid the epoch of the client matches the local epoch number. Upon failure return err_sealed. Upon success run the address through the map to return the physical page of data.
  • write(epoch,address,value) validate the epoch of the client matches the local epoch.  Upon failure return err_sealed. Upon success run the address through the map to get the physical page address. Check the write-once flag and deleted flag. Upon failure return err_written or err_deleted. Upon success write the value into the physical address.
  • delete(address) mark the address as deleted. In the paper they don’t validate the epoch which looks risky to me. Talking to the authors of the paper they agree that in general you should validate the epoch of the client matches the local epoch. Upon failure return err_sealed. Upon success mark the address as deleted.
  • seal(epoch) this is used during a reconfiguration. It first validates the epoch being set is higher than the current local epoch number.  Upon failure return err_sealed. Upon success set the local epoch to be the one supplied. Then returns the highest page number written via any command. All the actual logic of how to do a reconfiguration runs in a client which we will cover below.

We can see that the epoch is a guard to prevent clients from reading and writing data using an out of date configuration. Other than that the storage node just uses its internal hashmap to resolve a logical client address to a local disk page. We only need 1 bit to know if a page is written and another bit to know if the page has been deleted. Easy peasy.

This leaves the main logic in the client. We have seen that clients interact with a global sequencer (Part 3) to append to the log. The client API is then actually reasonably straight forward:

  • append(b) appends a value b to the global log getting back the log position l. That is the high-level application facing API. Internally the client driver code will first go to the global sequencer to obtain the next 64 bit number. The client driver then uses the versioned cluster configuration to translate this logical log position to a page offset on a particular replica set. Write messages are sent to each storage node in the replica set in a strict ordering. Upon receipt of a err_written the driver can loop to obtain a new position from the global sequencer and reattempt the write. Upon receipt of an err_sealed the client needs to reload the cluster configuration to know the correct configuration associated with the next epoch number.
  • read(l) read a page of data at a given position l. Internally the client driver uses the versioned cluster configuration to translate this logical log position to a page offset on a replica set. The read message is sent to the last replica in the strict write ordering.  If the client driver receives an err_unwritten it can check the first replica and copy any value found to the remaining replicas. If no value is found the error is returned. Upon receipt of an err_sealed the client needs to reload the cluster configuration.
  • fill(l) fills a position in the log with a noop value. This is used by the application when it suspects another client has crashed during an append operation having obtained a sequence number from the global sequencer but before successfully writing any value. The majority of the client code can be shared with the append functionality skipping the trip to the sequencer.
  • trim(l) deletes the value at a given position. The client logic will be similar to the fill functionality.
  • reconfig(changes) accepts a cluster reconfiguration. This is called by the “blackbox” Paxos engine when a new configuration is put into effect.

Append lets the sequencer pick the location and writes a real value. Fill writes a noop value to a location. Delete sets the delete bit on a location. The cluster configuration maps a location onto a replica set using a strip and mirror technique. Simple stuff. Very cool.

The last thing to consider are the cluster reconfigurations. The canonical reason to do this is that we might have a storage unit crash. We will probably be writing to three nodes in a replica set. This is described in the following pseudocode slightly adapted from the paper:

send (seal, c.epoch) request to storage servers
wait for replies from at least one replica in each extent
compute new projection newP based on changes and (sealed,*) replies
using any black-box consensus-engine:
propose newP and recieve successful decision else abort
set projection to newP
increment c.epoch

Note that the client seals the storage units before computing a new projection of logical ranges to the storage unit. It then runs that computed configuration through a black-box consensus engine (they use something “Paxos-like” in the paper). Why? Because a client does not know the last written position during a crash or network partition. The client sealing the unit needs to learn the last written position from the responses to the seal command. It can create a new extent of logical log starting at the next position mapped onto a healthy replica set. This means that all nodes are sealed and will reject any reads or writes before the new configuration is computed or published. This means that reconfigurations are short stop-the-world events.

If a storage node crashes we can reconfigure and write fresh data to a healthy replace set. What about the data already written to the replica set that has lost a node? In the background a client can then copy the data from the unhealthy set onto a healthy one. Then we can reconfigurate a second time to remap the range from the unhealthy replica set onto the replacement.

A corner case to consider is a client and a replica set being partitioned from the rest of the cluster. On the majority side of the partition, a reconfiguration could be taking place to remove the replica set. A partitioned client can then get a false err_unwritten on its side of the partition whereas the value could be written on the health side of the partition. The partitioned client will fill the value with a no-op. The paper asserts that this maintains “seralisation of reads”. This means that if you read a value you will continue to read a value. They are asserting that reading a no-op isn’t a read its more like a null  or  Nil  value meaning “undefined”. The idea being that if the partition is healed the application can be reset and will see the real value. Sounds a bit messy but if we can automate the reset then why not. Yet I find it a little dubious that we can get strong consistency under network partitions without running a quorum based consensus algorithm. Consider what would happen if the partitioned client performs a write to the replica set which is no longer in the cluster followed by a read. Perhaps we can prevent the write by having the global sequencer check the client epoch. Okay but what if the global sequencer is in the minority partition and a reconfiguration has set a new sequencer?

The paper proposes a fix to this. We can have each storage unit renew a lease. The management of the leases can enforce that no two single storage units have an overlapping active range. The catch with leases is clock skew such that we have to wait a while before a partitioned node loses a lease. Still, leases are a pretty standard technique. The logical thing to do is run the lease renewal through the same blackbox Paxos algorithm to ensure that the leases are made strongly consistent across the cluster.