Back in March, I mentioned that we’d be using conduit for high performance analyses. We’ve been busy working on various aspects of this behind the scenes. This is the first publicly available follow-up since then.
One issue with financial analyses is bridging the gap between in-memory representations and streaming data. The former allows for higher performance for many forms of analysis, while the latter allows us to deal with far larger data sets and to generate output more quickly for certain analyses.
We’ll be using the vector package almost exclusively for
efficient in-memory representations in IAP, and conduit for
streaming data. The question comes: what happens when we need to
transition from one to the other? There are already functions like
yieldMany
to yield values from a Vector
as a Conduit
, and conduitVector
or
sinkVector
to consume values from a
Conduit
into a packed representation.
While these solutions are always sufficient, they aren’t always
optimal. When we’re going for high speed analyses, we don’t want to
waste time boxing unpacked values or allocating extra constructors
unnecessarily. This blog post introduces a new tool we’re adding to
the IAP toolchain (and the conduit toolchain in general):
vectorBuilder
.
Often times when working with Haskell in a high performance
context, the overhead introduced by a linked list representation
can be too high. Having an extra constructor around each value, a
constructor for each cons cell, and the indirection introduced by
having to follow pointers, can completely kill performance. The
most common examples of this are the high speedup you can often
achieve by replacing String
with Text
(or
sometimes ByteString
), or by using
Vector
s- especially unboxed or storable
Vector
s.
conduit has a similar representation of a stream as a list, including the constructor overheads just mentioned. It’s not surprising, therefore, that in a situation that a list would be a poor representation, conduits will often suffer similar performance problems. Like lists, some of this overhead is mitigated by shortcut fusion (a.k.a., rewrite rules). But this isn’t always the case.
conduit-combinators provides a helper function which allows us to take back performance, by working with a packed representation instead of creating a bunch of cons cells. It does this by using the vector package’s generic mutable interface under the surface, while at a user-facing level providing a simple yield-like function, avoiding the need to muck around with mutable buffers.
This article will cover how to use this function, some implementation details, and comparisons to other approaches.
NOTE: At the time of writing, the version of
conduit-combinators provided on School of Haskell does not contain
the vectorBuilder
function, and therefore the active
code below will not run.
Let’s start with a simple goal: we have chunks of bytes coming in, and we want to (1) duplicate each successive byte so that, e.g. [1, 2, 3] becomes [1, 1, 2, 2, 3, 3] and (2) rechunk the values into vectors of size 512. The original data could be chunked in any way, so we can rely on any specific incoming chunk size (in this case, a known 256 chunk size would be convenient).
Likely the easiest approach is to convert our stream of chunked
values (e.g., ByteString
or Vector Word8
)
into a stream of elements (e.g., Word8
), duplicate the
individual values, then chunk those back up. Such a solution would
look like:
rechunk1 = concatC =$= concatMapC (x -> [x, x]) =$= conduitVector 512
This uses the concatC
combinator to “flatten out”
the input stream, concatMapC
to duplicate each
individual Word8
, and then conduitVector
to create a stream of 512-sized Vector
s. In my simple
benchmark, this function took 13.06ms.
But as we can probably guess, this falls into the problem zone
described in our introduction. So instead of dealing with things on
the individual byte level, let’s try to use some higher-level
functions operating on Vector
s of values instead. Our
new approach will be to first mapC
over the stream and
use vector’s concatMap
to double each value, and then
use takeCE
and foldC
to extract
successive chunks of size 4096. In code:
rechunk2 = mapC (concatMap $ replicate 2) =$= loop where loop = do x <- takeCE 512 =$= foldC unless (null x) $ yield x >> loop
In the same benchmark, this performed at 8.83ms, a 32% speedup. While respectable, we can do better.
Our first approach is optimal in one way: it avoids needless
buffer copying. Each Word8
is copied precisely once
into an output Vector
by conduitVector
.
Unfortunately, this advantage is killed by the overhead of boxing
the Word8
s and allocating constructors for conduit.
Our second approach avoids the boxing and constructors by always
operating on Vector
s, but we end up copying buffers
multiple times: from the original Vector
to the
doubled Vector
, and then when folding together
multiple Vector
s into a single Vector
of
size 512.
What we want to do is to be able to yield
a
Word8
and have it fill up an output buffer, and once
that buffer is filled, yield
that buffer downstream
and start working on a new one. We could do that by directly
dealing with mutable Vector
s, but that’s error-prone
and tedious. Instead, let’s introduce our new combinator function:
vectorBuilder
(or its unqualified name,
vectorBuilderC
).
The idea is simple. vectorBuilder
will allocate an
output buffer for you. It provides you with a special
yield
-like function that fills up this buffer, and
when it’s full, yields the entire buffer downstream for you.
To use it, we’re going to use one other combinator function:
mapM_CE
, which performs an action for every value in a
chunked input stream (in our case, for each Word8
in
our input Vector Word8
s). Altogether, this looks
like:
rechunk3 = vectorBuilderC 512 $ yield' -> mapM_CE (x -> yield' x >> yield' x)
We call yield'
twice to double our bytes.
vectorBuilder
ensures that each output buffer is of
size 512. mapM_CE
efficiently traverses the incoming
Vector
s without creating intermediate data
structures.
This version benchmarks at 401.12us. This is approximately 95% faster than our previous attempt!
There’s something tricky about the yield'
function
above. Notice how it’s not being used in the Conduit
monad transformer, but is instead living the base monad (e.g.,
IO
). This is not accidental. Not only does this allow
us to use existing monadic combinators like mapM_CE
,
it also allows for far more efficient code. To demonstrate,
let’s look at two different ways of doing the same thing:
bgroup "transformers" $ let src = return () in [ bench "single" $ nfIO $ do ref <- newIORef 0 let incr = modifyIORef ref succ src $$ liftIO (replicateM_ 1000 incr) , bench "multi" $ nfIO $ do ref <- newIORef 0 let incr = liftIO $ modifyIORef ref succ src $$ replicateM_ 1000 incr ]
Both of these benchmarks use no conduit features. They both
create an IORef
, then increment it 1000 times. The
difference is that the first calls liftIO
once, while
the second calls liftIO
1000 times. Let’s see the
difference in benchmark results:
benchmarking transformers/single
mean: 4.292891 us, lb 4.285319 us, ub 4.303626 us, ci 0.950
std dev: 45.83832 ns, lb 35.04324 ns, ub 59.43617 ns, ci 0.950
benchmarking transformers/multi
mean: 93.10228 us, lb 92.95708 us, ub 93.30159 us, ci 0.950
std dev: 869.6636 ns, lb 673.8342 ns, ub 1.090044 us, ci 0.950
Avoiding extra liftIO
calls has a profound
performance impact. The reason for this is somewhat similar to what
we’ve been discussing up until now about extra cons cells. In our
case, it’s extra PipeM
constructors used by conduit’s
MonadIO
instance. I don’t want to dwell on those
details too much right now, as that’s a whole separate topic of
analysis, involving looking at GHC core output. But let’s take it
as a given right now.
The question is: how does vectorBuilder
allow you
to live in the base monad, but still yield
values
downstream, which requires access to the Conduit
transformer? There’s a trick here using mutable variables. The
implementation essentially works like this:
Conduit
function,
providing it with a specialized yield
function.yield
function- which lives in the
base monad- will write values into the mutable vector. Once that
mutable vector is filled, the vector is frozen and added to the end
of the mutable variable’s list, and a new mutable vector is
allocated.await
s for
values from upstream, we jump into action. Since we’re already
forced to be in the Conduit
transformer at that point,
this is our chance to yield
. We grab all of the frozen
vectors from the mutable variable and yield
them
downstream. Once that’s done, we await
for new data
from upstream, and provide it to the user’s function.The upsides of this approach are ease-of-use and performance.
There is one downside you should be aware of: if you
generate a large amount of output without await
ing for
more data from upstream, you can begin to accumulate more memory.
You can force the collection of frozen Vector
s to be
flushed using the following helper function:
forceFlush :: Monad m => ConduitM i o m () forceFlush = await >>= maybe (return ()) leftover
This simply await
s for a value, allowing
vectorBuilder
to clear its cache, and then gives the
new value back as a leftover.
Overall, your goal should be to have a decent trade-off between memory and time efficiency. To demonstrate, try playing around with the functions f1, f2, and f3 in the following code snippet:
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE FlexibleContexts #-}
import ClassyPrelude.Conduit
forceFlush :: Monad m => ConduitM i o m ()
forceFlush = await >>= maybe (return ()) leftover
-- Memory inefficient, time efficient
f1 :: (Int -> IO ()) -> Sink () IO ()
f1 f = liftIO $ forM_ [1..1000000] f
-- Memory efficient, time inefficient
f2 :: (Int -> Sink () IO ()) -> Sink () IO ()
f2 f = forM_ [1..1000000] $ i -> do
f i
forceFlush
-- Good trade-off
f3 f = forM_ (chunksOf 10000 [1..1000000]) $ is -> do
liftIO $ mapM_ f is
forceFlush
where
chunksOf _ [] = []
chunksOf i x =
y : chunksOf i z
where
(y, z) = splitAt i x
main = vectorBuilderC 4096 f3
$$ (sinkNull :: Sink (Vector Int) IO ())
It may be surprising to have seen an entire article on packed
representations of bytes, and not yet seen ByteString
.
As a matter of fact, the
original use case I started working on this for had nothing to
do with the vector package. However, I decided to focus on
vector
for two reasons:
ByteString
to and from a storable
Vector
.To demonstrate that second point, let’s try to read a file,
duplicate all of its bytes as we did above, and write it back to a
separate file. We’ll use the toByteVector
and
fromByteVector
functions, which I recently added to
mono-traversable for just this purpose:
{-# LANGUAGE NoImplicitPrelude #-}
import ClassyPrelude.Conduit
import System.IO (IOMode (ReadMode, WriteMode),
withBinaryFile)
double :: (Word8 -> IO ()) -> Sink (SVector Word8) IO ()
double yield' = mapM_CE $ w ->
yield' w >> yield' w
main :: IO ()
main = withBinaryFile "input.txt" ReadMode $ inH ->
withBinaryFile "output.txt" WriteMode $ outH ->
sourceHandle inH
$$ mapC toByteVector
=$ vectorBuilderC 4096 double
=$ mapC fromByteVector
=$ sinkHandle outH
There’s a strong overlap between what vectorBuilder
does, and how blaze-builder (and more recently, bytestring’s
Builder
type) are intended to be used. I unfortunately
can’t give any conclusive comparisons between these two techniques
right now. What I can say is that there are cases where
using a Builder
has proven to be inefficient, and
vectorBuilder
provides a large performance
improvement. I can also say that vectorBuilder
addresses many more use cases that Builder
. For
example, at FP Complete we’re planning to use this in financial
analyses for creating time series data.
On the other hand, blaze-builder and bytestring’s
Builder
have both had far more real-world tuning than
vectorBuilder
. They also have support for things such
as copying existing ByteString
s into the output
stream, whereas vectorBuilder
always works by copying
a single element at a time.
So for now, if you have a use case and you’re uncertain whether
to use vectorBuilder
to blaze-builder, I recommend
either trying both approaches, or discussing it on one of the
Haskell mailing lists to get more feedback.
The code for most of the blog post above is below. Sorry that it’s a bit messy:
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
import ClassyPrelude.Conduit
import Control.Monad.Primitive (PrimMonad)
import Control.Monad.ST (runST)
import Criterion.Main (bench, bgroup, defaultMain, nfIO,
whnfIO)
import qualified Data.Vector.Generic as VG
import qualified System.Random.MWC as MWC
import Test.Hspec (hspec, shouldBe)
import Test.Hspec.QuickCheck (prop)
rechunk1 :: ( Monad m
, VG.Vector vector (Element input)
, PrimMonad base
, MonadBase base m
, MonoFoldable input
)
=> Conduit input m (vector (Element input))
rechunk1 = concatC =$= concatMapC (x -> [x, x]) =$= conduitVector 512
{-# INLINE rechunk1 #-}
rechunk2 :: (Monad m, IsSequence a) => Conduit a m a
rechunk2 =
mapC (concatMap $ replicate 2) =$= loop
where
loop = do
x <- takeCE 512 =$= foldC
unless (null x) $ yield x >> loop
{-# INLINE rechunk2 #-}
rechunk3 :: ( MonadBase base m
, PrimMonad base
, MonoFoldable input
, VG.Vector vector (Element input)
)
=> Conduit input m (vector (Element input))
rechunk3 = vectorBuilderC 512 $ yield' ->
mapM_CE (x -> yield' x >> yield' x)
{-# INLINE rechunk3 #-}
main :: IO ()
main = do
hspec $ prop "rechunking" $ ws -> do
let src = yield (pack ws :: UVector Word8)
doubled = concatMap (w -> [w, w]) ws
res1 = runST $ src $$ rechunk1 =$ sinkList
res2 = runST $ src $$ rechunk2 =$ sinkList
res3 = runST $ src $$ rechunk3 =$ sinkList
res1 `shouldBe` (res2 :: [UVector Word8])
(res3 :: [UVector Word8]) `shouldBe` (res2 :: [UVector Word8])
(unpack $ (mconcat res2 :: UVector Word8)) `shouldBe` (doubled :: [Word8])
case reverse res1 :: [UVector Word8] of
[] -> return ()
x:xs -> do
(length x <= 512) `shouldBe` True
all ((== 512) . length) xs `shouldBe` True
gen <- MWC.createSystemRandom
bytes <- replicateM 20 $
MWC.uniformR (12, 1024) gen >>= MWC.uniformVector gen
defaultMain
[ bgroup "copy bytes"
[ bench "rechunk1" $ whnfIO
$ yieldMany (bytes :: [UVector Word8])
$$ (rechunk1 :: Conduit (UVector Word8) IO (UVector Word8))
=$ sinkNull
, bench "rechunk2" $ whnfIO
$ yieldMany (bytes :: [UVector Word8])
$$ (rechunk2 :: Conduit (UVector Word8) IO (UVector Word8))
=$ sinkNull
, bench "rechunk3" $ whnfIO
$ yieldMany (bytes :: [UVector Word8])
$$ (rechunk3 :: Conduit (UVector Word8) IO (UVector Word8))
=$ sinkNull
]
, bgroup "transformers" $
let src = return () in
[ bench "single" $ nfIO $ do
ref <- newIORef (0 :: Int)
let incr = modifyIORef ref succ
src $$ liftIO (replicateM_ 1000 incr)
, bench "multi" $ nfIO $ do
ref <- newIORef (0 :: Int)
let incr = liftIO $ modifyIORef ref succ
src $$ replicateM_ 1000 incr
]
]
Subscribe to our blog via email
Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.