4 main focuses of a system design problem:
Users/Customers: who will use the system? all or subset? how will the system be used?
understand what the data needs to be.
understand how real-time the data needs to be.
Scale (react and write)
how much data is coming through ? how much data is retrieved ?
can there be spikes in traffic ? what avg per second ?
Performance
write-to-read data delay ?
latency for read queries
Cost: minimise cost of development, minimise cost of maintenance
Reliable = Replication and checkpointing
Stages of a typical system design interview:
functional requirements (API): write a sentence -> function signature -> generalise
non-functional requirements: often tradeoff between performance and scalability
scalable (partitioning)
highly performant (in memory)
highly available
consistency
cost
maintainability
high-level design
detailed design
bottlenecks and tradeoffs.
🧐
Why requirements clarification is so important: shows interviewer of your experience and helps make the question doable - impossible to cover everything as very open ended question.
What questions to ask the interviewer: get to grips the functional/non-functional requirements.
Drive the conversation!
How we store data.
individual events: e.g. every click
😊
fast writes
can slice and dice data however we need
can recalculate if needed
😔
slow reads
costly for large scale
aggregate data
😊
fast reads
data is ready for decision making
😔
can query only the way data was aggregated
requires data aggregation pipeline
hard or even impossible to fix errors
expected data delay: time between when event happened and when it was processed
minutes -> stream data processing
hours -> batch data processing
🧐 you can use a hybrid approach to get best of both worlds but increase complexity and cost.
How to scale a distributed (SQL) database (e.g. Vitess).
requirements:
scale writes and reads
fast writes and reads
data durability
strong consistency
trade off with availability - stale data over no data at all
recover data
data security
data model extensibility
Sharding: with a cluster proxy in-front using some configuration service for discovery.
Shard proxy can cache query results, monitor health and publish metrics (i.e. dashboard).
Replication: for data durability using a single-leader, multi-leader
or no leader replication however needs proxy to get quorum for consistency issues.
writes go through leader or quorum.
reads can go through any healthy replica.
use different data centres.
Apache Cassandra high-level architecture.
No cluster proxy but nodes/shard gain network knowledge (of all other nodes) through gossip protocol.
Use quorum reads and writes when using replication for durability.
Use version number to determine staleness of data
Tunable consistency is implemented by changing the required number to be considered a quorum.
Any node can route receive any request and route to appropriate shard (using consistent hashing for example).
This would make the node receiving the request the coordinator node which can take advantage of DNS resolution and resolve closest network node.
It is fault-tolerant, scalable (both read and write throughput increases linearly as new machines are added).
Supports multi-datacenter and asynchronous masterless replication and works well with time-series data.
SQL vs NoSQL data model:
Relational databases: start with defining nouns which are converted into tables and use foreign keys to reference them in other tables.
We then use joins to generate the report.
There is a no redundancy and the data is in normalised form.
This makes transactions quicker and improve write speed.
NoSQL: think in terms of storing queries.
Are not scared of storing data in denormalised form.
Makes read much faster.
Cassandra is a column NoSQL database we keep adding columns instead of rows for each new piece of data e.g. add column for the new hour of the view count for the each video
Other types of NoSQL are Document, Key-Value and Graph databases.
MongoDB (Document oriented db) uses leader-based replication. HBase is column oriented but master-based replication.
Data processing concepts:
in-memory aggregation: to scale better (vertically) we should pre-aggregate data in the processing service
we then push the new count to the database periodically
checkpointing: should users send async message directly to service (push) or service pull from storage.
it is easier to implement fault tolerance with pull.
we can then implement checkpointing on the storage (which can be visualised as a queue).
new machine will resume from checkpoint.
we can then use a deduplication cache if processed twice.
partitioning: instead of putting all events in one queue (storage) use multiple queues
we can do this through hashing on some id.
partition consumer: infinite loop that consumes from partition and deserialises byte array into object.
usually consumer is a single threaded but can be multi threaded however hard to reason about order of events and therefore checkpointing becomes harder.
aggregator: this can be a multi-threaded hash table that does are counting and processes events using an in memory store.
internal queue: this serialises sending the old hash table to the database.
database writer will take messages from internal queue.
this can be multi-threaded as well but again checkpointing becomes harder.
deduplication cache: the partition consumer will check this cache and throw away processed events.
dead-letter queue: if messages cannot be routed to downstream service in case of overload or failure.
this is separate from partition and another process will process them at a later date (i.e flow control)
embedded database: this is for data enrichment.
state management of in-memory store
we can recreate state from point of failure by re-processing events (due to checkpointing)
we could also backup in-memory store to separate durable storage.
Data ingestion pipeline concepts:
API Gateway:
blocking vs non-blocking I/O: one thread per connection (use rate limiter) vs requests are queued and processed using a pool.
non-blocking systems cost is increased complexity as much harder to debug.
blocking systems allow you to catch and use exceptions as well as thread local variables.
buffering and batching: if pass single requests to Partitioner Service then the PS cluster will have to be just as big, so we buffer and batch multiple requests.
introduces complexity for a failed (timeout) requests: resend whole window or only failed windows (or maintain individual timeouts).
timeouts
connection timeout is very quick
request processing timeout should be slowest 1% of requests.
we need to do exponential backoff and jitter algorithms for flow control when we do retries.
circuit breaker pattern
stops a client from executing an operation that is likely to fail
counts number of failed requests if past error threshold stop calling downstream service
stop time later (again retransmission timeout as before) limited number of requests are allowed through - if successful then we can assume service is healthy and allow all requests
makes system more difficult to test and set error threshold properly
Load Balancer:
software vs hardware load balancing
networking protocols that they support affect their decision making ability
higher in the layer (e.g. HTTP vs TCP) means higher processing time as they proxy that protocol.
load balancing algorithms
round robin (in order)
least (active) connections
least response time (fastest)
hash-based on some key
DNS: resolve the IP address of the domain of the service to the load balancer
most proximal load balancer will be chosen
we then need some discovery method (can be statically code in the load balancer).
health checking: pings each server periodically to check and only routes to healthy nodes.
high availability of load balancers: primary and secondary in tandem (with one serving and one checking) and live in different data centres.
Partitioner Service (sits in front of partitions and sends to correct partition) and Partitions
the queue that it consumes: is append only log file that is totally ordered by timestamp.
partition strategy: hash function that sends video IDs to a certain partition
hot partitions: however very popular videos will end up routing many requests to one partition
include event time into the hash function key
hot partition is split into two partition - like consistent hashing
allocate dedicated partitions to certain video channels
service discovery (client-side and server-side)
register with some common service (Zookeeper) that the load-balancer then queries to get routing information.
the common service can also perform health checks.
we can also use gossip protocol to discover network.
or statically encode
single leader replication: like scaling SQL database
multi-leader replication is for multiple data centres commonly
leaderless replication: like Cassandra and requires quorum.
textual vs binary data formats: keeps field names vs remove them and use a schema (need common database to pull it from and may have versioning problems)
Data retrieval pipeline concepts
time-series data: we aggregate for periods
data rollup allows to reduce the aggregation periods we have to store - e.g. we store per minute count for several days after one week, we aggregate into per hour data etc
we keep data for the last few days in the database then move to offshore object storage
hot storage and cold storage: we can then make it cheaper to store the older data by storing it in less accessible (lower latency) storage.
the query service will perform data federation to route the request to the correct storage.
distributed cache for recent queries that the query se rvice will consult.
Types of performance testing:
load testing: can the system handle the desired load - test scalability.
stress testing: how much can the system handle till it breaks (shows bottlenecks).
soak testing: nothing breaks over time e.g. memory(/resource ) leaks.
Health monitoring: latency, traffic, errors and saturation
Audit systems: check that it is working correctly
weak audit system is a continuously running end to end test: what if it loses event in rare scenarios
strong audit system is a tandem system that often performs slower like a batch processing system compared to a stream processing system and results are compared and checked.
Distributed Caching
functional: put(key, value); get(key)
non-functional: scalable, highly available (even under network partition/failure), performant
cache eviction policy: LRU in local cache uses doubly linked list to get fast get, queue and delete operations.
dedicated cache cluster:
😊 isolation of resources between service and cache
😊 can be used by multiple services
😊 flexibility in choosing hardware
co-located cache:
😊 no extra hardware and operational cost
😊 scales together with the service
Consistent Hashing
Naive: hash_function(key)%NumberOfCacheHosts
because if failure of server or addition of new server can direct the client to the wrong shard (for a large range of hashes as everything is shifted) resulting in large number of cache misses
Instead we can use assign hash ranges to a host (imagine hosts assigned a circle) and the hash range is it to the next host. Then clients simply work backwards from the hash to find the host. This means addition and failure only requires a smaller part of the circle to be re-hashed to a new host.
However we still may get very skewed ranges so we use undefined hash functions and each host has multiple ranges in the pie (i.e. split up) (as this tends to undefined we get very evenly divided ranges).
Cache client must discover available cache hosts and maintain sorted order of them. They can then use binary search to find which host is responsible for the hash. But requires all clients to have same order.
Knowledge of cache hosts could be static local file or from some remote storage (that they can refresh periodically).
We could instead use something a configuration service like Zookeeper that the cache client can query.
May also implement local cache.
Expired item: passively expire, or actively using some maintenance pass but cannot just iterate through all usually use probabilistic algorithm to test a few entries randomly.
Not optimised for security usually trusted clients and environment so firewall should be used.
Encryption of cache could also be used on either end
Distributed Message Queue
Load balancer -> FrontEnd (entrypoint) web service -> Metadata service -> Database
FrontEnd also connects to the Backend service that stores message queue
FrontEnd web service is responsible for:
Request Validation, Authentication/Authorisation
TLS (SSL) termination
Server-side encryption so all messages in Backend are encrypted.
Caching: store metadata on queues like most used
Rate limiting (throttling): Lea ky bucket algorithm
Downstream request dispatch
Request deduplication
Usage data collection
Backend: let's not use RabbitMQ and instead use RAM and local disk to store the message queue durably on a cluster of machines with replication.
Metadata will tell us how to interact with Backend cluster to push and pop from the given queue correctly.
We could use single leader paradigm to manage replication.
However this would need some configuration server (in-cluster manager) or effective leader election.
We could get inspiration gossip protocol and have some configuration server to assign all hosts to a queue so no leader is required. (out-cluster)
We then get problems with hot queues and distributing load (splitting up queues) that both managers will have to do.
Message deletion caveats
Do not delete right after it was been consumed, meaning consumers should be careful not to process twice (or mark as invisible) and then later on a maintenance job performs cleanup (or explicit delete message API).
This also means the queue head will be an offset from the start of actual storage.
Messages should be replicated for durability.