When deciding which language to use to solve challenges that require heavy concurrent algorithms, it’s hard to not consider Haskell. Its immutable and persistent data structures reduce the introduction of accidental complexity, and the GHC runtime facilitates the creation of thousands of (green) threads without having to worry as much about the memory and performance costs.
The epitome of Haskell’s concurrent API is the async package, which provides higher-order functions (e.g. race, mapConcurrently, etc.) that allow us to run IO sub-routines and combine their results in various ways while executing concurrently. It also offers the type Concurrently which allows developers to give normal sub-routines concurrent properties, and also provides Applicative and Alternative instances that help in the creation of values from composing smaller sub-routines.
In this blog post, we will discuss some of the drawbacks of using the Concurrently type when composing sub-routines. Then we will show how we can overcome these shortcomings by taking advantage of the structural nature of the Applicative and Alternative typeclasses; re-shaping and optimizing the execution of a tree of sub-routines.
And, if you simply want to get these performance advantages in your Haskell code today, you can cut to the chase and begin using the new Conc datatype we’ve introduced in unliftio 0.2.9.0.
ConcurrentlyGetting started with Concurrently is easy. We can wrap an IO a sub-routine with the Concurrently constructor, and then we can compose async values using the map (<$>), apply (<*>), and alternative (<|>) operators. An example might be:
myPureFunction :: String -> String -> String -> String
myPureFunction a b c = a ++ " " ++ b ++ " " ++ c
myComputation :: Concurrently String
myComputation =
myPureFunction
<$> Concurrently fetchStringFromAPI1
<*> ( Concurrently fetchStringFromAPI2_Region1
<|> Concurrently fetchStringFromAPI2_Region2
<|> Concurrently fetchStringFromAPI2_Region3
<|> Concurrently fetchStringFromAPI2_Region4)
<*> Concurrently fetchStringFromAPI3
Let’s talk a bit on the drawbacks of this approach. How many threads do you think we need to make sure all these calls execute concurrently? Try to come up with a number and an explanation and then continue reading.
I am guessing you are expecting this code to spawn six (6) threads, correct? One for each IO sub-routine that we are using. However, with the existing implementation of Applicative and Alternative in Concurrently, we will spawn at least ten (10) threads. Let’s explore these instances to have a better understanding of what is going on:
instance Applicative Concurrently where
pure = Concurrently . return
Concurrently fs <*> Concurrently as =
Concurrently $ ((f, a) -> f a) <$> concurrently fs as
instance Alternative Concurrently where
Concurrently as <|> Concurrently bs =
Concurrently $ either id id <$> race as bs
First, let us expand the alternative calls in our example:
Concurrently fetchStringFromAPI2_Region1
<|> Concurrently fetchStringFromAPI2_Region2
<|> Concurrently fetchStringFromAPI2_Region3
<|> Concurrently fetchStringFromAPI2_Region4
--- is equivalent to
Concurrently (
either id id <$>
race {- 2 threads -}
fetchStringFromAPI2_Region1
(either id id <$>
race {- 2 threads -}
fetchStringFromAPI2_Region2
(either id id <$>
race {- 2 threads -}
fetchStringFromAPI2_Region3
fetchStringFromAPI2_Region4))
)
Next, let us expand the applicative calls:
Concurrently (myPureFunction <$> fetchStringFromAPI1)
<*> Concurrently fetchStringFromAPI2
<*> Concurrently fetchStringFromAPI3
--- is equivalent to
Concurrently (
((f, a) -> f a) <$>
concurrently {- 2 threads -}
( ((f, a) -> f a) <$>
concurrently {- 2 threads -}
(myPureFunction <$> fetchStringFromAPI1)
fetchStringFromAPI2
)
fetchStringFromAPI3
)
You may tell we are always spawning two threads for each pair of sub-routines. Suppose we have 7 sub-routines we want to compose via Applicative or Alternative. Using this implementation we would spawn at least 14 new threads when at most 8 should do the job. For each composition we do, an extra thread is going to be spawned to deal with bookkeeping.
Another drawback to consider: what happens if one of the values in the call is a pure call? Given this code:
pure foo <|> bar
We get to spawn a new thread (unnecessarily) to wait for foo, even though it has already been computed and it should always win. As we mentioned before, Haskell is an excellent choice for concurrency because it makes spawning threads cheap; however, these threads don’t come for free, and we should strive to avoid redundant thread creation.
Conc typeTo address the issues mentioned above, we implemented a new type called Conc in our unliftio package. It has the same purpose as Concurrently, but it offers some extra guarantees:
Applicative and Alternative compositions.pure calls in an Applicative or an Alternative composition, we will not spawn a new thread.Conc value (instead of a composition of Conc values).IO sub-routines. Any monadic type that implements MonadUnliftIO is accepted.The Conc type is defined as follows:
data Conc m a where
Action :: m a -> Conc m a
Apply :: Conc m (v -> a) -> Conc m v -> Conc m a
LiftA2 :: (x -> y -> a) -> Conc m x -> Conc m y -> Conc m a
Pure :: a -> Conc m a
Alt :: Conc m a -> Conc m a -> Conc m a
Empty :: Conc m a
instance MonadUnliftIO m => Applicative (Conc m) where
pure = Pure
(<*>) = Apply
(*>) = Then
liftA2 = LiftA2
instance MonadUnliftIO m => Alternative (Conc m) where
(<|>) = Alt
If you are familiar with Free types, this will look eerily familiar. We are going to represent our concurrent computations as data so that we can later transform it or evaluate as we see fit. In this setting, our first example would look something like the following:
myComputation :: Conc String
myComputation =
myPureFunction
<$> conc fetchStringFromAPI1
<*> ( conc fetchStringFromAPI2_Region1
<|> conc fetchStringFromAPI2_Region2
<|> conc fetchStringFromAPI2_Region3
<|> conc fetchStringFromAPI2_Region4)
--- is equivalent to
Apply (myPureFunction <$> fetchStringFromAPI1)
(Alt (Action fetchStringFromAPI2_Region1)
(Alt (Action fetchStringFromAPI2_Region2)
(Alt (Action fetchStringFromAPI2_Region3)
(Action fetchStringFromAPI2_Region4))))
You may notice we keep the tree structure of the Concurrently implementation. However, given we are dealing with a pure data structure, we can modify our Conc value to something that is easier to evaluate. Indeed, thanks to the Applicative interface, we don’t need to evaluate any of the IO sub-routines to do transformations (magic!).
We have additional (internal) types that flatten all our alternatives and applicative values:
data Flat a
= FlatApp !(FlatApp a)
| FlatAlt !(FlatApp a) !(FlatApp a) ![FlatApp a]
data FlatApp a where
FlatPure :: a -> FlatApp a
FlatAction :: IO a -> FlatApp a
FlatApply :: Flat (v -> a) -> Flat v -> FlatApp a
FlatLiftA2 :: (x -> y -> a) -> Flat x -> Flat y -> FlatApp a
These types are equivalent to our Conc type, but they have a few differences from Conc:
Flat type separates Conc values created via Applicative from the ones created via AlternativeFlatAlt constructor flattens an Alternative tree into a list (helping us spawn all of them at once and facilitating the usage of a single bookkeeping thread).
semigroups package.Flat and FlatApp records are not polymorphic on their monadic context given they rely directly on IO. We can transform the m parameter in our Conc m a type to IO via the MonadUnliftIO constraint.The first example of our blog post, when flattened, would look something like the following:
FlatApp
(FlatApply
(FlatApp (FlatAction (myPureFunction <$> fetchStringFromAPI1)))
(FlatAlt (FlatAction fetchStringFromAPI2_Region1)
(FlatAction fetchStringFromAPI2_Region2)
[ FlatAction fetchStringFromAPI2_Region3
, FlatAction fetchStringFromAPI2_Regoin4 ]))
Using a flatten function that transforms a Conc value into a Flat value, we can later evaluate the concurrent sub-routine tree in a way that is optimal for our use case.
So given that the Conc API reduces the number of threads created via Alternative, our implementation should work best, correct? Sadly, it is not all peachy. To ensure that we get the result of the first thread that finishes on an Alternative composition, we make use of the STM API. This approach works great when we want to gather values from multiple concurrent threads. Sadly, the STM monad doesn’t scale too well when composing lots of reads, making this approach prohibitive if you are composing tens of thousands of Conc values.
Considering this limitation, we only use STM when an Alternative function is involved; otherwise, we rely on MVars for multiple thread result composition via Applicative. We can do this without sweating because we can change the evaluator of the sub-routine tree created by Conc on the fly.
We showcased how we can model the composition of computations using an Applicative and Alternative tree, and then, taking advantage of this APIs; we transformed this computation tree into something more approachable to execute concurrently. We also took advantage of this sub-routines as data approach to change the evaluator from MVar to STM compositions.
Subscribe to our blog via email
Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.