Replication, for very large datasets or very high query throughput is not sufficient, we need to break the data up into partitions (sharding).
Scalability is the main reason for partitioning data. It enables a large dataset to be distributed across many disks, and a query load can be distributed across many processors.
Copies of each partition are usually stored on multiple nodes. Each record belongs to exactly one partition, it may still be stored on several nodes for fault tolerance. A node may store more than one partition.
Partition of key-value data
- The goal with partitioning is to spread the data and the query load evenly across nodes.
- If partition is unfair, it is skewed. It makes partitioning much less effective.
- A partition with disproportionately high load is called a hot spot.
The simplest approach is to assign records to nodes randomly, but this has the downside that it has to read all replicas if accessing a particular item.
Partition by key range
- Assign a continuous range of keys.
- This range does not have to be evenly spaced. Once we know the boundaries between the ranges, it’s easy to determine which partition contains a given key. Keys could also be kept in sorted order within each partition.
- The downside of this is that some access patterns can lead to hotspots.
- The solution for above is to use a compound key, e.g. timestamp and item index. Though with this approach, a separate range query for each item name is required.
Partitioning by hash of key
- Many distributed data stores use a hash function to determine the partition for a given key.
- A good hash function takes skewed data and makes it uniformly distributed. The partition boundaries can be evenly spaced or chosen pseudo-randomly ( consistent hashing).
- Unfortunately this loses the ability to do efficient range queries. Keys that were once adjacent are now scattered across all the partitions. Any range query has to be sent to all partitions.
Skewed workloads and relieving hot spots
- In an extreme case where reads and writes are for the same key, then all requests being routed to the same partition (e.g. twitter celeb with millions followers).
- A simple technique is to add a random number to the beginning or end of the key. This has the downside, though, that reads now have to do additional work to keep track of these keys.
Partitioning and secondary indexes
The situation gets more complicated if secondary indexes are involved. A secondary index usually doesn’t identify the record uniquely. They don’t map neatly to partitions. Two main approaches to partitioning with secondary indexes are:
- Document-based partitioning
- Term-based partitioning
Partitioning secondary indexes by document
Each partition maintains its secondary indexes, covering only the documents in that partition (local index).
You need to send the query to all partitions, and combine all the results you get back (scatter/gather). This is prone to tail latency amplification and is widely used in MongoDB, Riak, Cassandra, Elasticsearch, SolrCloud and VoltDB.
Partitioning secondary indexes by term
We keep a global index that covers data in all partitions. The global index must also be partitioned so it doesn’t become the bottleneck. It is called the term-partitioned because the term we’re looking for determines the partition of the index.
The advantage of a global index is that reads are more efficient as no need to scatter/gather over all partitions. A client only needs to make a request to the partition containing the document. But the downside is that writes are slower and more complicated, because a write to a single document may now affect multiple partitions of the index.
The process of moving load from one node in the cluster to another.
- How not to do it: Hash mod n. The problem with mod N is that if the number of nodes N changes, most of the keys will need to be moved from one node to another, and the operation is very expensive.
Fixed number of partitions:
- Create many more partitions than there are nodes and assign several partitions to each node.
- When a new node is added , it can steal a few partitions from every existing node until partitions are fairly distributed once again.
- The number of partitions does not change, nor does the assignment of keys to partitions.
- The only thing that change is the assignment of partitions to nodes.
- Make the number of partitions adjusts to the total data volume.
- An empty database starts with an empty partition. All the data could end up on one node.
- When a partition grows to exceed a configured size, it is split into two partitions so that approximately half of the data ends up on each side of the split.
- Conversely, if lots of data is deleted and a partition shrinks below some threshold, it can be merged with an adjacent partition.
Partitioning proportionally to nodes
- Database like Cassandra makes the number of partitions proportional to the number of nodes.
- Have a fixed number of partitions per node. This approach also keeps the size of each partition fairly stable.
Operations: automatic or manual rebalancing
- Fully automated rebalancing can be unpredictable. The process can overload the network or the nodes and harm the performance of other requests while the rebalancing is in progress.
- It can be good to have a human in the loop for rebalancing.
When a client makes a request, how does it know which node to connect to? This problem is called service discovery. There are different approaches:
- Allow clients to contact any node and make them handle the request directly, or forward the request to the appropriate node.
- Send all requests from clients to a routing tier first that acts as a partition-aware load balancer.
- Make clients aware of the partitioning and the assignment of partitions to nodes.
Many distributed data systems rely on a separate coordination service such as ZooKeeper to keep track of this cluster metadata. Each node registers itself in ZooKeeper, and ZooKeeper maintains the authoritative mapping of partitions to nodes. The routing tier or the partitioning-aware client, can subscribe to this information in ZooKeeper.
HBase, SolrCloud and Kafka use ZooKeeper to track partition assignment. MongoDB relies on its own config server.
Cassandra and Riak take a different approach: they use a gossip protocol.
A gossip protocol is a procedure or process of computer peer-to-peer communication that is based on the way epidemics spread. Some distributed systems use peer-to-peer gossip to ensure that data is disseminated to all members of a group.
That’s so much of it!