December 8, 2019

Advent of Code 2019 day 7

When building your own concurrent scheduling system is the easy way.

Advent of Code 2019 day 7

Day 7 was a task of two parts. Part 1 was fairly simple. Part 2… wasn't.

Part 1

The first part of the challenge was to chain some machines together, where the output of one became the input of the text. This "repeated application" pattern is just fold over the list of inputs (one input per machine in the chain). As such, the Haskell implementation was just defining that fold, and then using map to test the chain on all permutations of the input.

part1 mem = maximum outputs
    where inputs = permutations [0..4]
          outputs = map (chainMachines mem) inputs

chainMachines mem settings = foldl' (chainMachine mem) 0 settings

chainMachine mem prevOutput setting = findMachineOutput [setting, prevOutput] mem

This was using, unchanged, the Intcode implementation from day 5.

Part 2

This was substantually more difficult. The need to feed the output of the final machine back into the inputs of the first machine meant that a simple fold wouldn't suffice.

There were a couple of ways forward. Some, such as brandonchinn, used mutually-recursive let statements to connect all the input and output streams, then let the lazy evaluation semantics sort out all the scheduling. That was extremely clever, but involved extracting the machine from the monad stack and implementing it as pure functions.

Another approach was to embrace the concurrency and use Haskell's Concurrency modules. That would entail putting each machine in its own thread, and connecting them with Channels. (The freely-available Parallel and Concurrent Haskell book is a good description of this approach.) But that seemed like overkill, especially as I've not done concurrent Haskell before! Perhaps I'll revisit this challenge later, as a way of learning how to do this.

Instead, I decided to implement my own concurrent runtime system for the various Intcode machines. As you do.

The key to my approach was that I wasn't using the return value of the Intcode machine execution. Each machine's input was fed in via the Reader monad, and the current position in the stream was kept in the State. The output was held in the Writer monad. But running the State monad returns both the final state of the machine and a final value. I decided to use that value to determine whether execution of the machine stopped because it terminated (executed instruction 99) or was blocked because it was trying to read beyond the input stream it currently knew about.

My own concurrent job scheduler

The set of machines and their connections was held in a pipeline of machines. Each machine in the pipeline contains its own input stream, output stream, machine state, and machine status (Runnable, Blocked, or Terminated). The pipeline manager works out when to run each job and orchestrates their interaction. It takes a Runnable machine from the pipeline, runs it until it stops for some reason, and updates the pipeline with the new machine. The manager then adds the output to the input stream of the next machine in order and, if the next machine is Blocked, mark it as Runnable. The process finishes when there are no Runnable machines.


And now to put all that together. First, some data type definitions. ProgrammedMachine changes to drop the reference to the return type. There are also types for the machine execution status and the pipeline.

type ProgrammedMachine = RWS [Int] [Int] Machine

data EncapsulatedMacine = EncapsulatedMacine 
    { _machine :: Machine
    , _executionState :: ExecutionState
    , _initialInput :: [Int]
    , _currentInput :: [Int]
    , _machineOutput :: [Int]
    } deriving (Show, Eq)

data ExecutionState = Runnable | Blocked | Terminated  deriving (Ord, Eq, Show)

type Pipeline = M.IntMap EncapsulatedMacine

The runAll function changes to return the execution state, and runStep changes type so that it still returns nothing. runAll looks at the combination of the current instruction, length of known input stream, and current position in the stream, to determine if the machine is Blocked.

runAll :: ProgrammedMachine ExecutionState
runAll = do mem <- gets _memory
            ip <- gets _ip
            input <- ask
            iIndex <- gets _inputIndex
            let acutalInputLength = length input
            let requiredInputLength = iIndex + 1
            if (mem!ip == 99)
            then return Terminated
            else    if (mem!ip == 3 && requiredInputLength > acutalInputLength)
                    then return Blocked
                    else do runStep

runStep :: ProgrammedMachine ()

The initial pipeline is built by mapping over the inputs, one machine for each element of the input; there's some fiddling around to add the initial 0 input to the first machine in the pipeline. encapsulate builds the encapsulated machine.

buildPipeline :: [Int] -> [Int] -> Pipeline
buildPipeline mem input = M.insert 0 machine0' pipeline
    where pipeline = M.fromList $ zip [0..] $ map (encapsulate mem) input
          machine0 = pipeline!0
          machine0' = machine0 { _initialInput = (_initialInput machine0) ++ [0]}

encapsulate :: [Int] -> Int -> EncapsulatedMacine
encapsulate mem input = EncapsulatedMacine 
    { _machine = makeMachine mem
    , _executionState = Runnable
    , _initialInput = [input]
    , _machineOutput = []
    , _currentInput = [input]

runPipeline is the pipeline manager, responsible for executing all the jobs on all the machines. If all the machines have finished, return the last output of the last machine. Otherwise, find a runnable machine (machineToRun), run it, and update the relevant input and output streams in this machine and the next in sequence (feedsIntoMachine). Then run runPipeline on the updated pipeline.

runPipeline :: Pipeline -> Int
runPipeline pipeline 
    | finished pipeline = last $ _machineOutput $ snd $ M.findMax pipeline
    | otherwise = runPipeline pipeline''
    where   (indexToRun, machineToRun) = M.findMin $ runnableMachines pipeline
            feedsIntoIndex = (indexToRun + 1) `mod` (M.size pipeline)
            feedsIntoMachine = pipeline!feedsIntoIndex
            fimi = _initialInput feedsIntoMachine
            machine' = runEncapsulatedMachine machineToRun
            fullOutput = _machineOutput machine'
            feedsIntoState = case (_executionState feedsIntoMachine) of
                                  Blocked -> Runnable
                                  Terminated -> Terminated
                                  Runnable -> Runnable
            feedsIntoMachine' = feedsIntoMachine {_executionState = feedsIntoState, _currentInput = fimi ++ fullOutput}
            pipeline' = M.insert indexToRun machine' pipeline
            pipeline'' = M.insert feedsIntoIndex feedsIntoMachine' pipeline'

The last bit is a couple of utility functions, and runEncapsulatedMachine, which extracts the machine can calls runRWS runAll to actually run the machine.

finished :: Pipeline -> Bool
finished = M.null . runnableMachines

runnableMachines :: Pipeline -> Pipeline
runnableMachines = M.filter (\e -> _executionState e == Runnable)

runEncapsulatedMachine :: EncapsulatedMacine -> EncapsulatedMacine
runEncapsulatedMachine e = e { _machine = machine'
                             , _executionState = halted
                             , _machineOutput = (_machineOutput e) ++ output
    where   machine = _machine e
            input = _currentInput e
            (halted, machine', output) = runRWS runAll input machine

Full code and refactoring

Around day 11, I decided to extract the "complete" Intcode interpreter into its own module. You can see the code I wrote on day 7 (and on Github),  and what it became once the interpreter was extracted (and on Github).