Thursday, October 11, 2012

OFlux: Lock-free Mutex Atomic

Unlike a Pthread mutex which stops a thread in its tracks if the resource is already held, an OFlux atomic with mutual-exclusion semantics does not do that.  Rather, it takes an "event" argument which is parked within the atomic's internal queue.  The thread is still available to execute other events as long as they are runnable (have all the needed atomic resources they need).  This keeps threads warmer, and avoids a costly context switch.  The cost of parking an event is just a single compare and swap (CAS) instruction (this code is geared for the i686 architecture).  Within the same loop that this CAS happens, conditions are checked for whether the atomic is free (not held by another event), and if so another CAS allows acquisition.  So the basic operation of acquire_or_wait() provides a simple way for events to interact with an atomic.  For a completed event to release an atomic it calls release() which can return a next event (from its waiting queue) to have acquired.


Rudiments of Interface


I will try to explain the top-level interface for this atomic object first, and then delve into the mechanics of the working parts within it.   Here is the AtomicExclusive subclass of AtomicCommon which implements the atomic object with mutual exclusion semantics (meaning at most one event may hold the atomic at one time) from src/runtime/lockfree/atomic/OFluxLFAtomic.h (simplified somewhat):


typedef EventBase * EventBasePtr;

class AtomicExclusive : public AtomicCommon {
public:
  AtomicExclusive(void * data)
    : AtomicCommon(data)
  {}
  virtual int held() const { return ! _waiters.empty(); }
  virtual void release(
     std::vector<EventBasePtr > & rel_ev
   , EventBasePtr &)
  {
    EventBaseHolder * ebh = _waiters.pop();
    if(ebh) {
      rel_ev.push_back(EventBasePtr(ebh->ev));
      AtomicCommon::allocator.put(ebh);
    }
  }
  virtual bool acquire_or_wait(EventBasePtr & ev, int mode)
  {
    EventBaseHolder * ebh = 
       AtomicCommon::allocator.get(ev,mode);
    bool acquired = _waiters.push(ebh);
    if(acquired) {
      AtomicCommon::allocator.put(ebh); 
      // not in use - return it to pool
    }
    return acquired;
  }
  ... // other things omitted
private:
  ExclusiveWaiterList _waiters;
};


Regardless of whether the atomic is currently held or not, multiple threads can be calling the acquire_or_wait() member function on behalf of new events which need to acquire this atomic.  Returning a true value from this function indicates that the acquisition succeeded and the event may proceed with further acquisitions, or actually execute its function.  Returning a false value indicates that the event has been given-up (implying it is queued internally within the atomic object, and the current thread is forbidden to access it): to the atomic object and will later be released when its turn to hold the atomic happens.

The interface supports other semantics like read-write or pools, which I will describe at a later date in a separate blog posting.  The signature of release() adds events to a std::vector (which could be thread local and pre-sized for efficiency) in anticipation of other kinds of atomic objects.  Generally the choice of ordering for events within the internal queue is to be first-in first-out (FIFO) since this has the simplest fairness guarantee, and makes writing code easier.  The mode argument can specify a "mode" of acquisition, but it is not very useful for this type of atomic object (there is only one mode of acquisition that it supports: "exclusive").  I won't cover the internals of the AtomicCommon::allocator member object, except to say that it is a thread-safe reclamation facility which has a hazard pointers capability (enabling the implementation of the ExclusiveWaiterList class which I will talk about next).


Hang on to this


In order to avoid putting mechanics that ExclusiveWaiterList needs into the events, I have a container object called EventBaseHolder which boxes up the events while they are anywhere near ExclusiveWaiterList.  On its own, it is not very interesting, but it is important to describe it before it is used (again simplified for reading):


struct EventBaseHolder {
  EventBaseHolder(
      EventBasePtr & a_ev
    , int a_mode)
    : ev(a_ev)
    , next(NULL)
    , mode(a_mode)
  {}

  EventBase * ev;
  EventBaseHolder * next;
  int mode;
};

The allocator object's get() member function is used to call the constructor (above).


Line up!


The declaration of ExclusiveWaiterList is pretty straight-forward (once I have simplified the code a bit):


template<const size_t v, typename T>
inline bool is_val(const T * p)
{
  return reinterpret_cast<size_t>(p) == v;
}

template<const size_t v, typename T>
inline void set_val(T * & p)
{
  reinterpret_cast<size_t &>(p) = v;
}


class ExclusiveWaiterList {
        WaiterList()
    : _head(AtomicCommon::allocator.get(0,
            EventBaseHolder::None))
    , _tail(_head)
  { set_val<0x0001>(_head->next); }

  size_t count() const;
  bool has_waiters() const;
  bool push(EventBaseHolder * e);
  EventBaseHolder * pop();

public:
  EventBaseHolder * _head;
  EventBaseHolder * _tail;
};

When constructed, a new ExclusiveWaiterList has one empty EventBaseHolder element in it (which has mode None).  That initial element is a critical part of the implementation of this list.  I will describe each of the member functions one by one after I first describe the possible states that the list can be in (abstractly characterized by these three states):

In the empty state, there is no event holding the atomic, and no waiters.  The other states have self-explanatory names. The pointer value 0x1 is used as a special marker value since it is not a real (i.e. properly aligned) value for a pointer on i686.  States not shown in the diagram (e.g. _head == _tail and _head->next > 0x1) should be unreachable as resting states (no operation is completing past its sequentialization point).

The has_waiters() member function is quite simple to implement with these states in mind:


bool ExclusiveWaiterList::has_waiters() const
{
  EventBaseHolder * h = _head->next;
  return !is_val<0x0001>(h) && (h != NULL);
}

The count() member function is also fairly simple (though it is not used in a context where it needs to be fully safe -- so no hazard pointers are set):


size_t ExclusiveWaiterList::count() const // number of waiters
{
  size_t res = 0;
  EventBaseHolder * h = _head;
  while(h && !is_val<0x0001>(h)  && h->ev) {
    ++res;
    h = h->next;
  }
  return res;
}


Push



The code to push an event is most easily explained by visualizing the transitions that should happen on the abstract state diagram above:


Here is the (simplified) code which does the push() transitions:


template<typename T>
inline T * unmk(T * t)
{
  size_t u = reinterpret_cast<size_t>(t);
  return reinterpret_cast<T *>(u & ~0x0001);
}

inline void ev_swap(EventBase * & ev1, EventBase * & ev2)
{
  EventBase * ev = ev1;
  ev1 = ev2;
  ev2 = ev;
}


bool
ExclusiveWaiterList::push(EventBaseHolder *e)
{
  bool res = false;
  e->next = NULL;
  EventBaseHolder * t = NULL;
  EventBase * ev = NULL; // hold the event locally
  ev_swap(ev,e->ev);
  EventBaseHolder * h = NULL;
  EventBaseHolder * hn = NULL;
  while(1) {
    // h = _head; side-effect:
    HAZARD_PTR_ASSIGN(h,_head,0); 
    hn = h->next;
    if(is_val<0x0001>(hn)
           && __sync_bool_compare_and_swap(
             &(h->next)
           , 0x0001
           , NULL)) {
      // empty->held/no
      ev_swap(e->ev,ev);
      res = true;
      break;
    } else {
      // (held/no,held/some)->held/some
      //t = _tail; side-effect
      HAZARD_PTR_ASSIGN(t,_tail,1); 
      if(unmk(t->next)) {
        continue;
      }
      if(unmk(t) && __sync_bool_compare_and_swap(
           &(t->next)
           , NULL
           , e)) {
        __sync_bool_compare_and_swap(&_tail,t,e);
        ev_swap(t->ev,ev);
        break;
      }
    }
  }
  HAZARD_PTR_RELEASE(0);
  HAZARD_PTR_RELEASE(1);
  return res;
}

The __sync_bool_compare_and_swap() are built-in gcc primitives for CAS, and the HAZARD_PTR macros are part of the deferred reclamation capabilities of the AtomicCommon::allocator object.  Here are the intermediate results of executing push(e1); push(e2); push(e3); with return results to each invocation:



Pop


The pop() operation is done using CAS operations as well, and returns an event holder if one was in the list.



Here is the (simplified) code:


EventBaseHolder *
ExclusiveWaiterList::pop()
{
  EventBaseHolder * r = NULL;
  EventBaseHolder * h = NULL;
  EventBaseHolder * hn = NULL;
  EventBaseHolder * t = NULL;
  while(1) {
    HAZARD_PTR_ASSIGN(h,_head,0); 
    //h = _head; side-effect
    hn = h->next;
    HAZARD_PTR_ASSIGN(t,_tail,1); 
    //t = _tail; side-effect
    if(unmk(t) && unmk(t->next)) {
      __sync_synchronize();
      continue;
    }
    if(h != _tail && hn != NULL
         && !is_val<0x0001>(hn)
         && h->ev != NULL
         && __sync_bool_compare_and_swap(
                   &_head
                   , h
                   , hn)) {
      // held/some->held/no,held/some
      r = h;
      r->next = NULL;
      break;
    } else if(hn==NULL && __sync_bool_compare_and_swap(
         &(h->next)
        , hn
        , 0x0001)) {
      // held/no->empty
      break; //empty
    }
  }
  HAZARD_PTR_RELEASE(0);
  HAZARD_PTR_RELEASE(1);
  return r;
}

With an example of what pop(); pop(); pop(); looks like when there are two waiters (in this case events e1, e2 and then e3 are releasing their hold on the atomic object.



Away to the Races


Is this code correct?  Is it possible for races to happen which mutate the list into a state which should not be reachable?  These are great questions.  Here are some things to check (in terms of parallel operations hitting this data structure happening in thread T1 and thread T2):

  • T1 calls push() and T2 calls push().   This can happen.  Serialization will happen on the update to _tail->next.  The transition values for that location are from either 0x0 or 0x1 -- other values indicate that your _tail variable needs to be re-read.
  • T1 calls pop() and T2 calls pop().  Actually this cannot happen.  Only one event holds the atomic object (and only it can invoke pop()), so this case is impossible.
  • T1 calls push() and T2 calls pop().   This can happen, but only when T2 sees a held state.  Transitions (for pop()) from held/some event modify _head and do not interact with push().  Transitions (for pop()) from held/no event modify _head->next when _tail==_head, and will serialize on that change (due to CAS).

Summary


I have presented here a lock-free atomic object implementation which is used to serialize event access to a resource (essentially some kind of object).  Rather than block, acquisitions which immediately fail cause events to queue inside of the atomic object, so that when the holding event releases the waiting events are allowed access (in a FIFO manner).  The underlying data structure to make this access safe is a modified linked list which allows parallel access from multiple threads without the use of pthread synchronization or semaphores.  When used in the lock-free OFlux run-time this atomic object can mediate access to guards in an efficient manner, causing event waits to not turn into thread waits.  Since the underlying structure is linked-list based, the necessity to mediate reclamation of the link objects with hazard pointers and loss of cache locality are performance problems.  An alternate implementation based on a dynamic buffer consisting of C arrays, might perform even better (or present experience would indicate this to be likely true).

Monday, October 1, 2012

Splitting Trade Units

Dividing a single executed trade proportionally for several customers is an interesting problem.  Naive decisions can lead to unpleasant and surprising results. In this post, I will describe the problem of fairly partitioning a trade with an integer number of units amongst a group of participants who are participating according to some numerical proportions (think pie chart).  The proportions will be specified as double precision floating point numbers, and they will be normalized relative to their overall sum.



Requirement: Make it Whole


Given a new trade for U units which establishes a new position, and N participants where participant i has P[i] >= 0 proportion.  If the total proportion (left-associated sum) is P>0, then the problem becomes finding values U[i] whose sum is U, and which are proportional to the P[i] values.

Using (P[i]/P)*U and rounding, seems like a good idea, but it is wrong.  Consider what happens with N = 3, P[0] = P[1] = P[2] = 10.0 and U = 20 : each participant is given 6.6666666666 units (but rounded  to the nearest integer using an unspecified rounding technique).  A deterministic round to 6 units each or 7 units each hands out the wrong number of total units (18 versus 21).  So this won't work.

Instead of trying to establish unit counts right away per client, I attempted to assign unit counts to two groups of participants (essentially abstracting the problem to two participants).  I wrote the following code:


 void inline position_split(
   int & units1
 , int & units2
 , const double & prop1
 , const double & prop2
 , int total_position)
{
 double total_proportion = prop1 + prop2;
 assert(total_proportion > 0 || total_position == 0);
 if(total_position) {
  units1 = ROUND_UNITS(total_position * prop1 / total_proportion);
  units2 = total_position - units1;
 } else {
  units1 = 0;
  units2 = 0;
 }
}

Going back to my N = 3 example, this is better since I can use it to determine (U[0], U[1]+U[2]) using (P[0],P[1]+P[2]) and then invoke it again in a similar way to find out what (U[0]+U[1],U[2]) is.  Applying the code above I obtain the following unit values: U[0]=7, U[1] = 6, U[2]=7.  Great, now the values add up to the expected total U.


Selling Out


Now that you are in a trade for U units and each participant has his piece the most interesting part happens.  How do you attribute profit as each unit is closed (in the worst case each one is done separately).  In essence, you have to have an ordering of the existing units of the position and an idea of whose unit will go next.  If the number of participants is very high, it would also be nice to not have to store the number of units left to each of them at all times.  This is key, since persisting that information can be a painful limit on performance as each update will cause a side-effect to an underlying database.

My initial approach was to leverage position_split to figure out what the position distribution would be if for (U-1) total units and then just compare that with the result for U to find out which participant lost a unit.  This does not work.  Back to our example plugging in 19 for position size we get U[0] = 6, U[1] = 7, and U[2] = 6.  This is a problem since the middle participant (i=1) actually gained a unit when we reduced the position.

Since position_split is not enough and we still want to avoid storing detailed per-participant data on the remaining position as it gets reduced, we return to considering how to order the units of the beginning position in a way that fairly strips them from the participants.  How can the ordering of the units sold off be deterministic and therefore, require no storage?

Partial Binary Trees 


A binary tree is a tree with two paths emenating from each non-leaf node in the tree.  The leaves can be addressed using the binary coded path from the root to the leaf.  A partial binary tree is a binary tree which is missing leaves beyond a particular coded address.  It should be obvious that a unique partial binary tree exists with U leaves for any positive integer U.  Consider the following tree for U = 5 (as an example):


I have written the coded binary path (0s and 1s for lefts and rights) on the leaves backwards.  Those backwards path numbers can be arranged as a sequence of unsigned binary numbers S = ( 0, 4, 2, 1, 3 ).  In this case, the sequence has no gaps (missing numbers), but that is not the case in general.  Using OS = sort(S),  to denote the ascending totally order sequence for S, we can say that after reducing the position by r units the next (ordered) unit to be picked is OS-1[S[OS-1[r]]].  For the above tree, OS = (0,1,2,3,4), so unit 0 goes first, unit 4 goes second, etc.  When there are gaps in OS, the point of using its inverse is clearer.

Essentially, for each trade which is originally U units in size, we can keep track of the number of units sold so far (r) and figure out which participants lose the next t units by evaluating OS-1•S•OS-1 at r,r+1,...r+t-1.  The position_split function indicates which participant originally had each of the units (e.g. participant 0 gets units 0 to U[0]-1, assuming U[0]>0).  I have empirically observed that the ordering of sold units using this technique is reasonably fair -- at each point the relative amount of units each participant has is roughly in line with their initial proportion amounts P[i].


Summary

Splitting the initial trade properly, requires a technique which is sensitive to rounding and floating point arithmetic. It distributes the units of a trade to several participants using their proportion amounts P[i] and it ensures that the right total number of units are distributed.

The reversed binary codes for a partial binary tree provide a deterministic permutation which can be used to fairly sell out a position so that at any point the participants still hold their proportion of what is left in aggregate.  By only knowing how many units were already sold r and the additional number to be sold t, an algorithm can tell you which participants are losing how many units.  The code to efficiently compute the permutation is a bit involved, so I have not posted it here.  The basics needed to reconstruct the approach are here, however. 




Follow Mark on GitHub