Andreas Garnæs

Improved OCaml Memcached client with Core and Async

In the previous blog post, we implemented a simple Ocaml library for talking to Memcached with the binary protocol using bitstring. The code uses the baked-in standard library and synchronous IO (blocking), so a lot of time will be wasted waiting for IO. The standard library replacement Core offers cooperative threading with callbacks through Async, similar to Javascript, EventMachine for Ruby or many others. In this blog post we'll try to rewrite the code from the previous post to use asynchronous IO with Async.

IO with Async

The primary IO abstractions in the Ocaml standard library are in_channel and out_channel, while in Core/Async it's Reader and Writer. Reading and writing with Core/Async is asynchronous (non-blocking), so results are not returned immediately. If we for example examine the signature of Reader.really_read it looks like this:

(* really_read t pos len buffer reads until it fills len bytes of buffer starting *)
(* at pos or runs out of input.`                                                  *)
val really_read : Reader.t -> ?pos:int -> ?len:int -> string ->
                    [ `Eof of int | `Ok ] Deferred.t

The type 'a Deferred.t signifies that the result of type 'a is not immediate, but will be filled in sometime in the future. To actually access the result, you can bind functions to be called when the Deferred is resolved using Deferred.bind:

(* Deferred.bind : 'a Deferred.t -> ('a -> 'b Deferred.t) -> 'b Deferred.t *)
let buf = String.create 10 in
let dfd = Reader.really_read my_reader 0 10 buf in
Deferred.bind dfd (function
  | `Ok    -> return buf
  | `Eof _ -> return ""

Note that bind applies a function to the resolved value and must return a new Deferred value. That's why we're using return buf (type string Deferred.t) instead of just buf (type string). The type of return is 'a -> 'a Deferred.t.

To make things more readable, there are two handy infix operators for working with Deferreds:

(* the infix operator >>= is an alias of bind                      *)
(* >>= : 'a Deferred.t -> ('a -> 'b Deferred.t) -> 'b Deferred.t   *)
dfd >>= function
  | `Ok    -> return buf
  | `Eof _ -> return ""

(* the infix operator >>| is a shortcut to avoid explicit return   *)
(* >>| : 'a Deferred.t -> ('a -> 'b) -> 'b Deferred.t              *)
dfd >>| function
  | `Ok    -> buf
  | `Eof _ -> ""

Let's turn to Writer. The most basic function it offers is write:

(* write : Writer.t -> ?pos:int -> ?len:int -> string -> unit *)
Writer.write my_writer "foo"

You might notice that write does not return a deferred: write only queues the write in a buffer, and another Async microthread actually writes to the OS buffer. If you want to ensure that the write has been flushed, you can call Writer.flush : unit -> unit Deferred.t. An exception will be raised if any errors occur while transferring data to the OS buffer. How to handle exceptions in Async is a topic of another blog post (if you're curious start here).

We've covered enough to update the client library to use Async now, but if you want to learn more about Async, you can also read Dummy's Guide to Async from Jane Street or the excellent chapter on Async in Real World Ocaml.

Updating the Client

We can reuse the code from the last post, which doesn't touch IO. All the functions read_* or write_* needs to be rewritten to be asynchronous though. Let's start by writing a packet:

(* write_packet : Writer.t -> packet -> unit *)
let write_packet writer packet =
  let header_bits = header_to_bitstring packet.header in
  Writer.write writer (Bitstring.string_of_bitstring header_bits);
  Writer.write writer (Bitstring.string_of_bitstring packet.extras);
  Writer.write writer packet.key;
  Writer.write writer packet.value

This is almost identical to the previous version and doesn't even introduce any deferreds. Note that we have to convert our bitstrings to strings though, while we could previously emit them directly to the channel (e.g. Bitstring.to_chan out_chan header_bits). The bitstring library was not built with Async in mind.

Reading the response bears less resemblance with the former version and we now have to use deferreds:

(* read_header : Reader.t -> header option Deferred.t *)
let read_header reader =
  let header_buffer = String.create 24 in
  Reader.really_read reader ~len:24 header_buffer >>| function
    | `Eof _ -> None
    | `Ok    -> header_of_bitstring (Bitstring.bitstring_of_string header_buffer)

(* read_body : header -> Reader.t -> packet option Deferred.t *)
let read_body header reader =
  let body_length  = Int32.to_int_exn header.body_length                    in
  let value_length = body_length - header.extras_length - header.key_length in
  let body_buffer  = String.create body_length                              in
  Reader.really_read reader ~len:body_length body_buffer >>| function
    | `Eof _ -> None
    | `Ok    -> 
      bitmatch Bitstring.bitstring_of_string body_buffer with
        | { extras : 8*header.extras_length : bitstring;
            key    : 8*header.key_length    : string;
            value  : 8*value_length         : string
          } -> Some { header; extras; key; value; }
        | { _ } -> None

(* read_response_packet : Reader.t -> packet option Deferred.t *)
let read_response_packet reader =
  read_header reader >>= function
  | Some header -> read_body header reader
  | None -> return None

Like before, there's a slight mismatch between Async and bitstring, and we have to convert between string and bitstring.

Finally, we can now write our small sample program again which connects to a local Memcached server and reads the key "foo":

let main () =
  let host_and_port = Tcp.to_host_and_port "localhost" 12211 in
  Tcp.connect host_and_port >>= fun (_, reader, writer) ->
  let req = make_request_packet 0 "foo" "" Bitstring.empty_bitstring in
  write_packet writer req;
  read_response_packet reader >>= fun response ->
  Writer.close writer >>= fun () ->
  Reader.close reader >>| fun () ->
  match response with
  | Some packet when packet.header.status = 0 ->
      printf "Found value: %s\n" packet.value
  | Some packet ->
      printf "Key not found"
  | None ->
      printf "Request failed"
main ();
Scheduler.go ()


So what did we gain by rewriting our IO to use Core/Async rather than the built-in standard library? In short, our Memcached client library will play nice with other Async code and will scale much more nicely, e.g. if being used as part of a web server based on Async. As noted a couple of times during the rewrite, Async and bitstring doesn't play very well together. We both need to allocate temporary strings and convert between string and bitstring in multiple places. In a future post I'll discuss other ways of using Reader/Writer which is more efficient.

If you liked this post, please vote on Hacker News.

comments powered by Disqus