The other day I threw together a quick program to solve an
annoyance some people on our team were expressing. We sometimes do
our development on remote machines, but would still like to use
local file editors. There are plenty of solutions for this (SSHFS,
inotify
+rsync
), but none of us ever found
a prebaked solution that was low-latency and robust enough to use
regularly.
The program I put together is a simple client/server combo, where the client watches for local file changes and sends the updates to the server, which writes them to disk. This approach reduces latency significantly by keeping an open TCP connection, and can be tunneled over SSH for security. It’s simple, and the code is short and readable.
This program turned out to be a good demonstration of a few different common problem domains when writing practical, real world Haskell code:
This blog post series will build up from basics to being able to implement a program like the simple file mirror. This first post of three planned posts will cover communication protocols and streaming of data. You can also read:
Part 2, on network communication and concurrency
This series will contain a number of self-contained code
samples, presented as runnable scripts. In order to run these
scripts, copy them into a file ending with .hs
(e.g.,
practical.hs
), and then run it with stack
practical.hs
). You will need to install the Stack build tool. Stack
will take care of downloading and installing any compiler and
libraries necessary. Because of that: your first time running a
script will take a bit of time while downloading and
installing.
If you’d like to test that you’re ready to proceed, try running this program:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc main :: IO () main = putStrLn "Hello world!"
Also, this series of posts will use the
classy-prelude-conduit
library, which provides a lot
of built-in functionality to make it easier to follow. If you’d
like to kick off a build of that library now so it’s available when
you want to use it, run:
$ stack --resolver nightly-2016-09-10 --install-ghc build classy-prelude-conduit
We humans have an interesting tendancy to want to communicate – with each other and our computers – in human languages. Computers don’t care: they just see binary data. Often times, this overlaps just fine, specifically when dealing with the ASCII subset of the character set. But generally speaking, we need some way to distinguish between textual and binary data.
In Haskell, we do this by using two different data types:
Text
and ByteString
. In order to convert
between these two, we need to choose a character encoding,
and most often we’ll choose UTF-8. We can perform this
conversion with the encodeUtf8
and
decodeUtf8
functions:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit main :: IO () main = do let someText :: Text someText = unlines [ "Hello, this is English." , "Hola, este es el español." , "שלום, זה עברית." ] binary :: ByteString binary = encodeUtf8 someText filePath :: FilePath filePath = "say-hello.txt" writeFile filePath binary binary2 <- readFile filePath :: IO ByteString let text2 = decodeUtf8 binary2 putStrLn text2
If you’re coming from a language that doesn’t have this strong separation, it may feel clunky at first. But after years of experience with having a type-level distinction between textual and binary data, I can attest to the fact that the compiler has many times prevented me from making some mistakes that would have been terrible in practice. One example popped up when working on the simple-file-mirror tool itself: I tried to send the number of characters in a file path over the wire, instead of sending the number of bytes after encoding it.
The OverloadedStrings
language extension lets us
use string literals for all of these string-like things, including
(as you saw above) file paths.
NOTE: For historical reasons which are being worked on,
there is additionally a String
type, which is
redundant with Text
but far less efficient. I mention
it because you may see references to it when working with some
Haskell documentation. Whenever possible, stick to
Text
or ByteString
, depending on your
actual data content.
OK, that was a bit abstract. Let’s do something more concrete. We’re going to use the conduit library to represent streaming data. To start off simple, let’s stream the numbers 1 to 10 and print each of them:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit main :: IO () main = yieldMany [1..10] $$ mapM_C print
Our yieldMany
function will take a sequence of
values – in this case a list from 1 to 10 – and yield them
downstream. For those familiar, this is similar to
yield
ing in Python with generators. The idea is that
we will build a pipeline of multiple components, each
await
ing for values from upstream and
yield
ing values downstream.
Our mapM_C print
component will apply the function
print
to every value it receives from upstream. The
C
suffix is used for disambiguating conduit functions
from non-conduit functions. Finally, the $$
operator
in the middle is the “connect” function, which connects a source of
data to a sink of data and runs it. As you might guess, the above
prints the numbers 1 to 10.
We can also put other components into our pipeline, including
functions that both await
values from upstream
and yield
them downstream. For example:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit main :: IO () main = yieldMany [1..10] $$ mapC (* 10) =$ mapM_C print
The mapC
function applies a function to each value
in the stream and yields it downstream, while the =$
operator connects two components together. Take a guess at what the
output from this will be, and then give it a try.
NOTE There’s a subtle but important different between
$$
and =$
. $$
will connect
two components and then run the result to completion to get a
result. =$
will connect two components without
running them so that it can be further connected to other
components. In a single pipeline, you’ll end up with one
$$
usage.
We can also do more interesting consumption of a stream, like summing:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit main :: IO () main = do total <- yieldMany [1..10] $$ mapC (* 10) =$ sumC print total -- or more explicitly... total <- yieldMany [1..10] $$ mapC (* 10) =$ foldlC (+) 0 print total
Or limit how much of the stream we want to consume:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit main :: IO () main = do total <- yieldMany [1..10] $$ mapC (* 10) =$ takeC 5 =$ sumC print total
This only scratches the surface of what we can do with conduit, but hopefully it gives enough of a basic intuition for the library to get started. If you’re interested in diving in deep on the conduit library, check out the previously linked tutorial.
Having a stream of individual bytes turns out to be inefficient
in practice. It’s much better to chunk a series of bytes into an
efficient data structure like a ByteString
. Let’s see
what it looks like to stream data from a file to standard
output:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit import qualified System.IO as IO main :: IO () main = do writeFile "some-file.txt" ("This is just some text." :: ByteString) -- Yes, this is clunky. We'll do something much slicker in a bit. IO.withBinaryFile "some-file.txt" IO.ReadMode $ fileHandle -> sourceHandle fileHandle $$ decodeUtf8C =$ stdoutC
This is good, but what if we want to deal with the individual
bytes in the stream instead of the chunks. For example, let’s say
we want to get just the first 10 bytes of our file. The
takeC
function we used above would take the first five
chunks of data. We instead need a function which will work
on the elements of the bytestrings (the individual bytes).
Fortunately, conduit provides for this with the
E
-suffixed element-specific functions:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit import qualified System.IO as IO main :: IO () main = do writeFile "some-file.txt" ("This is just some text." :: ByteString) -- Yes, this is clunky. We'll do something much slicker in a bit. IO.withBinaryFile "some-file.txt" IO.ReadMode $ fileHandle -> sourceHandle fileHandle $$ takeCE 10 =$ decodeUtf8C =$ stdoutC
In the simple-file-mirror program, we will be sending files over
the network, and will need to limit some operations to the actual
file sizes in question. Functions like takeCE
will be
vital for doing this.
While the withBinaryFile
approach above worked, it
felt kind of clunky. And for more complicated control flows,
opening up the file in advance won’t be an option (like when we’ll
only know which file to open after the network connection tells us
the file path). To allow for these cases, we’re going to introduce
the runResourceT
, which allows us to acquire resources
in an exception-safe manner. Let’s rewrite the above example with
runResourceT
:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit main :: IO () main = do writeFile "some-file.txt" ("This is just some text." :: ByteString) runResourceT $ sourceFile "some-file.txt" $$ takeCE 10 =$ decodeUtf8C =$ stdoutC
Internally, sourceFile
uses the
bracketP
function, which runs some initialization
function (in our case, opening a file handle), registers some
cleanup function (in our case, closing that file handle), and then
performs an action with the resource. To demonstrate what that
looks like more explicitly, let’s write a modified
sourceFile
function which will return some default
file contents if the requested file can’t be read from.
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit import qualified System.IO as IO sourceFileDef :: MonadResource m => FilePath -> Source m ByteString sourceFileDef fp = do let -- tryIO catches all IO exceptions and puts them in a Left -- value. open = tryIO $ IO.openBinaryFile fp IO.ReadMode -- Exception occurred, so nothing to close. close (Left e) = putStrLn $ "No file to close, got an exception: " ++ tshow e -- No exception, so close the file handle. close (Right h) = hClose h bracketP open close $ eitherHandle -> case eitherHandle of Left ex -> do yield "I was unable to open the file in question:n" yield $ encodeUtf8 $ tshow ex ++ "n" Right fileHandle -> sourceHandle fileHandle main :: IO () main = runResourceT $ sourceFileDef "some-file.txt" $$ decodeUtf8C =$ stdoutC
Let’s at least get started on our actual simple-file-mirror code. The wire protocol we’re going to use is defined in the README, but we can describe it briefly as:
9:hello.txt11:Hello World
means “write
the file hello.txt
with the content Hello
World
“9:hello.txt-1:
means “hello.txt
was
deleted”So let’s get started with implementing the integer send/receive logic in conduit. There are many ways of doing this, some more efficient than others. For this program, I elected to go with the simplest approach possible (though you can see some historical more complicated/efficient versions). Let’s start with sending, which is pretty trivial:
#!/usr/bin/env stack -- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} import ClassyPrelude.Conduit sendInt :: Monad m => Int -> Producer m ByteString sendInt i = yield $ encodeUtf8 $ tshow i ++ ":" main :: IO () main = yieldMany sampleInts $$ awaitForever sendInt =$ stdoutC where sampleInts = [ 1 , 10 , -5 , 0 , 60 ]
Here we’ve introduced a new function, awaitForever
,
which repeatedly applies a function as long as data exists on the
stream. Take a guess at what the output of this program will be,
and then try it out.
Now let’s try out the receiving side of this, which is slightly more complicated, but not too bad:
#!/usr/bin/env stack {- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit --package word8 -} {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} import ClassyPrelude.Conduit import Data.Word8 (_colon) sendInt :: Monad m => Int -> Producer m ByteString sendInt i = yield $ encodeUtf8 $ tshow i ++ ":" recvInt :: MonadThrow m => Consumer ByteString m Int recvInt = do intAsText <- takeWhileCE (/= _colon) =$ decodeUtf8C =$ foldC dropCE 1 -- drop the colon from the stream case readMay $ unpack intAsText of Nothing -> error $ "Invalid int: " ++ show intAsText Just i -> return i main :: IO () main = yieldMany sampleInts $$ awaitForever sendInt =$ peekForeverE (do i <- recvInt print i) where sampleInts = [ 1 , 10 , -5 , 0 , 60 ]
peekForeverE
is similar to
awaitForever
, in that it repeatedly performs an action
as long as there is data on the stream. However, it’s different in
that it doesn’t grab the data off of the stream itself, leaving it
to the action provided to do that, and it deals correctly with
chunked data by ignoring empty chunks.
We’ve also introduced takeWhileCE
, which is like
takeCE
, but instead of giving it a fixed size of the
stream to consume, it continues consuming until it finds the given
byte. In our case: we consume until we get to a colon. Then we
decode into UTF-8 data, and use foldC
to concatenate
multiple chunks of Text
into a single
Text
value. Then we use readMay
to parse
the textual value into an Int
. (And yes, we could do a
much better job at error handling, but using error
is
the simplest approach.)
Building on top of these two functions, it becomes a lot easier to send and receive complete file paths. Let’s put that code in here, together with a test suite to prove it’s all working as expected:
#!/usr/bin/env stack {- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit --package word8 --package hspec -} {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} import ClassyPrelude.Conduit import Data.Word8 (_colon) import Test.Hspec import Test.Hspec.QuickCheck (prop) sendInt :: Monad m => Int -> Producer m ByteString sendInt i = yield $ encodeUtf8 $ tshow i ++ ":" sendFilePath :: Monad m => FilePath -> Producer m ByteString sendFilePath fp = do -- UTF-8 encode the filepath let bs = encodeUtf8 $ pack fp :: ByteString -- Send the number of bytes sendInt $ length bs -- Send the actual path yield bs recvInt :: MonadThrow m => Consumer ByteString m Int recvInt = do intAsText <- takeWhileCE (/= _colon) =$ decodeUtf8C =$ foldC dropCE 1 -- drop the colon from the stream case readMay $ unpack intAsText of Nothing -> error $ "Invalid int: " ++ show intAsText Just i -> return i recvFilePath :: MonadThrow m => Consumer ByteString m FilePath recvFilePath = do -- Get the byte count fpLen <- recvInt -- Read in the given number of bytes, decode as UTF-8 text, and -- then fold all of the chunks into a single Text value fpText <- takeCE fpLen =$= decodeUtf8C =$= foldC -- Unpack the text value into a FilePath return $ unpack fpText main :: IO () main = hspec $ do prop "sendInt/recvInt are inverses" $ i -> do res <- sendInt i $$ recvInt res `shouldBe` i prop "sendFilePath/recvFilePath are inverses" $ fp -> do res <- sendFilePath fp $$ recvFilePath res `shouldBe` fp
We’ve used the hspec test framework library and its QuickCheck
support to create a test suite which automatically generates test
cases based on the types in our program. In this case, it will
generate 100 random Int
s and 100 random
FilePath
s each time it’s run, and ensure that our
properties hold. This is a great way to quickly get significant
test coverage for a program.
OK, finally time to put all of this together. We’re going to add in some new functions for sending and receiving files themselves. This is a fairly simple composition of all of the work we’ve done until now. And this is the nice thing about Haskell in general, as well as the conduit library: purity and strong typing often make it possible to trivially combine smaller functions into more complicated beasts. Let’s see this all in action:
#!/usr/bin/env stack {- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit --package word8 --package hspec --package temporary -} {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} import ClassyPrelude.Conduit import Data.Word8 (_colon) import System.Directory (createDirectoryIfMissing, doesFileExist, removeFile) import System.FilePath (takeDirectory) import System.IO (IOMode (ReadMode), hFileSize, openBinaryFile) import System.IO.Temp (withSystemTempDirectory) import Test.Hspec import Test.Hspec.QuickCheck (prop) sendInt :: Monad m => Int -> Producer m ByteString sendInt i = yield $ encodeUtf8 $ tshow i ++ ":" sendFilePath :: Monad m => FilePath -> Producer m ByteString sendFilePath fp = do -- UTF-8 encode the filepath let bs = encodeUtf8 $ pack fp :: ByteString -- Send the number of bytes sendInt $ length bs -- Send the actual path yield bs sendFile :: MonadResource m => FilePath -- ^ root directory -> FilePath -- ^ path relative to root -> Producer m ByteString sendFile root fp = do -- Send the relative file path, so the other side knows where to -- put the contents sendFilePath fp let open = tryIO $ openBinaryFile fpFull ReadMode -- If the opening failed, we'll have an error message. So -- there's nothing to close, just do nothing! close (Left _err) = return () -- Opening succeeded, so close the file handle. close (Right h) = hClose h -- Grab the file handle... bracketP open close $ eh -> case eh of -- No file, send a -1 length to indicate file does not -- exist Left _ex -> sendInt (-1) -- File exists Right h -> do -- Send the size of the file size <- liftIO $ hFileSize h sendInt $ fromInteger size -- And stream the contents sourceHandle h where fpFull = root </> fp recvInt :: MonadThrow m => Consumer ByteString m Int recvInt = do intAsText <- takeWhileCE (/= _colon) =$ decodeUtf8C =$ foldC dropCE 1 -- drop the colon from the stream case readMay $ unpack intAsText of Nothing -> error $ "Invalid int: " ++ show intAsText Just i -> return i recvFilePath :: MonadThrow m => Consumer ByteString m FilePath recvFilePath = do -- Get the byte count fpLen <- recvInt -- Read in the given number of bytes, decode as UTF-8 text, and -- then fold all of the chunks into a single Text value fpText <- takeCE fpLen =$= decodeUtf8C =$= foldC -- Unpack the text value into a FilePath return $ unpack fpText recvFile :: MonadResource m => FilePath -- ^ directory to store files in -> Sink ByteString m () recvFile root = do -- Get the relative path to store in fpRel <- recvFilePath -- Prepend with the root directory to get a complete path let fp = root </> fpRel -- Get the size of the file fileLen <- recvInt if fileLen == (-1) -- We use -1 to indicate the file should be removed. Go ahead -- and call removeFile, but ignore any IO exceptions that -- occur when doing so (in case the file doesn't exist -- locally, for example) then liftIO $ void $ tryIO $ removeFile fp else do -- Create the containing directory liftIO $ createDirectoryIfMissing True $ takeDirectory fp -- Stream out the specified number of bytes and write them -- into the file takeCE fileLen =$= sinkFile fp main :: IO () main = hspec $ do prop "sendInt/recvInt are inverses" $ i -> do res <- sendInt i $$ recvInt res `shouldBe` i prop "sendFilePath/recvFilePath are inverses" $ fp -> do res <- sendFilePath fp $$ recvFilePath res `shouldBe` fp -- A more standard unit test, checking that sending and receiving -- a file does what is expected. it "create and delete files" $ -- Get temporary source and destination directories withSystemTempDirectory "src" $ srcDir -> withSystemTempDirectory "dst" $ dstDir -> do let relPath = "somepath.txt" content = "This is the content of the file" :: ByteString -- Ensure that sending a file that exists makes it appear in -- the destination writeFile (srcDir </> relPath) content runResourceT $ sendFile srcDir relPath $$ recvFile dstDir content' <- readFile (dstDir </> relPath) content' `shouldBe` content -- Ensure that sending a file that doesn't exist makes it -- disappear in the destination removeFile (srcDir </> relPath) runResourceT $ sendFile srcDir relPath $$ recvFile dstDir exists <- doesFileExist (dstDir </> relPath) exists `shouldBe` False
Our sendFile
function looks very similar to our
sourceFileDef
example at the beginning of the post.
But instead of streaming a default value, we just send a length of
-1, as our protocol dictates. The recvFile
function
relies heavily on recvFilePath
and
recvInt
. In the case of a -1
, it removes
the file in question. Otherwise, it creates the containing
directory if necessary, and then composes takeCE
with
sinkFile
to stream the correct number of bytes into
the file.
We also have a unit test covering the interaction of these two new functions. While some kind of property could perhaps be devised for testing this with QuickCheck, a more standard unit test seemed far more straightforward in this case.
This part of the tutorial covered quite a number of topics, so this is a good place to take a break. Next time, we’ll dive into the network communication aspect of things, including:
If you have feedback on how to make this tutorial series more useful, please share it in the comments below, on Twitter (@snoyberg), or in any Reddit discussions about it. I’ll try to review feedback and make changes to parts 2 and 3.
Subscribe to our blog via email
Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.