

# Functional Abstractions for Practical and Scalable Concurrent Programming

KC Sivaramakrishnan  
Purdue University

# Concurrent programming is (still) hard!



# Concurrency bugs can be disastrous

# Concurrency bugs can be disastrous

## 2003 Northeast blackout<sup>[1]</sup>



- Data race disables alarm system
- Effect
  - 256 power stations go offline
  - ~7 hr major blackout
  - 11 fatalities, \$6 billion

[1] <http://www.scientificamerican.com/article/2003-blackout-five-years-later/>

[2] <http://sunnyday.mit.edu/papers/therac.pdf>

[3] <http://www.cnbc.com/id/100587334>

# Concurrency bugs can be disastrous

## 2003 Northeast blackout<sup>[1]</sup>



- Data race disables alarm system
- Effect
  - 256 power stations go offline
  - ~7 hr major blackout
  - 11 fatalities, \$6 billion

## Therac-25 incidents<sup>[2]</sup>



- Data race b/w UI and controller
- 6 fatalities

[1] <http://www.scientificamerican.com/article/2003-blackout-five-years-later/>

[2] <http://sunnyday.mit.edu/papers/therac.pdf>

[3] <http://www.cnbc.com/id/100587334>

# Concurrency bugs can be disastrous

## 2003 Northeast blackout<sup>[1]</sup>



- Data race disables alarm system
- Effect
  - 256 power stations go offline
  - ~7 hr major blackout
  - 11 fatalities, \$6 billion

## Therac-25 incidents<sup>[2]</sup>



- Data race b/w UI and controller
- 6 fatalities

## Facebook IPO @ NASDAQ<sup>[3]</sup>



- Race between validation and new orders arrival
- NASDAQ compensation = \$62 million

[1] <http://www.scientificamerican.com/article/2003-blackout-five-years-later/>

[2] <http://sunnyday.mit.edu/papers/therac.pdf>

[3] <http://www.cnbc.com/id/100587334>



Safe and scalable  
concurrent program



Safe and scalable  
concurrent program

Fences, Locks,  
Condition variables, etc.





Safe and scalable  
concurrent program



Data race



Deadlock

Fences, Locks,  
Condition variables, etc.



Atomicity  
violations



Safe and scalable concurrent program



Data race



Deadlock



Weak memory semantics



Eventual consistency

Fences, Locks,  
Condition variables, etc.



No cache coherence



Atomicity violations



---

Stick to  
Status Quo

---



Safe and scalable  
concurrent program



---

Stick to  
Status Quo

Testing

---



Safe and scalable  
concurrent program



---

Stick to  
Status Quo

Testing

Verification

---



Safe and scalable  
concurrent program



---

Stick to  
Status Quo

Testing

Verification

PL Support



Safe and scalable  
concurrent program



Safe and scalable concurrent program



Data race



Deadlock



*Functional  
programming  
abstractions*



Eventual consistency



Non-cache coherence



Atomicity violations



# Programming abstractions *simplify* Concurrent Programming

Transactional  
Memory

Multicore  
Garbage  
Collection

# But, programming abstractions introduce a *level of indirection*



Safety  
&  
Simplicity



Performance  
&  
Functionality

Safety  
&  
Simplicity



Always desirable to marry the two whenever possible!

# MultiMLton

Intel SCC  
48-core Non-cache-coherent



Azul Vega 3  
864-core CC-UMA



Compute clouds



# MultiMLton

Intel SCC  
48-core Non-cache-coherent



Azul Vega 3  
864-core CC-UMA



Compute clouds



Asynchronous CML [PLDI '11]  
Memoizing Communication [ICFP '09]  
Rx-CML – optimistic CML [PADL '14]

Parasitic threads [DAMP '10, JFP '14]  
Thread-local GC [ISMM '12, MARC '12, JFP '14]

## Language Design

## Runtime Systems

# MultiMLton

Intel SCC  
48-core Non-cache-coherent



Azul Vega 3  
864-core CC-UMA



Compute clouds



Asynchronous CML  
Memoizing Communication  
Rx-CML – optimistic CML

[PLDI '11]  
[ICFP '09]  
[PADL '14]

Parasitic threads [DAMP '10, JFP '14]  
Thread-local GC [ISMM '12, MARC '12, JFP '14]

## Language Design Runtime Systems

Scheduler activations  
for Haskell

Schedulers for Haskell threads as Haskell libraries  
[In submission to OOPSLA '14]

Sting (Java)

Session type based protocol optimization  
[Coordination '10, SCP '13]

# MultiMLton

Language Design

Rx-CML – optimistic CML [PADL '14]

---

Scheduler activations  
for Haskell

Schedulers for Haskell threads as Haskell libraries  
[In submission to OOPSLA '14]

# Rx-CML : A Prescription for Safely Relaxing Synchrony

**Abstraction**



**Domain**

Synchronous  
communication



Asynchronous  
distributed system

Transactional  
memory



Shared memory  
concurrency

Garbage  
collection



Memory  
management

## Abstraction

Synchronous communication

Transactional memory

Garbage collection



## Domain

Asynchronous distributed system

Shared memory concurrency

Memory management



Synchronous communication =  
atomic { data transfer +  
synchronization }

## Abstraction

Synchronous communication

Transactional memory

Garbage collection



## Domain

Asynchronous distributed system

Shared memory concurrency

Memory management

Synchronous communication =  
atomic { data transfer +  
synchronization }



synchrony

latency

Can we **discharge** synchronous communications  
asynchronously and ensure **observable equivalence**?

Can we **discharge** synchronous communications  
asynchronously and ensure **observable equivalence**?

**Formalize:**

$$\llbracket \text{send } (c, v) \rrbracket k \equiv \llbracket \text{asend } (c, v) \rrbracket k$$

**Implement:**

Distributed Concurrent ML in MultiMLton +  
Speculative execution framework

# Concurrent ML

val spawn : (unit -> unit) -> thread\_id

Thread  
creation

val channel : unit -> 'a chan

val send : 'a chan \* 'a -> unit

val recv : 'a chan -> 'a

Synchronous  
message passing

val sendEvt : 'a chan \* 'a -> unit event

val recvEvt : 'a chan -> 'a event

val sync : 'a event -> 'a

val never : 'a event

val alwaysEvt : 'a -> 'a event

val wrap : 'a event -> ('a -> 'b) -> 'b event

val guard : (unit -> 'a event) -> 'a event

val choose : 'a event list -> 'a event

First-class events

...

| T1          | T2       | T3          |
|-------------|----------|-------------|
| send(c1,v1) | recv(c2) | send(c2,v3) |
| f()         | g()      | h()         |
| send(c2,v2) | recv(c1) | recv(c2)    |



| T1          | T2       | T3          |
|-------------|----------|-------------|
| send(c1,v1) | recv(c2) | send(c2,v3) |
| f()         | g()      | h()         |
| send(c2,v2) | recv(c1) | recv(c2)    |



| T1          | T2       | T3          |
|-------------|----------|-------------|
| send(c1,v1) | recv(c2) | send(c2,v3) |
| f()         | g()      | h()         |
| send(c2,v2) | recv(c1) | recv(c2)    |



| T1          | T2       | T3          |
|-------------|----------|-------------|
| send(c1,v1) | recv(c2) | send(c2,v3) |
| f()         | g()      | h()         |
| send(c2,v2) | recv(c1) | recv(c2)    |



| T1          | T2       | T3          |
|-------------|----------|-------------|
| send(c1,v1) | recv(c2) | send(c2,v3) |
| f()         | g()      | h()         |
| send(c2,v2) | recv(c1) | recv(c2)    |



| T1          | T2       | T3          |
|-------------|----------|-------------|
| send(c1,v1) | recv(c2) | send(c2,v3) |
| f()         | g()      | h()         |
| send(c2,v2) | recv(c1) | recv(c2)    |



| T1          | T2       | T3          |
|-------------|----------|-------------|
| send(c1,v1) | recv(c2) | send(c2,v3) |
| f()         | g()      | h()         |
| send(c2,v2) | recv(c1) | recv(c2)    |



| T1          | T2       | T3          |
|-------------|----------|-------------|
| send(c1,v1) | recv(c2) | send(c2,v3) |
| f()         | g()      | h()         |
| send(c2,v2) | recv(c1) | recv(c2)    |



| T1          | T2       | T3          |
|-------------|----------|-------------|
| send(c1,v1) | recv(c2) | send(c2,v3) |
| f()         | g()      | h()         |
| send(c2,v2) | recv(c1) | recv(c2)    |



Cyclic dependence  $\Rightarrow$  divergent behavior

# Distributed group chat app

No central server & causal dependence → *causal broadcast*

# Distributed group chat app

No central server & causal dependence → *causal broadcast*

```
fun bsend (BCHAN (vcList, acList), v: 'a, id: int) : unit =
let
  val _ = map (fn vc => if (vc = nth (vcList, id)) then () else send (vc, v))
    vcList (* phase 1 -- Value distribution *)
  val _ = map (fn ac => if (ac = nth (acList, id)) then () else recv ac)
    acList (* phase 2 -- Acknowledgments *)
in ()
end
```

*synchronously send values*

*prevent receivers from proceeding until  
all members have received the value*

# Distributed group chat app: Performance

# Distributed group chat app: Performance

Site A – US East

bsend(c,x)

brecv(c)

Site B – EU

brecv(c)

bsend(c,y)

Site C – US West

brecv(c)

brecv(c)

# Distributed group chat app: Performance

Site A – US East

bsend(c,x)

brecv(c)

Site B – EU

brecv(c)

bsend(c,y)

Site C – US West

brecv(c)

brecv(c)



# Distributed group chat app: Performance



# Distributed group chat app: Performance



| <i>Execution</i>                    | <i>Avg.time (ms)</i> | <i>Errors</i> |
|-------------------------------------|----------------------|---------------|
| <i>Sync</i>                         | 1540                 | 0             |
| <i>Unsafe Async</i>                 | 520                  | 7             |
| <i>Safe Async (R<sup>CML</sup>)</i> | 533                  | 0             |

# Formalization

Reason axiomatically

$$E := \langle P, A, \rightarrow_{po}, \rightarrow_{co} \rangle$$

Happens-before relation

$$\begin{aligned}\rightarrow_{hb} &= (\rightarrow_{po} \cup \rightarrow_{td} \cup \\ &\quad \{(\alpha, \beta) \mid \alpha \rightarrow_{co} \alpha' \wedge \alpha' \rightarrow_{po} \beta\} \cup \\ &\quad \{(\beta, \alpha) \mid \beta \rightarrow_{po} \alpha' \wedge \alpha' \rightarrow_{co} \alpha\})^+\end{aligned}$$

Well-formed execution

$$\text{Obs}(\text{WF\_Exec}(P)) \in \{\text{Obs}(\text{Sync\_Exec}(P))\}$$

# Formalization

Reason axiomatically

$$E := \langle P, A, \rightarrow_{po}, \rightarrow_{co} \rangle$$

Happens-before relation

$$\rightarrow_{hb} = (\rightarrow_{po} \cup \rightarrow_{td} \cup \{(\alpha, \beta) \mid \alpha \rightarrow_{co} \alpha' \wedge \alpha' \rightarrow_{po} \beta\} \cup \{(\beta, \alpha) \mid \beta \rightarrow_{po} \alpha' \wedge \alpha' \rightarrow_{co} \alpha\})^+$$

Well-formed execution

$$\text{Obs}(\text{WF\_Exec}(P)) \in \{\text{Obs}(\text{Sync\_Exec}(P))\}$$

- No happens before cycle
- Sensible intra-thread semantics
- No outstanding speculative actions



# Formalization

Reason axiomatically

$$E := \langle P, A, \rightarrow_{po}, \rightarrow_{co} \rangle$$

Happens-before relation

$$\rightarrow_{hb} = (\rightarrow_{po} \cup \rightarrow_{td} \cup \{(\alpha, \beta) \mid \alpha \rightarrow_{co} \alpha' \wedge \alpha' \rightarrow_{po} \beta\} \cup \{(\beta, \alpha) \mid \beta \rightarrow_{po} \alpha' \wedge \alpha' \rightarrow_{co} \alpha\})^+$$

Well-formed execution

$$\text{Obs}(\text{WF\_Exec}(P)) \in \{\text{Obs}(\text{Sync\_Exec}(P))\}$$

- No happens before cycle
- Sensible intra-thread semantics
- No outstanding speculative actions



# Implementation & Results

# Implementation & Results

- Dependence graph  $\equiv$  Axiomatic execution
  - WF Check before observable actions
  - Ill-formed? Rollback and re-execute non-speculatively – Progress!
- WF Check, checkpoint, rollback are uncoordinated!
- Replicated channel consistency through speculative execution
- Benchmark: Optimistic OLTP & P2P Collaborative editing



*OLTP*

With 48 clients - **5.8X** faster than sync  
**1.4X** slower than async



*Collaborative Editing*

With 6 authors - **7.6X** faster than sync  
**2.3X** slower than async

# Scheduler Activations in Haskell

# How to write schedulers for Haskell threads as Haskell libraries?

# How to write schedulers for Haskell threads as Haskell libraries **in GHC?**

# How to write schedulers for Haskell threads as Haskell libraries **in GHC?**



# How to write schedulers for Haskell threads as Haskell libraries **in GHC?**



# How to write schedulers for Haskell threads as Haskell libraries **in GHC?**



# How to write schedulers for Haskell threads as Haskell libraries **in GHC?**



## Concurrent Haskell Application



App Dev



Lang Dev



## Concurrent Haskell Application



User-level Scheduler

Haskell

---

RTS - C



## Concurrent Haskell Application



User-level Scheduler  
EnQ and DeQ  
Activations

Haskell

---

RTS - C



# Concurrent Haskell Application



# Concurrent Haskell Application



# Concurrency Substrate API

```
data SCont
newSCont      :: IO () -> IO SCont
switch        :: (SCont -> STM SCont) -> IO ()
runOnIdleHEC :: SCont -> IO ()
```

# Concurrency Substrate API

```
data SCont
newSCont      :: IO () -> IO SCont
switch        :: (SCont -> STM SCont) -> IO ()
runOnIdleHEC :: SCont -> IO ()

type DequeueAct = SCont -> STM SCont
type EnqueueAct = SCont -> STM ()

-- read activations
dequeueAct    :: DequeueAct
enqueueAct    :: EnqueueAct
-- update activations
setDequeueAct :: DequeueAct -> IO ()
setEnqueueAct :: EnqueueAct -> IO ()
```

# Concurrency Substrate API

```
data SCont
newSCont      :: IO () -> IO SCont
switch        :: (SCont -> STM SCont) -> IO ()
runOnIdleHEC :: SCont -> IO ()
```

```
type DequeueAct = SCont -> STM SCont
type EnqueueAct = SCont -> STM ()
```

Scheduler access  
is under STM

```
-- read activations
dequeueAct     :: DequeueAct
enqueueAct     :: EnqueueAct
-- update activations
setDequeueAct :: DequeueAct -> IO ()
setEnqueueAct :: EnqueueAct -> IO ()
```

# Concurrency Substrate API

```
data SCont
newSCont      :: IO () -> IO SCont
switch        :: (SCont -> STM SCont) -> IO ()
runOnIdleHEC :: SCont -> IO ()

type DequeueAct = SCont -> STM SCont
type EnqueueAct = SCont -> STM ()
```

Scheduler access  
is under STM

```
-- read activations
dequeueAct    :: DequeueAct
enqueueAct    :: EnqueueAct
-- update activations
setDequeueAct :: DequeueAct -> IO ()
setEnqueueAct :: EnqueueAct -> IO ()

-- update activations
getAux         :: SCont -> STM Dynamic
setAux         :: SCont -> Dynamic -> STM ()
```

# RTS Interaction - Blocking



# RTS Interaction - Blocking



# RTS Interaction - Blocking



# RTS Interaction - Blocking



# RTS Interaction - Blocking



# RTS Interaction - Unblocking



# RTS Interaction - Unblocking



# RTS Interaction - Unblocking



# RTS Interaction - Unblocking



# Multicore capable, preemptive, round-robin work-sharing scheduler

```

newtype Sched = Sched (Array Int (TVar[SCont]))

dequeueActivation:: Sched -> SCont -> STM SCont
dequeueActivation (Sched pa) _ = do
  cc <- getCurrentHEC -- get current HEC number
  l <- readTVar $ pa!cc
  case l of
    [] -> retry
    x:tl -> do
      writeTVar (pa!cc) tl
      return x

enqueueActivation:: Sched -> SCont -> STM ()
enqueueActivation (Sched pa) sc = do
  dyn <- getAux sc
  let (hec:Int, _::TVar Int) = fromJust $
    fromDynamic dyn
  l <- readTVar $ pa!hec
  writeTVar (pa!hec) $ l++[sc]

```

```

newScheduler :: IO ()
newScheduler = do
  -- Initialise Auxiliary state
  switch $ `s` -> do
    counter <- newTVar (0:Int)
    setAux s $ toDyn $ (0:Int,counter)
    return s
  -- Allocate scheduler
  nc <-getNumHECs
  sched <- (Sched . listArray (0,nc-1)) <$>
    replicateM n (newTVar [])
  -- Initialise activations
  setDequeueAct s $ dequeueActivation sched
  setEnqueueAct s $ enqueueActivation sched

newHEC :: IO ()
newHEC = do
  -- Initial task
  s <- newSCont $ switch dequeueAct
  -- Run in parallel
  runOnIdleHEC s

```

```

forkIO :: IO () -> IO SCont
forkIO task = do
  numHECs <- getNumHECs
  -- epilogue: Switch to next thread
  newSC <- newSCont (task >> switch dequeueAct)
  -- Create and initialise new Aux state
  switch $ `s` -> do
    dyn <- getAux s
    let (_::Int, t::TVar Int) = fromJust $
      fromDynamic dyn
    nextHEC <- readTVar t
    writeTVar t $ (nextHEC + 1) `mod` numHECs
    setAux newSC $ toDyn (nextHEC, t)
    return s
  -- Add new thread to scheduler
  atomically $ enqueueAct newSC
  return newSC

yield :: IO ()
yield = switch (\s -> enqueueAct s >> dequeueAct s)

```

# Multicore capable, preemptive, round-robin work-sharing scheduler

```

newtype Sched = Sched (Array Int (TVar[SCont]))
dequeueActivation:: Sched -> SCont -> STM SCont
dequeueActivation (Sched pa) _ = do
  cc <- getCurrentHEC -- get current HEC number
  l <- readTVar $ pa!cc
  case l of
    [] -> retry
    x:tl -> do
      writeTVar (pa!cc) tl
      return x
enqueueActivation:: Sched -> SCont -> STM ()
enqueueActivation (Sched pa) sc = do
  dyn <- getAux sc
  let (hec:Int, _::TVar Int) = fromJust $ fromDynamic dyn
  l <- readTVar $ pa!hec
  writeTVar (pa!hec) $ l++[sc]

```

```

newScheduler :: IO ()
newScheduler = do
  -- Initialise Auxiliary state
  switch $ `s` -> do
    counter <- newTVar (0:Int)
    setAux s $ toDyn $ (0:Int,counter)
    return s
  -- Allocate scheduler
  nc <-getNumHECs
  sched <- (Sched . listArray (0,nc-1)) <$>
    replicateM n (newTVar [])
  -- Initialise activations
  setDequeueAct s $ dequeueActivation sched
  setEnqueueAct s $ enqueueActivation sched
  newHEC :: IO ()
  newHEC = do
    -- Initial task
    s <- newSCont $ switch dequeueAct
    -- Run in parallel
    runOnIdleHEC s

```

```

forkIO :: IO () -> IO SCont
forkIO task = do
  numHECs <- getNumHECs
  -- epilogue: Switch to next thread
  newSC <- newSCont (task >> switch dequeueAct)
  -- Create and initialise new Aux state
  switch $ `s` -> do
    dyn <- getAux s
    let (_::Int, t::TVar Int) = fromJust $ fromDynamic dyn
    nextHEC <- readTVar t
    writeTVar t $ (nextHEC + 1) `mod` numHECs
    setAux newSC $ toDyn (nextHEC, t)
    return s
  -- Add new thread to scheduler
  atomically $ enqueueAct newSC
  return newSC
yield :: IO ()
yield = switch (\s -> enqueueAct s >> dequeueAct s)

```



# Multicore capable, preemptive, round-robin work-sharing scheduler

```

newtype Sched = Sched (Array Int (TVar[SCont]))
dequeueActivation:: Sched -> SCont -> STM SCont
dequeueActivation (Sched pa) _ = do
  cc <- getCurrentHEC -- get current HEC number
  l <- readTVar $ pa!cc
  case l of
    [] -> retry
    x:tl -> do
      writeTVar (pa!cc) tl
      return x
enqueueActivation:: Sched -> SCont -> STM ()
enqueueActivation (Sched pa) sc = do
  dyn <- getAux sc
  let (hec:Int, _::TVar Int) = fromJust $ fromDynamic dyn
  l <- readTVar $ pa!hec
  writeTVar (pa!hec) $ l++[sc]

```

```

newScheduler :: IO ()
newScheduler = do
  -- Initialise Auxiliary state
  switch $ `s` -> do
    counter <- newTVar (0:Int)
    setAux s $ toDyn $ (0:Int,counter)
    return s
  -- Allocate scheduler
  nc <-getNumHECs
  sched <- (Sched . listArray (0,nc-1)) <$>
    replicateM n (newTVar [])
  -- Initialise activations
  setDequeueAct s $ dequeueActivation sched
  setEnqueueAct s $ enqueueActivation sched
  newHEC :: IO ()
  newHEC = do
    -- Initial task
    s <- newSCont $ switch dequeueAct
    -- Run in parallel
    runOnIdleHEC s

```

```

forkIO :: IO () -> IO SCont
forkIO task = do
  numHECs <- getNumHECs
  -- epilogue: Switch to next thread
  newSC <- newSCont (task >> switch dequeueAct)
  -- Create and initialise new Aux state
  switch $ `s` -> do
    dyn <- getAux s
    let (_::Int, t::TVar Int) = fromJust $ fromDynamic dyn
    nextHEC <- readTVar t
    writeTVar t $ (nextHEC + 1) `mod` numHECs
    setAux newSC $ toDyn (nextHEC, t)
    return s
  -- Add new thread to scheduler
  atomically $ enqueueAct newSC
  return newSC
yield :: IO ()
yield = switch (\s -> enqueueAct s >> dequeueAct s)

```



# Multicore capable, preemptive, round-robin work-sharing scheduler

```

newtype Sched = Sched (Array Int (TVar[SCont]))
dequeueActivation:: Sched -> SCont -> STM SCont
dequeueActivation (Sched pa) _ = do
  cc <- getCurrentHEC -- get current HEC number
  l <- readTVar $ pa!cc
  case l of
    [] -> retry
    x:tl -> do
      writeTVar (pa!cc) tl
      return x
enqueueActivation:: Sched -> SCont -> STM ()
enqueueActivation (Sched pa) sc = do
  dyn <- getAux sc
  let (hec:Int, _::TVar Int) = fromJust $ fromDynamic dyn
  l <- readTVar $ pa!hec
  writeTVar (pa!hec) $ l++[sc]

```

```

newScheduler :: IO ()
newScheduler = do
  -- Initialise Auxiliary state
  switch $ `s` -> do
    counter <- newTVar (0:Int)
    setAux s $ toDyn $ (0:Int,counter)
    return s
  -- Allocate scheduler
  nc <-getNumHECs
  sched <- (Sched . listArray (0,nc-1)) <$>
    replicateM n (newTVar [])
  -- Initialise activations
  setDequeueAct s $ dequeueActivation sched
  setEnqueueAct s $ enqueueActivation sched
  newHEC :: IO ()
  newHEC = do
    -- Initial task
    s <- newSCont $ switch dequeueAct
    -- Run in parallel
    runOnIdleHEC s

```

```

forkIO :: IO () -> IO SCont
forkIO task = do
  numHECs <- getNumHECs
  -- epilogue: Switch to next thread
  newSC <- newSCont (task >> switch dequeueAct)
  -- Create and initialise new Aux state
  switch $ `s` -> do
    dyn <- getAux s
    let (_::Int, t::TVar Int) = fromJust $ fromDynamic dyn
    nextHEC <- readTVar t
    writeTVar t $ (nextHEC + 1) `mod` numHECs
    setAux newSC $ toDyn (nextHEC, t)
    return s
  -- Add new thread to scheduler
  atomically $ enqueueAct newSC
  return newSC
yield :: IO ()
yield = switch (\s -> enqueueAct s >> dequeueAct s)

```



# Formalization

## Concurrency Substrate

|               |                                                                                           |                  |
|---------------|-------------------------------------------------------------------------------------------|------------------|
|               | $x, y \in Variable$                                                                       | $r, s, \in Name$ |
| $Md$          | $\text{return } M \mid M \gg= N$                                                          |                  |
| $Ex$          | $\text{throw } M \mid \text{catch } M\ N \mid \text{catchSTM } M\ N$                      |                  |
| $Stm$         | $\text{newTVar } M \mid \text{readTVar } r \mid \text{writeTVar } r\ M$                   |                  |
|               | $\mid$                                                                                    |                  |
|               | $\text{atomically } M \mid \text{retry}$                                                  |                  |
| $Sc$          | $\text{newSCont } M \mid \text{switch } M \mid \text{runOnIdleHEC } s$                    |                  |
| $Sls$         | $\text{getAux } s \mid \text{setAux } s\ M$                                               |                  |
| $Act$         | $\text{dequeueAct } s \mid \text{enqueueAct } s$                                          |                  |
|               | $\mid$                                                                                    |                  |
|               | $\text{setDequeueAct } M \mid \text{setEnqueueAct } M$                                    |                  |
| <b>Term</b>   |                                                                                           |                  |
| $M, N$        | $\text{::= } r \mid x \mid \lambda.x \rightarrow M \mid M\ N \mid \dots$                  |                  |
|               | $\mid$                                                                                    |                  |
|               | $Md \mid Ex \mid Stm \mid Sc \mid Sls \mid Act$                                           |                  |
| Program state | $P \text{ ::= } S; \Theta$                                                                |                  |
| HEC soup      | $S \text{ ::= } \emptyset \mid H \parallel S$                                             |                  |
| HEC           | $H \text{ ::= } \langle s, M, D \rangle \mid \langle s, M, D \rangle_{Sleeping}$          |                  |
|               | $\mid \langle s, M, D \rangle_{Outcall} \mid \text{Idle}$                                 |                  |
| Heap          | $\Theta \text{ ::= } r \mapsto M \oplus s \mapsto (M, D)$                                 |                  |
| SLS Store     | $D \text{ ::= } (M, N, r)$                                                                |                  |
| IO Context    | $\mathbb{E} \text{ ::= } \bullet \mid \mathbb{E} \gg= M \mid \text{catch } \mathbb{E}\ M$ |                  |
| STM Context   | $\mathbb{P} \text{ ::= } \bullet \mid \mathbb{P} \gg= M$                                  |                  |

## Upcalls from the RTS

|                    |                                                                                                                                                      |
|--------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
|                    | $\text{Dequeue upcall instantiation } H; \Theta \xrightarrow{\text{deq}} H'; \Theta'$                                                                |
| (UPDEQUEUE)        |                                                                                                                                                      |
|                    | $s' \text{ fresh } r \text{ fresh } D' = (\text{deq}(D), \text{enq}(D), r)$                                                                          |
|                    | $M' = \text{switch } (\lambda x. \text{deq}(D) s)$                                                                                                   |
|                    | $\Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()]$                                                                                     |
|                    | $\underline{(s, M, D); \Theta \xrightarrow{\text{deq}} \langle s', M', D' \rangle; \Theta'}}$                                                        |
|                    | $\text{Enqueue upcall instantiation } H; \Theta \xrightarrow{\text{enq } s} H'; \Theta'$                                                             |
| (UPENQUEUEIDLE)    |                                                                                                                                                      |
|                    | $s' \text{ fresh } r \text{ fresh } D' = (\text{deq}(D), \text{enq}(D), r)$                                                                          |
|                    | $M' = \text{atomically } (\text{enq}(D) s)$                                                                                                          |
|                    | $\Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()]$                                                                                     |
|                    | $\underline{\text{Idle}; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M', D' \rangle; \Theta'}$                                  |
| (UPENQUEUERUNNING) |                                                                                                                                                      |
|                    | $M'' = \text{atomically } (\text{enq}(D) s) \gg M'$                                                                                                  |
|                    | $\underline{\langle s', M', D' \rangle; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M'', D' \rangle; \Theta[s \mapsto (M, D)]}$ |

# Formalization

## Concurrency Substrate

|               |                                                                                           |                  |
|---------------|-------------------------------------------------------------------------------------------|------------------|
|               | $x, y \in Variable$                                                                       | $r, s, \in Name$ |
| $Md$          | $\text{return } M \mid M \gg= N$                                                          |                  |
| $Ex$          | $\text{throw } M \mid \text{catch } M\ N \mid \text{catchSTM } M\ N$                      |                  |
| $Stm$         | $\text{newTVar } M \mid \text{readTVar } r \mid \text{writeTVar } r\ M$                   |                  |
|               | $\mid$                                                                                    |                  |
|               | $\text{atomically } M \mid \text{retry}$                                                  |                  |
| $Sc$          | $\text{newSCont } M \mid \text{switch } M \mid \text{runOnIdleHEC } s$                    |                  |
| $Sls$         | $\text{getAux } s \mid \text{setAux } s\ M$                                               |                  |
| $Act$         | $\text{dequeueAct } s \mid \text{enqueueAct } s$                                          |                  |
|               | $\mid$                                                                                    |                  |
|               | $\text{setDequeueAct } M \mid \text{setEnqueueAct } M$                                    |                  |
| <b>Term</b>   |                                                                                           |                  |
| $M, N$        | $\text{ ::= } r \mid x \mid \lambda.x \rightarrow M \mid M\ N \mid \dots$                 |                  |
|               | $\mid$                                                                                    |                  |
|               | $Md \mid Ex \mid Stm \mid Sc \mid Sls \mid Act$                                           |                  |
| Program state | $P \text{ ::= } S; \Theta$                                                                |                  |
| HEC soup      | $S \text{ ::= } \emptyset \mid H \parallel S$                                             |                  |
| HEC           | $H \text{ ::= } \langle s, M, D \rangle \mid \langle s, M, D \rangle_{\text{Sleeping}}$   |                  |
|               | $\mid \langle s, M, D \rangle_{\text{Outcall}} \mid \text{Idle}$                          |                  |
| Heap          | $\Theta \text{ ::= } r \mapsto M \oplus s \mapsto (M, D)$                                 |                  |
| SLS Store     | $D \text{ ::= } (M, N, r)$                                                                |                  |
| IO Context    | $\mathbb{E} \text{ ::= } \bullet \mid \mathbb{E} \gg= M \mid \text{catch } \mathbb{E}\ M$ |                  |
| STM Context   | $\mathbb{P} \text{ ::= } \bullet \mid \mathbb{P} \gg= M$                                  |                  |

## Upcalls from the RTS

|                                     |                              |                                                                                                                                                      |
|-------------------------------------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
|                                     | Dequeue upcall instantiation | $H; \Theta \xrightarrow{\text{deq}} H'; \Theta'$                                                                                                     |
| <b>(UPDEQUEUE)</b>                  |                              |                                                                                                                                                      |
| $s' \text{ fresh }$                 | $r \text{ fresh }$           | $D' = (\text{deq}(D), \text{enq}(D), r)$                                                                                                             |
|                                     |                              | $M' = \text{switch } (\lambda x. \text{deq}(D))\ s$                                                                                                  |
|                                     |                              | $\Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()]$                                                                                     |
|                                     |                              | $\underline{(s, M, D); \Theta \xrightarrow{\text{deq}} \langle s', M', D' \rangle; \Theta'}}$                                                        |
| <b>Enqueue upcall instantiation</b> |                              |                                                                                                                                                      |
|                                     |                              | $H; \Theta \xrightarrow{\text{enq } s} H'; \Theta'$                                                                                                  |
| <b>(UPENQUEUEIDLE)</b>              |                              |                                                                                                                                                      |
| $s' \text{ fresh }$                 | $r \text{ fresh }$           | $D' = (\text{deq}(D), \text{enq}(D), r)$                                                                                                             |
|                                     |                              | $M' = \text{atomically } (\text{enq}(D))\ s$                                                                                                         |
|                                     |                              | $\Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()]$                                                                                     |
|                                     |                              | $\underline{\text{Idle}; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M', D' \rangle; \Theta'}$                                  |
| <b>(UPENQUEUERUNNING)</b>           |                              |                                                                                                                                                      |
|                                     |                              | $M'' = \text{atomically } (\text{enq}(D))\ s \gg M'$                                                                                                 |
|                                     |                              | $\underline{\langle s', M', D' \rangle; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M'', D' \rangle; \Theta[s \mapsto (M, D)]}$ |



# Formalization

## Concurrency Substrate

$x, y \in Variable \quad r, s, \in Name$

$Md ::= \text{return } M \mid M \gg= N$   
 $Ex ::= \text{throw } M \mid \text{catch } M \ N \mid \text{catchSTM } M \ N$   
 $Stm ::= \text{newTVar } M \mid \text{readTVar } r \mid \text{writeTVar } r \ M$   
 $\mid \text{atomically } M \mid \text{retry}$   
 $Sc ::= \text{newSCont } M \mid \text{switch } M \mid \text{runOnIdleHEC } s$   
 $Sls ::= \text{getAux } s \mid \text{setAux } s \ M$   
 $Act ::= \text{dequeueAct } s \mid \text{enqueueAct } s$   
 $\mid \text{setDequeueAct } M \mid \text{setEnqueueAct } M$

**Term**

$M, N ::= r \mid x \mid \lambda.x \rightarrow M \mid M \ N \mid \dots$   
 $\mid Md \mid Ex \mid Stm \mid Sc \mid Sls \mid Act$

Program state

|             |                                                                                                                                                    |
|-------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
| $P$         | $::= S; \Theta$                                                                                                                                    |
| HEC soup    | $S ::= \emptyset \mid H \parallel S$                                                                                                               |
| HEC         | $H ::= \langle s, M, D \rangle \mid \langle s, M, D \rangle_{\text{Sleeping}}$<br>$\mid \langle s, M, D \rangle_{\text{Outcall}} \mid \text{Idle}$ |
| Heap        | $\Theta ::= r \mapsto M \oplus s \mapsto (M, D)$                                                                                                   |
| SLS Store   | $D ::= (M, N, r)$                                                                                                                                  |
| IO Context  | $\mathbb{E} ::= \bullet \mid \mathbb{E} \gg= M \mid \text{catch } \mathbb{E} \ M$                                                                  |
| STM Context | $\mathbb{P} ::= \bullet \mid \mathbb{P} \gg= M$                                                                                                    |

## Upcalls from the RTS

**Dequeue upcall instantiation**  $H; \Theta \xrightarrow{\text{deq}} H'; \Theta'$

(UPDEQUEUE)

$$\frac{s' \text{ fresh } r \text{ fresh } D' = (\text{deq}(D), \text{enq}(D), r) \quad M' = \text{switch } (\lambda x. \text{deq}(D) \ s) \quad \Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()]}{\langle s, M, D \rangle; \Theta \xrightarrow{\text{deq}} \langle s', M', D' \rangle; \Theta'}$$

**Enqueue upcall instantiation**  $H; \Theta \xrightarrow{\text{enq } s} H'; \Theta'$

(UPENQUEUEIDLE)

$$\frac{s' \text{ fresh } r \text{ fresh } D' = (\text{deq}(D), \text{enq}(D), r) \quad M' = \text{atomically } (\text{enq}(D) \ s) \quad \Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()] \quad \text{Idle}; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M', D' \rangle; \Theta'}{\text{Idle}; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M', D' \rangle; \Theta'}$$

(UPENQUEUERUNNING)

$$\frac{M'' = \text{atomically } (\text{enq}(D) \ s) \gg M'}{\langle s', M', D' \rangle; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M'', D' \rangle; \Theta[s \mapsto (M, D)]}$$

IO

$E[M] \xrightarrow{\text{tick}} E[\text{yield} \gg M]$

atomically  
STM

$E[\text{atomically}(P[M])] \xrightarrow{\text{tick}} \text{Suspend transaction and switch}$

# Formalization

## Concurrency Substrate

|               |                                                                                           |                  |
|---------------|-------------------------------------------------------------------------------------------|------------------|
|               | $x, y \in Variable$                                                                       | $r, s, \in Name$ |
| $Md$          | $\text{return } M \mid M \gg= N$                                                          |                  |
| $Ex$          | $\text{throw } M \mid \text{catch } M\ N \mid \text{catchSTM } M\ N$                      |                  |
| $Stm$         | $\text{newTVar } M \mid \text{readTVar } r \mid \text{writeTVar } r\ M$                   |                  |
|               | $\mid$                                                                                    |                  |
|               | $\text{atomically } M \mid \text{retry}$                                                  |                  |
| $Sc$          | $\text{newSCont } M \mid \text{switch } M \mid \text{runOnIdleHEC } s$                    |                  |
| $Sls$         | $\text{getAux } s \mid \text{setAux } s\ M$                                               |                  |
| $Act$         | $\text{dequeueAct } s \mid \text{enqueueAct } s$                                          |                  |
|               | $\mid$                                                                                    |                  |
|               | $\text{setDequeueAct } M \mid \text{setEnqueueAct } M$                                    |                  |
| <b>Term</b>   |                                                                                           |                  |
| $M, N$        | $\text{ ::= } r \mid x \mid \lambda.x \rightarrow M \mid M\ N \mid \dots$                 |                  |
|               | $\mid$                                                                                    |                  |
|               | $Md \mid Ex \mid Stm \mid Sc \mid Sls \mid Act$                                           |                  |
| Program state | $P \text{ ::= } S; \Theta$                                                                |                  |
| HEC soup      | $S \text{ ::= } \emptyset \mid H \parallel S$                                             |                  |
| HEC           | $H \text{ ::= } \langle s, M, D \rangle \mid \langle s, M, D \rangle_{Sleeping}$          |                  |
|               | $\mid \langle s, M, D \rangle_{Outcall} \mid \text{Idle}$                                 |                  |
| Heap          | $\Theta \text{ ::= } r \mapsto M \oplus s \mapsto (M, D)$                                 |                  |
| SLS Store     | $D \text{ ::= } (M, N, r)$                                                                |                  |
| IO Context    | $\mathbb{E} \text{ ::= } \bullet \mid \mathbb{E} \gg= M \mid \text{catch } \mathbb{E}\ M$ |                  |
| STM Context   | $\mathbb{P} \text{ ::= } \bullet \mid \mathbb{P} \gg= M$                                  |                  |

## Upcalls from the RTS

|                                     |                                                                                                                                                      |                                                  |
|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------|
|                                     | Dequeue upcall instantiation                                                                                                                         | $H; \Theta \xrightarrow{\text{deq}} H'; \Theta'$ |
| <b>(UPDEQUEUE)</b>                  |                                                                                                                                                      |                                                  |
|                                     | $s' \text{ fresh } r \text{ fresh } D' = (\text{deq}(D), \text{enq}(D), r)$                                                                          |                                                  |
|                                     | $M' = \text{switch } (\lambda x. \text{deq}(D) s)$                                                                                                   |                                                  |
|                                     | $\Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()]$                                                                                     |                                                  |
|                                     | $\underline{(s, M, D); \Theta \xrightarrow{\text{deq}} \langle s', M', D' \rangle; \Theta'}}$                                                        |                                                  |
| <b>Enqueue upcall instantiation</b> |                                                                                                                                                      |                                                  |
|                                     | $H; \Theta \xrightarrow{\text{enq } s} H'; \Theta'$                                                                                                  |                                                  |
| <b>(UPENQUEUEIDLE)</b>              |                                                                                                                                                      |                                                  |
|                                     | $s' \text{ fresh } r \text{ fresh } D' = (\text{deq}(D), \text{enq}(D), r)$                                                                          |                                                  |
|                                     | $M' = \text{atomically } (\text{enq}(D) s)$                                                                                                          |                                                  |
|                                     | $\Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()]$                                                                                     |                                                  |
|                                     | $\text{Idle}; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M', D' \rangle; \Theta'$                                              |                                                  |
| <b>(UPENQUEUERUNNING)</b>           |                                                                                                                                                      |                                                  |
|                                     | $M'' = \text{atomically } (\text{enq}(D) s) \gg M'$                                                                                                  |                                                  |
|                                     | $\underline{\langle s', M', D' \rangle; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M'', D' \rangle; \Theta[s \mapsto (M, D)]}$ |                                                  |

IO

$E[M]$

tick

$E[\text{yield} \gg M]$

atomically  
STM

$E[\text{atomically}(P[M])]$

tick

Suspend transaction  
and switch

Switch  
STM

$E[\text{switch}(P[M])]$

tick

# Formalization

## Concurrency Substrate

|               |                                                                                  |                  |
|---------------|----------------------------------------------------------------------------------|------------------|
|               | $x, y \in Variable$                                                              | $r, s, \in Name$ |
| $Md$          | $\text{return } M \mid M \ggg N$                                                 |                  |
| $Ex$          | $\text{throw } M \mid \text{catch } M\ N \mid \text{catchSTM } M\ N$             |                  |
| $Stm$         | $\text{newTVar } M \mid \text{readTVar } r \mid \text{writeTVar } r\ M$          |                  |
|               | $\mid$                                                                           |                  |
|               | $\text{atomically } M \mid \text{retry}$                                         |                  |
| $Sc$          | $\text{newSCont } M \mid \text{switch } M \mid \text{runOnIdleHEC } s$           |                  |
| $Sls$         | $\text{getAux } s \mid \text{setAux } s\ M$                                      |                  |
| $Act$         | $\text{dequeueAct } s \mid \text{enqueueAct } s$                                 |                  |
|               | $\mid$                                                                           |                  |
|               | $\text{setDequeueAct } M \mid \text{setEnqueueAct } M$                           |                  |
| <b>Term</b>   |                                                                                  |                  |
| $M, N$        | $r \mid x \mid \lambda.x \rightarrow M \mid M\ N \mid \dots$                     |                  |
|               | $\mid$                                                                           |                  |
|               | $Md \mid Ex \mid Stm \mid Sc \mid Sls \mid Act$                                  |                  |
| Program state | $P ::= S; \Theta$                                                                |                  |
| HEC soup      | $S ::= \emptyset \mid H \parallel S$                                             |                  |
| HEC           | $H ::= \langle s, M, D \rangle \mid \langle s, M, D \rangle_{Sleeping}$          |                  |
|               | $\mid \langle s, M, D \rangle_{Outcall} \mid \text{Idle}$                        |                  |
| Heap          | $\Theta ::= r \mapsto M \oplus s \mapsto (M, D)$                                 |                  |
| SLS Store     | $D ::= (M, N, r)$                                                                |                  |
| IO Context    | $\mathbb{E} ::= \bullet \mid \mathbb{E} \ggg M \mid \text{catch } \mathbb{E}\ M$ |                  |
| STM Context   | $\mathbb{P} ::= \bullet \mid \mathbb{P} \ggg M$                                  |                  |

## Upcalls from the RTS

|                                                                                                                                          |                                                  |
|------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------|
| Dequeue upcall instantiation                                                                                                             | $H; \Theta \xrightarrow{\text{deq}} H'; \Theta'$ |
| (UPDEQUEUE)                                                                                                                              |                                                  |
| $s' \text{ fresh } r \text{ fresh } D' = (\text{deq}(D), \text{enq}(D), r)$                                                              |                                                  |
| $M' = \text{switch } (\lambda x. \text{deq}(D) s)$                                                                                       |                                                  |
| $\Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()]$                                                                         |                                                  |
| $\langle s, M, D \rangle; \Theta \xrightarrow{\text{deq}} \langle s', M', D' \rangle; \Theta'$                                           |                                                  |
| Enqueue upcall instantiation                                                                                                             |                                                  |
| $H; \Theta \xrightarrow{\text{enq } s} H'; \Theta'$                                                                                      |                                                  |
| (UPENQUEUEIDLE)                                                                                                                          |                                                  |
| $s' \text{ fresh } r \text{ fresh } D' = (\text{deq}(D), \text{enq}(D), r)$                                                              |                                                  |
| $M' = \text{atomically } (\text{enq}(D) s)$                                                                                              |                                                  |
| $\Theta' = \Theta[s \mapsto (M, D)][r \mapsto \text{toDyn } ()]$                                                                         |                                                  |
| $\text{Idle}; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M', D' \rangle; \Theta'$                                  |                                                  |
| (UPENQUEUERUNNING)                                                                                                                       |                                                  |
| $M'' = \text{atomically } (\text{enq}(D) s) \gg M'$                                                                                      |                                                  |
| $\langle s', M', D' \rangle; \Theta[s \mapsto (M, D)] \xrightarrow{\text{enq } s} \langle s', M'', D' \rangle; \Theta[s \mapsto (M, D)]$ |                                                  |

IO

$E[M] \xrightarrow{\text{tick}} E[\text{yield} \gg M]$

atomically  
STM

$E[\text{atomically}(P[M])] \xrightarrow{\text{tick}}$  Suspend transaction  
and switch

Switch  
STM

$E[\text{switch}(P[M])] \xrightarrow{\text{tick}}$  Disable Interrupts!

# Future directions



# Future directions



# Future directions



# Future directions



- Verifying the functional correctness of schedulers
  - Correct Scheduler  $\Rightarrow$  Each thread runs as if given its **own processor** and **register set**
  - Scheduler access under STM  $\Rightarrow$  Treat scheduler as **sequential process**

# Future directions



- Verifying the functional correctness of schedulers
  - Correct Scheduler  $\Rightarrow$  Each thread runs as if given its **own processor** and **register set**
  - Scheduler access under STM  $\Rightarrow$  Treat scheduler as **sequential process**
- FP abstractions for eventually consistent systems
  - Operations on ECDTs described as pure functions over axiomatic executions
  - A relational specification language for specifying consistency assertions over axiomatic executions

# Conclusion



- Functional programming abstractions simplify concurrent programming
  - Rx-CML: Synchronous communication over geo-distributed systems
  - Concurrency Substrate: Scheduler activation + STM for writing schedulers
- Abstractions introduce indirection resulting in overheads
  - Rx-CML slower than explicit async under contention
  - Conc. Subs: Scheduler allocations increase GC overheads