# SYS7: Command Execution API

Owner: Bogdan Ionut Ghit <[email protected]>

Changelog:
* 2024-03-19: Initial question
* 2024-04-18: Feedback from interview training

## Overview
This question aims to build a command execution service that is scalable, performant, and resilient to failures. We recommend this question to test a range of system design concepts with a focus on concurrency control, distributed execution, and latency optimizations. In particular, candidates with background in distributed systems, cloud computing, and API design will grasp the problem better and provide more insightful solutions. The current framing is a simplified version of a real-world system which we built at Databricks and can be extended to cover more advanced topics such as caching, retention, fault tolerance, and monitoring. 

## Problem Statement
We want to build a Command Execution API that allows resource-constraints processes running inside web servers to extract small samples of data from a given cloud storage location. The web servers don't have direct access to the cloud storage and need to rely on a separate service to fetch the data. We call this service the Command Execution API service. The API service facilitates the execution of a single type of command which is specified by two input parameters: a set of files with pointers to their location in cloud storage and a limit that indicates the number of lines the web service wants to retrieve from the set of files. The API service should be able to handle multiple concurrent requests and return the result of each command execution.

```scala
// The parameters required to execute a command
case class CommandParameters(inputFiles: Seq[FilePath], limit: Int)

The content of multiple files may be appended to form the final result, but it should not be interleaved. Each file has a maximum size in the order of tens of MB (empty files may occur as well). The limit parameter is global and so, it applies to the entire result that needs to be returned to the client. We can represent the CommandResult as a data structure that maps each file identifier to the set of lines extracted from the file as follows:

// The result set of a command maps each file path to an array of strings
case class CommandResult(results: Map[FilePath, Array[String]])

Example:

file1 = ["foo", "foo", "foo"]                  // 3 lines
file2 = ["bar", "bar", "bar", "bar", "bar"]    // 5 lines

CommandParameters([file1, file2], 1)  → CommandResult({file1 -> ["foo"], file2 -> []})
CommandParameters([file1, file2], 5)  → CommandResult({file1 -> ["foo", "foo", "foo"], file2 -> ["bar”, "bar"]})

Reference Architecture

At a high-level the architecture consists of three components: a client (the web server process) that is submitting commands, an API service that manages the execution of commands, and a backend cluster that executes the commands in distributed fashion on multiple machines. The client submits commands to the API service which schedules the execution of the command on the backend cluster. The backend cluster processes the command and returns the results to the front-end service which in turn returns the results to the client.

The Client Flow

The execution API enables the following end-to-end flow the clients can use: (1) Submission of commands. The client provides the CommandParameters and expects a CommandId in return. (2) Polling for the command state. The client can query the status of the command using the CommandId. (3) Fetching the command results. The client can retrieve the results of the command using the CommandId.

You can use the interface below as reference for the client flow:

// The command execution API which enables a simple Send -> Get Status -> Fetch flow.
class CommandExecutionApi {
   // Submit a given command. Always returns a CommandId, even though 
   // the command might not be executed at that time.
   def SendCommand(cmd: CommandParameters): CommandId

   // Retrieve the status of a command. Requires a command ID. 
   // The status can be as granular as you can see fit. 
   def GetStatus(id: CommandId): CommandStatus

   // Fetch the results of a given command. Requires a command ID.
   def FetchResults(id: CommandId): CommandResult
}

The command submission is asynchronous and the client should be able to poll for the status of a command and fetch the results when the command is completed. The candidate should be able to discuss the trade-offs between synchronous and asynchronous APIs and the benefits of using asynchronous APIs in this context. For simplicity, we can assume that results are small and returned in a single round-trip to the client. As a follow-up, the candidate should be able to discuss how to handle large results that do not fit in a single response and how to paginate the results.

The API Service

This is the service that implements and exposes to clients the CommandExecutionApi. It accepts requests from clients to execute, monitor, and fetch the results of commands. The service should be able to handle multiple concurrent requests and should be resilient to failures. The candidate should be able to discuss how to handle failures and retries in the context of this service.

The API service needs to offload the command execution to the backend cluster. To do so, it has access to an internal primitive called runCommand which divides the work into multiple tasks, serializes those tasks, and schedules them for execution on the cluster. The candidate doesn't need to implement the primitive, but they need to be able to use this primitive in the system they're building. In particular, the primitive requires a function and an RPC callback which are explained below.

// Launches the distributed execution of a command by creating a task for each file split which applies the given func and calls the 
// result handler on completion with the output set of lines.
def runCommand(
  inputFiles: Seq[FilePath],
  fileOperator: (Iterator[String], Long) => Array[String],
  resultHandler: (FilePath, Array[String]) => Unit): Unit

The fileOperator function is serialized and executed remotely on the cluster. As a result this function cannot access the global state and is confined to each individual task that processes a given input file. For instance, the fileOperator function can be implemented as an identity function that reads and return the complete content of the input file as an array of lines.

// TODO: Implement the fileOperator function
// Takes the content of a file as an iterator and returns the subset of lines that can be appended to the final result.
def fileOperator(it: Iterator[String], limit: Long): Array[String]

The resultHandler is an RPC callback that is called by each task once it finished the execution of the fileOperator function. The resultHandler is executed locally by the caller and has access to the global state. This callback is responsible for collecting the results from each task and preparing the final result that can be fetched by the client.

// TODO: Implement the resultHandler callback
// Updates the final result with the output set of lines from a file.
def resultHandler(filePath: FilePath, lines: Array[String]): Unit

The Backend Cluster

The execution backend consists of a cluster of machines that can execute commands in a distributed fashion. A cluster machine is configured with multiple slots each of which can be occupied by a single task at a time. We assume that each cluster machine has read access to a cloud storage bucket where the input files are stored. All tasks that run on the cluster are executing a fileOperator function with different input parameters depending on the command they belong to.

Simple Implementation

This section provides a simple implementation of the CommandExecutionApi using the runJob primitive. This implementation can be used as reference by the interviewer. The code is in Python but the candidate can use any language they are comfortable with.

%python

from collections.abc import Iterable, Callable
from enum import Enum

class CommandId(int):
    pass

class Status(Enum):
    RUNNING = 1
    COMPLETED = 2
    # TODO

class FilePath(str):
    pass

class CommandParameters:
    def __init__(self, files: list[FilePath], limit: int):
        self.limit = limit
        self.files = files

class ResultSet(dict):
    pass

def run_job(inputFiles: list[FilePath],
            func: Callable[[Iterable[str]], list[str]],
            result_handler: Callable[[FilePath, list[str]], None]) -> None:
    pass

class CommandExecutionApi:
    next_id = 0
    def __init__(self):
        self.result = {}
        self.status = {}

    # Submit a given command. Always returns a CommandId, even though 
    # the command might not be executed at that time.
    def send_command(cmd: CommandParameters) -> CommandId:
        id = next_id
        next_id += 1
        def result_handler(file: FilePath, result: list[str]) -> None:
            self.result[(id, file)] = result
            has_all_results = True
            total_lines = 0
            for file in cmd.files:
                if total_lines >= cmd.limit:
                    if (id, file) in self.result:
                        del self.result[(id, file)]
                else:
                    if (id, file) not in self.result:
                        has_all_results = False
                    else:
                        total_lines += len(self.result[(id, file)])
                        if total_lines > cmd.limit:
                            self.result[(id, file)] = self.result[(id, file)][:cmd.limit - total_lines]
            if has_all_results:
                self.status[id] = Status.COMPLETED
        self.status[id] = Status.RUNNING
        run_job(cmd.files, lambda x: local_limit(x, cmd.limit), result_handler)
        return id

    # Retrieve the status of a command. Requires a command ID. 
    # The status can be as granular as you can see fit. 
    def get_status(id: CommandId) -> Status:
        return self.status[id]
    
    # Fetch the results of a given command. Requires a command ID.
    def fetch_results(id: CommandId) -> ResultSet:
        return self.result[id]

Optimizing Result Collection

It should be clear by now that the main instrument for collecting results is the runCommand primitive. The general solution is to collect results incrementally by running sequential runCommand operations with an exponentially increasing number of files to process.

The main tradeoff to discuss is the following. On one hand collecting the entire dataset may be time-consuming and wasteful because of transferring and storing a large number of bytes which will not be needed by the client. On the other hand, executing a large number of runCommand operations may incur significant overhead when the dataset is sparse and the client requests a small amount of bytes relative to the total dataset size.

The candidate should identify some scenarios to optimize for such as:

  1. Latency sensitive commands in which the client sets a small limit on a sparse dataset. We want to avoid launching more than one job in this case if the overhead is in the order of the time it takes to execute the command. To optimally execute such commands, we will want to allocate the first job a higher number of files to maximize the chance of returning the complete set of requested lines.

  2. Commands with high limits applied on large datasets. In this scenario we will end up launching too many jobs before we collect the required content and which may lead to under-utilization of the cluster. To optimally execute such commands, we will want to run fewer jobs with higher concurrency thus minimizing the turn-around time.

Optimizing Result Delivery

We now want to optimize the service to deliver short command results with minimal latency. To steer the candidate in the right direction, we should give some specifics. We can indicate that a short command is expected to finish within 10ms. Based on this the candidate should call out the RTT client to frontend service as the main source of latency which would be paid three times for submitting the command, retrieving the status and fetching the result. The solution has two components, one focused on latency improvement and another on admission control.

There are two approaches for reducing the number of round-trips. We could provide a synchronous API so that the SubmitCommand request will block for a short time waiting for command completion. If the command doesn’t complete within the timeout, the RPC call will return the command identifier that the client can further use to poll for command status. When the service returns a command completion status, the client can fetch the results with a subsequent RPC call. The candidate may also notice that command execution times are strongly correlated with the result sizes. Therefore, as in this case the results are likely to be small they can be returned in a single round-trip without pagination. This approach has the advantage of being simple, but it makes the service prone to scalability issues, thread-pool exhaustion, and rejection of incoming requests. Alternatively, experienced candidates could point out that we can choose to provide a streaming API which will reduce the time we hold on to threads and will help us scale better the service. The downside is that it pushes more complexity to the client.

Either solution is acceptable, but if the candidate chooses the blocking approach, we should make sure they think through the admission control mechanism that prevents blocking operations from monopolizing the RPC thread pool capacity. The candidate should point out that the service may run into the scenario where all threads are blocked waiting for the backend to finish execution and the frontend rejects RPC requests for retrieving status and fetching results because of lack of capacity. Thus, blocking command submission RPCs should be scheduled on a background thread pool different from the RPC handler thread pool.

Main Requirements

R1. The API service should never run into thread exhaustion. This means that we should not block threads on the execution of a request and all RPCs should return within a well-defined SLA. An experienced candidate should be able to provide a bound on that SLA which factors in the RTT from client to frontend to backend.

R2. The API service has limited memory size and the collected results may not always fit in memory. Based on this requirement, the candidate should conclude from here that the execution state should be fully offloaded to the cluster. It is not required to go over all details of cluster state management, but they should point out the main design alternatives in having the frontend versus the backend manage the state.

R3. The service should never return more lines to the client than the limit. The canny candidate may point out that while pushing down the limit to the cluster machine and each task only returns a partial result that is below the limit, the overall result set may exceed the threshold. In that case, the frontend needs to do an extra check and discard the excess of lines.

R4. The service should do minimal processing of the results when enforcing the line limit. This requirement indicates that the computation is offloaded to the cluster. This is already stated in the problem definition, but it is good to call it out explicitly. While the service may discard the excess of lines as the workers return their results, the candidate should call out that we unnecessarily scan the entire set of files. We should discard attempts that read everything in-memory and then return the limited results to the client. The preferred solution is to execute multiple jobs starting with a small set of files and then increasing the number of files until we reach the line limit. Drop the excess on the service side after collection.

R5. The SLA for executing a command is ~200 ms. Some candidates may immediately ask for SLAs. We should feel free to give them some ball-park numbers. If the maximum file size is 100 MB and the read speed of an NVM M2 SSD of 500 MB/s, a limit command should run relatively fast, in the order of ~200 ms assuming there is sufficient capacity to maximize concurrency. We could also indicate that the runJob primitive adds a fixed overhead. More experienced candidates can develop execution strategies based on the size of the table, how large this overhead is, and the number of input files.

Follow-Up Questions

Pagination. The commands may return arbitrarily large result sets, but the clients that use our API are thin and run in resource constraints settings like web servers. To be able to deliver large datasets back to the client, the candidate should think about pagination. A simple solution is to extend the fetch result with two parameters: an offset and a size in bytes. The former indicates the file index in the result set from which the client requests to fetch results. The latter indicates the number of bytes the client requests.

// Fetch the results of a given command. Requires a command ID.
def FetchResults(id: CommandId, offset: Long, sizeInBytes: Long): CommandResult

Retention. Clients expect results to be available for up to 24 h after they’ve been generated. There are different alternatives that can be discussed. From storing the results locally on disk to uploading to a persistent cloud storage location to caching them in-memory.

Idempotency. The network between the Command Execution API service and the cluster executing the jobs is not reliable. The candidate should identify methods we can apply to protect the service availability from intermittent network failures. One can notice that the commands are idempotent and can be retried without the risk of generating side effects.

Service limits. Analyze how many requests the service can handle and identify ways to scale it.

Guidelines for Interviewers

One may recognize that the question is inspired by the Databricks SQL Execution API. The problem statement is centered around multi-threaded command execution with a simple asynchronous API exposed to clients. Once this basic command execution frameworks is implemented, the interviewer can go deeper into latency optimizations. Depending on which constraints are provided to the candidate, the solution can evolve towards a hybrid execution API which can deliver direct results if the execution time is short and/or incremental command execution by retrieving partial results in batches until the desired final result is complete. From current experience, it may be too long to do both in a single interview session, so it is recommended to focus on one of the two approaches.

Grading

To get a passing grade, the candidate can either be challenged to go broad on various engineering aspects of the problem, or go deep into designing a specific mechanism. In both cases, the candidate is expected to provide at least a minimal implementation of the solution that can be further used as reference to discuss further extensions. For example, a candidate can get a passing grade if it provides the simple solution presented above and is able to correctly address multi-threading command execution, incremental result collection, and round-trips reductions, but without fully implementing the complete set of functionalities. At the same time, the candidate can be steered to go in-depth on one or two of these functionalities to further extend the simple implementation towards a more production-ready version.

Design Code L4 L5 L6+
High-level design and familiarity with most concepts Minimal pseudocode implementation 2 2 1
Detailed design covering the end-to-end flow of the command execution Single-threaded solution without optimizations 3 3 2
Deep dives into at least one of the latency optimizations (multi-threading, limits or delivery) Correctly implements the optimization 4 3 3
Covers all aspects of the problem from multi-threading to optimized limits to reduced round-trips Multi-threaded implementation with optimizations 4 4 4