Implementing Callbacks

When a callback is registered with bind or one of the other syntaxes, it is added to a list of callbacks that is stored with the promise. Eventually, when the promise has been resolved, the Lwt resolution loop runs the callbacks registered for the promise. There is no guarantee about the execution order of callbacks for a promise. In other words, the execution order is nondeterministic. If the order matters, the programmer needs to use the composition operators (such as bind and join) to enforce an ordering. If the promise never becomes resolved (or is rejected), none of its callbacks will ever be run.

Once again, it's important to keep track of where the concurrency really comes from: the OS. There might be many asynchronous I/O operations occurring at the OS level. But at the OCaml level, the resolution loop is sequential, meaning that only one callback can ever be running at a time.

Finally, the resolution loop never attempts to interrupt a callback. So if the callback goes into an infinite loop, no other callback will ever get to run. That makes Lwt a cooperative concurrency mechanism, rather than preemptive.

Our Own Callbacks

To better understand callback resolution, let's implement it ourselves. We'll use the Promise data structure we developed earlier. To start, we add a bind operator to the Promise signature:

module type Promise = sig 
  ...

  (** [p >>= c] registers callback [c] with promise [p]. 
      When the promise is resolved, the callback will be run
      on the promises's contents.  If the promise is never
      resolved, the callback will never run. *)
  val (>>=) : 'a promise -> ('a -> 'b promise) -> 'b promise
end

Next, let's re-develop the entire Promise structure. We start off just like before:

module Promise : Promise = struct
  type 'a state = Pending | Resolved of 'a | Rejected of exn
  ...

But now to implement the representation type of promises, we use a record with mutable fields. The first field is the state of the promise, and it corresponds to the ref we used before. The second field is more interesting and is discussed below.

  (** RI: the input may not be [Pending] *)
  type 'a handler = 'a state -> unit

  (** RI: if [state <> Pending] then [handlers = []]. *)
  type 'a promise = {
    mutable state : 'a state;
    mutable handlers : 'a handler list
  }

A handler is a new abstraction: a function that takes a non-pending state. It will be used to handle resolving and rejecting promises when their state is ready to switch away from pending. The primary use for a handler will be to run callbacks. As a representation invariant, we require that only pending promises may have handlers waiting in their list. Once the state becomes non-pending, i.e., either resolved or rejected, the handlers will all be processed and removed from the list.

This helper function that enqueues a handler on a promise's handler list will be helpful later:

  let enqueue 
      (handler : 'a state -> unit) 
      (promise : 'a promise) : unit 
    =
    promise.handlers <- handler :: promise.handlers

We continue to pun resolvers and promises internally:

  type 'a resolver = 'a promise

Because we changed the representation type from a ref to a record, we have to update a few of the functions in trivial ways:

  (** [write_once p s] changes the state of [p] to be [s].  If [p] and [s]
      are both pending, that has no effect.
      Raises: [Invalid_arg] if the state of [p] is not pending. *)
  let write_once p s = 
    if p.state = Pending
    then p.state <- s
    else invalid_arg "cannot write twice"

  let make () = 
    let p = {state = Pending; handlers = []} in
    p, p

  let return x = 
    {state = Resolved x; handlers = []}

  let state p = p.state

Now we get to the trickier parts of the implementation. To resolve or reject a promise, the first thing we need to do is to call write_once on it, as we did before. Now we also need to process the handlers. Before doing so, we mutate the handlers list to be empty to ensure that the RI holds.

  (** requires: [st] may not be [Pending] *)
  let resolve_or_reject (r : 'a resolver) (st : 'a state) = 
    assert (st <> Pending);
    let handlers = r.handlers in
    r.handlers <- [];
    write_once r st;
    List.iter (fun f -> f st) handlers

  let reject r x = 
    resolve_or_reject r (Rejected x)

  let resolve r x =  
    resolve_or_reject r (Resolved x)

Finally, the implementation of >>= is the trickiest part. First, if the promise is already resolved, let's go ahead and immediately run the callback on it:

  let (>>=) 
      (input_promise : 'a promise) 
      (callback : 'a -> 'b promise) : 'b promise 
    = 
    match input_promise.state with
    | Resolved x -> callback x

Second, if the promise is already rejected, then we return a promise that is rejected with the same exception:

    | Rejected exc -> {state = Rejected exc; handlers = []}

Third, if the promise is pending, we need to do more work. Here's what we said in our discussion of bind in the previous section:

[T]he bind function returns a new promise. That promise will become resolved when (or if) the callback completes running, sometime in the future. Its contents will be whatever contents are contained within the promise that the callback itself returns.

That's what we now need to implement. So, we create a new promise and resolver called output_promise and output_resolver. That promise is what bind returns. Before returning it, we use a helper function handler_of_callback (described below) to transform the callback into a handler, and enqueue that handler on the promise. That ensures the handler will be run when the promise later becomes resolved or rejected:

    | Pending -> 
      let output_promise, output_resolver = make () in
      enqueue (handler_of_callback callback output_resolver) input_promise;
      output_promise

All that's left is to implement that helper function to create handlers from callbacks. The first two cases, below, are simple. It would violate the RI to call a handler on a pending state. And if the state is rejected, then the handler should propagate that rejection to the resolver, which causes the promise returned by bind to also be rejected.

  let handler_of_callback 
      (callback : 'a -> 'b promise) 
      (resolver : 'b resolver) : 'a handler 
    = function
      | Pending -> failwith "handler RI violated"
      | Rejected exc -> reject resolver exc

But if the state is resolved, then the callback provided by the user to bind can—at last!—be run on the contents of the resolved promise. Running the callback produces a new promise. It might already be rejected or resolved, in which case that state again propagates.

      | Resolved x ->
        let promise = callback x in
        match promise.state with
        | Resolved y -> resolve resolver y
        | Rejected exc -> reject resolver exc

But the promise might still be pending. In that case, we need to enqueue a new handler whose purpose is to do the propagation once the result is available:

        | Pending -> enqueue (handler resolver) promise

where handler is a new helper function that creates a very simple handler to do that propagation:

  let handler (resolver : 'a resolver) : 'a handler
    = function
      | Pending -> failwith "handler RI violated"
      | Rejected exc -> reject resolver exc
      | Resolved x -> resolve resolver x

The complete implementation of bind is thus as follows:

  let handler (resolver : 'a resolver) : 'a handler
    = function
      | Pending -> failwith "handler RI violated"
      | Rejected exc -> reject resolver exc
      | Resolved x -> resolve resolver x

  let handler_of_callback 
      (callback : 'a -> 'b promise) 
      (resolver : 'b resolver) : 'a handler 
    = function
      | Pending -> failwith "handler RI violated"
      | Rejected exc -> reject resolver exc
      | Resolved x ->
        let promise = callback x in
        match promise.state with
        | Resolved y -> resolve resolver y
        | Rejected exc -> reject resolver exc
        | Pending -> enqueue (handler resolver) promise      

  let (>>=) 
      (input_promise : 'a promise) 
      (callback : 'a -> 'b promise) : 'b promise 
    = 
    match input_promise.state with
    | Resolved x -> callback x
    | Rejected exc -> {state = Rejected exc; handlers = []}
    | Pending -> 
      let output_promise, output_resolver = make () in
      enqueue (handler_of_callback callback output_resolver) input_promise;
      output_promise

The Lwt implementation of bind follows essentially the same algorithm as we just implemented. Note that there is no concurrency in bind: as we said above, everything in Lwt is sequential; it's the OS that provides the concurrency.

results matching ""

    No results matching ""