Blog/Database Sharding Strategies: Range, Hash, and Directory-Based Approaches
databasesshardingscalabilitysystem-designpartitioning

Database Sharding Strategies: Range, Hash, and Directory-Based Approaches

March 18, 2024·12 min read·by Bishwambhar Sen
Three database shard layouts side by side showing range partitioning, hash ring, and directory service mapping

Concept

Horizontal data partitioning — sharding — is the act of splitting a single logical dataset across multiple physical database instances, each owning a subset of the data. The goal is to exceed the storage, throughput, or concurrency limits of a single node without giving up the data model.

Every sharding decision begins with the partition key: the attribute of the data used to route each record to its shard. The partition key determines every downstream property of the system — query distribution, hotspot risk, cross-shard join cost, and rebalancing complexity. A wrong partition key cannot be corrected without a full data migration.

There are three canonical sharding strategies, each with a different trade-off profile.

Range-Based Sharding

Records are partitioned by consecutive ranges of the partition key. Shard 1 owns user IDs 1–1,000,000; Shard 2 owns 1,000,001–2,000,000; and so on. For time-series data, Shard 1 owns January 2024, Shard 2 owns February 2024, etc.

Strengths:

  • Range queries are efficient. "Give me all orders from January 1–31" touches exactly one shard.
  • Simple routing logic: compare key against range boundaries.
  • Excellent for time-series and audit log patterns where data is written sequentially and queried by time window.

Weaknesses:

  • Hot shard problem. If the partition key correlates with write activity (current month, recently created user IDs), all writes land on the same shard while older shards sit idle.
  • Rebalancing complexity. Splitting a hot shard requires migrating half its data to a new shard and updating routing configuration — not atomic, not instant.

Hash-Based Sharding

The partition key is hashed, and the hash is modulo-divided by the shard count to determine the target shard. shard_index = hash(partition_key) % shard_count.

Strengths:

  • Uniform write distribution. Hash functions distribute keys pseudo-randomly, eliminating hot shards caused by sequential keys.
  • Predictable query routing: deterministic, no lookup required.

Weaknesses:

  • Range queries are catastrophically expensive. "All orders from January" requires querying every shard and merging results.
  • Rebalancing with naïve modulo is destructive. Adding a shard changes shard_count, invalidating N-1 of N existing hash assignments. This is the modulo rehash problem — effectively a full data migration on every scale event.

Consistent hashing solves the modulo problem by placing shards on a hash ring. When a new shard is added, only the data between the new node's position and its predecessor needs to migrate — roughly 1/N of the dataset, not all of it. This is how DynamoDB, Cassandra, and Riak implement their partition schemes.

Directory-Based Sharding

A centralized lookup table (the directory service) maps each partition key — or range of keys — to its shard. The application queries the directory before every data request.

Strengths:

  • Maximum flexibility. Routing logic is data, not code. You can move individual records between shards without changing application logic.
  • Supports heterogeneous shard sizes. VIP customers can be isolated to dedicated shards without any hash function change.

Weaknesses:

  • Directory becomes a single point of failure and a performance bottleneck. Every read and write first hits the directory. Without aggressive caching and high availability, the directory's reliability ceiling becomes the system's ceiling.
  • Cache consistency: A stale directory cache routes requests to the wrong shard — potentially the worst failure mode (silent data access errors).

Constraints

The Cross-Shard Query Problem

Sharding breaks the relational model's global query capability. A SQL join between two tables that live on different shards cannot be executed inside the database. Options:

  1. Application-level join: Fetch from both shards in parallel, merge in memory. Doubles the query round-trips and moves join computation to the application server.
  2. Denormalization: Pre-join data into a single record that co-locates on one shard. Eliminates cross-shard joins but increases storage and write amplification.
  3. Broadcast query: Send the query to all shards, merge results. Correct but O(N_shards) cost — degrades linearly as you add shards.

The fundamental constraint: your partition key must align with your most frequent query patterns. If your primary query is "get all orders for a customer," your partition key must be customer_id. If it's "get all orders in a date range," the partition key must be time-based. When the two most frequent query patterns require different partition keys, you either choose one and accept cross-shard cost for the other, or you maintain two physical copies of the data with different partition keys (a common pattern in analytics systems).

Hot Shard Problem — The Operational Reality

The hot shard problem manifests when a single partition key value accounts for a disproportionate fraction of traffic. For a multi-tenant SaaS product: one enterprise customer with 10,000 active users lands on a single shard. That shard's CPU is pegged at 95%; every other shard runs at 15%. The system's capacity is effectively the capacity of the hot shard, not the aggregate capacity.

Mitigations:

  • Key suffixing: Append a random suffix (1–N) to the partition key for write distribution, strip the suffix on read and query all N sub-shards. Adds read fan-out but eliminates write hotspots.
  • Dedicated shard for hot tenants: Directory-based sharding specifically to isolate known-hot partitions.
  • Write-ahead buffering: Buffer writes for hot keys in an in-memory queue or Redis, periodically flushing to the database.

Rebalancing: The Hidden Operational Cost

Most sharding decisions are made at system inception. The data grows. The original shard count is no longer adequate. Rebalancing — redistributing data across a larger shard set — is the most expensive operational event in a sharded system.

The rebalancing process requires:

  1. A mechanism to stream data from old shards to new shards while the system is live.
  2. A dual-write window where writes go to both old and new shards.
  3. A validation phase confirming new shard data is consistent.
  4. A cutover event that switches routing to the new shards.
  5. A cleanup phase removing data from old shards.

Steps 2–4 are the dangerous window. Any bug in the dual-write logic produces data divergence that's difficult to detect and expensive to repair.

Trade-offs

When NOT to Shard

Sharding is architecturally expensive. Before accepting that cost, exhaust these alternatives:

Alternative Upper Limit Operational Cost
Read replicas Read scaling to any level; write still constrained Low
Vertical scaling (larger instance) ~10–20TB, ~100K IOPS on cloud Medium
Caching layer (Redis) Eliminate 80%+ of read load Medium
Partitioning within a single DB Most RDBMS support table partitioning Low
Columnar store for analytics Redirects analytics queries off OLTP Medium

Sharding is the right choice when:

  • Write throughput has exceeded the capacity of a single node (including vertical scaling ceiling)
  • Data volume has exceeded what a single node can economically store
  • Regulatory requirements mandate geographic data isolation

Sharding is premature when you haven't yet saturated a well-configured, vertically-scaled primary with read replicas.

Code

The following implements a consistent hash ring router in C#, demonstrating the core algorithm behind hash-based sharding with virtual nodes for uniform distribution:

// ConsistentHashShardRouter.cs
// Implements consistent hashing with virtual nodes to minimize rehash on rebalance
public class ConsistentHashShardRouter
{
    private readonly SortedDictionary<uint, string> _ring = new();
    private readonly int _virtualNodesPerShard;
    private readonly HashAlgorithm _hashAlgorithm;

    public ConsistentHashShardRouter(int virtualNodesPerShard = 150)
    {
        _virtualNodesPerShard = virtualNodesPerShard;
        _hashAlgorithm = MD5.Create(); // MD5 for speed; SHA-256 for stronger distribution
    }

    public void AddShard(string shardId, string connectionString)
    {
        for (int vNode = 0; vNode < _virtualNodesPerShard; vNode++)
        {
            var virtualNodeKey = $"{shardId}#vnode{vNode}";
            uint hash = ComputeHash(virtualNodeKey);
            _ring[hash] = shardId;
        }
    }

    public void RemoveShard(string shardId)
    {
        var keysToRemove = _ring
            .Where(kvp => kvp.Value == shardId)
            .Select(kvp => kvp.Key)
            .ToList();

        foreach (var key in keysToRemove)
            _ring.Remove(key);
    }

    /// <summary>
    /// Routes a partition key to its shard.
    /// After adding a new shard: only keys between the new node and its predecessor
    /// are affected — approximately (1 / N) of total keys.
    /// </summary>
    public string GetShardForKey(string partitionKey)
    {
        if (_ring.Count == 0)
            throw new InvalidOperationException("No shards registered.");

        uint keyHash = ComputeHash(partitionKey);

        // Find the first shard on the ring clockwise from keyHash
        var targetEntry = _ring.FirstOrDefault(kvp => kvp.Key >= keyHash);

        // Wrap around: if no shard is clockwise, take the first shard on the ring
        return targetEntry.Value ?? _ring.First().Value;
    }

    private uint ComputeHash(string input)
    {
        var bytes = _hashAlgorithm.ComputeHash(Encoding.UTF8.GetBytes(input));
        return BitConverter.ToUInt32(bytes, 0);
    }
}

// Usage in a repository routing layer
public class ShardedOrderRepository
{
    private readonly ConsistentHashShardRouter _router;
    private readonly IReadOnlyDictionary<string, IDbConnection> _shardConnections;

    public ShardedOrderRepository(
        ConsistentHashShardRouter router,
        IReadOnlyDictionary<string, IDbConnection> shardConnections)
    {
        _router = router;
        _shardConnections = shardConnections;
    }

    public async Task<Order?> FindByIdAsync(
        Guid orderId,
        Guid customerId, // Partition key: co-locate orders with their customer
        CancellationToken cancellationToken = default)
    {
        string shardId = _router.GetShardForKey(customerId.ToString());
        var connection = _shardConnections[shardId];

        return await connection.QuerySingleOrDefaultAsync<Order>(
            "SELECT * FROM orders WHERE order_id = @OrderId AND customer_id = @CustomerId",
            new { OrderId = orderId, CustomerId = customerId });
    }
}

The second example shows the hot shard write-distribution pattern using key suffixing:

// HotShardMitigationWriter.cs
// Distributes writes for hot partition keys across N sub-shards
// Reads must fan out to all sub-shards and merge results
public class HotShardMitigationWriter
{
    private readonly ConsistentHashShardRouter _router;
    private readonly IReadOnlyDictionary<string, IDbConnection> _shardConnections;
    private readonly int _hotShardFanOut;

    public HotShardMitigationWriter(
        ConsistentHashShardRouter router,
        IReadOnlyDictionary<string, IDbConnection> shardConnections,
        int hotShardFanOut = 10) // Spread hot key across 10 sub-shards
    {
        _router = router;
        _shardConnections = shardConnections;
        _hotShardFanOut = hotShardFanOut;
    }

    /// <summary>
    /// Write with hot shard mitigation: appends a random suffix to the partition key
    /// to distribute writes across _hotShardFanOut virtual partitions.
    /// </summary>
    public async Task WriteEventAsync(
        string hotPartitionKey,
        DomainEvent domainEvent,
        CancellationToken cancellationToken = default)
    {
        // Random suffix distributes writes evenly across N sub-shards
        int suffix = Random.Shared.Next(0, _hotShardFanOut);
        string distributedKey = $"{hotPartitionKey}#{suffix}";

        string shardId = _router.GetShardForKey(distributedKey);
        var connection = _shardConnections[shardId];

        await connection.ExecuteAsync(
            "INSERT INTO domain_events (partition_key, suffix, event_type, payload, occurred_at) " +
            "VALUES (@PartitionKey, @Suffix, @EventType, @Payload, @OccurredAt)",
            new
            {
                PartitionKey = hotPartitionKey,
                Suffix = suffix,
                EventType = domainEvent.EventType,
                Payload = JsonSerializer.Serialize(domainEvent),
                OccurredAt = domainEvent.OccurredAt
            });
    }

    /// <summary>
    /// Read requires fan-out across all N sub-shards, then merge and sort.
    /// Trade-off: write throughput improved; read cost increased by factor N.
    /// </summary>
    public async Task<IReadOnlyList<DomainEvent>> ReadAllEventsAsync(
        string partitionKey,
        CancellationToken cancellationToken = default)
    {
        var tasks = Enumerable.Range(0, _hotShardFanOut).Select(async suffix =>
        {
            string distributedKey = $"{partitionKey}#{suffix}";
            string shardId = _router.GetShardForKey(distributedKey);
            var connection = _shardConnections[shardId];

            return await connection.QueryAsync<DomainEvent>(
                "SELECT * FROM domain_events WHERE partition_key = @Key AND suffix = @Suffix " +
                "ORDER BY occurred_at",
                new { Key = partitionKey, Suffix = suffix });
        });

        var results = await Task.WhenAll(tasks);

        return results
            .SelectMany(r => r)
            .OrderBy(e => e.OccurredAt)
            .ToList();
    }
}

Further Reading

External references:

  • Kleppmann, M. (2017). Designing Data-Intensive Applications, Ch. 6. O'Reilly.
  • Amazon DynamoDB Documentation: "Choosing a Partition Key."
  • DeCandia, G. et al. (2007). "Dynamo: Amazon's Highly Available Key-Value Store." SOSP '07.