technology

The Numbers Behind the Architecture

From 50 requests per second to 500,000 — every stage of the scaling journey, with the actual configs, queries, and decisions

Sathyan··32 min read
Two engineers debugging in a server room at night

In 2017, I inherited a system that served fifty requests per second on a good day. One Postgres database on a managed instance, one application server behind an Nginx reverse proxy, and a Redis cache that someone had added six months earlier "just in case." The Redis instance had four keys in it. Two of them had expired.

By 2023, a descendant of that system — same core domain, same data model at its heart, heavily evolved in every other way — was serving four hundred thousand requests per second across three regions. The database had been sharded. The cache was no longer optional. The single application server had become a fleet. The Nginx reverse proxy had been replaced by a global load balancer that routed traffic based on latency, geography, and the health of downstream services.

Every stage of that journey involved a specific bottleneck, a specific intervention, and specific numbers that told us when to act. This article walks through those stages — with the actual queries, configurations, and measurements that drove each decision. No frameworks. No decision matrices. Just the machine, and what it told us when we listened.

This is Part 4 of the solution architecture series. Parts 1 through 3 covered the conceptual toolkit — the five forces, resilience patterns, shipping discipline. This part is the engine room.

The Product

Before we get into the plumbing, you should know what the plumbing carries.

The system was a healthcare claims processing platform — the kind of product that sounds mundane until you realize it sits between a patient getting care and a provider getting paid. Medical claims. Prior authorizations. Eligibility checks. The paperwork that determines whether a hospital can afford to keep its lights on.

Here's what that looks like in practice. A patient walks into an ER in Houston with chest pain. The hospital runs tests, admits them for observation, performs a cardiac catheterization, and discharges them three days later. Behind that stay sits a cascade of transactions — an eligibility check against the patient's insurance plan, a prior authorization request for the catheterization, a claim with forty-seven line items covering every procedure, medication, and bed-night, and a remittance advice from the payer explaining what they'll cover and what they'll deny. Miss a coding error on the claim and it gets rejected. Miss a prior auth and the hospital eats the cost. Miss a filing deadline and the claim dies — the provider never gets paid.

Our platform processed that workflow. Ingest claims from hospital EHR systems, validate them against payer rules, flag coding inconsistencies before submission, route prior authorizations to the right clinical reviewers, track every claim from submission through adjudication to payment. Every event in the system — claim submitted, authorization requested, line item denied, appeal filed — had a dollar value and a patient behind it.

The numbers weren't abstract. A slow dashboard meant a billing coordinator couldn't see why a $47,000 cardiac claim had been sitting in review for six days. A dropped message meant a prior authorization didn't reach the clinical reviewer before the procedure was scheduled. A database timeout during the monthly claims batch — when health systems submit tens of thousands of claims simultaneously — meant manual rework on transactions that were supposed to be straight-through.

When the system served fifty requests per second for a few hundred internal users, none of this was urgent. When it grew to serve health systems and payers across twenty-three states, processing $4.1 billion in annual claims volume, every millisecond of latency and every dropped request had a patient's bill or a provider's revenue attached to it.

The scaling journey that follows isn't about handling more traffic for its own sake. It's about making sure that when a utilization review nurse in Chicago needs to approve a prior auth at 4:59 PM before a surgery scheduled for 7 AM tomorrow, the system responds in 80 milliseconds — not 4 seconds, not a timeout, not a blank screen.

With that context, let's open the hood.

Stage 1: The Single Box (50 rps)

The system started as most systems start — one of everything. One application server (4 cores, 16GB RAM), one Postgres database (4 cores, 16GB RAM, 500GB SSD), one Redis instance that nobody was really using. The traffic was internal — a few hundred operations staff running reports and processing approvals.

At fifty requests per second, everything worked. Response times hovered around 80 milliseconds. The database CPU sat at 15 percent. The application server had more headroom than it knew what to do with.

Then the product grew. Two large health systems in Texas onboarded, followed by a regional payer network covering six Midwestern states. Their EHR and adjudication systems needed API access to push claims and pull authorization statuses. Traffic climbed to 200 requests per second, and one morning the on-call Slack channel lit up with timeout alerts.

The culprit was a single query. The events table had grown to 12 million rows over eighteen months — nobody had noticed because the table scans were fast enough when it was small. The query powered the main dashboard:

SELECT * FROM events
WHERE user_id = 12345
  AND created_at > '2025-01-01'
ORDER BY created_at DESC
LIMIT 50;

EXPLAIN ANALYZE told the whole story:

Seq Scan on events
  Filter: ((user_id = 12345) AND (created_at > '2025-01-01'))
  Rows Removed by Filter: 11,986,153
  actual time=3821.445 rows=13,847
Planning Time: 0.182 ms
Execution Time: 3842.214 ms

A sequential scan. The database was reading every single row in a 12-million-row table, throwing away 99.9 percent of them, and sorting the survivors. Almost four seconds per query. Multiply that by the number of dashboard users hitting the same endpoint, and the database connection pool was drowning.

The fix was a composite index:

CREATE INDEX idx_events_user_created
  ON events (user_id, created_at DESC);

The new EXPLAIN ANALYZE:

Index Scan using idx_events_user_created on events
  Index Cond: ((user_id = 12345) AND (created_at > '2025-01-01'))
Planning Time: 0.241 ms
Execution Time: 0.228 ms

From 3,842 milliseconds to 0.2 milliseconds. A sixteen-thousand-fold improvement. One line of SQL.

That's the first lesson of Stage 1. Before you scale anything — before you add replicas, before you add caching, before you even think about splitting services — run EXPLAIN ANALYZE on your slowest queries. The answer is almost always an index, not more hardware.

The second lesson is Postgres tuning. The default configuration ships optimized for a system with 128MB of RAM. On a 16GB machine, the defaults are leaving most of your resources unused.

ParameterDefaultTuned (16GB)What It Does
shared_buffers128 MB4 GBPostgres's internal cache — set to 25% of RAM
effective_cache_size4 GB12 GBTells the planner how much OS cache to expect — set to 75% of RAM
work_mem4 MB50 MBMemory per sort/hash operation — too high and concurrent queries OOM
maintenance_work_mem64 MB512 MBMemory for VACUUM, CREATE INDEX — be generous
max_connections100100Keep this low — each connection costs 5-10MB of RAM

work_mem is the setting that catches people. It is allocated per sort or hash operation per query — not per connection. A query with five sort nodes using 50MB of work_mem consumes 250MB. Multiply by twenty concurrent connections and you've consumed 5GB just on sorts. Set it high enough to avoid disk spills (you'll see Sort Method: external merge Disk in EXPLAIN output), low enough to avoid OOM under concurrency.

After tuning Postgres and adding indexes to the five worst queries, the system was back to 60ms response times at 200 requests per second. The database CPU dropped from 85 percent back to 30 percent. We'd bought ourselves another six months of headroom without adding a single component.

Stage 2: The First Split (500 rps)

Six months later, the traffic had tripled. The dashboard queries were fast now, but the sheer volume of connections was becoming the problem. Each application server process held a database connection for the duration of each request. At peak, we had 180 concurrent connections — approaching the 200 where Postgres starts spending more time managing connections than executing queries.

This is where PgBouncer entered the picture.

The Postgres wiki has a formula that sounds almost too simple to be useful:

optimal_connections = (core_count × 2) + effective_spindle_count

For our 4-core database server with SSD storage (spindle count = 1): (4 × 2) + 1 = 9. Nine actual database connections. That's it. Nine connections will outperform 200 because the database spends its time executing queries instead of context-switching between connections and fighting for lock contention.

PgBouncer sits between the application and the database. The application opens hundreds of connections to PgBouncer. PgBouncer funnels them into nine actual Postgres connections using transaction-mode pooling — each connection is returned to the pool the instant a transaction completes.

[pgbouncer]
pool_mode = transaction
default_pool_size = 20
max_client_conn = 5000
reserve_pool_size = 5
reserve_pool_timeout = 3
server_idle_timeout = 600
server_lifetime = 3600

Transaction pooling mode has a catch: you lose session-level features. Prepared statements (in the traditional sense), SET commands, LISTEN/NOTIFY, advisory locks, and temp tables won't work as expected because your next query might run on a different backend connection. PgBouncer 1.21+ added prepared_statement_protocol to handle the prepared statements case, but the others remain session-mode-only features.

The effect was immediate. The database went from 180 concurrent connections to 20. CPU dropped. Latency dropped. The application servers could scale horizontally without worrying about overwhelming Postgres with connections — PgBouncer absorbed the pressure.

Redis Gets Serious

At the same time, we started using Redis properly. The dashboard had several queries that ran the same expensive aggregations for every user viewing the same time range. A hundred users looking at "last 7 days" were executing a hundred identical queries.

The question was which Redis data structure to use. This matters more than people think — each structure has different performance characteristics:

StructureUse CaseKey OperationsTime Complexity
StringsSimple key-value, counters, serialized objectsGET, SET, INCRO(1)
HashesObject with multiple fields (user profile, config)HGET, HSET, HGETALLO(1) per field, O(N) for GETALL
Sorted SetsLeaderboards, time-series rankings, rate limitingZADD, ZRANGE, ZRANGEBYSCOREO(log N) insert, O(log N + M) range
StreamsEvent logs, message queues, activity feedsXADD, XREAD, XRANGEO(1) append, O(N) read
SetsUnique membership, tagging, intersection/unionSADD, SISMEMBER, SINTERO(1) add/check, O(N×M) intersect

For the dashboard aggregations, we used strings with serialized JSON — cache the entire query result with a TTL. Simple and effective for read-heavy workloads where the cached value is the whole response.

For the rate limiter on the new public API, we used sorted sets — ZADD with timestamps as scores, ZRANGEBYSCORE to count requests in a sliding window, ZREMRANGEBYSCORE to clean up old entries. O(log N) for every operation, and N is bounded by the window size.

Three weeks after we added caching, we had our first thundering herd incident.

We'd set a 60-second TTL on the most popular dashboard cache key. Every 60 seconds, the key expired. In the few milliseconds between expiration and regeneration, dozens of requests hit the cache, found nothing, and all stampeded to the database with the same expensive query. The database spiked. Connections saturated. Response times went through the roof — for 60 seconds, then it calmed down, then 60 seconds later it happened again. A perfectly periodic disaster.

The immediate fix was TTL jitter. Instead of TTL = 60, we set TTL = 60 + random(-10, 10). This spread expirations across a 20-second window for the general case.

But for this specific hot key — the one that every dashboard user hit — jitter alone wasn't enough. We implemented a mutex-based approach: when the cache misses, the first request acquires a distributed lock (Redis SET key NX EX 5), regenerates the value, and populates the cache. Every other request during that window gets the stale value (we kept the old value in a separate key with a longer TTL).

Facebook solved this at massive scale with what they called "leases" in their Memcached infrastructure — documented in their 2013 NSDI paper. When a cache miss occurs, memcached issues a lease token to the first requester. Subsequent requests for the same key within the lease period either wait or get a stale value. Same principle, different implementation. The problem is universal. The solution is always some form of: let one request regenerate, serve everyone else from stale data while it happens.

After PgBouncer, Redis caching with thundering herd protection, and a second application server behind the load balancer, the system was comfortable at 500 requests per second. The database was doing a fraction of the work it used to — cache hit rates were running at 85 percent, meaning only 75 of every 500 requests actually touched Postgres.

Stage 3: Read Replicas and Queues (5,000 rps)

A year later, the API had gone public. Traffic was climbing past 2,000 requests per second and accelerating. The cache was absorbing most reads, but the writes were becoming the bottleneck. Every API call that created or updated a record hit the primary database, and the primary was starting to show strain — CPU at 70 percent during business hours, write latency climbing from 5ms to 25ms.

The read-to-write ratio was roughly 15:1 — typical for most applications. Users browse, search, and view far more than they create. That asymmetry is an architectural gift because it means you can scale reads and writes independently.

Read Replicas

We added two read replicas — Postgres streaming replication from the primary. The application routed all read queries to the replicas and all writes to the primary.

Read/write split: writes to primary, reads distributed across replicas, cache absorbs hot reads · Click chart to expand

The replication lag question came up immediately. With asynchronous replication — the default and the right choice for most systems — the replicas are slightly behind the primary. We measured it:

-- On the primary:
SELECT client_addr, state, replay_lag
FROM pg_stat_replication;

Same-region async replication lag was typically sub-millisecond — fast enough that users almost never noticed. But "almost never" isn't "never."

The read-after-write problem: a user creates a record, the page refreshes, and the read goes to a replica that hasn't received the write yet. The user thinks their data vanished. Our solution was the simplest one — after a write, we set a short-lived cookie that routed that user's reads to the primary for the next 5 seconds. Crude, effective, zero infrastructure changes.

The Queue Changes Everything

The bigger shift at this stage was introducing a message queue for background processing. Every time a clinical reviewer approved a prior authorization, the system had to send a notification to the requesting provider, generate a PDF of the authorization certificate, update the search index so the claim showed the new status, and fire a webhook to the payer's adjudication system. All of that was happening inline — during the HTTP request. A request that should have taken 50ms was taking 800ms because it was waiting for an email provider's API, an Elasticsearch indexing call, and a webhook to a payer's system.

We moved all of that behind an SQS queue. The API wrote a message to the queue and returned immediately. Background workers picked up messages and processed them asynchronously.

ConcernBefore (Sync)After (Queue)
API response time800ms (waiting for side effects)50ms (write + enqueue)
Email delivery failureUser gets a 500 errorMessage retries automatically
Search index downAPI degrades for all usersQueue backs up, auto-recovers
Partner webhook slowBlocks the user's requestProcessed independently, with retries

SQS was the right choice for us — zero operational overhead, functionally unlimited throughput for standard queues, built-in dead letter queues. We configured a maxReceiveCount of 3 — if a message failed processing three times, it moved to a dead letter queue for manual inspection rather than retrying forever.

The choice between SQS and RabbitMQ comes down to what you need. SQS gives you near-zero ops burden, massive scale, and at-least-once delivery. RabbitMQ gives you lower latency (sub-millisecond vs SQS's low-tens-of-milliseconds), sophisticated routing via exchanges, and built-in backpressure — when memory or disk thresholds are crossed, RabbitMQ blocks publishers automatically. If you're on AWS and can tolerate slight latency, SQS. If you need complex routing or sub-millisecond delivery, RabbitMQ.

After read replicas and queues, the system was comfortable at 5,000 requests per second. The primary database was handling only writes — about 330 per second. The replicas handled reads. The cache absorbed the hot path. The queue absorbed the side effects. Each component was doing one thing, and doing it within its performance envelope.

Stage 4: Sharding (50,000 rps)

This is where the story changes character. Stages 1 through 3 are incremental — you're adding components to an architecture that is fundamentally the same single-primary system. Stage 4 is a phase change. You're splitting the data itself.

By this point, the platform was processing claims for health systems and payers across twenty-three states. Each organization was a tenant — with its own users, its own payer rules, its own clinical review workflows. The month-end claims submission window, when every health system pushed their backlog before filing deadlines, had become our Black Friday. Write throughput on the primary reached the ceiling that vertical scaling couldn't push past. We'd already upgraded to a 16-core, 128GB machine. Postgres was tuned aggressively:

ParameterValue (128GB Machine)
shared_buffers32 GB
effective_cache_size96 GB
work_mem256 MB
maintenance_work_mem4 GB
max_connections300 (behind PgBouncer)

The machine was powerful. It wasn't enough. Write latency was climbing during peak hours because the workload had shifted — more tenants, more concurrent writes, more lock contention on hot tables. The WAL generation rate was so high that replicas were falling behind during peak writes, which meant our read scaling was degrading too.

Time to shard.

Choosing the Sharding Strategy

We evaluated three approaches. Each one had real tradeoffs, and the decision depended on our specific access patterns.

Hash-based shardingshard_id = hash(tenant_id) % num_shards. Even distribution, simple routing. But resharding requires remapping nearly every key. Consistent hashing reduces the damage (only ~1/N keys move when you add a shard), but cross-shard queries become expensive or impossible.

Range-based sharding — tenants 1-10,000 on shard 1, 10,001-20,000 on shard 2. Easy to reason about, range queries work within a shard. But access patterns are rarely uniform — recent tenants are more active, creating hotspots on the highest-numbered shard.

Tenant-based (directory) sharding — a lookup table maps each tenant to a shard. Flexible: you can move individual tenants between shards, give large tenants dedicated shards, rebalance without touching the application logic. The cost is one extra lookup per request (easily cached).

We went with tenant-based sharding, inspired by what Instagram described in their engineering blog. Their approach was elegant — thousands of logical shards mapped to fewer physical servers, each logical shard living in its own Postgres schema. The shard ID is encoded into the primary key itself: (shard_id << 23) | sequence. This means you can look at any ID and know which shard it lives on without a lookup.

Notion did something similar when they sharded their Postgres monolith — each workspace mapped to a shard, because Notion's data model is workspace-centric and cross-workspace queries are rare. Our data model had the same property: almost every query was scoped to a single tenant.

The routing layer was a simple service that cached the tenant-to-shard mapping in Redis. Application code changed from:

# Before: single database
db = get_connection()
result = db.execute(query, params)
 
# After: shard-aware routing
shard_id = get_shard_for_tenant(tenant_id)  # Redis lookup, cached
db = get_connection(shard_id)
result = db.execute(query, params)
Tenant-based sharding: routing layer maps tenants to shards, large tenants get dedicated shards · Click chart to expand

We started with 16 logical shards across 4 physical database servers — 4 shards per server. This gave us room to rebalance by moving logical shards between physical servers without application changes. When one server got hot (a large tenant driving disproportionate write traffic), we moved that tenant's logical shard to its own dedicated physical server. The application didn't know or care — the routing layer handled it.

The hardest part of sharding isn't the routing or the data distribution. It's the queries that cross shard boundaries. Any query that needs data from multiple tenants — admin dashboards, analytics, global search — now requires scatter-gather across all shards. We handled this by building a separate analytics pipeline: events from all shards flowed into a Kafka topic, a consumer wrote them into a single analytics database (ClickHouse), and all cross-tenant queries ran against that. The transactional path was sharded. The analytical path was unified. Two systems for two access patterns.

The Migration

Migrating from a single database to shards without downtime is the kind of project that looks simple on a whiteboard and takes three months in practice.

Our approach:

  1. Dual-write phase — application writes to both the old database and the new shard. Reads still come from the old database. Run for two weeks, validate data consistency between old and new.

  2. Shadow-read phase — application reads from both, compares results, logs discrepancies. Serves the old database's response. Run for one week. Fix every discrepancy.

  3. Cutover — flip reads to the new shards. Keep the old database running as a fallback. Monitor for 48 hours.

  4. Decommission — shut down the old database. Three months after cutover, once you're confident.

The dual-write phase caught eleven data consistency bugs — edge cases where our shard-routing logic disagreed with the old database's behavior. Finding those before cutover was worth the entire three months.

After sharding, each physical database server was handling a quarter of the write load. The system scaled to 50,000 requests per second with comfortable headroom. The read replicas on each shard handled their portion of reads. The cache hit rate had climbed to 92 percent. The queues were processing 8,000 background jobs per minute.

Stage 5: The Distributed System (500,000+ rps)

The jump from 50,000 to 500,000 is where the architecture stops being something you can hold in your head.

At this scale, every assumption from earlier stages gets tested. The cache that served you well at 5K rps is now handling 450,000 reads per second — a single Redis instance tops out around 100K-250K operations per second for real workloads. The message queue that gracefully handled async processing at 5K rps is now ingesting 40,000 messages per second. The sharded database is generating enough WAL to make cross-region replication a bandwidth concern.

Redis Clustering

A single Redis instance was no longer enough. We moved to Redis Cluster — hash-slot-based sharding across multiple nodes. Redis Cluster divides the keyspace into 16,384 hash slots and distributes them across nodes. Each key is mapped to a slot by CRC16(key) % 16384.

ConfigurationSingle InstanceRedis Cluster (6 nodes)
Max throughput~150K ops/sec~750K ops/sec
Max memory64 GB (practical limit)384 GB (64 × 6)
AvailabilitySingle point of failureSurvives node loss (with replicas)
ComplexityNoneClients must handle MOVED/ASK redirects

Redis is single-threaded for command execution — I/O threading was added in Redis 6, but the actual command processing uses one core. This means a single KEYS * or HGETALL on a hash with a million fields blocks the entire instance for seconds. Use SCAN instead of KEYS. Use HSCAN instead of HGETALL on large hashes. Delete large keys with UNLINK (async) instead of DEL (blocking). At 500K rps, one blocked second means 500,000 failed requests.

Kafka as the Central Nervous System

At this scale, SQS was still handling point-to-point messaging, but we needed something for the event backbone — the stream of domain events that fed analytics, search indexing, partner webhooks, and the cross-shard analytics pipeline. Kafka.

Kafka as the event backbone: every shard publishes domain events, multiple consumer groups process them independently — each at its own pace, each for a different purpose · Click chart to expand

The partition math matters:

partitions = max(target_throughput / producer_per_partition,
                 target_throughput / consumer_per_partition)

Our target was 40,000 messages per second. A single Kafka partition can handle 10 MB/s of producer throughput easily. But our consumer processing — which involved database writes and HTTP calls — was the bottleneck at roughly 500 messages/second per consumer. So:

partitions = 40,000 / 500 = 80 partitions

We set 96 partitions (rounding up to a power-of-friendly number that allows for consumer group flexibility). With 96 partitions, we could run up to 96 consumer instances — each processing one partition — and scale consumers horizontally by adding instances.

Two months after launching Kafka, we discovered a subtle ordering problem. We'd partitioned the main event topic by tenant_idhash(tenant_id) % 96. This guaranteed that all events for a given tenant arrived at the same partition and were processed in order. Critical for our use case: a "created" event must be processed before an "updated" event for the same entity.

But one of our largest tenants generated 15 percent of all traffic. Their events all landed on one partition. That partition's consumer was processing 6,000 messages per second while the others averaged 400. The consumer fell behind. The lag grew. Alerts fired.

The fix was sub-partitioning for hot tenants. We changed the partition key to tenant_id + entity_id for tenants above a traffic threshold. This spread their events across multiple partitions while maintaining per-entity ordering (which is what we actually needed — not per-tenant ordering). Smaller tenants kept tenant-level partitioning because their volume didn't warrant the complexity.

The lesson: your partition key determines your parallelism ceiling. Choose it based on your actual ordering requirements, not your logical grouping.

Kafka SettingOur ValueWhy
num.partitions96Consumer parallelism ceiling
replication.factor3Survive any single broker failure
min.insync.replicas2Writes confirmed by at least 2 replicas
acksallProducer waits for all in-sync replicas
retention.ms604800000 (7 days)Consumers can recover from a week of downtime
max.message.bytes1048576 (1MB)Prevent accidental huge messages

Kubernetes at Scale

The application tier was running on Kubernetes by this point — about 200 pods across the fleet. And this is where we learned about resource limits the hard way.

Kubernetes fleet: API pods, workers, batch jobs, and Kafka consumers isolated on separate nodes — workload types never share resources · Click chart to expand

A Tuesday night. Traffic was normal. At 2:14 AM, monitoring showed a spike in 5xx errors. Within three minutes, 40 percent of our pods were in CrashLoopBackOff.

The investigation: a nightly batch job had been deployed that afternoon. It ran a memory-intensive data transformation — loading a few hundred megabytes into memory, processing, writing out. The developer had tested it locally with plenty of RAM. In production, the pod's memory limit was 512MB, inherited from the default template.

The pod consumed 480MB (normal application usage) plus the batch job's allocation. Exceeded 512MB. The Linux kernel's OOM killer sent SIGKILL — not SIGTERM, no graceful shutdown, no chance to finish in-flight requests. Kubernetes restarted the pod. The batch job triggered again on startup (it was a cron-triggered init container, poorly designed). Exceeded memory again. Killed again. CrashLoopBackOff — exponential backoff between restarts. Repeat across every pod that picked up the job.

The fix had three parts:

  1. Moved the batch job to a separate deployment with its own resource limits (2GB memory)
  2. Set application pod limits with actual headroom — requests: 384MB, limits: 768MB
  3. Added a hard rule: batch workloads and API workloads never share a pod

The Kubernetes resource model has a critical asymmetry that catches everyone:

ResourceBelow RequestBetween Request and LimitAbove Limit
CPUGuaranteed schedulingBurst allowed (if available)Throttled (slowed down)
MemoryGuaranteed schedulingBurst allowed (if available)OOMKilled (process dies)

CPU limits throttle. Memory limits kill. This asymmetry means getting memory limits wrong is catastrophically worse than getting CPU limits wrong. A throttled pod is slow. A killed pod is an outage.

For JVM-based services (Java, Kotlin, Scala), set -XX:MaxRAMPercentage=75 to ensure the heap doesn't exceed 75 percent of the container's memory limit. The remaining 25 percent covers metaspace, thread stacks, NIO buffers, and other off-heap allocations. Without this flag, the JVM defaults to using 1/4 of the perceived system memory — which in a container might be the host's memory, not the container's limit. The result: a JVM that thinks it has 64GB and tries to allocate accordingly inside a 2GB container.

Consistency at Scale

At 500K requests per second across sharded databases, replicas, and caches, the consistency model is something you choose explicitly — or discover painfully.

The CAP theorem gets cited a lot and understood rarely. Martin Kleppmann, who wrote the definitive book on distributed data systems, has pointed out that CAP as commonly presented is misleading. The "pick 2 of 3" framing is wrong — network partitions happen whether you want them to or not. The real choice is: when a partition occurs, do you refuse requests (consistency) or serve potentially stale data (availability)?

But here's what Kleppmann argues matters more: the everyday tradeoff isn't partition tolerance at all. It's latency versus consistency. Daniel Abadi formalized this as PACELC — if Partition, choose Availability or Consistency; Else (no partition), choose Latency or Consistency.

We made different consistency choices for different data:

Data TypeConsistency ModelWhy
Claim adjudication recordsSynchronous replication, read from primaryLosing or duplicating a payment determination is unacceptable
Provider/payer profilesAsync replication, read from replicaA 200ms stale profile is fine
Claims dashboard aggregationsCached, TTL-based, eventualBilling coordinators tolerate data that's 60 seconds old
Claim search indexAsync event-driven, eventualSearch lag of a few seconds is invisible
HIPAA audit logsAppend-only, async, at-least-onceMust eventually be complete, order within tenant matters, real-time doesn't

There is no "consistent" or "eventually consistent" system — there are systems that make different consistency choices for different data paths. The architecture is in knowing which data demands what.

Event Sourcing: Where We Used It and Where We Didn't

At this scale, we adopted event sourcing for exactly one subsystem: the claims adjudication ledger — the core of what the platform existed to do. Every payment determination was stored as an immutable event — "claim CLM-2023-04871 adjudicated, $47,200 approved for 38 of 47 line items," "line items 12, 19, 24-28, 31, 40 denied — reason: medical necessity not established," "appeal filed, clinical documentation attached." The current status of any claim was a projection — a materialized view built by replaying events.

This made audit trivial. "What was the adjudication status of claim CLM-2023-04871 on March 15, before the appeal was filed?" became a query against the event stream, not a prayer. Reconciliation between payer and provider systems became a comparison of event streams. HIPAA compliance and CMS audit requirements — which demanded a complete, tamper-evident history of every claim determination — were built into the data model.

We did not use event sourcing for anything else.

Event sourcing is powerful for domains where you need a complete temporal history — financial systems, compliance-heavy workflows, complex state machines. For everything else, it adds cognitive overhead, makes debugging harder (you're replaying events to understand state, not reading a row), and creates a schema evolution nightmare. Changing the structure of events after they're stored requires versioning, upcasting, or migration strategies that rival database migrations in complexity. Use it where the domain demands it. Use CRUD for the rest.

The Numbers That Matter

Across all five stages, the decisions were driven by numbers — not predictions, not best practices, not conference talks. Measured bottlenecks, specific interventions, measured results.

Here's the full picture:

StageTrafficDatabaseCacheQueueKey Intervention
150 rpsSingle Postgres, default configUnusedNoneIndexes + Postgres tuning
2500 rpsSingle Postgres + PgBouncerRedis (single)NoneConnection pooling + caching
35K rpsPrimary + 2 read replicasRedis (single)SQSRead/write split + async processing
450K rps4 shards (16 logical)Redis (single)SQS + KafkaTenant-based sharding
5500K rps12 shards (48 logical)Redis Cluster (6 nodes)SQS + Kafka (96 partitions)Everything above, plus Kubernetes fleet management
The scaling journey: each stage is a response to a measured bottleneck, not a predicted one · Click chart to expand

Each stage bought roughly a 10x increase in capacity. Each stage was the smallest intervention that pushed the ceiling high enough to buy time for the next stage. We never jumped ahead. We never built for a scale we hadn't measured our way toward.

The system isn't elegant. It's a layered accumulation of responses to real problems — duct tape that turned into load-bearing walls, temporary solutions that proved durable enough to keep. The architecture diagrams look messy compared to the clean boxes and arrows in a conference slide deck.

But it serves half a million requests per second. It's been running for six years. The team that operates it sleeps through the night.

The architecture that works at scale is never the one you designed on day one. It's the one you grew — one measured bottleneck at a time, one intervention at a time, one stage at a time.

That's the whole lesson of this series. The five forces from Part 1 are still at work in every decision described in this article — constraints shaped what we could build, tradeoffs shaped what we chose, failure modes shaped how we protected it, evolution shaped how it grew, operational cost shaped what the team could sustain. The forces don't change as the system scales. The numbers do. And the numbers are what you listen to.

The next part will go deeper into the specific subsystems that hold the architecture together at scale — the load balancer configurations, the CDN edge caching strategies, the exact monitoring setup that tells you when the next bottleneck is approaching before it arrives. The nerdy details continue.

Enjoyed this?

Get new articles delivered to your inbox. No spam, unsubscribe anytime.

Related Articles

More from Narchol