# SYS10: File Caching
**Question Track**: Distributed Data Systems. Use for L4+ candidates being hired into systems roles.
**Owner**: Hans Norheim (hans.norheim)\\
**Contributors**: Divjot Arora (div.arora), Yan Yao (yan.yao)
## Introduction
This question tests:
- Creative problem-solving.
- Understanding of network and disk IO. Instincts around latency, throughput, and performance.
- Experience and instincts around multi-threading and synchronization.
- Working within the bounds of a given system interface.
It is recommended for L4+ and not new college grads unless they have a background in systems code. It is deep enough and has enough potential bonus stages that it should fill an entire interview even for experienced systems experts.
## Question
We are building a system that needs random, repeated read access to data in a file stored in AWS S3 (cloud object storage). Directly issuing reads to the cloud has considerable latency, so we want to implement a local caching layer.
Each file may be read multiple times, sequentially or randomly, in whole or in part. We want to implement a `CachedFile` class that caches file contents locally to speed up reads. It should download the file using the supplied `StorageClient` and respond to read requests via the `read()` method.
**Provided code:**
```scala
/** Client to download files from the internet. */
trait StorageClient {
/** Get the size of the file at $uri. */
def getFileSize(uri: String): Long
/** Fetch $length bytes at $offset from $uri into $dstBuf. */
def fetch(uri: String, offset: Long, length: Long, dstBuf: Array[Byte]): Unit
}
Junior candidates can be given an IO api like this if they need help.
/** Thread safe file random IO API. */
class RandomIoFile(path: String) {
def write(offset: Int, srcBuf: Array[Byte]): Unit
def read(offset: Int, length: Int, dstBuf: Array[Byte]): Unit
}
Code to implement:
/** This class downloads and caches a remote file (from $uri) for repeated reading. */
class CachedFile(uri: String, client: StorageClient) {
/** Read $length bytes at $offset into $dstBuf (as fast as possible). */
def read(offset: Long, length: Long, dstBuf: Array[Byte]): Unit = {
// your code...
}
}
See appendix for method stubs in Go, C++, Java and Python.
Make sure to mention:
CachedFile
instance is instantiated by the file system per unique file.Stage 1 is a soft-start to allow TC to internalize the problem and to facilitate discussion around requirements and uncover misunderstandings. Focus on a stupid-simple, suboptimal, but correct solution before optimizations. Start by assuming single-threaded if necessary. Ensure TC has understood the question and the expected behavior of the read()
method. If TC quickly identifies optimizations in stage 2 and you are confident they have understood the requirements, you can seamlessly transition to stage 2.
Take an active role as interviewer. Facilitate an active conversation. Provide coaching. Drop hints if necessary to keep moving forward if they’re stuck. Don’t just be a passive listener. To grade, judge how much you had to participate to get to a good solution, and how much the candidate did themselves.
Let senior candidates ask clarifying questions to discover the following. For junior candidates (or if senior candidates make incorrect assumptions), be more forthcoming.
read()
should block until the requested data can be returned.read()
.read()
have valid parameters (eg. the caller won’t ask for data past EOF).read()
until the file has been downloaded).TC’s comfort level with “nuts and bolts” type code.
Basic correctness
Thread safety, TC’s comfort level with concurrent code.
Multiple concurrent readers don’t all try to download the file.
The technique TC uses to block readers until the download is done.
It’s good if TC has an aversion to do IO under a lock like this, but it’s actually OK here.
Is it some form of busy waiting/polling? It wastes CPU and probably introduces a delay between download complete and waking up the readers. This is acceptable for L4, but press for proper thread signalling for L5+. For L6+ it is required.
In a production implementation, this would often be solved by "parking" threads on a signal/event that is later flagged. But in our simple case here, a perfectly fine solution is to just put fetch()
under a synchronized
block.
Watch out for suspect synchronization schemes. For example, if TC suggests a var downloadComplete: Boolean
and busy-wait polling this variable in readers, be very suspicious. Unless locks, synchronized
, volatile,
or other synchronization primitives are involved, it is likely wrong. The Java Memory Model does not guarantee that one thread’s write will ever be seen by another thread unless volatile, locks, or similar is involved.
Do they state assumptions on the thread safety of the file system APIs they use?
Candidates may comment that:
fetch()
should be an async method.fetch()
returned a stream to avoid big array buffers.
Both are good observations, but we skipped it for simplicity.The sample solutions don’t have error handling and omit some optimizations. We also don’t normalize URI characters to allowed file system names.
class CachedFile(uri: String, client: StorageClient) {
var file: RandomIoFile = null
def read(offset: Long, length: Long, dstBuf: Array[Byte]): Unit = {
this.synchronized {
if (file == null) {
val size = client.getFileSize(uri)
val buf = new Array[Byte](size)
file = new RandomIoFile(s"/cache/$uri")
client.fetch(uri, offset = 0, length = size, buf)
file.write(offset = 0, buf)
}
}
file.read(offset, length, dstBuf)
}
}
// We assume a thread-safe file IO API like this:
class RandomIoFile(path: String) {
def write(offset: Int, srcBuf: Array[Byte]): Unit
def read(offset: Int, length: Int, dstBuf: Array[Byte]): Unit
}
Add the following requirements.
read()
appears to block for a long time in the beginning. Anything we can do to avoid that?TC should ideally uncover the following optimizations. Help them with hints if they are lost.
There are two strategies to download the file:
<ins>A) Entire file from the beginning.</ins>\ Still chunked download, but all chunks are queued for download on first access.
<ins>B) Chunks faulted in on-demand (“OS page cache” style).</ins>\
Chunks covering the offsets requested by read()
are downloaded on-demand.
Both are good solutions, with A) being a bit simpler than B). We’ll now describe optimizations 1) and 2) and then provide an sample solution for both A) and B).
If the source S3 account is some distance away, a single TCP stream won’t yield more than \~20-30 MB/s. To fully saturate a fat network pipe, we must download multiple chunks of the same file in parallel.
Avoid saying anything about “chunked”, “parallel” or “multi-threaded”. We want TC to come up with chunked downloads by themselves. If TC is stuck, give hints like “Can we use the offset
and length
parameters for something useful?”
Things to look for
TC’s approach to multithreading. If needed you can help them with a simple thread pool construct.
Reasoning in chunks. Can TC devise a scheme to chunk the download using offset
and length
parameters correctly? Things to pay attention to in the implementation:
The output file has the correct data (no overlapping chunks, no holes)
No reading past EOF unless they state assumptions about fetch()
behavior.
Files of length 0 or 1 byte.
The file is not fully downloaded until all chunks are finished.
read()
calls on a chunk boundary (reads spanning two chunks).
Does TC suggest a sensible scheme to determine chunk sizes? Too small and the download becomes “chatty”, reducing throughput. Too high and chunk latency goes up, exposing us to stragglers. A reasonable range might be 10 to a few 100s MB. Maybe dynamic depending on the total file size. Does TC think about potential API/storage throttling if the chunk size is too small, resulting in too many fetch()
calls?
Does TC think about the level of parallelism? How many threads to get the best throughput for a single file? Although we haven’t explicitly said so, this may not be the only file download happening. If there are 100 parallel file downloads, it’s not a good idea to start 100 threads for each.
In a naive implementation, read()
blocks until the entire file has been downloaded. We can do better.
See if TC has a feel for the length
s read()
might be called with. File systems often use buffers <= 1 MB to fit in L3 CPU cache. read()
should return as soon as the requested byte range is available and not wait until the whole file is downloaded.
One solution is a “low water mark” - the offset to which the download has been completed. Another is to track the chunks that are complete. Either case requires synchronization between the read()
thread and the download threads.
Things to look for:
How do the downloader threads communicate completion to the read() threads? As in Stage 1, watch for suspect synchronization schemes. Simply marking completion in an array without synchronization is not sufficient! Experienced candidates should avoid busy waiting/polling.
Correct usage of standard synchronization primitives. For example, do they know that the Object.wait()
method in Java is subject to spurious wakeups?
Depending on time available, discuss the following with TC:
Picking chunk sizes\ Have a discussion around tradeoffs on how big the chunks should be.\ <ins>Smaller:</ins> +Higher paralllism, +lower completion latency, -chattier, -could cause storage throttling.\ <ins>Bigger:</ins> +Less overhead, +less storage API calls, -lower paralllism, -higher completion latency, -exposed to stragglers.
Could it be dynamic/heuristics based depending on file size, roundtrip latency and system workload?
Optimizing memory handling\ Handling buffers that are the size of a full chunk isn't great for memory consumption. If they are not reused, it can cause heap pressure or high garbage collection (GC) load. Probe whether TC has knowledge of this, and what they would do to improve it. Ideas include reusing buffers or using streams with smaller buffers.
The following solution implements both optimizations 1 and 2. A simple Array[Boolean]
is used to track chunk completion and Java’s built-in Object.notifyAll()
is used to signal readers. Candidates don’t need to write the code in the same detail level as here - this is to illustrate that a complete solution need not be complex.
val CHUNK_SIZE = 100 * 1024 * 1024 // 100 MB
val executor = Executors.newFixedThreadPool(CPU_COUNT * 4)
class CachedFile(uri: String, client: StorageClient) {
val size = client.getFileSize(uri)
val numChunks = (size.toDouble / CHUNK_SIZE).ceil.toInt
val chunkStatus = new Array[Boolean](numChunks) // Tracks completion status by chunk.
var file = new RandomIoFile(s"/cache/$uri") // Underlying file
file.setSize(size)
for (i <- 0 until numChunks) {
val offset = i * CHUNK_SIZE
val length = min(size - offset, CHUNK_SIZE)
// Download in parallel, in the background
executor.execute(() => {
val buf = new Array[Byte](length)
client.fetch(uri, offset, length, buf) // These two are
file.write(offset, buf) // outside lock.
this.synchronized { // This synchronizes on shared state with readers
chunkStatus(i) = true
this.notifyAll() // Wake up readers
}
})
}
def read(offset: Long, length: Long, dstBuf: Array[byte]): Unit = {
waitForData(offset, offset + length)
file.read(offset, length, dstBuf)
}
private def waitForData(offsetBegin: Long, offsetEnd: Long): Unit = {
this.synchronized {
val firstChunk = offsetBegin / CHUNK_SIZE
val lastChunkExclusive = (offsetEnd - 1) / CHUNK_SIZE + 1
while (!chunkStatus.slice(firstChunk, lastChunkExclusive).allTrue())
this.wait()
}
}
}
// We assume a thread-safe file IO API like this:
class RandomIoFile(path: String) {
def write(offset: Int, srcBuf: Array[Byte]): Unit
def read(offset: Int, length: Int, dstBuf: Array[Byte]): Unit
}
This is similar as solution A), except:
In this solution, it might be advisable to reduce the chunk size so that the on-demand IO latency is reduced. Solutions A) and B) could also be combined with a priority scheme: Give priority to chunks being accessed, but use available bandwidth to download all chunks in the background.
val CHUNK_SIZE = 10 * 1024 * 1024 // 10 MB
val executor = Executors.newFixedThreadPool(CPU_COUNT * 4)
enum ChunkState:
case UNFETCHED, FETCHING, FETCHED
class CachedFile(uri: String, client: StorageClient) {
val size = client.getFileSize(uri)
val numChunks = (size.toDouble / CHUNK_SIZE).ceil.toInt
val chunkStatus = Array.fill(numChunks) { ChunkState.UNFETCHED }
var file = Files.create(s"/cache/$uri") // Underlying file
file.setSize(size)
def read(offset: Long, length: Long, dstBuf: Array[byte]): Unit = {
ensureData(offset, offset + length)
file.read(offset, length, dstBuf)
}
/** Ensures the requested byte range is available locally. */
private def ensureData(offsetBegin: Long, offsetEnd: Long): Unit = {
this.synchronized {
val firstChunk = offsetBegin / CHUNK_SIZE
val lastChunkExcl = (offsetEnd - 1) / CHUNK_SIZE + 1
// Start download of any chunks that have not yet been started.
for (i <- Range(firstChunk, lastChunkExcl).filter(chunkStatus(_) == ChunkState.UNFETCHED)) {
val offset = i * CHUNK_SIZE
val length = min(size - offset, CHUNK_SIZE)
// Download in parallel, in the background
chunkStatus(i) = ChunkState.FETCHING
executor.execute(() => {
val buf = new Array[Byte](length)
client.fetch(uri, offset, length, buf) // These two are
file.write(offset, buf) // outside lock.
this.synchronized { // This synchronizes on shared state with readers
chunkStatus(i) = ChunkState.FETCHED
this.notifyAll() // Wake up readers
}
})
}
// Wait for all needed chunks to be complete (.wait() release the lock).
while (!chunkStatus.slice(firstChunk, lastChunkExcl).forall(_ == ChunkState.FETCHED))
this.wait()
}
}
}
Here is a solution in Go:
package main
const (
PartitionSize int64 = 50 * (2 << 20) // 50 MiB
)
type StorageClient interface {
GetFileSize(uri string) int64
Fetch(uri string, offset int64, length int64, dstBuf []byte)
}
type CachedFile struct {
uri string
size int64
client StorageClient
file *RandomIoFile
partitions []bool // true if chunk is downloaded
partitionsLock sync.Mutex
partitionsCondVars []*sync.Cond
}
func NewCachedFile(uri string, client StorageClient) *CachedFile {
size := client.GetFileSize(uri)
file := NewRandomIoFile("./cachefile")
cf := &CachedFile{
uri: uri,
size: size,
client: client,
file: file,
partitions: make([]bool, int(math.Ceil(float64(size) / float64(PartitionSize)))),
partitionsCondVars: make([]*sync.Cond, int(math.Ceil(float64(size) / float64(PartitionSize)))),
}
for i := range cf.partitionsCondVars {
cf.partitionsCondVars[i] = sync.NewCond(&cf.partitionsLock)
}
go cf.downloadFile()
return cf
}
// Read reads data from CachedFile, assuming inputs are valid
func (c *CachedFile) Read(offset int64, length int64, dstBuf []byte) {
firstPartitionIdx := getPartitionIdx(offset)
lastPartitionIdx := getPartitionIdx(offset + length - 1)
for i := firstPartitionIdx; i <= lastPartitionIdx; i++ {
c.partitionsLock.Lock()
for !c.partitions[i] {
// Wait atomically unlocks c.L and suspends execution of the calling goroutine.
// After later resuming execution, Wait locks c.L before returning.
// Unlike in other systems, Wait cannot return unless awoken by Broadcast or Signal.
c.partitionsCondVars[i].Wait()
}
c.partitionsLock.Unlock()
}
c.file.Read(offset, length, dstBuf)
}
// getPartitionIdx calculates partition index
func getPartitionIdx(lastByteIdx int64) int {
return int(float64(lastByteIdx) / float64(PartitionSize))
}
// downloadFile downloads file partitions concurrently
func (c *CachedFile) downloadFile() {
partitionCh := make(chan int, len(c.partitions))
for i := 0; i < len(c.partitions); i++ {
partitionCh <- i
}
close(partitionCh)
for i := 0; i < 10; i++ {
go func() {
var buf [PartitionSize]byte
for {
partitionIdx, ok := <-partitionCh
if !ok {
return
}
offset := int64(partitionIdx) * PartitionSize
length := int64(math.Min(float64(PartitionSize), float64(c.size-offset)))
c.client.Fetch(c.uri, offset, length, buf[:])
c.file.Write(offset, buf[:length])
c.partitionsLock.Lock()
c.partitions[partitionIdx] = true
c.partitionsCondVars[partitionIdx].Broadcast()
c.partitionsLock.Unlock()
}
}()
}
}
If TC makes it through the above with time to spare, here are ideas for bonus items to explore to see how far they can go. The first (parallelism limiting) is a good one to code. The rest are good for discussion.
Parallelism limiting and fairness across multiple files
Async IO\
What changes would they make to read()
and fetch()
and their code to make IO async and consume fewer threads?
Fancier download schemes\ Such as combining solutions 2A) and 2B) with a priority scheme giving priority to chunks being accessed, but using available bandwidth to download the entire file in the background.
Advanced local IO schemes\ Eg: Using a memory-mapped file instead of a regular file handle for reading and writing data to the working file.
Grading is based on using the entire interview on the problem. This question is relatively new so the below are guidelines and not hard-and-fast rules. We need your help to further calibrate the question. Use your discretion when administering and give me feedback.
<ins>L4</ins> \ L4s may not intuitively grasp the problem, may not be comfortable with file system APIs and may not have great instincts around writing concurrent code.
<ins>L5</ins> \ L5s should more quickly grasp the problem, be more comfortable with file system APIs and have experience writing concurrent code, although it may not be pretty.
<ins>L6+</ins> \ For L6s+ with systems experience, most aspects of this question should be familiar territory. They should intuitively grasp the problem, be comfortable with file system APIs and write good concurrent code without major race conditions and with proper thread signalling and no busy-wait loops.
Provided Code
// Client to download files from the internet.
type StorageClient interface {
// Get the size of the file at $uri.
GetFileSize(uri string) int64
// Fetch $length bytes at $offset from $uri into $dstBuf.
Fetch(uri string, offset int64, length int64, dstBuf []byte)
}
Code to implement:
type CachedFile struct {
uri string
client StorageClient
}
// Read $length bytes at $offset into $dstBuf (as fast as possible).
func (c *CachedFile) Read(offset int64, length int64, dstBuf []byte) {
// Your code
}
Provided code:
# Client to download files from the internet.
class StorageClient:
def getFileSize(uri): # -> long
pass
# Fetch $length bytes at $offset from $uri into $dstBuf
def fetch(self, uri, offset, length, dstBuf):
pass
Code to implement:
# This class downloads and caches a remote file for repeated reading.
# The file is at $uri and has length $size bytes.
class CachedFile:
def __init__(self, uri, client):
self.uri = uri
self.client = client
# Read $length bytes at $offset into $dstBuf (as fast as possible).
def read(self, offset, length, dstBuf):
# your code...
Provided code:
/** Client to download files from the internet. */
interface StorageClient {
/** Get the size of the file at $uri. */
long getFileSize(String uri);
/** Fetch $length bytes at $offset from $uri into $dstBuf */
void fetch(String uri, long offset, long length, byte[] dstBuf)
}
Code to implement:
/**
* This class downloads and caches a remote file at $uri for repeated reading.
*/
class CachedFile {
private String uri;
private StorageClient client;
public CachedFile(String uri, StorageClient client) {
this.uri = uri;
this.client = client
}
/** Read $length bytes at $offset into $dstBuf (as fast as possible). */
public void read(long offset, long length, byte[] dstBuf) {
// your code...
}
}
Provided code:
/* Client to download files from the internet. */
class IStorageClient {
public:
/** Get the size of the file at $uri. */
virtual long getFileSize(std::string& uri) = 0;
/* Fetch $length bytes at $offset from $uri into $dstBuf. */
virtual void fetch(std::string& uri, long offset, long length, unsigned char* dstBuf) = 0;
}
Code to implement:
/*
* This class downloads and caches a remote file at $uri for repeated reading.
*/
class CachedFile {
std::string uri;
IStorageClient& client;
public:
CachedFile(std::string& uri, IStorageClient& client)
: uri(uri), client(client)
{ }
/* Read $length bytes at $offset into dstBuf (as fast as possible). */
void read(long offset, long length, unsigned char* dstBuf) {
// your code...
}
};