# SYS10: File Caching

**Question Track**: Distributed Data Systems. Use for L4+ candidates being hired into systems roles.

**Owner**: Hans Norheim (hans.norheim)\\
**Contributors**: Divjot Arora (div.arora), Yan Yao (yan.yao)

## Introduction

This question tests:
- Creative problem-solving.
- Understanding of network and disk IO. Instincts around latency, throughput, and performance.
- Experience and instincts around multi-threading and synchronization.
- Working within the bounds of a given system interface.

It is recommended for L4+ and not new college grads unless they have a background in systems code. It is deep enough and has enough potential bonus stages that it should fill an entire interview even for experienced systems experts.

## Question

We are building a system that needs random, repeated read access to data in a file stored in AWS S3 (cloud object storage). Directly issuing reads to the cloud has considerable latency, so we want to implement a local caching layer.

Each file may be read multiple times, sequentially or randomly, in whole or in part. We want to implement a `CachedFile` class that caches file contents locally to speed up reads. It should download the file using the supplied `StorageClient` and respond to read requests via the `read()` method.

**Provided code:**
```scala
/** Client to download files from the internet. */
trait StorageClient {
    /** Get the size of the file at $uri. */
    def getFileSize(uri: String): Long
    
    /** Fetch $length bytes at $offset from $uri into $dstBuf. */
    def fetch(uri: String, offset: Long, length: Long, dstBuf: Array[Byte]): Unit
}

Junior candidates can be given an IO api like this if they need help.

/** Thread safe file random IO API. */
class RandomIoFile(path: String) {
    def write(offset: Int, srcBuf: Array[Byte]): Unit
    def read(offset: Int, length: Int, dstBuf: Array[Byte]): Unit
}

Code to implement:

/** This class downloads and caches a remote file (from $uri) for repeated reading. */
class CachedFile(uri: String, client: StorageClient) {
    /** Read $length bytes at $offset into $dstBuf (as fast as possible). */
    def read(offset: Long, length: Long, dstBuf: Array[Byte]): Unit = {
        // your code...
    }
}

See appendix for method stubs in Go, C++, Java and Python.

Make sure to mention:

Stage 1 - Basic Solution

Stage 1 is a soft-start to allow TC to internalize the problem and to facilitate discussion around requirements and uncover misunderstandings. Focus on a stupid-simple, suboptimal, but correct solution before optimizations. Start by assuming single-threaded if necessary. Ensure TC has understood the question and the expected behavior of the read() method. If TC quickly identifies optimizations in stage 2 and you are confident they have understood the requirements, you can seamlessly transition to stage 2.

Take an active role as interviewer. Facilitate an active conversation. Provide coaching. Drop hints if necessary to keep moving forward if they’re stuck. Don’t just be a passive listener. To grade, judge how much you had to participate to get to a good solution, and how much the candidate did themselves.

Let senior candidates ask clarifying questions to discover the following. For junior candidates (or if senior candidates make incorrect assumptions), be more forthcoming.

“Hidden” requirements

Stage 1 tests the following candidate skills:

Things to look for

Sample Solution

The sample solutions don’t have error handling and omit some optimizations. We also don’t normalize URI characters to allowed file system names.

class CachedFile(uri: String, client: StorageClient) {
    var file: RandomIoFile = null
    def read(offset: Long, length: Long, dstBuf: Array[Byte]): Unit = {
        this.synchronized {
            if (file == null) {
              val size = client.getFileSize(uri)
              val buf = new Array[Byte](size)
              file = new RandomIoFile(s"/cache/$uri")
              client.fetch(uri, offset = 0, length = size, buf)
              file.write(offset = 0, buf)
            }
        }
        file.read(offset, length, dstBuf)
    }
}

// We assume a thread-safe file IO API like this:
class RandomIoFile(path: String) {
    def write(offset: Int, srcBuf: Array[Byte]): Unit
    def read(offset: Int, length: Int, dstBuf: Array[Byte]): Unit
}

Stage 2: Optimizations & Concurrency

Add the following requirements.

  1. Files can be large (up to 10 GB). There could be considerable latency to the source.
  2. The first read() appears to block for a long time in the beginning. Anything we can do to avoid that?

TC should ideally uncover the following optimizations. Help them with hints if they are lost.

  1. Chunked downloads\ A single TCP stream to a far-away source will not be fast.
  2. Partial file completion tracking\ Waiting for the entire file to be downloaded before serving the first byte is suboptimal.

There are two strategies to download the file:

Both are good solutions, with A) being a bit simpler than B). We’ll now describe optimizations 1) and 2) and then provide an sample solution for both A) and B).

1) Chunked downloads

If the source S3 account is some distance away, a single TCP stream won’t yield more than \~20-30 MB/s. To fully saturate a fat network pipe, we must download multiple chunks of the same file in parallel.

2) Partial File Completion Tracking

In a naive implementation, read() blocks until the entire file has been downloaded. We can do better.

Further discussion points

Depending on time available, discuss the following with TC:

Sample Solution for A) - Download file from the beginning

The following solution implements both optimizations 1 and 2. A simple Array[Boolean] is used to track chunk completion and Java’s built-in Object.notifyAll() is used to signal readers. Candidates don’t need to write the code in the same detail level as here - this is to illustrate that a complete solution need not be complex.

val CHUNK_SIZE = 100 * 1024 * 1024 // 100 MB
val executor = Executors.newFixedThreadPool(CPU_COUNT * 4)

class CachedFile(uri: String, client: StorageClient) {
    val size = client.getFileSize(uri)
    val numChunks = (size.toDouble / CHUNK_SIZE).ceil.toInt
    val chunkStatus = new Array[Boolean](numChunks) // Tracks completion status by chunk.
    var file = new RandomIoFile(s"/cache/$uri") // Underlying file
    file.setSize(size)

    for (i <- 0 until numChunks) {
        val offset = i * CHUNK_SIZE
        val length = min(size - offset, CHUNK_SIZE)

        // Download in parallel, in the background
        executor.execute(() => {
            val buf = new Array[Byte](length)
            client.fetch(uri, offset, length, buf) // These two are
            file.write(offset, buf)                // outside lock.

            this.synchronized { // This synchronizes on shared state with readers
                chunkStatus(i) = true
                this.notifyAll() // Wake up readers
            }
        })
    }

    def read(offset: Long, length: Long, dstBuf: Array[byte]): Unit = {
        waitForData(offset, offset + length)
        file.read(offset, length, dstBuf)
    }

    private def waitForData(offsetBegin: Long, offsetEnd: Long): Unit = {
        this.synchronized {
            val firstChunk = offsetBegin / CHUNK_SIZE
            val lastChunkExclusive = (offsetEnd - 1) / CHUNK_SIZE + 1
            while (!chunkStatus.slice(firstChunk, lastChunkExclusive).allTrue())
                this.wait()
        }
    }
}

// We assume a thread-safe file IO API like this:
class RandomIoFile(path: String) {
    def write(offset: Int, srcBuf: Array[Byte]): Unit
    def read(offset: Int, length: Int, dstBuf: Array[Byte]): Unit
}

Sample Solution for B) - Fault in chunks on demand, “OS page cache” style

This is similar as solution A), except:

In this solution, it might be advisable to reduce the chunk size so that the on-demand IO latency is reduced. Solutions A) and B) could also be combined with a priority scheme: Give priority to chunks being accessed, but use available bandwidth to download all chunks in the background.

val CHUNK_SIZE = 10 * 1024 * 1024 // 10 MB
val executor = Executors.newFixedThreadPool(CPU_COUNT * 4)

enum ChunkState:
    case UNFETCHED, FETCHING, FETCHED

class CachedFile(uri: String, client: StorageClient) {
    val size = client.getFileSize(uri)
    val numChunks = (size.toDouble / CHUNK_SIZE).ceil.toInt
    val chunkStatus = Array.fill(numChunks) { ChunkState.UNFETCHED } 
    var file = Files.create(s"/cache/$uri") // Underlying file
    file.setSize(size)

    def read(offset: Long, length: Long, dstBuf: Array[byte]): Unit = {
        ensureData(offset, offset + length)
        file.read(offset, length, dstBuf)
    }

    /** Ensures the requested byte range is available locally. */
    private def ensureData(offsetBegin: Long, offsetEnd: Long): Unit = {
        this.synchronized {
            val firstChunk = offsetBegin / CHUNK_SIZE
            val lastChunkExcl = (offsetEnd - 1) / CHUNK_SIZE + 1

            // Start download of any chunks that have not yet been started.
            for (i <- Range(firstChunk, lastChunkExcl).filter(chunkStatus(_) == ChunkState.UNFETCHED)) {
                val offset = i * CHUNK_SIZE
                val length = min(size - offset, CHUNK_SIZE)

                // Download in parallel, in the background
                chunkStatus(i) = ChunkState.FETCHING
                executor.execute(() => {
                    val buf = new Array[Byte](length)        
                    client.fetch(uri, offset, length, buf)  // These two are
                    file.write(offset, buf)                 // outside lock.

                    this.synchronized { // This synchronizes on shared state with readers
                        chunkStatus(i) = ChunkState.FETCHED
                        this.notifyAll() // Wake up readers
                    }
                })
            }

            // Wait for all needed chunks to be complete (.wait() release the lock).
            while (!chunkStatus.slice(firstChunk, lastChunkExcl).forall(_ == ChunkState.FETCHED))
                this.wait()
        }
    }
}

Here is a solution in Go:

package main

const (
  PartitionSize int64 = 50 * (2 << 20) // 50 MiB
)

type StorageClient interface {
  GetFileSize(uri string) int64
  Fetch(uri string, offset int64, length int64, dstBuf []byte)
}

type CachedFile struct {
  uri                string
  size               int64
  client             StorageClient
  file               *RandomIoFile
  partitions         []bool         // true if chunk is downloaded
  partitionsLock     sync.Mutex
  partitionsCondVars []*sync.Cond
}

func NewCachedFile(uri string, client StorageClient) *CachedFile {
  size := client.GetFileSize(uri)
  file := NewRandomIoFile("./cachefile")

  cf := &CachedFile{
    uri:                uri,
    size:               size,
    client:             client,
    file:               file,
    partitions:         make([]bool, int(math.Ceil(float64(size) / float64(PartitionSize)))),
    partitionsCondVars: make([]*sync.Cond, int(math.Ceil(float64(size) / float64(PartitionSize)))),
  }

  for i := range cf.partitionsCondVars {
    cf.partitionsCondVars[i] = sync.NewCond(&cf.partitionsLock)
  }

  go cf.downloadFile()

  return cf
}

// Read reads data from CachedFile, assuming inputs are valid
func (c *CachedFile) Read(offset int64, length int64, dstBuf []byte) {
  firstPartitionIdx := getPartitionIdx(offset)
  lastPartitionIdx := getPartitionIdx(offset + length - 1)

  for i := firstPartitionIdx; i <= lastPartitionIdx; i++ {
    c.partitionsLock.Lock()
    for !c.partitions[i] {
      // Wait atomically unlocks c.L and suspends execution of the calling goroutine. 
      // After later resuming execution, Wait locks c.L before returning. 
      // Unlike in other systems, Wait cannot return unless awoken by Broadcast or Signal.
      c.partitionsCondVars[i].Wait()
    }
    c.partitionsLock.Unlock()
  }

  c.file.Read(offset, length, dstBuf)
}

// getPartitionIdx calculates partition index
func getPartitionIdx(lastByteIdx int64) int {
  return int(float64(lastByteIdx) / float64(PartitionSize))
}

// downloadFile downloads file partitions concurrently
func (c *CachedFile) downloadFile() {
  partitionCh := make(chan int, len(c.partitions))
  for i := 0; i < len(c.partitions); i++ {
    partitionCh <- i
  }
  close(partitionCh)

  for i := 0; i < 10; i++ {
    go func() {
      var buf [PartitionSize]byte
      for {
        partitionIdx, ok := <-partitionCh
        if !ok {
          return
        }

        offset := int64(partitionIdx) * PartitionSize
        length := int64(math.Min(float64(PartitionSize), float64(c.size-offset)))

        c.client.Fetch(c.uri, offset, length, buf[:])
        c.file.Write(offset, buf[:length])

        c.partitionsLock.Lock()
        c.partitions[partitionIdx] = true
        c.partitionsCondVars[partitionIdx].Broadcast()
        c.partitionsLock.Unlock()
      }
    }()
  }
}

Possible Follow-Ups/Bonus Items for Experienced Candidates

If TC makes it through the above with time to spare, here are ideas for bonus items to explore to see how far they can go. The first (parallelism limiting) is a good one to code. The rest are good for discussion.

Grading

go/eng-interview-scoring

Grading is based on using the entire interview on the problem. This question is relatively new so the below are guidelines and not hard-and-fast rules. We need your help to further calibrate the question. Use your discretion when administering and give me feedback.

Method Stubs in Other Languages

Go

Provided Code

// Client to download files from the internet.
type StorageClient interface {
    // Get the size of the file at $uri.
    GetFileSize(uri string) int64

    // Fetch $length bytes at $offset from $uri into $dstBuf.
    Fetch(uri string, offset int64, length int64, dstBuf []byte)
}

Code to implement:

type CachedFile struct {
    uri string
    client StorageClient
}

// Read $length bytes at $offset into $dstBuf (as fast as possible).
func (c *CachedFile) Read(offset int64, length int64, dstBuf []byte) {
    // Your code
}

Python

Provided code:

# Client to download files from the internet.
class StorageClient:
    def getFileSize(uri): # -> long
        pass

    # Fetch $length bytes at $offset from $uri into $dstBuf
    def fetch(self, uri, offset, length, dstBuf):
        pass

Code to implement:

# This class downloads and caches a remote file for repeated reading.
# The file is at $uri and has length $size bytes.
class CachedFile:
    def __init__(self, uri, client):
        self.uri = uri
        self.client = client

    # Read $length bytes at $offset into $dstBuf (as fast as possible).
    def read(self, offset, length, dstBuf):
        # your code...

Java

Provided code:

/** Client to download files from the internet. */
interface StorageClient {
    /** Get the size of the file at $uri. */
    long getFileSize(String uri);

    /** Fetch $length bytes at $offset from $uri into $dstBuf */
    void fetch(String uri, long offset, long length, byte[] dstBuf)
}

Code to implement:

/**
* This class downloads and caches a remote file at $uri for repeated reading.
*/
class CachedFile {
    private String uri;
    private StorageClient client;

    public CachedFile(String uri, StorageClient client) {
        this.uri = uri;
        this.client = client
    }

    /** Read $length bytes at $offset into $dstBuf (as fast as possible). */
    public void read(long offset, long length, byte[] dstBuf) {
        // your code...
    }
}

C++

Provided code:

/* Client to download files from the internet. */
class IStorageClient {
public:
    /** Get the size of the file at $uri. */
    virtual long getFileSize(std::string& uri) = 0;

    /* Fetch $length bytes at $offset from $uri into $dstBuf. */
    virtual void fetch(std::string& uri, long offset, long length, unsigned char* dstBuf) = 0;
}

Code to implement:

/*
 * This class downloads and caches a remote file at $uri for repeated reading.
 */
class CachedFile {
    std::string uri;
    IStorageClient& client;
public:
    CachedFile(std::string& uri, IStorageClient& client)
        : uri(uri), client(client)
    { }

    /* Read $length bytes at $offset into dstBuf (as fast as possible). */
    void read(long offset, long length, unsigned char* dstBuf) {
        // your code...
    }
};