Thursday, October 11, 2012

OFlux: Lock-free Mutex Atomic

Unlike a Pthread mutex which stops a thread in its tracks if the resource is already held, an OFlux atomic with mutual-exclusion semantics does not do that.  Rather, it takes an "event" argument which is parked within the atomic's internal queue.  The thread is still available to execute other events as long as they are runnable (have all the needed atomic resources they need).  This keeps threads warmer, and avoids a costly context switch.  The cost of parking an event is just a single compare and swap (CAS) instruction (this code is geared for the i686 architecture).  Within the same loop that this CAS happens, conditions are checked for whether the atomic is free (not held by another event), and if so another CAS allows acquisition.  So the basic operation of acquire_or_wait() provides a simple way for events to interact with an atomic.  For a completed event to release an atomic it calls release() which can return a next event (from its waiting queue) to have acquired.


Rudiments of Interface


I will try to explain the top-level interface for this atomic object first, and then delve into the mechanics of the working parts within it.   Here is the AtomicExclusive subclass of AtomicCommon which implements the atomic object with mutual exclusion semantics (meaning at most one event may hold the atomic at one time) from src/runtime/lockfree/atomic/OFluxLFAtomic.h (simplified somewhat):


typedef EventBase * EventBasePtr;

class AtomicExclusive : public AtomicCommon {
public:
  AtomicExclusive(void * data)
    : AtomicCommon(data)
  {}
  virtual int held() const { return ! _waiters.empty(); }
  virtual void release(
     std::vector<EventBasePtr > & rel_ev
   , EventBasePtr &)
  {
    EventBaseHolder * ebh = _waiters.pop();
    if(ebh) {
      rel_ev.push_back(EventBasePtr(ebh->ev));
      AtomicCommon::allocator.put(ebh);
    }
  }
  virtual bool acquire_or_wait(EventBasePtr & ev, int mode)
  {
    EventBaseHolder * ebh = 
       AtomicCommon::allocator.get(ev,mode);
    bool acquired = _waiters.push(ebh);
    if(acquired) {
      AtomicCommon::allocator.put(ebh); 
      // not in use - return it to pool
    }
    return acquired;
  }
  ... // other things omitted
private:
  ExclusiveWaiterList _waiters;
};


Regardless of whether the atomic is currently held or not, multiple threads can be calling the acquire_or_wait() member function on behalf of new events which need to acquire this atomic.  Returning a true value from this function indicates that the acquisition succeeded and the event may proceed with further acquisitions, or actually execute its function.  Returning a false value indicates that the event has been given-up (implying it is queued internally within the atomic object, and the current thread is forbidden to access it): to the atomic object and will later be released when its turn to hold the atomic happens.

The interface supports other semantics like read-write or pools, which I will describe at a later date in a separate blog posting.  The signature of release() adds events to a std::vector (which could be thread local and pre-sized for efficiency) in anticipation of other kinds of atomic objects.  Generally the choice of ordering for events within the internal queue is to be first-in first-out (FIFO) since this has the simplest fairness guarantee, and makes writing code easier.  The mode argument can specify a "mode" of acquisition, but it is not very useful for this type of atomic object (there is only one mode of acquisition that it supports: "exclusive").  I won't cover the internals of the AtomicCommon::allocator member object, except to say that it is a thread-safe reclamation facility which has a hazard pointers capability (enabling the implementation of the ExclusiveWaiterList class which I will talk about next).


Hang on to this


In order to avoid putting mechanics that ExclusiveWaiterList needs into the events, I have a container object called EventBaseHolder which boxes up the events while they are anywhere near ExclusiveWaiterList.  On its own, it is not very interesting, but it is important to describe it before it is used (again simplified for reading):


struct EventBaseHolder {
  EventBaseHolder(
      EventBasePtr & a_ev
    , int a_mode)
    : ev(a_ev)
    , next(NULL)
    , mode(a_mode)
  {}

  EventBase * ev;
  EventBaseHolder * next;
  int mode;
};

The allocator object's get() member function is used to call the constructor (above).


Line up!


The declaration of ExclusiveWaiterList is pretty straight-forward (once I have simplified the code a bit):


template<const size_t v, typename T>
inline bool is_val(const T * p)
{
  return reinterpret_cast<size_t>(p) == v;
}

template<const size_t v, typename T>
inline void set_val(T * & p)
{
  reinterpret_cast<size_t &>(p) = v;
}


class ExclusiveWaiterList {
        WaiterList()
    : _head(AtomicCommon::allocator.get(0,
            EventBaseHolder::None))
    , _tail(_head)
  { set_val<0x0001>(_head->next); }

  size_t count() const;
  bool has_waiters() const;
  bool push(EventBaseHolder * e);
  EventBaseHolder * pop();

public:
  EventBaseHolder * _head;
  EventBaseHolder * _tail;
};

When constructed, a new ExclusiveWaiterList has one empty EventBaseHolder element in it (which has mode None).  That initial element is a critical part of the implementation of this list.  I will describe each of the member functions one by one after I first describe the possible states that the list can be in (abstractly characterized by these three states):

In the empty state, there is no event holding the atomic, and no waiters.  The other states have self-explanatory names. The pointer value 0x1 is used as a special marker value since it is not a real (i.e. properly aligned) value for a pointer on i686.  States not shown in the diagram (e.g. _head == _tail and _head->next > 0x1) should be unreachable as resting states (no operation is completing past its sequentialization point).

The has_waiters() member function is quite simple to implement with these states in mind:


bool ExclusiveWaiterList::has_waiters() const
{
  EventBaseHolder * h = _head->next;
  return !is_val<0x0001>(h) && (h != NULL);
}

The count() member function is also fairly simple (though it is not used in a context where it needs to be fully safe -- so no hazard pointers are set):


size_t ExclusiveWaiterList::count() const // number of waiters
{
  size_t res = 0;
  EventBaseHolder * h = _head;
  while(h && !is_val<0x0001>(h)  && h->ev) {
    ++res;
    h = h->next;
  }
  return res;
}


Push



The code to push an event is most easily explained by visualizing the transitions that should happen on the abstract state diagram above:


Here is the (simplified) code which does the push() transitions:


template<typename T>
inline T * unmk(T * t)
{
  size_t u = reinterpret_cast<size_t>(t);
  return reinterpret_cast<T *>(u & ~0x0001);
}

inline void ev_swap(EventBase * & ev1, EventBase * & ev2)
{
  EventBase * ev = ev1;
  ev1 = ev2;
  ev2 = ev;
}


bool
ExclusiveWaiterList::push(EventBaseHolder *e)
{
  bool res = false;
  e->next = NULL;
  EventBaseHolder * t = NULL;
  EventBase * ev = NULL; // hold the event locally
  ev_swap(ev,e->ev);
  EventBaseHolder * h = NULL;
  EventBaseHolder * hn = NULL;
  while(1) {
    // h = _head; side-effect:
    HAZARD_PTR_ASSIGN(h,_head,0); 
    hn = h->next;
    if(is_val<0x0001>(hn)
           && __sync_bool_compare_and_swap(
             &(h->next)
           , 0x0001
           , NULL)) {
      // empty->held/no
      ev_swap(e->ev,ev);
      res = true;
      break;
    } else {
      // (held/no,held/some)->held/some
      //t = _tail; side-effect
      HAZARD_PTR_ASSIGN(t,_tail,1); 
      if(unmk(t->next)) {
        continue;
      }
      if(unmk(t) && __sync_bool_compare_and_swap(
           &(t->next)
           , NULL
           , e)) {
        __sync_bool_compare_and_swap(&_tail,t,e);
        ev_swap(t->ev,ev);
        break;
      }
    }
  }
  HAZARD_PTR_RELEASE(0);
  HAZARD_PTR_RELEASE(1);
  return res;
}

The __sync_bool_compare_and_swap() are built-in gcc primitives for CAS, and the HAZARD_PTR macros are part of the deferred reclamation capabilities of the AtomicCommon::allocator object.  Here are the intermediate results of executing push(e1); push(e2); push(e3); with return results to each invocation:



Pop


The pop() operation is done using CAS operations as well, and returns an event holder if one was in the list.



Here is the (simplified) code:


EventBaseHolder *
ExclusiveWaiterList::pop()
{
  EventBaseHolder * r = NULL;
  EventBaseHolder * h = NULL;
  EventBaseHolder * hn = NULL;
  EventBaseHolder * t = NULL;
  while(1) {
    HAZARD_PTR_ASSIGN(h,_head,0); 
    //h = _head; side-effect
    hn = h->next;
    HAZARD_PTR_ASSIGN(t,_tail,1); 
    //t = _tail; side-effect
    if(unmk(t) && unmk(t->next)) {
      __sync_synchronize();
      continue;
    }
    if(h != _tail && hn != NULL
         && !is_val<0x0001>(hn)
         && h->ev != NULL
         && __sync_bool_compare_and_swap(
                   &_head
                   , h
                   , hn)) {
      // held/some->held/no,held/some
      r = h;
      r->next = NULL;
      break;
    } else if(hn==NULL && __sync_bool_compare_and_swap(
         &(h->next)
        , hn
        , 0x0001)) {
      // held/no->empty
      break; //empty
    }
  }
  HAZARD_PTR_RELEASE(0);
  HAZARD_PTR_RELEASE(1);
  return r;
}

With an example of what pop(); pop(); pop(); looks like when there are two waiters (in this case events e1, e2 and then e3 are releasing their hold on the atomic object.



Away to the Races


Is this code correct?  Is it possible for races to happen which mutate the list into a state which should not be reachable?  These are great questions.  Here are some things to check (in terms of parallel operations hitting this data structure happening in thread T1 and thread T2):

  • T1 calls push() and T2 calls push().   This can happen.  Serialization will happen on the update to _tail->next.  The transition values for that location are from either 0x0 or 0x1 -- other values indicate that your _tail variable needs to be re-read.
  • T1 calls pop() and T2 calls pop().  Actually this cannot happen.  Only one event holds the atomic object (and only it can invoke pop()), so this case is impossible.
  • T1 calls push() and T2 calls pop().   This can happen, but only when T2 sees a held state.  Transitions (for pop()) from held/some event modify _head and do not interact with push().  Transitions (for pop()) from held/no event modify _head->next when _tail==_head, and will serialize on that change (due to CAS).

Summary


I have presented here a lock-free atomic object implementation which is used to serialize event access to a resource (essentially some kind of object).  Rather than block, acquisitions which immediately fail cause events to queue inside of the atomic object, so that when the holding event releases the waiting events are allowed access (in a FIFO manner).  The underlying data structure to make this access safe is a modified linked list which allows parallel access from multiple threads without the use of pthread synchronization or semaphores.  When used in the lock-free OFlux run-time this atomic object can mediate access to guards in an efficient manner, causing event waits to not turn into thread waits.  Since the underlying structure is linked-list based, the necessity to mediate reclamation of the link objects with hazard pointers and loss of cache locality are performance problems.  An alternate implementation based on a dynamic buffer consisting of C arrays, might perform even better (or present experience would indicate this to be likely true).

Monday, October 1, 2012

Splitting Trade Units

Dividing a single executed trade proportionally for several customers is an interesting problem.  Naive decisions can lead to unpleasant and surprising results. In this post, I will describe the problem of fairly partitioning a trade with an integer number of units amongst a group of participants who are participating according to some numerical proportions (think pie chart).  The proportions will be specified as double precision floating point numbers, and they will be normalized relative to their overall sum.



Requirement: Make it Whole


Given a new trade for U units which establishes a new position, and N participants where participant i has P[i] >= 0 proportion.  If the total proportion (left-associated sum) is P>0, then the problem becomes finding values U[i] whose sum is U, and which are proportional to the P[i] values.

Using (P[i]/P)*U and rounding, seems like a good idea, but it is wrong.  Consider what happens with N = 3, P[0] = P[1] = P[2] = 10.0 and U = 20 : each participant is given 6.6666666666 units (but rounded  to the nearest integer using an unspecified rounding technique).  A deterministic round to 6 units each or 7 units each hands out the wrong number of total units (18 versus 21).  So this won't work.

Instead of trying to establish unit counts right away per client, I attempted to assign unit counts to two groups of participants (essentially abstracting the problem to two participants).  I wrote the following code:


 void inline position_split(
   int & units1
 , int & units2
 , const double & prop1
 , const double & prop2
 , int total_position)
{
 double total_proportion = prop1 + prop2;
 assert(total_proportion > 0 || total_position == 0);
 if(total_position) {
  units1 = ROUND_UNITS(total_position * prop1 / total_proportion);
  units2 = total_position - units1;
 } else {
  units1 = 0;
  units2 = 0;
 }
}

Going back to my N = 3 example, this is better since I can use it to determine (U[0], U[1]+U[2]) using (P[0],P[1]+P[2]) and then invoke it again in a similar way to find out what (U[0]+U[1],U[2]) is.  Applying the code above I obtain the following unit values: U[0]=7, U[1] = 6, U[2]=7.  Great, now the values add up to the expected total U.


Selling Out


Now that you are in a trade for U units and each participant has his piece the most interesting part happens.  How do you attribute profit as each unit is closed (in the worst case each one is done separately).  In essence, you have to have an ordering of the existing units of the position and an idea of whose unit will go next.  If the number of participants is very high, it would also be nice to not have to store the number of units left to each of them at all times.  This is key, since persisting that information can be a painful limit on performance as each update will cause a side-effect to an underlying database.

My initial approach was to leverage position_split to figure out what the position distribution would be if for (U-1) total units and then just compare that with the result for U to find out which participant lost a unit.  This does not work.  Back to our example plugging in 19 for position size we get U[0] = 6, U[1] = 7, and U[2] = 6.  This is a problem since the middle participant (i=1) actually gained a unit when we reduced the position.

Since position_split is not enough and we still want to avoid storing detailed per-participant data on the remaining position as it gets reduced, we return to considering how to order the units of the beginning position in a way that fairly strips them from the participants.  How can the ordering of the units sold off be deterministic and therefore, require no storage?

Partial Binary Trees 


A binary tree is a tree with two paths emenating from each non-leaf node in the tree.  The leaves can be addressed using the binary coded path from the root to the leaf.  A partial binary tree is a binary tree which is missing leaves beyond a particular coded address.  It should be obvious that a unique partial binary tree exists with U leaves for any positive integer U.  Consider the following tree for U = 5 (as an example):


I have written the coded binary path (0s and 1s for lefts and rights) on the leaves backwards.  Those backwards path numbers can be arranged as a sequence of unsigned binary numbers S = ( 0, 4, 2, 1, 3 ).  In this case, the sequence has no gaps (missing numbers), but that is not the case in general.  Using OS = sort(S),  to denote the ascending totally order sequence for S, we can say that after reducing the position by r units the next (ordered) unit to be picked is OS-1[S[OS-1[r]]].  For the above tree, OS = (0,1,2,3,4), so unit 0 goes first, unit 4 goes second, etc.  When there are gaps in OS, the point of using its inverse is clearer.

Essentially, for each trade which is originally U units in size, we can keep track of the number of units sold so far (r) and figure out which participants lose the next t units by evaluating OS-1•S•OS-1 at r,r+1,...r+t-1.  The position_split function indicates which participant originally had each of the units (e.g. participant 0 gets units 0 to U[0]-1, assuming U[0]>0).  I have empirically observed that the ordering of sold units using this technique is reasonably fair -- at each point the relative amount of units each participant has is roughly in line with their initial proportion amounts P[i].


Summary

Splitting the initial trade properly, requires a technique which is sensitive to rounding and floating point arithmetic. It distributes the units of a trade to several participants using their proportion amounts P[i] and it ensures that the right total number of units are distributed.

The reversed binary codes for a partial binary tree provide a deterministic permutation which can be used to fairly sell out a position so that at any point the participants still hold their proportion of what is left in aggregate.  By only knowing how many units were already sold r and the additional number to be sold t, an algorithm can tell you which participants are losing how many units.  The code to efficiently compute the permutation is a bit involved, so I have not posted it here.  The basics needed to reconstruct the approach are here, however. 



Thursday, September 20, 2012

Trailing Stops: Rotating in two dimensions

Trailing stop orders are a computational challenge since they are state-full and react to every price change.  In 2008, I looked into the problem of implementing a data structure which could do better than a naive container at dealing with trailing stop orders.  In this post, I will describe the structure that I came up with, and critique it a little.

Hitching a Ride


A trailing stop order is designed to attempt to lock-in profit.  A stop order S (which is less fancy, and should be understood first) is typically used to close an open trade T at price P by specifying a market price which the trade T has accumulated a maximum allowable loss.  If T is a long position, then the stop order specifies a price lower than P, and otherwise (a short position) it specifies a price higher than P.  A trailing stop order TS is specified similarly but its trigger price creeps upwards so that it is never further from the market price than it started out.  Using this to control risk for trade T is advantageous since it sort of locks in profit.  If the price moves against T, then the TS price stays put (acting like a watermark).  If you want more explanation of how this works, please have a look at this helpful video.

I should stress that my purpose is to describe how to implement trailing stops, and not to endorse their use under any specific contexts.  Trading is risky, and strategic decisions to use various order types should be made with due caution.


Interface Design


Trailing stops for a particular instrument will enter the system with an insert() method, and be removable using a remove() method.  It is assumed that each of these orders has a unique integer identifier which can be used to refer to them.  Although the data structure should be concerned with the efficiency of these house keeping methods, the high-frequency operations of deepest concern are related to price movements.

The same structure should be used for orders which are buy or sell, its only really important to realize whether to use bid or ask prices, and to have an idea of which way is "up" or "down".  Prices will be dealt with internally as integers relative to the current market price (units which are pips or pipettes).  A trailing stop order has a pair of integers which describe its state (trailing_stop, trailing_amount).   To insert a new order which is 10 units off of market, the pair (10,10) is put into the structure.   The below diagram shows valid values for resting trailing stop orders (trailing amounts are not allowed below 1 and they cannot exceed the trailing stop):



Trailing amounts, which track how far the trailing stop order is from the market price, will change on the orders as market prices (Px) are applied to the structure.  The two different sides (buy and sell) will each have a data structure which reacts differently to market prices:


TS type Px side above/below market Px increases Px decreases
BUY ASK above price_down() price_up()
SELL BID below price_up() price_down()


If the price moves "down" by a single price unit we call price_down() which notionally changes the state of every trailing stop order from (ts,ta) to (ts,ta-1), and all (ts,0) state orders are removed as triggered orders.  A naive implementation would need to touch every order to update these records.

If the price moves "up" by a single price unit we call price_up() which notionally changes the state of every trailing stop order from (ts,ta) into (ts,min(ts,ta+1)).  Again, a naive implementation would need to touch every order to update all of the trailing amount records which is quite expensive.


Rotating Array of Arrays


In order to simplify the job of updating the trailing amounts, we could use the trailing amount to classify  and index each order.  This means that we can, in many cases, just have an array of orders move its position (changing the effective trailing amounts for all of its contents at once).  Consider the following array of array of lists (the inner arrays are connected by dotted lines, and each order with a particular (ts,ta) value is added to the list held at that box):


The outer array is the part we will rotate.  If the top box in the diagram (blue grid) is at position [0,0] of the NxN array, then an order with state (ts,ta) should be at logical array position [N-ts+ta-1,ts-1] (in the above diagram N=5).  The top "triangle" of boxes are unused since they do not represent reachable (ts,ta) states.

Completing a price_up() is only a matter of merging the red boxes at (ts,ts) up into the array above and rotating the whole top-level array (which is done in the usual way with modulus operations and an index variable):


The new empty lists (white boxes) simply appear as a result of the rotation, and the green boxes are not disturbed.  The magenta boxes are an inner array which moves as a result of the outer array rotation.

To do a price_down() operation, a similar trick happens.  The diagonal order list elements (trailing amount 1) boxes (shown as red) are removed, and the array is rotated in the other direction:


The new empty array that holds all the new (ts,ts) orders is shown as empty magenta boxes on the lower edge.  As a practical matter, the number of price unit levels supported (the value of N) has to be a fixed predetermined value.  As described, the data structure will work very well for large numbers of orders, but will use up considerable space when empty (an NxN array).  To mitigate this, the inner arrays (blue boxes linked with dotted lines) could be represented more sparsely with skip-lists or binary trees.  If the remaining dimension (top-level) array is still consuming too much memory, something could be done to make it sparse as well (allowing a larger value of N).

Summary

Using a benchmark which consisted of:

  1. inserting 2 million orders at a spread of (ts,ta) values
  2. price_up() 100 times
  3. price_down() 100 times
  4. insert 200k more random orders
  5. price_down() 100 times
  6. price_up() 100 times
The data structure described above (with sparse BST inner arrays) was 20 times faster than a naive implementation built around the glib hash table.  It is an interesting case of using a frame of reference to get free work done (rotating array) and mapping the problem to a data structure that attempts to reduce the number of operations.

If the structure has evenly distributed (ts,ta) valued trailing stop orders in it, price_up() and price_down() are both O(N) for the above structure.  In the case of a vector, list or hash-table something closer to O(N*N) is observed, as every element gets touched.  I have always been interested to know if something more space efficient, but with similar properties could be imagined in the future.



Tuesday, September 11, 2012

TIPC: Dual Bearers - Oh my!

The 10Gbe cards I have in my cluster have two interfaces (eth1 and eth2), and so far I have only made use of one of them.  TIPC has the ability to internally bond these so that they are both used to transfer data.  The benefit is mostly redundancy (more realistic if done with separate cards), and increased bandwidth.  In order to pull this off I need to change the script I am using to configure the TIPC network on the cluster, after connecting the unused interfaces (highlighted in green):




The network interfaces plugged in here are the Solareflare 10Gbe interfaces which I reported on earlier.  Configuring the new interface and adding it to the bearer list is a pretty simple edit to my bash script (run as root):


#!/bin/bash

mussh -h monet010 -h monet011 -c '
modprobe sfc
export SUFFIX=`hostname | sed 's/monet0//g'`
ifconfig eth1 192.168.3.$SUFFIX
ifconfig eth2 192.168.4.$SUFFIX
modprobe tipc
tipc-config -netid=1234 -addr=1.1.$SUFFIX -be=eth:eth1,eth:eth2
lsmod | grep tipc
/home/mark/git/tipcutils/tipc-config/tipc-config -lw=broadcast-link/100
'

Now we can confirm (on each node that the bearers are both there):


root@monet011:/home/mark# tipc-config -nt -b
Type       Lower      Upper      Port Identity              Publication Scope
0          16781322   16781322   <1.1.10:1765777409>        1765777410  cluster
           16781323   16781323   <1.1.11:3091349505>        3091349506  zone
1          1          1          <1.1.11:3091365891>        3091365892  node
Bearers:
eth:eth1
eth:eth2

I am eager to retry some of my throughput tests which previously indicated that the NIC was saturated (so I was hitting the 10 Gbe ceiling).  Stay tuned.  Will this prove tasty, or poison?

Thursday, August 30, 2012

Cleanup: OFlux Guard Garbage Collection

OFlux Guards with keys with large ranges (e.g. hash strings) can cause the underlying map memory to grow too much.  The values in these guards are pointers, so a buildup of (key,0) pairs in the underlying map is responsible.  Since any new key not already present in the map starts off associated with 0 (the NULL pointer if you like), simply removing these entries from the underlying map when they are detected corrects the problem.  The timing of the removal in the run-time has to ensure the map is not accessed by two threads at the same time.



Garbarge Collected Guards


The /gc guard modifier enables this feature to remove an accessed (key,0) pair once the node function is  done with it.  The example I will describe below demonstrates its usefulness (based on gctest1 from the OFlux repo):


exclusive/gc G (int v) => int *; 
  /* remove the /gc to cause this program to leak memory */

node GenerateKey () 
  => (int key);
node Populate (int key, guard G(key) as g) 
  => (int key); /* populates the rval guard */
node Depopulate (int key, guard G(key) as g) 
  => (); /* depopulates the rval guard */


source GenerateKey -> Populate -> Depopulate;


The plan is to have the source node GenerateKey generate a number, have node Populate populate the G guard with that number as a key, then have node Depopulate depopulate it (by assigning it a value of (int *) 0) on the same integer key.  The following C++ implementation of the node functions carry out this plan:


#include <stdio.h>
#include "OFluxGenerate_gctest1.h"

int 
GenerateKey(const GenerateKey_in *
          , GenerateKey_out * out
          , GenerateKey_atoms *)
{
 static int x = 0;
 out->key = ++x;
 return 0;
}

int 
Populate(const Populate_in *in
       , Populate_out *out
       , Populate_atoms * atoms)
{
 int * & g = atoms->g();
 static int x = 0;
 out->key = in->key;
 if(g == NULL) {
  g = &x;
 }
 return 0;
}

int
Depopulate(const Depopulate_in * in
         , Depopulate_out *
         , Depopulate_atoms * atoms)
{
 int * & g = atoms->g();
 g = NULL;
 return 0;
}

Summary


Building this code without the /gc guard modifier causes the Gb guard's underlying map to explode in size (eventually running out of memory on a 32-bit system once it hits the 4G mark).  In some cases, the key space is a known finite set that does not grow values dynamically very much, and the need to remove those unused (key,0) pairs from the underlying map is not there (and the resulting speed hit of removing things is unnecessary).  Depending on the application, it could be that the value associated with key will transition from 0 to a real object again at some point in the near future with high probability.  If that is the case, the default setting with no garbage collection is recommended.

Friday, August 17, 2012

Memories Shared Over Coffee while RESTing on Node.js

A shared memory library (written in C++) needs to be exposed via REST as a read-only part of an API.  I accomplished this with a brew of Node.js, expressjs, Coffeescript, and some careful examination of the C++ add-on documentation.  Although I am admittedly pretty new to the javascript side of this code (which is the focus of this post), I found that all of those parts were very well documented on the web.  The C++ stuff is old hat for me at least, and I can describe that another time (comment if you are interested).


Shared Memory API


The data that I am drawing out of shared memory is simple fixed sized records for things called instruments.  Users of this REST API will simply ask for these records with an HTTP GET request to the proper URI whenever they need the most up to date information.  The C++ library that I have written exposes some very simple functions and data structures in its header file to allow this:


int
get_pair_id_from_symbol(const char * symbol);

struct InstrumentInfo {
  char display_name[100];
  int pip_location;
  int extra_precision;
  double pip;
};

InstrumentInfo *
getInstrumentInfo(int pair_id);


Things like the name of the shared memory segment and what to do if it needs to be created and populated are all hidden inside of the library.  This is nice, since the application using it just needs to work with the above functions to get its work done.

Instrument Info Add-on


In a file called instrumentaddon.cpp, I define the following (based on the information from the v0.8.2 of Node.js which I am using):


#define BUILDING_NODE_EXTENSION
#include <node.h>
#include "InstrumentsShm.h"

using namespace v8;


Handle<value>
GetInstrumentInfo(const Arguments & args)
{
  HandleScope scope;
  Local&ltfunction> cb = Local<function>::Cast(args[1]);

  Local<string> symbol = Local<string>::Cast(args[0]);
  String::AsciiValue ascii_val(symbol);
  int pair_id = get_pair_id_from_symbol(*ascii_val);
  if(!pair_id) {
     // not calling the cb is insult enough
     //throw Exception(String::NewSymbol("unknown pair"));
     return scope.Close(Undefined());
  }
  InstrumentInfo * ii = getInstrumentInfo(pair_id);
  if(!ii) {
    throw -1;
  }
  const int argc  = 1;
  { // display_name first
    Local<value> argv[argc] =
        { String::New(ii->display_name) };
    cb->Call(Context::GetCurrent()->Global(), argc, argv);
  }
  { // pip_location second
    Local<value> argv[argc] =
        { Number::New(ii->pip_location) };
    cb->Call(Context::GetCurrent()->Global(), argc, argv);
  }
  { // extra_precision third
    Local<value> argv[argc] =
        { Number::New(ii->extra_precision) };
    cb->Call(Context::GetCurrent()->Global(), argc, argv);
  }
  { // pip third
    Local<value> argv[argc] =
        { Number::New(ii->pip) };
    cb->Call(Context::GetCurrent()->Global(), argc, argv);
  }
  return scope.Close(Undefined());
}

extern "C" {
void init(Handle<object> target) 
{
  target->Set(String::NewSymbol("getInstrumentInfo"),
      FunctionTemplate::New(GetInstrumentInfo)->GetFunction());
} 
} // "C"

NODE_MODULE(instrumentaddon, init)


You can see how I am calling on the v8 engine (javascript engine which Node.js uses) to wrap data values and conform to the add-on API (admittedly this seems like black magic -- and it sort of is). My error handling is not very good since I am just trying to prove the concept and get some code to work which can be demonstrated.

To build this C++ into a shared object which the Node.js run-time can use, I do the following (actually I have Gnu make do it for me, rather than the default python builder):


g++ -g -O3 -Wall  -D_XOPEN_SOURCE -fPIC -DPIC -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -DEV_MULTIPLICITY=0 -I/usr/local/include/node -I. -Iinclude -Isrc/common   -c instrumentaddon.cpp -o instrumentaddon.nlo
/usr/local/include/node/uv-private/ev.h:574:1: warning: ‘ev_tstamp ev_now()’ defined but not used [-Wunused-function]
/usr/local/include/node/uv-private/ev.h:583:1: warning: ‘int ev_is_default_loop()’ defined but not used [-Wunused-function]
g++  -lm -lrt -lnsl -lpthread -L. -linstr_shm -Xlinker '-rpath=/opt/x/lib'  -g -O3 -Wall  -shared -o instrumentaddon.node instrumentaddon.nlo


The warnings building instrumentaddon.nlo (I chose this extension to associate the specific required compiler options that are needed for a Node.js add-on) are inoccuous, and can be ignored. The -rpath linker option specifies a search path for the required libinstr_shm.so library (which exports the C++ functions mentioned earlier to access the shared memory segment).

A Little Coffeescript


The Node.js program which uses this add-on to serve the data is quite simple and even looks fairly nice with the help of Coffeescript (which beautifies the necessary syntax) in a file rest.coffee:


express = require 'express'
instrumentaddon = require './instrumentaddon'

getinstrumentinfo = (symbol) ->
  info = []
  instrumentaddon.getInstrumentInfo( symbol, (itm) -> 
    info.push(itm) )
  throw something if info.length == 0
  pip = info[3]
  res =
    "instrument" : symbol
    "display_name" : info[0]
    "pip_location" : info[1]
    "extra_precision" : info[2]
    "pip_str" : "#{pip}"


app.configure( () ->
  app.set 'port', (process.env.PORT || 3000)
  app.set 'views', (__dirname + '/views')
  app.set 'view engine', 'jade'
  app.use (express.favicon())
  app.use (express.logger('dev'))
  app.use (express.bodyParser())
  app.use (express.methodOverride())
  app.use app.router
  app.use (express.static (__dirname + '/public') )
)

app.configure('development', () ->
  app.use(express.errorHandler())
)

###
instrument info route
###

app.get /^\/instruments\/([^_]+)_([^_\/]+)/, (req,res) ->
  res.json ( getinstrumentinfo (req.params[0] + '/' 
   + req.params[1]) )

app.listen 3000

console.log 'Express app started on port 3000'


Like Python, Coffeescript uses indentation levels to demarcate functional blocks and scope. This has the effect of dropping quite a bit of syntax which would otherwise clutter the code. Adding more routes which do other things (more shared memory reads or socket request/response code) is fairly simple since express provides a way of adding more lines that begin with one of app.get, app.put or app.post (followed by a URI regular expression and a handler) to accomplish this.

To convert the rest.coffee source into javascript (rest.js):

coffee --compile --bare --output builds/_Linux_x86_64_production rest.coffee

And run it (assuming that instrumentaddon.node and rest.js endup under /opt/x/node path):

% NODE_ENV=production \
NODE_PATH=/usr/local/lib/node:/usr/local/lib/node_modules \
LD_LIBRARY_PATH=/opt/x/node:/opt/x/lib \
node /opt/x/node/rest.js

The program listens on non-privileged port 3000 to serve the HTTP REST requests for this new service:


% curl "http://localhost:3000/instruments/EUR_USD"
{"instrument":"EUR/USD","display_name":"EUR/USD","pip_location":-4,"extra_precision":0,"pip_str":"0.0001"}

Summary


Integrating a C++ shared memory library with Node.js is pretty straightforward.  Shared memory provides an ideal interface since it is non-blocking and we don't need to worry about slowing down the Node.js event loop by calling out to our custom library.  The steps to build and run this application on Linux are described above in detail using the Gnu C++ compiler (g++), the Coffeescript compiler (coffee) and Node.js run-time (node).  Hopefully this bits are helpful to anyone else who might find themselves attempting something similar.  A word of warning though; Node.js and its libraries are undergoing rapid change at the moment, so any description of how to build an add-on is likely to go out-of-date soon!

Monday, August 13, 2012

Secondary School: XML Parsed with OCaml

In my previous post, I described how C++ could transfer XML into some predefined types (basic structs).  Changing those types to make the XML reading easier was not an option (in effect that would be cheating) since I assumed that working with pre-existing types is part of the problem.  I will do the same thing with my OCaml version of the problem.  The types will not be laid out in a way that necessarily makes the XML reading any simpler.


Meet the Types


The OCaml types related to reading the school schema (tastyPS.xml is an example input) are as follows:

type person = { name : string }
type teacher = { super : person; subject : string }
type student = person
type clazz =
  { hour : int
  ; teacher : teacher
  ; students : student list }
type school = { classes : clazz list }
type result = { school : school }

Faking Expat


I did not look for a library which can read XML and provide string handlers for nodes (and their attributes); there may indeed be such a library.  Instead I faked out the behaviour of the C++ Expat by just writing a long function which will operate the node handlers in a similar way:


(** test program simulates the read of TastyPS.xml *)

let readxml' nd_start_fun nd_end_fun () =
  let res1 = nd_start_fun "school" [] in
  let res1_1 =
    let res2 = nd_start_fun "class"
       ["day","Monday"; "hour","10"] in
      let res2_1 =
        let res3 = nd_start_fun "teacher"
          ["name","Mr Gauss"; "subject", "math"]
        in nd_end_fun res3 [] in
      let res2_2 =
        let res3 = nd_start_fun "student" ["name", "Jake"]
        in nd_end_fun res3 [] in
      let res2_3 =
        let res3 = nd_start_fun "student" ["name", "Mark"]
        in nd_end_fun res3 []
      in  nd_end_fun res2 [res2_1;res2_2;res2_3] in
  let res1_2 =
    let res2 = nd_start_fun "class"
        ["day","Tuesday";"hour","11"] in
    let res2_1 =
      let res3 = nd_start_fun "teacher"
        ["name","Mr Shakespeare";"subject","english"]
      in  nd_end_fun res3 [] in
    let res2_2 =
      let res3 = nd_start_fun "student" ["name","Christine"]
      in  nd_end_fun res3 [] in
    let res2_3 =
      let res3 = nd_start_fun "student" ["name","Thom"]
      in nd_end_fun res3 []
    in  nd_end_fun res2 [res2_1;res2_2;res2_3]
  in nd_end_fun res1 [res1_1;res1_2]



This code is not really meant to be elegant, it is just a large expression which does the same function calls in the same order as would an Expat library for OCaml (in a way that is similar to Expat).


Some Helper Code



An exception type is useful for when things go wrong:


exception Reader of string

A partial XML type is used to hold the results of partially converting the XML to the pre-defined types above:


type partial_xml = (** type for recursing partial content *)
   | PXStudent of student
   | PXTeacher of teacher
   | PXClass of clazz
   | PXSchool of school

An attribute finding function is useful for traversing the lists of pairs that are thrown at our handler:


let get_attr attr aname =
  try List.assoc aname attr
  with Not_found ->
    raise (Reader ("attribute "^aname^" not found"))


Node Handlers


The start node handler is where all the real work happens, but the end node handler is just used to complete the continuation which takes the converted content (list of partial converted XML) and builds the appropriate type:



let start_fun nname attr =
  let part_class_content content =
    let pcc (tlist,slist) px =
      match px with
        (PXStudent s) -> (tlist,s::slist)
        | (PXTeacher t) -> (t::tlist,slist)
        | _ -> raise 
           (Reader "classes only have teachers & students")
    in  List.fold_left pcc ([],[]) content in
    let part_school_content content =
      let psc px =
        match px with
          (PXClass cls) -> cls
          | _ -> raise 
             (Reader "school can only have classes in it ")
      in  List.map psc content in
    let get_attr = get_attr attr
    in match nname with
      "student" ->
        (fun _ ->
          PXStudent { name = get_attr "name" })
      | "teacher" ->
        (fun _ ->
          PXTeacher { super = { name = get_attr "name" }
                    ; subject = get_attr "subject" })
      | "class" ->
        (fun content ->
           match part_class_content content with
             ([teacher],students) ->
                 PXClass { hour = 
                           (int_of_string (get_attr "hour"))
                         ; teacher = teacher
                         ; students = students }
             | _ -> raise (Reader 
                "each class needs exactly one teacher"))
      | "school" ->
        (fun content ->
          let classes = part_school_content content
          in  PXSchool { classes = classes })
      | _ -> raise (Reader "unrecognized node tag")


The end node handler and the wrapper code are much simpler:


let end_fun sfunres content = sfunres content

let readxml () =
  let px = readxml' start_fun end_fun ()
  in  match px with
    (PXSchool school) -> { school = school }
    | _ -> raise (Reader "top tag is not a school")

let _ = readxml () (* run it *)


I could have made a lookup array to be used with List.assoc (a lookup function from the ocre library which matchs a key to a list of key/value pairs and returns the associated value) to deliver the proper continuation (e.g. part_class_content used for the class tag). This might have been a nicer parallel to the C++ code.  I really like having a code structure with separated handlers for each node.  I think it makes it easier to understand, and gives a fresh set of eyes (who might want to add a new node type) a nice pattern to follow. Building and running this code (in schoolxml.ml) is a simple matter as well:


$ ocamlc -g schoolxml.ml -o s.x
$ ./s.x


Summary


Rewriting this code in OCaml is (after writing the C++ version) is informative.  The OCaml code seems to handle more corner cases and raises exceptions properly (I omitted the throw statements in the C++, but left comments for where they should be added).  One very interesting rule is that a class should have exactly one teacher -- not fewer or greater.  The C++ code will likely allow the teacher value to be NULL - an error by omission -- but will not allow there to be more than one teacher.  The last observation is that, although the syntax of OCaml is much simpler, the overall parsimony of the comparable code is much more satisfying.  I guess that is why I like functional programming so much.

Thursday, August 9, 2012

Back to School: XML Parsed with C++

Reading an XML file into a hierarchy of data structures is a pretty common problem.  If those data structures have been defined already and do not follow the recursive structure of the XML, some validation is needed to ensure the XML input conforms to the data structure declarations.

School Data Example


Consider the following data structures (from Target.h):


#ifndef TARGET_H
#define TARGET_H

#include <vector>

// forward declarations
struct Person;
struct Student;
struct Teacher;
struct Class;
struct School;
struct Result;

struct Person {
 Person() { name[0] = '\0'; }
 char name[256];
};

struct Student : public Person {
 typedef Class BelongsTo;
 Student() {}
};

struct Teacher : public Person {
 typedef Class BelongsTo;
 Teacher() {}
 char subject[256];
};

struct Class {
 typedef School BelongsTo;
 Class() : teacher(0) {}

 char day[64];
 int hour;
 Teacher * teacher;
 std::vector<student> students;
};

struct School {
 typedef Result BelongsTo;
 School() {}

 std::vector<class> classes;
};

struct Result {
 typedef void BelongsTo;
 Result() : school(0) {}

 School * school;
};

#endif // TARGET_H

And an example of XML input (fom TastyPS.xml):


<school>
 <student name="foo" />
 <class day="Monday" hour="10">
  <teacher name="Mr Gauss" subject="math" />
  <student name="Jake" />
  <student name="Mark" />
 </class>
 <class day="Tuesday" hour="11">
  <teacher name="Mr Shakespeare" subject="english" />
  <student name="Christine" />
  <student name="Thom" />
 </class>
</school>

The goal is to have a function which reads the content of a file and returns a Result * which can be used by other parts of the code.  If the structure of the the XML does not match the data structures defined above we want to have the reading code to raise a C++ exception.

Top-level interface


The interface for our main program to use for reading the XML file is very simple (in readschoolxml.h):


#ifndef XML_READ_H
#define XML_READ_H

struct Result;

namespace xml {

Result *
readfile(const char * filename);

} // namespace xml

#endif // XML_READ_H


The minimalism of just having a C-like functional interface is something worth appreciating now. The symbols exposed to the following main program are very limited (no need to over expose the implementation to the user code main.cpp):


#include "Target.h"
#include "readschoolxml.h"

int
main()
{
 Result * __attribute__((unused)) res = 
  xml::readfile("TastyPS.xml");
 return 0;
}



Parsing with Expat


Expat provide a C library interface for reading XML and handling every node with its attributes using callbacks (one for the start of the node and one for the end).  Interfacing Expat with a push-down state::Reader state can be done without referencing any detail of the data structures we are targetting:


#include "readschoolxml.h"
#include "Target.h"
#include "Attribute.h"
#include <expat.h>
#include <string.h>
#include <map>
#include <fstream>

namespace xml {
namespace state {
// implementation stuff linked from elsewhere

#define XML_READER_MAX_LINE 2048

struct Reader; // opaque here

extern Reader * initial();
extern Result * get_initial_obj_and_delete(Reader *);
extern void push(Reader **parent, const char * el,
    const char ** attr);
extern void pop(Reader **child);

} // namespace state


The four functions initial, get_initial_obj_delete, push and pop in the xml::state namespace will implement the XML reading state which the following Expat callbacks will use:


void
startHandler(void *data, const char *el, const char **attr)
{
 state::Reader ** reader = 
  reinterpret_cast<state::reader>(data);
 state::push(reader,el,attr);
}

void 
endHandler(void * data, const char * )
{
 state::Reader ** reader = 
  reinterpret_cast<state::reader>(data);
 state::pop(reader);
}

// data and comments are not interesting for us here
void dataHandler(void *, const char *, int) {}
void commentHandler(void *, const char *) {}



Finally, the readfile() function can be implemented with I/O calls and Expat parsing:

Result *
readfile(const char * filename)
{
 std::ifstream in(filename);

 if ( !in ) {
  // throw file not opened
 }

 XML_Parser p = XML_ParserCreate(NULL);
 if ( !p ) {
  // throw parser creation failed 
  // (odd problem)
 }
 state::Reader * reader = state::initial();
 XML_SetUserData(p, &reader);
 XML_SetElementHandler(p, startHandler, endHandler);
 XML_SetCharacterDataHandler(p, dataHandler);
 XML_SetCommentHandler(p, commentHandler);

 int done,len;
 char buff[XML_READER_MAX_LINE +1];

 while ( in.getline(buff, XML_READER_MAX_LINE) ) {
  len = strlen(buff);
  done = in.eof();
  if ( XML_Parse(p, buff, len, done) 
    == XML_STATUS_ERROR ) {
   // throw syntax error 
   // parsing a line of XML
  }
 }
 in.close();
 XML_ParserFree(p);
 Result * res = get_initial_obj_and_delete(reader);
 return res;
}

} // namespace xml


All of this code for interfacing with the Expat library is fairly generic. It just sets up a stack-based interface with an opaque type called state::Reader.

Dealing with Attributes



A class called Attribute is used to hold the attribute values and provide type conversions if necessary (since XML attributes are basically strings) in Attribute.h:


#ifndef XML_ATTRIBUTE_H
#define XML_ATTRIBUTE_H

#include <string>
#include <map>
#include <cstdlib>

namespace xml {

class Attribute {
public:
 Attribute() : _v(NULL) {}
 Attribute(const char *v) : _v(v) {}
 Attribute(const Attribute & a) : _v(a._v) {}

 Attribute & operator=(const Attribute & a)
 {
  _v = a._v;
  return *this;
 }
 int intVal() const { return atoi(_v); }
 const char * c_str() const { return _v; }
 bool boolVal() const
 {
  static const std::string t = "true";
  return t == _v;
 }
private:
 const char * _v;
};

Another class called AttributeMap is used to hold a map from constant strings to Attribute values:


class AttributeMap : public std::map<const char *, Attribute> {
public:
 Attribute & getOrThrow(const char * k)
 {
  std::map<const char *, Attribute>::iterator 
   itr = find(k);
  if(itr == end()) {
   // exception thrown here
  }
  return (*itr).second;
 }
 Attribute getOrDefault(const char * k, 
  const char * str_default)
 {
  // ...
 }
};

} // namespace xml

#endif // XML_ATRRIBUTE_H

Push-down State Details



In schoolstate.cpp the XML reader state is described in more detail. The add<>() template function describes how objects are added to other objects (partial template specialization is used here to help cover the variety of ways that one object is added to another):


#include "Target.h"
#include "Attribute.h"
#include <string.h>

namespace xml {
namespace state {

struct Reader {
 Reader(Reader * n) : next(n) {}
 virtual ~Reader() {}

 Reader * next;
};

void 
pop(Reader **r)
{
 Reader * d = *r;
 *r = d->next;
 delete d;
}

template< typename P
 , typename C
 >
inline void add(P * parent, C * child)
{}

template<> inline void 
add<Result,School>(Result * r, School *s)
{
 // throw if r->school != 0
 r->school = s;
}

template<> inline void 
add<School,Class>(School *s,Class * c)
{
 s->classes.push_back(c);
}
template<> inline void 
add<Class,Student>(Class * c,Student * s)
{
 c->students.push_back(s);
}
template<> inline void 
add<Class,Teacher>(Class * c,Teacher * t)
{
 // throw if c->teacher != 0
 c->teacher = t;
}



The ReaderImpl<> template creates the new Target.h object and adds it to the containing object when the Reader is destructed:

template< typename T
 >
struct ReaderImpl : public Reader {
 ReaderImpl(Reader * n) : Reader(n) , obj(new T()) {}
 virtual ~ReaderImpl()
 {
  ReaderImpl<typename T::BelongsTo> * parent =
   dynamic_cast<ReaderImpl< T::BelongsTo> * >(next);
  // throw on !parent (cast failed)
  add<typename T::BelongsTo,T>(parent->obj,obj);
 }

 T * obj;
};

template<> // top-level over-ride
struct ReaderImpl<Result> : public Reader {
 ReaderImpl<Result>(Reader * n) 
            : Reader(n) , obj(new Result()) {}
 virtual ~ReaderImpl<result>() {}

 Result *obj;
};

Reader *
initial()
{
 return new ReaderImpl<Result>(0);
}



Some constant strings are useful for understanding the XML content:

namespace vocab {

// attribute values
static const char * monday = "Monday";
static const char * tuesday = "Tuesday";
static const char * wednesday = "Wednesday";
static const char * thursday = "Thursday";
static const char * friday = "Friday";
static const char * saturday = "Saturday";
static const char * sunday = "Sunday";

static const char * math = "math";
static const char * science = "science";
static const char * english = "english";

// attributes:
static const char * day = "day";
static const char * hour = "hour";
static const char * name = "name";
static const char * subject = "subject";

// nodes:
static const char * school="school";
static const char * clazz="class";
static const char * teacher="teacher";
static const char * student="student";

} // namespace vocab



Filling the AttributeMap using the Expat provided array of attribute/values is fairly simple (and validation of the values happens when they are from a finite set):

void
fillAttributeMap(AttributeMap & map, const char ** attr)
{
 using namespace vocab;
 static const char * days[] =
  { monday, tuesday, wednesday, thursday, 
                  friday, saturday, sunday, 0 };
 static const char * subjects[] =
  { math , english , science , 0 };
 static const struct {
  const char * attr_name;
  const char ** restrict_values;
 } attr_vocab[] = 
  { { day, days }
  , { name, 0 }
  , { hour, 0 }
  , { subject, subjects }
  , { 0, 0 }
 };
 for(size_t i = 0; attr[i]; i += 2) {
  Attribute attrib(attr[i+1]);
  int fd = -1;
  for(size_t j = 0; attr_vocab[j].attr_name; ++j) {
   if(strcmp(attr_vocab[j].attr_name,attr[i]) == 0) {
    fd = j;
    break;
   }
  }
  if(fd < 0) {
   // throw unrecognized attribute
  }
  bool val_fd = (attr_vocab[fd].restrict_values == 0);
  for(size_t j = 0; !val_fd 
   && attr_vocab[fd].restrict_values[j]; ++j) {
   if(strcmp(attr_vocab[fd].restrict_values[j],attrib.c_str
()) == 0) {
    val_fd = true;
    break;
   }
  }
  if(!val_fd) {
   // throw invalid value for attribute
  }
  map[attr_vocab[fd].attr_name] = attrib;
 }
}


Pushing the XML reader state is now possible with the factory<> template function (using C++ template function partial specialization in order to get the attribute values into the data structures):

template< typename T > Reader * 
factory(Reader *n, AttributeMap &)
{
 return new ReaderImpl<T>(n);
}

template<> Reader * 
factory<Student>(Reader *n, AttributeMap & map)
{
 ReaderImpl<Student> * res = new ReaderImpl<Student>(n);
 strcpy(res->obj->name,map.getOrThrow(vocab::name).c_str());
 return res;
}

template<> Reader * 
factory<Teacher>(Reader *n, AttributeMap & map)
{
 ReaderImpl<Teacher> * res = new ReaderImpl<Teacher>(n);
 strcpy(res->obj->subject,
  map.getOrThrow(vocab::subject).c_str());
 strcpy(res->obj->name, map.getOrThrow(vocab::name).c_str());
 return res;
}

template<> Reader * 
factory<Class>(Reader *n, AttributeMap & map)
{
 ReaderImpl<Class> * res = new ReaderImpl<Class>(n);
 strcpy(res->obj->day, map.getOrThrow(vocab::day).c_str());
 res->obj->hour = map.getOrThrow(vocab::hour).intVal();
 return res;
}


void
push(Reader **parent, const char * el, const char ** attr)
{
 static const struct {
  const char * el_name;
  Reader * (*factory_fun)(Reader *, AttributeMap &);
 } lookup[] =
  { { vocab::school,  factory<School> }
  , { vocab::clazz,   factory<Class> }
  , { vocab::teacher, factory<Teacher> }
  , { vocab::student, factory<Student> }
  , { 0, 0 } };
  
        AttributeMap map;
        fillAttributeMap(map,attr);
 for(size_t i = 0; lookup[i].el_name; ++i) {
  if(strcmp(lookup[i].el_name,el)==0) {
   *parent = (*(lookup[i].factory_fun))(*parent,map);
   return;
  }
 }
 // throw not found
}

Result *
get_initial_obj_and_delete(Reader *r)
{
 ReaderImpl<Result> * ri =
  dynamic_cast<ReaderImpl<Result> > *>(r);
 // throw on !ri (cast failed)
 Result * res = ri->obj;
 delete r;
 return res;
}

} // namespace state
} // namespace xml


I have left the exception throwing parts of the code as comments, they really need to be there for this software to properly handle errors.

Summary


The first revision of the code that this sample is based on had a huge switch statement in it and maintained lots of state variables in order to get the job done.  These variable kept track of the XML state that had been read.  The code was fragile and hard to maintain (other coders easily broke it).  The sample above does more validation, encapsulates the knowledge of the target data structures (and how they are constructed) in one compilation unit, and the functions involved are much smaller.  The type checking that happens within the C++ template code also helps catch syntax errors for input that does not match what is expected.  I hope to compare this implementation with a comparable one in OCaml at some point in the future.  Static typing, recursive data structures, pattern matching and type inference will likely make for much simpler implementation of this code.

Tuesday, August 7, 2012

OFlux Plugin Away

Plugin architectures allow for optional functionality to be added to a simple core. The core or kernel of the program provides the bones on which the rest of the program is built. It is not necessary for the kernel to rely on the parts added ontop (the plugins) -- in fact it would be bad if that happened. Plugins can depend on more primitive plugins to accomplish their jobs.

Unlike modules which are re-usable as multiple instances within the program, plugins are intended to only either be there (once) or not there at all.  Most web servers accept plugins (e.g. for dynamic scripting language execution or CGIs) which extend the functionality of the core web server program (which parses HTTP headers etc).  This can be very a very powerful way of organizing server software generally.

When shipping software to customers with differing needs, plugins allow the the end user to customize the code that they are running in a controlled way.  A plugin which is not running is one which does not adversely affect performance, and cannot cause the program to crash.  More critically, turning off functionality which is only suitable in non-production environments (so it never runs on a live system with customers using it) is great safety feature.


My First Plugin



In order to prepare the way for a plugin Plug, the kernel.flux program you write first needs to have made available some abstract nodes for the plugin to hook into:

 
 node S () => (int a);
 node A (int a) => ...;
 node N (int a) => ();

 source S -> A;
 A = N;


By default all of the outputs of node S are consumed by concrete node (meaning it has a C++ implementation function) N via abstract node A (its only purpose is to provide a place for the new plugin to hook-in). Now we can write a new plugin which routes away some cases from the kernel flow to handle them in using their own special code:

 
 include kernel.flux

 plugin Plug
  begin
  external node A (int a) => ...;

  condition isZero (int a) => bool;
  node NForZero (int a) => ();

  A : [isZero] = NForZero;
  end


On the C++ side there is a Plug namespace which holds all of the symbols for the plugin, and it is compiled into a libPlug.so dynamic shared object (loaded dynamically at run time).  The decision to load a particular plugin is based on configuration (by default symbolic links to XML files such as Plug.xml in a particular directory), so it is easy to turn them on and off.  The content of a plugin XML file describes the list of required plugins that need to be loaded first, and how it is that the program flow is patched/modified by the new plugin code.

The effect of the Plug plugin is to divert the flow to node NForZero when the output a of S has isZero(a) evaluate to true. This is really a conditional augmentation of the existing kernel flow:


The dot output from compiling the plugin (using the -p compiler option), shows what the plugin added to the flow it is built on top of.  Had Plug depended on other plugins, those would also be highlighted in red colored boxes (and each .flux file would need to have include statements at the top of Plug.flux).

Had the conditional isZero been replaced by a *, the new route NForZero would become the new default (N being unreachable after that).  This is a way for the plugin to over-ride existing functionality in the kernel program.

Another Possibility is to add a concurrent successor node to the flow using the special &= operator which causes a second node (in addition to N) to run on every output from S:


 Node M (int a) => ();
 A &= M;

Summary


Plugins provide a method of extending a program with optional functionality.  In the case of OFlux plugins the functionality can be new parts of the flow which augment a pre-existing flow.  Plugins can use (and therefore depend on) the functionality of other plugins.  This way of coding away from the core with ever more specialized code with finer grain concerns is very useful.  It has many benefits such as reducing compile time (of the plugin component), enforcing dependencies, reducing exposure to bugs and increasing performance (by not running code you do not require).

Friday, August 3, 2012

OFlux Multipling Successor Events

Dynamically increasing the number of output successor events from a given node event execution is a powerful concept. By submitting many events to the OFlux run-time event queue, we can distribute the follow on work to other run-time worker threads. In a previous post, I described how to (staticly) have a node event's output processed concurrently by two separate nodes. Although similar, the functionality of processing the same input with two C++ node functions in the flow is orthogonal to the idea that a node event might produce several outputs.

Producing No Output


If a node function wants to cancel the flow to its successors it can do that by returning a non-zero result. This -- in effect -- means that the execution of that node function encountered an error. If there is an error handler for that node, it will be called -- but passed the input to the node that threw the error. The error handler node has a chance to re-inspect the input to the failed node function, and take remedial action:


Node Foo (const char * type) => (int type_id);
Node Oops (const char *) => ();

handle error Foo -> Oops;

The C++ code for the Foo node might check a static look-up table for a matching entry and return an error when no entry is found (causing no successor events to run, but rather having an Oops node event process the input instead):

int
Foo(const Foo_in *in, Foo_out *out, Foo_atoms *)
{
  static struct { const char * t, int tid } lookup[] =
    { { "apple", 1 }
    , ...
    , { 0, 0 } };
  int res = -1; // indicates not found - its an error
  for(size_t i = 0; lookup[i].t; ++i) {
    if(0 == strcmp(lookup[i].t,in->type)) {
      res = 0;
      out->type_id = lookup[i].tid;
      break;
    }
  }
  return res;
}


If no error handler is declared for a node, then no error node event is scheduled to run (meaning the error is ignored).

Producing More Output


In order to have a node produce more than a one output structure (leading to many successor events), there is a C++ help er gadget called oflux::PushTool<> which gives the node function access to this capability. Here is an instance of its use:



Modifying the flow above to have the Foo node "splay" all matching outputs in the lookup table (rather than just the first one) by adding a (non-compulsory) comment:

node Foo(const char * type) => /*splay*/ (int type_id); 

And changing our C++ implementation of Foo as follows:

Foo(const Foo_in *in, Foo_out *out, Foo_atoms *)
{
  static struct { const char * t, int tid } lookup[] =
    { { "apple", 1 }
    , ...
    , { 0, 0 } };
  size_t out_count = 0;
  oflux::PushTool<Foo_out> ptool(out);
  for(size_t i = 0; lookup[i].t; ++i) {
    if(0 == strcmp(lookup[i].t,in->type)) {
      ++out_count;
      ptool->type_id = lookup[i].tid;
      ptool.next();
    }
  }
  return out_count ? 0 : -1;
}

Now Foo could produce several outputs (type_ids) on one input, and each of those will get processed using the flow that follows node Foo. If no matches are found in the lookup, then no flow will operate (no successor node events).

Summary


In some cases where it is advantageous to release multiple outputs from a node (causing more successor events to be created), the run-time can be leveraged to dispatch the processing of those events on multiple threads. More threads doing work means more concurrency. The trade-off is that the successor event work must be more (in single thread terms) than the over head incurred (in the run-time) by doing this. It may be that successor processing happens so quickly in a single thread, that having Foo just output a container (e.g. vector) containing all results is better.

Thursday, August 2, 2012

Comparing 10Gbe Cards

10Gbe network interfaces deliver lower latency as well as higher bandwidth.  Even if you are not close to saturating the 1Gbe cards in your setup, it might be worth considering the next generation of networking technology on the basis of latency alone.  In this post, I will compare a few 10Gbe cards which I have had the privilege to try out.  I do have a favorite card at this point, which I will reveal at the end (hint: it has more to do with the software stack than the hardware specifically.

Wired magazine online had a series of articles earlier in the year where it was revealed that over 60% of the network ports in the data centers managed by internet giants were 10Gbe.  The potential latency for Infiniband (a similar but slightly older technology) is known to be under 5 microseconds.  It is fairly typical to see latencies in 1Gbe hover in the 150 to 30 microseconds (latency is dependent on the size of the packet payload).

In order to push a network interface to handle this much data, a modern computer with a PCIe slot with enough lanes (typically 10Gbe cards use an x8 slot -- so the x16 slot available for a graphics card is sufficient) is required.  Achieving the lower latencies this new hardware is capable of is challenging for the operating system (in my case Linux) since the TCP/IP stack and Berkeley sockets API begins to become a bottle neck.  Almost every manufacturer  has attempted to solve this problem in  their own way, providing a software work-around which achieves higher performance than what is dirrectly available via the kernel and standard API.


Method


To test a pair of cards, I plugged them into the x16 slots on two cluster nodes and cabled them directly to each other (so no switch in between).  I then configured them for ethernet, assigned IP addresses, and  ran some benchmarks:
 
 # modprobe 
 # service openib start 
 # ifconfig eth1 192.168.3.10
 # iperf 
 # NPtcp

And on the other node:
 
 # modprobe 
 # service openib start
 # ifconfig eth1 192.168.3.11
 # iperf -c 192.168.3.10
 # NPtcp -h 192.168.3.10

The iperf test mostly checks bandwidth and for me is just a basic sanity test. The more interesting test is netpipe (NPtcp) which does a latency test at various packet sizes.

Testing RDMA latency on a card that provides it is a simple matter of running a bundled utility (-s indicates payload bytes and -t is the number of iterations):


# rdma_lat -c -s32 -t500 192.168.3.10
 2935: Local address:  LID 0000, QPN 000000, PSN 0xb1c951 RKey 0x70001901 VAddr 0x00000001834020
 2935: Remote address: LID 0000, QPN 000000, PSN 0xcbb0d3, RKey 0x001901 VAddr 0x0000000165b020
 
 Latency typical: 0.984693 usec
 Latency best   : 0.929267 usec
 Latency worst  : 15.8892 usec


Mellanox 



This card is widely used and very popular.  Mellannox has considerable experience with Infiniband products, and have been able to produce cards which are capable of transporting TCP/IP traffic ontop of infiniband technology.  For them this is mostly accomplished using kernel drivers which are part of the OFED software stack.  I found that it is best to get a snapshot of this suite of packages from Mellanox directly for one of the particular distributions (all of them, ultimately, a variation on Red Hat Linux).  Although Debian wheezy had OFED packages in its repository, they were not recent enough for one of the newer cards I was trying.  For these reasons, I ended up dual booting my cluster to Oracle Enterprise Linux (OEL 6.1 specifically).  Debian Wheezy was able to run this card as an ethernet interface (using the kernel TCP/IP stack), it's just that fancy things like Infiniband and RDMA were not accessible.

I also managed to test a Mellannox ConnectX3 card, but I found that its performance was not (statistically) discernable from the ConnectX2.  If you told me to figure out which card was in a box from its benchmarks I would not be able to separate the ConnectX2 and ConnectX3 -- although presumably the new revision does have some advantage which I did not find.


Solar Flare 


Solar Flare makes several models of 10Gbe cards.  The value added by solar flare is mostly in their open onload driver technology which makes use of their alternative network stack which runs mostly in user space.  This software accesses a so-called virtual NIC interface on the card to speedup network interaction bypassing the standard kernel TCP/IP stack.  Just like the Mellanox cards, I found that Debian Wheezy could recognize the cards and run them with the Linux kernel TCP stack, but the special drivers (open onload) needed to run on OEL (I hope to attempt to build the sfc kernel driver on Wheezy soon).


Measurements and Summary



Below is a summary of the measurements that I did on these cards using various TCP stacks (vanilla indicates the Linux 3.2.0 Kernel is being used) and RDMA


Communication Type Card Distro K Mod Mesg bytes Latency
vanilla TCP solar flare OEL sfc 32 17 usec
vanilla TCP solar flare OEL sfc 1024 18 usec
vanilla TCP Mellanox connectX3 OEL mlx4_en 32 12 usec
vanilla TCP Mellanox connectX3 OEL mlx4_en 1024 16 usec
vanilla TCP Mellanox connectX2 OEL mlx4_en 32 13 usec
vanilla TCP Mellanox connectX2 OEL mlx4_en 1024 9 usec
onload userspaceTCP solar flare OEL sfc 32 2.4 usec
onload userspaceTCP solar flare OEL sfc 1024 3.6 usec
RDMA Mellanox connectX3 OEL mlx4_ib 32 1.0 usec
RDMA Mellanox connectX3 OEL mlx4_ib 1024 3.0 usec
RDMA Mellanox connectX2 OEL mlx4_ib 32 1.0 usec
RDMA Mellanox connectX2 OEL mlx4_ib 1024 3.0 usec


The big surprise in this investigation is open onload (more info can be had from this presentation).  This driver is activated selectively using user space system call interposition (so you can choose which applications run on it).  It does not require the application to be rewritten, recompiled or rebuilt in any way.  This means, in particular that closed source third party software can use it.  It's this extra flexibility which really has my attention.  Without coding to a fancy/complicated API, a developer can make use of familiar programming tools to create systems with low networking latency.