Tuesday, July 31, 2012

OFlux Modules

Code re-use is a good thing.  Developers want to have encapsulated functionality which they can easily turn into libraries so that those parts can be re-used for other programming projects.  OFlux does this using its modules system which takes its inspiration from Standard ML (or OCaml for that matter).  Standard ML extends modules a bit with an idea called functors for making modules (and their signatures -- like interfaces) generic.  OFlux does not yet support functors, but I have toyed with the idea from time to time.

Writing an OFlux module Bar involves creating the following within C++ namespace Bar:

  • a Bar::ModuleConfig structure to hold its "self" object state (each instance has its own object).  Often this object encapsulates the state of the module.  Static modules do not have a ModuleConfig structure -- so they are stateless in a way.
  • in a file called Bar.flux (and after including needed OFlux source) the module is declared using the syntax module Bar begin ... end.  All of the usual things go inside of this block (nodes, guards, flow, instances of other modules).  A module is not a recursive concept, so you can't instantiate it within itself.
  • to instantiate the module in another OFlux source location the syntax is instance Bar barinst;
  • Unless a node N (within the module) is declared with special keyword mutable, it will implicitly acquire the module instance's readwrite self guard for read access.  This provides to the C++ node function Bar::N a way to access the Bar::ModuleConfig object for that instance.  Mutable nodes acquire the self guard for write instead of read, and therefore they can change the internal instance state.
The semantics of instantiating a module is that the content of the module is notionally copied into the current scope of the program and its content (nodes, guards, etc) are just prefixed with the instance name and a "." character.  In order to allow the guards inside of a module to unify (conflating them to one guard rather than two), the syntax where guard1 = instguard1, guard2 = instguard2 is appended to the guard instantiation.  

There is no notion of inheritance between modules, since composition is done by inclusion (via instantiation).  So a developer can build modules that use other modules (e.g. we include Bar.flux in a new module which will contain an instance (or more) of Bar within itself).  Inheritance is a problematic abstraction, and the rule that inclusion should be used (indicating "has a" relationships) guided the design of this language feature.

To see an example of how an OFlux module is written, please have a look at this example in the Github repo and the top-level for that example.

Monday, July 30, 2012

xplot.org on Time Series Events

Visualizing events in a distributed system is tough.  There are good solutions for experimentation on a single machine, but once the software lives on multiple machines issues crop up.  The number one issue is timing synchronization.  Out of the box NTP based time synchronization is not accurate enough (I have found) to deliver proper time synchronization.


Precise Time Synchronization



Success (on Linux) involved installing and running ptpd2 (PTP stands for Precision Time Protocol) available on sourceforge.

On the master (fileserver machine) I keep ptpd2 running with:

 ./ptpd2 -W -b eth0

And on the slave compute nodes I just keep them running in sync with the master using:

 ./ptpd2 -c -g -B

Together, these processes keep the cluster time synchronization locked to around 1 microsecond - which is sufficient for my immediate needs.


Enter xplot.org



When looking into a TCP/IP issue, I discovered tcptrace which is a program for analyzing the output of tcpdump or snoop (on Solaris).  Several visualizations of a TCP/IP session captured by those sniffing tools are available, and (most importantly) they are viualized within a tool called xplot.org.  This tool is an X windows program which can efficiently display large time series data sets in a 2D graph.  Unfortunately, its text-based data format is only really documented by reviewing its C source code.  I intend to share what I have learned about the format so that (hopefully) others can skip reading the source.

Most of the commands are issued on a single line, with the exception of commands that take a text argument.  Commands that display text in the graph require the subsequent line for the text content.  Most drawing commands looks like:


 commandname x-coordinate1 y-coordinate2 [ x-coordinate2 y-coordinate2 ] [ color ]


Configuration of a time series graph that uses UTC timestamps (second resolution with microseconds expressed after the decimal part) is done using:


timeval double

Using three text commands we can label (using xlabel and ylabel commands) the axes and title the graph itself:

title
Results from Tue Apr 24 11:45:55 2012 - run Tue Apr 24 11:48:00 2012
xlabel
Wall clock time
ylabel
Event id

Adding some lines is easy with the line command:


line 1335282355.550356 1 1335282355.550356 2 yellow
line 1335282355.550356 2 1335282355.620854 2 yellow
line 1335282355.620854 2 1335282355.620854 3 yellow
line 1335282355.620854 3 1335282355.691357 3 yellow
line 1335282355.691357 3 1335282355.691357 4 yellow

Adding some more random things:


dtick 1335282355.623986 2.004608 blue
line 1335282355.623986 2.004608 1335282355.624049 2.004608 blue
diamond 1335282355.624049 2.004608 green
diamond 1335282355.624101 2.005530 green
line 1335282355.624049 2.004608 1335282355.624101 2.005530 green
line 1335282355.624049 2.004608 1335282355.624514 2.002304 gray20
box 1335282355.624514 2.002304 gray20

Finally a look at how xplot.org paints all of this data in its viewer:


 Zooming in on the green diamond (left button on my mouse):


Zooming in once more:


Finally we can see the event tree that I am depicting here (the Y-axis is mostly used for layout of the tree), and the X-axis shows accurate timing information:


The middle button (or wheel button) on my mouse helps to pan in one direction or another, and a single left-click is used to pop back through the zoom-ins you have done recently.


Summary



If you have lots of data to process and are comfortable generating an xplot file using your favourite tool (programs like awk and Perl are useful for transformations like this), then you may get some mileage out of xplot.org for visualizing moderate amounts of time series data (100s of megabytes) using a modest Linux computer (e.g. a netbook).  More serious hardware is definitely capable of more.  I have not found an in-browser viewer which is close to being as fast as the native xplot.org program.

Friday, July 27, 2012

Enumeration on the stack

Allocating objects in C++ on the heap is not free. Although there are many implementations of allocators that are very fast (and particularly good for multi-threaded applications), it is still preferable to avoid allocating too many things on the heap. In Java programs there is not much choice in the matter, since the use of the heap is kind of an endemic habit. Fortunately garbage collection in Java has gotten a whole lot better. In this post I re-visit the enumeration interface I described earlier and try to provide two implementations of it (one uses the program stack only, and the other relies on the heap).


Enumerators Again



In Enumerator.h I describe the same generic enumerator interface, and then I offer some convenience C macros which allow a function to be called which creates the enumerator, and enables a traversal of it:


template< typename AnyT >
struct Enumerator {
  virtual bool next(const AnyT * &) = 0;
  virtual ~Enumerator() {}
};

The stack macros within Enumerator.h are as follows (note the use of "placement new" and the explicit call to the virtual destructor):

#define stack_for_each_start(EFUN,Tp,X) \
 { /*new scope*/ \
  char _enum_buff[EFUN##__storage_size]; \
  typedef Tp _enum_Tp; \
  Enumerator<Tp> * _enum##X = EFUN(_enum_buff); \
  Enumerator<Tp> * _enum = _enum##X; /*for cleanup*/ \
  const Tp * X = 0; \
  while((_enum##X) && (_enum##X)->next(X)) {

#define stack_for_each_end \
 } if(_enum) { (_enum)->~Enumerator<_enum_Tp>(); } }  

So that is reasonably ugly, and most C macros are, but the beauty on the application side is coming -- be patient! Here are the heap allocated macros:

#define heap_for_each_start(EFUN,Tp,X) \
 { /*new scope*/ \
  Enumerator<Tp> * _enum##X = EFUN(); \
  Enumerator<Tp> * _enum = _enum##X; /*for cleanup*/ \
  const Tp * X = 0; \
  while((_enum##X) &&(_enum##X)->next(X)) {

#define heap_for_each_end \
 } delete (_enum); }


Our Library



In order to write our application library header file app.h, we want to minimize the amount of junk we expose the user application code. Here we declare a single function app::get_all_ts() which will return the enumerator and use existing storage if provided as an argument (which is our way to allocating on the stack):

#include "enumerator.h"
#include <cstdlib>

namespace app {
  struct T { int id; char name[40]; }; // T defined here
  Enumerator<T> * get_all_ts(void * storage = 0);
  extern const size_t get_all_ts__storage_size;
} // namespace app

The app::get_all_ts__storage_size symbol is provided so that we know how much storage the underlying enumerator implementation needs.  The implementation of the library code behind app::get_all_ts hides almost all the detail of the collection being enumerated, and how the enumerator implementation works:


#include "app.h"
#include <new>

namespace app {

T t1 = { 1, "sam" };
T t2 = { 2, "bill" };
T * tarray[] = { &t1, &t2, 0 };

struct AllEnumeratorImpl : public Enumerator<T> {
 AllEnumeratorImpl() : at(&(tarray[0])) {}
 virtual bool next(const T * & tp) {
  if((tp = *at)) {
   ++at;
   return true;
  }
  return false;
 }
private:
 T ** at;
};

Enumerator<T> * 
get_all_ts(void * storage)
{
 return storage
  ? new(storage) AllEnumeratorImpl()
  : new AllEnumeratorImpl();
}

const size_t get_all_ts__storage_size = 
        sizeof(AllEnumeratorImpl);

} // namespace app

My simple C array tarray of objects is just an example. A real library would have a more interesting collection with less static contents.

User Code



Finally we can use these mechanics to do a traversal in the ex.cpp user code:


#include "app.h"
#include <cstdio>

int
main()
{
  stack_for_each_start(app::get_all_ts, app::T, tptr)
    printf("%d %s\n", tptr->id, tptr->name);
  stack_for_each_end
  return 0;
}

Building and running the ex.cpp code we get:

 % g++ -g app.cpp ex.cpp -o ex
 % ./ex
 1 sam
 2 bill


Checking for Leaks



Using the heap_ version of the macros available in Enumerator.h, the same result is had. The difference in Valgrind output on the two implementations is a bit interesting (first for the stack macros):

% valgrind --tool=memcheck --leak-check=full ./ex
==29306== Memcheck, a memory error detector.
==29306== Copyright (C) 2002-2008, and GNU GPL'd, by Julian Seward et al.
==29306== Using LibVEX rev 1884, a library for dynamic binary translation.
==29306== Copyright (C) 2004-2008, and GNU GPL'd, by OpenWorks LLP.
==29306== Using valgrind-3.4.1-Debian, a dynamic binary instrumentation framework.
==29306== Copyright (C) 2000-2008, and GNU GPL'd, by Julian Seward et al.
==29306== For more details, rerun with: -v
==29306== 
1 sam
2 bill
==29306== 
==29306== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 17 from 1)
==29306== malloc/free: in use at exit: 0 bytes in 0 blocks.
==29306== malloc/free: 0 allocs, 0 frees, 0 bytes allocated.
==29306== For counts of detected errors, rerun with: -v
==29306== All heap blocks were freed -- no leaks are possible. 

Comparing this with the heap implementation which does see one alloc happen:

% valgrind --tool=memcheck --leak-check=full ./ex
==29288== Memcheck, a memory error detector.
==29288== Copyright (C) 2002-2008, and GNU GPL'd, by Julian Seward et al.
==29288== Using LibVEX rev 1884, a library for dynamic binary translation.
==29288== Copyright (C) 2004-2008, and GNU GPL'd, by OpenWorks LLP.
==29288== Using valgrind-3.4.1-Debian, a dynamic binary instrumentation framework.
==29288== Copyright (C) 2000-2008, and GNU GPL'd, by Julian Seward et al.
==29288== For more details, rerun with: -v
==29288== 
1 sam
2 bill
==29288== 
==29288== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 17 from 1)
==29288== malloc/free: in use at exit: 0 bytes in 0 blocks.
==29288== malloc/free: 1 allocs, 1 frees, 8 bytes allocated.
==29288== For counts of detected errors, rerun with: -v
==29288== All heap blocks were freed -- no leaks are possible.

At this point we also know that neither method leaks memory, or segmentation faults. So they are pretty equivalent to the user program.


Performance



To compare performance I made a new sample ex_timing.cpp with no I/O but iterated 10 million times:

#include "app.h"
#include <cstdio>

int
main()
{
  for(long i = 0; i < 10000000; ++i) {
    heap_for_each_start(app::get_all_ts, app::T, tptr)
    heap_for_each_end
  }
  return 0;
}

I ran each version (stack and heap) 3 times and picked the middle user timing to get this table (these used compiler optimization -O3):

macro prefixuser time for 10 million iterations
stack292 ms
heap960 ms


The stack version is over three times faster!

Summary


For local things like enumerators the method of encapsulating the functionality had us returning a heap allocated object which the user code had to delete (via out macro) the object when it was done.  This was quite functional, but it has a performance penalty relative to what can be done with the stack.  The trouble with the stack is that the memory for the enumerator object has to be be "allocated" prior to the call into the library call which returns the enumerator -- and this forced us to expose the size of the implementation enumerator somehow.  I think it is an acceptable cost (some minor symbol pollution) in order to get some benefit for performance.  Of course, I wish for a way to do this which does not involve using C macros.

Thursday, July 26, 2012

OFlux: Building a Flow

Most of a .flux file content will be node declarations and connecting flow.  There are two types of nodes: abstract nodes which serve as helpful connection points and do not have C++ code associated with them, and concrete nodes which are implemented via a C++ function.  In this post, I will describe the major features of the OFlux language in some detail.  First off is something programming language people call "choice".

Routing with Conditions


Suppose we have a source node Src which blocks on some input and produces an output Foo * foo.   The source node is declared as follows (its input set is empty -- which is necessary for a source node):


 node Src () => (Foo * foo);
 source Src;

As written, the oflux compiler will complain about this input since the flow rooted at Src with only one node in it does not end with a node that has a () output set.  Adding a line which has terminate Src, will silence the complaint (in effect saying "we know what we are doing, don't complain").  What we really want to accomplish is to apply a condition isFooEnough() to every foo that comes out of Src.  This condition will -- in reality -- be implemented within our C++ code using a function with prototype bool isFooEnough(Foo *):


 condition isFooEnough(Foo *) => bool;


Suppose we want to implement separate nodes ConsumeFooEnough and ConsumerFooLacking as successors depending on the outcome of the isFooEnough() test.  Needless to say, it is assumed that isFooEnough has no side-effects on its argument or the global state of the program since multiple calls to such a conditional might occur when the logic gets more complicated.  Once consumed we will dispose of foo with a node called DisposeFoo.  The abstract node ComsumeFoo is used below only to help describe the decision being made:


 node ConsumeFooEnough (Foo * foo) => (Foo *);
 node ConsumeFooLacking (Foo * foo) => (Foo *);
 node DisposeFoo (Foo * foo) => ();
 node abstract ConsumeFoo (Foo * foo) => ...;
 ConsumeFoo: [isFooEnough] = ConsumeFooEnough -> DisposeFoo;
 ConsumeFoo: [*] = ConsumeFooLacking -> DisposeFoo;
 source Src -> ConsumeFoo; /* changing the original */

Much like pattern matching in a language like OCaml or Scala, the syntax for routing is done using the rule which first matches the input.  So if isFooEnough(foo) returns true, then the routing to ConsumeFooEnough happens, and otherwise the path to ConsumeFooLacking is chosen.  This syntax survives from the original Flux language design.  Elipsis (...)  is used on the output declaration of ConsumeFoo to indicate to the compiler that we do not care to specify the output set, and if we did only () would be acceptable since other options would not unify with the output set of DisposeFoo.  Now that a basic flow is described, the application programmer only has to implement the following concrete node functions in their C++ code to create a runnable program: Src, ConsumeFooEnough, ConsumeFooLacking, and DisposeFoo.  If it is not necessary to have DisposeFoo as a separate node, I would recommend that it just be implemented as a function which is called inside of the ConsumeFooEnough/ConsumeFooLacking C++ functions.  The only reason to have a separate node (which implies a separate event and a run-time scheduling of that event -- a non-negligible cost), is if DisposeFoo is interacting with resources in the program (please see my posting on OFlux guards).

Running oflux


To compile the content above in a file called web.flux we issue the command:


 % ./oflux web.flux
OFlux v1.00-6-gc59d671 on web.flux

This causes the following output to be created locally:

  • web.dot: a description of the OFlux flow which can be turned into a pretty picture showing types, conditions and nodes (blue are abstract).  To create a picture with graphviz, run dot -Tpng web.dot -o web.png:

  • web.xml: the XML description of the OFlux flow which will be loaded at run-time to make the program run (edits to this file can cause changes to the flow without having to recompile. For brevity only the Src node is shown as it has the most interesting entry.):


<flow name="web.flux" ofluxversion="v1.00-6-gc59d671">
 <node name="Src" function="Src" source="true" door="false" iserrhandler="false"
 detached="false" external="false" inputunionhash="cbd4d4a285d623ee19470f7d5d68e
1c1a765263c3c9242a8d61b222d49c7e64c" outputunionhash="6e63ce559f96ef9b1f68aa4370
2f1343c9638715464dea7d140e6b4d068d9688">
  <successorlist>
   <successor name="0">
    <case nodetarget="ConsumeFooEnough">
     <condition name="isFooEnough" argno="1" isnegated="false" unionhash="6e63ce
559f96ef9b1f68aa43702f1343c9638715464dea7d140e6b4d068d9688"/>
    </case>
    <case nodetarget="ConsumeFooLacking">
     <condition name="isFooEnough" argno="1" isnegated="true" unionhash="6e63ce5
59f96ef9b1f68aa43702f1343c9638715464dea7d140e6b4d068d9688"/>
    </case>
   </successor>
   <successor name="erste">
    <case nodetarget="Src"/>
   </successor>
  </successorlist>
 </node>
 ... 
</flow>


  • OFluxGenerate.h: A header file declaring the needed node and conditional functions which includes the application's mImpl.h header file (which defines the types used).  This header should be included in the application's .cpp source.  It should not be necessary to understand all of the mechanics inside of the OFluxGenerate.h file.  Using the node declarations inside of web.flux, each node N gives rise to types N_in, N_out and N_atoms which are needed for the C++ prototype  int N(const N_in *, N_out *, N_atoms).
  • OFluxGenerate.cpp: The generated C++ code needed to bind the OFlux program to the run-time.  This is where the static tables used to look-up symbols read from the XML file (web.xml) live.
Building a complete application means OFlux compiling/C++ compiling/linking/running with these outputs.  Errors can crop up at any of those stages, so it is typical to have a tool like Gnu make build the whole project -- even verifying that the XML file is loadable (properly gets its symbols from the OFluxGenerate.cpp code)


Some Limitations and Philosophy


In the above example there are plenty of things for the oflux compiler to check for us.  When a node is connected to another as a successor, the compiler does an asymmetric unification of the output set with the input set.  This means that each input should exist as an output from the predecessor.  Generally this is done using the name of the argument (so matching foo with foo in my example), but if there is a mis-match in naming an attempt is made to unify using types.

If the names of the formal parameters do not match there are consequences in the generated code -- a C union is necessary to give the field two names (this is trouble if any of the argument types is non-POD, and that happens quite a bit with C++ code).  When unification fails, the offending node and its argument is indicated by the oflux compiler (the first error causes the compilation to halt).


OFlux adds another dimension to the task of programming a server.  A new server design will have nodes and flow to go with it.  My consistent impression working with developers who are new to the tool is that many many nodes end up being defined (much like functions are used in C++).  Making a flow very long (lots of nodes from source to any sink), can be quite detrimental to performance if most of the nodes are doing less work than the overhead to execute them.  My best advice is to try an initial flow with the absolute least number of nodes possible, and then investigate refining that program into more nodes in a step-wise manner.  Keep it as simple as possible!


Routing to Many Places


Occasionally concurrency is needed in the flow since an output is consumed by multiple nodes, and its inefficient to have them each process that input (a foo perhaps) sequentially.  The OFlux run-time keeps a node event (which holds its output data) alive using reference counting, so it is possible to keep a node event around long enough to be processed by multiple successor node events.

If we wanted to modify our example above to have DisposeFoo be an abstract which aliases two nodes we want to run at the same time (ReclaimFoo and CommunicateFoo).  This could be done as follows (replacing the C++ function implementation we have for DisposeFoo with implementations for ReclaimFoo and CommunicateFoo:


 node abstract DisposeFoo (Foo *foo) => ();
 node CommunicateFoo(Foo * foo) => ();
 node ReclaimFoo(Foo * foo) => ();
 DisposeFoo = CommunicateFoo & ReclaimFoo;

Using this technique, the completion of a ConsumeFooEnough event will cause two new events to be created: one for CommunicateFoo and another for ReclaimFoo.  If these nodes were detached or had a shimmed system call in them, they could both dispatch at the same time and run concurrently.


Summary



Using the basic composition syntax within OFlux an application developer can describe the top-level flow of events in his program without having to explicitly manage a thread pool themselves.  The language offers three main types of basic composition:


  1. sequential (using ->)
  2. concurrent (using &)
  3. choice using (using :[ ? ]matching)

Wednesday, July 25, 2012

OCaml: Hash Anything

Tasty


OCaml has a generic marshalling code within it, and it has MD5 Digest code.  In this post I will simply compose the two of them to achieve a hash function which works on any object (a handy magical thing to have):


 % ocaml
        Objective Caml version 3.10.2

 # Marshall.to_string;;
 - : 'a -> Marshal.extern_flags list -> string = <fun>
 # Digest.string;;
 - : string -> Digest.t = <fun>
 # Digest.to_hex;;
 - : Digest.t -> string = <fun>

Just entering the component functions you can see the types of each of them when they are entered into ocaml REPL (in fact, ocaml returns symbolname : type = value on each evaluation).  Note that ;; is needed to get it to evaluate since it takes multi-line input (OCaml does not require indentation for scoping -- a "feature" that some languages make use of):


 # let hash x =
    let m_x = Marshal.to_string x [] in
    let d_m_x = Digest.string m_x
    in  Digest.to_hex d_m_x;;
 val hash : 'a -> string = <fun>

Now we can use this hash on all kinds of things (with any type):


 # hash [1;2;3];;
 - : string = "64cb37afbe72effe97fb4f089a82f9b2"
 # hash 4.5966;;
 - : string = "5eed266efb8593218f1cb1cacf1f8d89"
 # hash "Ocaml rocks!";;
 - : string = "f078e0f37bce6c8a83112f9461bf7544"

It is almost too simple.  Writing a meta-programming monster hash function in C++ is much less enjoyable.  Some have criticized the OCaml library and its incompleteness (compared to Java perhaps) as a reason for its less wide adoption.  Now that OCaml-java is a project, that excuse has less weight. Hopefully some of the parametric polymorphism available to the OCaml bundled libraries finds its way into that project.  As usual, choosing the proper tool for any job is an important step in getting it done.

Tuesday, July 24, 2012

TIPC: Replication Log Example

Tasty


Losing customer requests is terrible, at least for the applications that I work on.  Some applications can survive request loss by pushing the problem to the customer -- having them sort out the consistency problem and re-submit the request if necessary.  Here, I want to describe a solution I am working on to avoid request loss and leveraging the reliability of replication within a cluster to accomplish this goal.

The service of a user's request involves interacting with (possibly changing) the state of the world (persistent data on the server).  Each request received by the server needs to immediately be persisted reliably, so that only in the event of a catastrophic failure do we lose that request.  This means that we replicate that request to multiple nodes within the cluster or attempt to preserve it on a reliable storage device/appliance.

After investigating reliable storage with low latency, I came to the conclusion that I/O devices like these are very expensive and tend to be (for cheaper ones) installed in a single computer .  If you lose that computer, asking your server facility to pull a card out and put it into another machine does not lead to a zero down-time solution.  An expensive appliance sitting on your network can accomplish the goal of persistence, but then you have already opened the door to communicating on the network to do persistence.  You might as well persist by replicating the data to multiple nodes in your cluster.


Persistence by Cluster Replication



Consider the following picture where a service which has received the user request G and replicated to several instances of another service S which just stores and acknowledges the data:



Each of the pink boxes is a separate machine/node in your cluster with a separate power supply and UPS (hopefully) so that you can be reasonably certain that a minor disaster only affects one node.  In the terms used by Greg Lindahl in his talk on replication (which goes into how it is just a way better option than investing in RAID once you start to think about clustered architecture) we are achieving R3 replication.

The strategy is to receive the customer request in G, annotate it with some immediately available state into a message (e.g. timestamps, sequence numbers -- back away from that database handle hot-shot!) and send it to S with an increasing message sequence numberS receives this stream of incoming messages i, (i+1), (i+3), ... , and "stores" them into an sequence of memory mapped archive files which are each just N-element message arrays by placing message i into mapped file (i/N) at position (i mod N)

By using memory mapped files we get two excellent side-effects.  First, the virtual memory system of the node running S will eventually permanently persist the mapped file to disk for us on its own.  Second, we have the speed of a direct in-memory recv() into that mapped memory (assuming that the mapping of new archive files into memory is somehow done for us using another thread.  In order to get the multicast send/recv done, I am going to use TIPC and its reliable multicast socket which I discussed in an earlier posting.

Once the message is properly received, an extra word in the S log is set to indicate "valid data", and an acknowledgement (containing the sequence number) is multicast on a separate channel back to G.  Ideally, G has a thread for processing customer requests and sending S messages, a rotating buffer to keep those "in the process of persistence" messages, and a second thread which deals with acknowledgements (implementing an "ack" policy which indicates how much persistence is enough).  Having ensured persistence by the time the policy is fulfilled in this second thread, G is free to send some further acknowledgement to the customer ("we have it!").

Losing an S


Consider what happens when we lose a node that runs an S:


In this case we degrade to R2 replication, and we start to get nervous.  Fortunately our cluster resource manager (a.k.a Pacemaker for my setup) notices this and starts up another S somewhere else:



The fresh S has empty archives however, so it needs to immediately start listening to G and asking the other Ss to fill it in on what has been going on.  This is done by implementing a second thread in S dedicated to peer-to-peer replay of existing archive data for the benefit of the newcomer.

Losing a G


If G restarts or needs to be run fresh on a new node since the node it was running on fails, then it will need to come up and ask the network of Ss what the last message sequence number was (so that it can start generating new messages which do not overlap or have gaps with that).  A fourth TIPC reliable multicast channel is dedicated to this purpose to serve that information.  G waits a specified period (I think 1 second) to receive as many answers to that question as possible and uses the maximum response sequence number to begin its new stream.

It is possible to lose a request when G is lost, but that request will not have been acknowledged.  The low latency characteristics of G, make the likelihood of losing customer traffic which has yet to be received by G on the front-side more likely.

Benefits


There are a number of benefits to this scheme:

  1. With low latency we are certain to reliably persist the customer's request
  2. No relational databases are disturbed which might have unpredictable latency profiles (adversely affecting performance)
  3. Replication scales well in a multi-cell sense (sharding) on the cluster
  4. Each node with an S running could have an API to walk the memory mapped archive files independently (without disrupting S) and (possibly) do logarithmic (in space) searches on the data stored there
  5. Minimal indexing is maintained -- its just a binary log of same-sized messages we are replicating -- so it does not affect the performance.
  6. Less reliance on non-volatile storage and its failure characteristics (spinning rust as they call it), in favour of RAM-based storage
  7. Leveraging the efficiency of virtual memory to chose the moment and method to "sync" data to non-volatile storage.
TIPC's reliable multicast really shines for this problem, since it enables a to have services at cluster level (the detail of which node they run on is not necessary for the programmer to delve into), and the ability to reliably send message-oriented packets over multicast to just the nodes who are listening is a big win.

Monday, July 23, 2012

OFlux: Detached Nodes

Tasty



Previously, I described the anti-pattern of blocking while locking, and also how it is that the OFlux run-time escapes this pitfall.  If a node in your program tends to do two or more blocking system calls, each one will be intercepted by the shim mechanism to give up the run-time's main mutex.  The context switching in a case like that could be optimized if the node does not cause side-effects within the rest of the program (mostly modifying non-local state).  The optimization is to enter the node C++ function as if it were one big system call (releasing the main run-time mutex for the duration of the function's execution).  This saves context switching on the mutex and conceivably increases the concurrency in the program (nodes events for these detached nodes are now able to run independently of the run-time more often).  Here is how we augment the basic state diagram for each run-time thread to accommodate this idea:



For nodes that are declared detached, the ability to run in the new mode (dotted line box on the right side) is available when the run-time sees that there are enough threads to allow this.  These two dotted-boxes indicate the states where the thread is not holding the main run-time mutex.

Example: Sleeping Beauty and the Seven Dwarves


Within a working copy of the OFlux Github repo, you can create a new directory called src/examples/dwarves with the following ex-contents.mk make file:

$(info Reading ex-contents.mk $(COMPONENT_DIR))

OFLUX_PROJECT_NAME:=dwarves

include $(SRCDIR)/Mk/oflux_example.mk

$(OFLUX_PROJECT_NAME)_OFLUX_CXXFLAGS+= -DHASINIT -DHASDEINIT

The dwarves.flux file describes the flow:

node SnowWhite () => (int apple_id);
node Dwarf (int apple_id) => ();
source SnowWhite -> Dwarf;

The C++ code for these nodes is pretty simple. Every 0.10 ms SnowWhite sends out an apple, and a Dwarf picks it up and does ten 0.10 ms sleeps in order to consume it (in mImpl_dwarves.cpp):

#include "OFluxGenerate_dwarves.h"
#include "OFluxRunTimeAbstract.h"
#include <sys/time.h>
#include <unistd.h>
#include <cstdlib>

long dwarf_count = 0;
extern oflux::shared_ptr<oflux::runtimeabstract> theRT;

int
SnowWhite(const SnowWhite_in *
        , SnowWhite_out * out
        , SnowWhite_atoms *)
{
        static int apples = 0;
        out->apple_id = apples++;
        if(apples>10000) {
                theRT->hard_kill();
        }
        usleep(100);
        return 0;
}

int
Dwarf(    const Dwarf_in * in
        , Dwarf_out *
        , Dwarf_atoms *)
{
        __sync_fetch_and_add(&dwarf_count,1);
        for(size_t i = 0;i < 10; ++i) {
                usleep(100);
        }
        return 0;
}

I have also added code to produce statistics when the program exits:
struct timeval tv_start;

void
deinit()
{
        struct timeval tv_end;
        gettimeofday(&tv_end,0);
        double total_time = tv_end.tv_sec-tv_start.tv_sec
                + (tv_end.tv_usec - tv_start.tv_usec)
                   /1000000.00;
        double dps = dwarf_count / total_time;
        printf("ran %lf seconds, dispatched %lf "
               "dwarves per second\n"
                , total_time
                , dps);
}

void
init(int argc,char * argv[])
{
        atexit(deinit);
        gettimeofday(&tv_start,0);
}

As is, the dwarves.flux flow will produce the following output on my Asus 1000HE netbook (which has 2 hardware contexts and 1 core):

 # ./builds/_Linux_i686_production/run-dwarves.sh \
   2> /dev/null  | grep ran
ran 5.109480 seconds, dispatched 1957.146324 dwarves per second

Detaching Dwarves


But if we make the Dwarf node detached (which I claim will likely be of benefit since the usleep shimmed system call will be called less frequently:

node SnowWhite () => (int apple_id);
node detached Dwarf (int apple_id) => ();
source SnowWhite -> Dwarf;

Re-running the test, we can see that we are running a little faster:

 # ./builds/_Linux_i686_production/run-dwarves.sh \
   2> /dev/null  | grep ran
ran 3.468819 seconds, dispatched 2882.825538 dwarves per second

So detaching nodes can pay off handsomely if it is safe to do so, since it reduces the in and out of the main run-time mutex.  It is unsafe to do this if there is something about the node source code which makes it unsafe (e.g. mutating non-local state).  Detached nodes are also useful when making calls to 3rd party libraries which (themselves) have mutexes -- in order to avoid a deadlock with the run-time mutex.

Friday, July 20, 2012

TIPC: communication on a cluster

Tasty.


Inter-process communication on a cluster is an interesting niche.  10Gbe networking hardware is not just capable of better bandwidth, but also has lower latency.  Using the network as an inter-process communication mechanism within a cluster makes sense since two processes which need to talk may not be running on the same node.  In fact, it is likely a very good idea in some instances (e.g. replication) that such processes do not run on same cluster node.

Most software people have some experience with using TCP/IP since its kind of running the show on the internet (HTTP for instance).  Less common, is the use of UDP which provides less guarantees (reliable delivery), but has the added flexibility of being connection-less so it can be quite good for broadcasting information.  Both of these options are somewhat legacy, and other alternatives have been imagined specifically for running on single-hop networks in the context of clusters (so LAN distances). 

TIPC (transparent inter-process communication) is one such option which was invented by an engineer at Ericsson Canada named Jon Paul Maloy.  It is an open source project now, and has several contributors now.

Alternatives


There are alternatives to TIPC which are similar (I only know Spread -- as of version 4.0.0 -- very well from experience) and may be worth investigating as well:

  • RDS (Reliable Datagram Sockets)
  • PGM (Pragmatic General Multicast)
  • QSM (Quicksilver Scalable Multicast)
  • Spread Toolkit

Spread toolkit is a user-space solution which consists of a daemon running on every node, a library for applications to talk to it, a messaging semantics that is based on groups (similar to the concept of IRC actually) and a token ring over UDP implemented inter-daemon protocol.  In the documentation of Spread there is some fairly clear language that it is not meant to be used for real-time applications.  Unfortunately, it found its way into some real-time applications that I care about.  One particular pit-fall was running the token ring through several servers and firewalls with different endian-ness (Sparc and x86) and different speed characteristics.  The combination was somewhat of a disaster from a performance perspective since the inter-daemon flow control code seemed to work better when the nodes in the Spread network were more equal.

Linux Installation of TIPC and Version Determination


It was mainly developed on Linux as a loadable kernel module.  You can install it on Debian (my cluster uses Wheezy) with:

 # sudo apt-get install tipcutils

In order to see what version of TIPC you are running you can figure this out using this command:

 # kvers=`uname -r` strings \
  `find /lib/modules/$kvers/ -name tipc.ko` \
   | grep ^version=
version=1.6.4
version=2.0.0

A grep for TIPC in /var/log/syslog is also a good idea after you have the kernel module loaded.  In my case TIPC 2.0.0 was running on the 3.2.0 x86_64 Linux kernel.  Some notes I have on TIPC versions:


  • 1.6.x: had a native API as well as berkeley sockets (via AF_TIPC)
  • 1.7.x: uncommon to find in a linux distro, was ported to Solaris (and I tried that out)
  • 2.0.x found commonly with Linux kernels past 2.6.30, native API was dropped

I think there may be code out there for a 2.1.x release, but I have not tried it.  The main admin interface to TIPC is the tipc-config command which can: report statistics, list open ports, configure bearers, and window sizes.  I gotcha found was that I had to build it from source in order to get it to properly configure the window size on the broadcast link.


Node Addressing and Scope


Nodes in the cluster are addressed using Z.C.N syntax for the triple consisting of the zone (cluster of clusters) Z, cluster C, and node N.  If you are like me and just have the one cluster you live in 1.1.N for all of your nodes (the limit is 4095 of those as well, in contrast to IPs limit of 255).  When opening a socket the API allows the programmer to select the scope of the socket to be node, cluster or zone.  This allows you to specify a local service that is running somewhere in a cluster, but which is accessible to all nodes there.  The same is accomplished with VIP management when using TCP.

Socket Types


The socket types that are supported are:


  • SOCK_STREAM: byte-oriented reliable stream much like TCP
  • SOCK_DGRAM: message-oriented unreliable datagram multicast like UDP
  • SOCK_RDM: message-oriented reliable datagram multicast
  • SOCK_SEQPACKET: message-oriented reliable stream


Message oriented communication is nicer too program with in some ways, since you don't have to deal with partial messages being sent or received (only whole messages do these things).  The largest message size (or cummulative message size with scatter-gather calls like recvmsg/sendmsg) is 66000 bytes.  This number is important to keep in mind since it has an effect on things like the effective MAX_IOV.

SOCK_STREAM is interesting as well since several nodes in the cluster can bind to it to server the same port, which causes TIPC to balance traffic between those two service end points.  You can see how some choices have been made to gear TIPC up for highly available services.

The socket type I found to be the most immediately interesting was SOCK_RDM (message-oriented, reliable multicast).  Being able to send the same message to multiple end points at the same time seems really powerful.  The reliable part requires some explanation.  From my read of various forum postings, the message is reliably delivered to another node when it is acknowledged by that node.  If the message cannot be delivered, it is possible to set things up so that the first 1024 bytes of that message are returned to the sending socket (letting the sender application know that it failed to be received).  The best practice for being sure that an application has received a message is to have that application send an explicit acknowledgement message back to the sender.  For more information please consult the older or newer documentation.

Configuration


I get the module loaded using the following bash script on my cluster (the node machines are node010 and node011 -- which are DRBL assigned names based on thier IP addresses in the 192.168.1.x range):

#!/bin/bash

mussh -h node010 -h node011 -c '
modprobe sfc
export SUFFIX=`hostname | sed 's/node0//g'`
ifconfig eth1 192.168.3.$SUFFIX
modprobe tipc
tipc-config -netid=1234 -addr=1.1.$SUFFIX -be=eth:eth1
lsmod | grep tipc
/home/mark/git/tipcutils/tipc-config/tipc-config -lw=broadcast-link/100
'

Here I am running the TIPC network through 10Gbe eth1 interfaces using the sfc kernel driver. For example cluster node node010 has 1Gbe address 192.168.1.10 on eth0, 10Gbe address 192.168.3.10 on eth1, and this latter interface is a TIPC bearer where the node is known uniquely as 1.1.10.  I find that this scheme is easy to remember.

Functional Addressing


I already mentioned that you can open a socket with cluster scope, which makes it easily accessible to the whole cluster (client sockets connect to the port without specifying a specific node address).  This is very neat since you can easily setup services within the cluster as if it were one big happy computer (ideal for dealing with a cluster).

The concept of functional addressing of ports also allows you to have subranges of services within that port, which allows for messages to be filtered based on how the socket is setup.  Suppose a server opens a {portname,lower,higher} (where lower is less than or equal to higher) he is saying in effect that he is interested in messages addressed to {portname,lower}, {portname,lower+1}, ..., {portname,higher}.  Suppose we had a mini-blogging service running on this where portname is 999 and the instance receives messages for a particular user id -- so listening to {portname,instance} gives you a single user's feed.  A bit of a contrived example, but you get the idea.

Name Subscriptions


Another mechanism exists for programs to subscribe for events which indicate major events, like a program binding to a particular portname and instance range, or a node going down completely.  I have not really played with this functionality very much, but it seems like a very useful way of coding fail-safe scenarios (not sending a message if there is nobody bound and listening).

Summary


TIPC is a really neat idea for communicating processes running in a cluster.  It makes use of several useful abstractions and conveniences which are not available with TCP directly.  It also does away with much complication that TCP needs to deal with for connections which are higher latency (WAN context for instance).  I would be very excited to learn that TIPC got the attention of kernal-bypass stuff that seems to be useful for speeding up networking over 10Gbe network cards.  If this happens, I get the best of both worlds (nice cluster API, and very low latency).


Thursday, July 19, 2012

LVS balancing clustered REST service

Tasty.


Node.js (a single threaded event loop) and LVS (Linux virtual server, a freely available layer 4 IP routing mechanism that is part of the Linux kernel) seem to be a natural fit.  The main advantage of node.js is that you can quickly write your server side code in javascript or coffeescript and get it up and running with in V8 javascript engine on top of node.js.  My main aim is to try out LVS and see how it does its job on my cluster:



The 4 physical machines on my 1Gbe commodity switch are commodity machines running Debian Wheezy.  Test is a Dell with an E8400 dual core chip.  Master is a i5 Dell, and its RealTek 1Gbe chip is doing all the routing work over its VIP (192.168.1.100 for my case).  I installed ldirectord onto the cluster by simply installing it on the Master (who's binaries are accessible to the nodes via NFS):

 # sudo apt-get install ldirectord

It is configured as follows (in /etc/ha.d/ldirectord.cf) :

checktimeout = 5
negotiatetimeout = 30
checkinterval = 10
failurecount = 1
autoreload = no
logfile = "local0"
fork = yes
quiescent = no
cleanstop = no
virtual=192.168.1.100:3000
    real=192.168.1.10:3000 gate
    real=192.168.1.11:3000 gate
    service=http
    request="hello"
    receive="Up"
    scheduler=lc
    protocol=tcp
    checktype=negotiate

I have removed the comments from the above file to make it shorter. The LVS load balancing scheduler choice was lc which stands for "least connections". There are about a dozen alternatives which are interesting.

Next, I used Pacemaker+Corosync to manage the VIPs:

 # crm configure edit

To make our REST service run on the Node1 and Node2 machines (pacemaker is managing the Master, Node1, and Node2 as a single cluster), we add the following lines:

 
 primitive Rest ocf:heartbeat:anything \
        params binfile="/opt/scripts/rest.sh" \
        user="rapt01" logfile="/tmp/rest.out" \
        errlogfile="/tmp/rest.err" \
        op start interval="0" timeout="20" \
        op stop interval="0" timeout="30" \
        op monitor interval="20"
 clone Rest-clone Rest \
        meta ordered="false" clone-max="2" \
        target-role="Started"
 location Rest-loc Rest-clone -inf: Master


To configure VIPs and ldirectord under pacemaker we add the lines:


 primitive RestVip ocf:heartbeat:IPaddr2 \
   op monitor interval="60" timeout="20" \
   params ip="192.168.1.100" lvs_support="true"
 primitive RestVip-lo ocf:heartbeat:IPaddr2 \
   op monitor interval="60" timeout="20" \
   params ip="192.168.1.100" nic="lo" cidr_netmask="32"
 primitive ldirectord ocf:heartbeat:ldirectord \
   op monitor interval="20" timeout="10"
 clone RestVip-lo-clone RestVip-lo \
        meta interleave="true" clone-max="2" target-role="Started"
 colocation rest-vip-coloc inf: ldirectord RestVip
 colocation rest-vip-lo-coloc -inf: RestVip RestVip-lo-clone

Finally I also tweaked the arp settings which I believe steers clear of the "arp problem":

Adding NAT forwarding to Master's /etc/sysctl.conf and running sysctl -p:

  net.ipv4.ip_forward=1
  net.ipv6.conf.all.forwarding=1

And on each of Node1/Node2 a similar edit to /etc/sysctl.conf (with reload) for arp:

  net.ipv4.conf.all.arp_ignore=1
  net.ipv4.conf.all.arp_announce=2

Testing.


Using Apache bench:

 # ab -c 1000 -n 10000 http://192.168.1.100:3000/something

Nice output like this is observed from our Test machine:
This is ApacheBench, Version 2.3 <$Revision: 655654 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 192.168.1.100 (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests


Server Software:        
Server Hostname:        192.168.1.100
Server Port:            3000

Document Path:          /something
Document Length:        167 bytes

Concurrency Level:      1000
Time taken for tests:   1.648 seconds
Complete requests:      10000
Failed requests:        0
Write errors:           0
Total transferred:      3330000 bytes
HTML transferred:       1670000 bytes
Requests per second:    6068.91 [#/sec] (mean)
Time per request:       164.774 [ms] (mean)
Time per request:       0.165 [ms] (mean, across all concurrent requests)
Transfer rate:          1973.58 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0   51 216.3      0    1004
Processing:     1   45  35.2     41     658
Waiting:        0   45  35.2     41     658
Total:          1   96 223.8     43    1270

Percentage of the requests served within a certain time (ms)
  50%     43
  66%     56
  75%     64
  80%     69
  90%    110
  95%    367
  98%   1055
  99%   1066
 100%   1270 (longest request)

Results.


Using Apache Bench with 1000 concurrent connections and 10000 requests, I obtained some data indicating how well LVS performs. My node.js code is a bit disappointing I think since it is not really handling what I think of as high load. I should spare node.js until I have a chance to learn it better and the techniques used to make it scale (other than LVS).


address/keepalive on?Requests/sec avg.mean Latency (ms)95th percentile latency (ms)
VIP/no6068.91164.774367
Node1/no3393.52294.6791040
VIP/yes8639.80115.743126
Node1/yes4447.24224.858225


So latency is roughly halved, and throughput is roughly doubled when we load balance to both Node1 and Node2 (versus just using Node1). The HTTP keep-alive does improve throughput as expected.  My only remaining uneasiness is that node.js, though easy to code, is not performing as well as I had hoped in terms of both latency and throughput.  I may look into something a bit lower level.

Pacemaker + Corosync Play Clue

Tasty.



Clue was a popular board game (and possibly still is) where possible suspects, locations and murder weapons were guessed at in order to solve a crime (which meant finding the correct combination of all three).  In the spirit of this board game, I would like to explain how it is that resources (processes, virtual IP addresses, services) are assigned to cluster nodes by a cluster manager (Pacemaker) according to some easily expressible constraints.  The analogy is that cluster nodes are a bit like rooms, and resources are a bit like people or murder weapons.

Pacemaker is a subproject of HALinux (high availability) which is used to manage resources on a cluster and determine what to do about failures in the cluster (crashed processes and/or failed nodes).  Given a set of constraints (or clues, if you like) it figures out a solution which satisfies those constraints (or comes closest to doing so).  Pacemaker can run on two reliable message buses: Corosync and Heartbeat (the latter is older than the former).  I chose to install Corosync as the message bus, since it seemed to be the new thing -- not always a good reason to pick something though.


Mansion Layout


Our cluster has 3 rooms at the moment:
  1. Kitchen (used as a fileserver mostly) 
  2. Bedroom1 (first compute node)
  3. Bedroom2 (second compute node)
In terms that Pacemaker understands we declare those nodes by editing the configuration using:

 # crm configure edit

And then,

 node Kitchen \
 attributes standby="off"
 node Bedroom1 \
 attributes standby="off"
 node Bedroom2 \
 attributes standby="off"

Characters


A simple service that is maintained by running a process called ProfPlum is added to the cluster with the following lines in the Pacemaker configuration:

 primitive AProfPlum ocf:heartbeat:anything \
 params binfile="/opt/clue/bin/ProfPlum" \
        user="clue" logfile="/tmp/prof.out" \
        errlogfile="/tmp/prof.err" \
 op start interval="0" timeout="20" \
 op stop interval="0" timeout="30" \
 op monitor interval="20"

The ocf:heartbeat:anything part refers to a script which implements  a primitive which is capable of running a simple process resource.  There is a large bundle of similar scripts for other cluster resources and commonly used packages (e.g. IP addresses, databases, file systems). As is, we can start and stop this resource using:

 # crm resource [start | stop | restart] AProfPlum

Locations.


In order to tell Pacemaker that we want to bind this resource to a particular node (in this case Kitchen) we add to the Pacemaker configuration:


 location AProfPlum-loc AProfPlum inf: Kitchen 

We could have just said things a little differently by specifying where we don't want AProfPlum to run using:

 location AProfPlum-not-bedroom1-loc AProfPlum -inf: Bedroom1
 location AProfPlum-not-bedroom2-loc AProfPlum -inf: Bedroom2

If we create similar simple process resource called AColonelMustard to manage a program called ColonelMustard, we can keep AColonelMustard and AProfPlum in the same room using this snippet of Pacemaker config:

 colocation APlum-and-Must-coloc inf: AProfPlum AColonelMustard

Using -inf: instead in the colocation line would have specified that they be kept separated instead of together.

Many Peacocks


Suppose we have a special service we want to run on both Node1 and Node2 called MissPeacock, we can accomplish this with a clone line and a location constraint that keeps the clones out of the Kitchen:

 
 clone AMissPeacock-clone AMissPeacock \
 meta ordered="false" clone-max="2" \
        target-role="Started"
 location AMissPeacock-loc AMissPeacock-clone -inf: Kitchen

House of Cards


To check the status of the cluster we type:

 # crm status

or monitor it persistently:

 # crm_mon

To fail a node (e.g. Bedroom1) to do maintenance on it we request:

 # crm node standby Bedroom1

and bring it back online:

 # crm node online Bedroom1

Summary


Pacemaker+Corosync provide a declarative way to describe the services and resources that need to be maintained in the cluster.  Using a resource list and constraints, resources are distributed amongst the available nodes.  Cluster nodes and resources are also easily taken down or restored.  There are many tools which can do these tricks on a single box, but on a cluster it is quite something to see.

To learn more about Pacemaker, there are nice documents explaining webserver installations for SUSE Linux and from Cluster Labs.

Tuesday, July 17, 2012

Anti-pattern: Lock and Block

Poison.


Never hold a lock and then block waiting for I/O. Just do that one thing, and you are mostly out of the woods. Even if your "multi-thread" program ultimately serializes to the equivalent of one thread because of your synchronization choices, doing that one thing (not locking while blocking) should keep your application at above average awesome.

Threads.


So how does locking work?  Why is it done? What is a thread anyway?

So many questions for a neophyte C++ developer to ask.  A thread is an execution context (stack, program counter, thread local storage) within a process which shares the address space (so global variables, binary code, and text sections) with other threads in that process.  The kernel schedules each runnable thread in the program on the physical cores that the machines has.  A core?  They told you about how many of those you've got when you bought your computer -- I know it sounded esoteric at the time.  Each core is capable of moving a thread along through the code, changing the state of the program.  As time progresses, machines have more cores, and software is written to have more threads which make use of that added hardware/machinery.

As time progresses (downwards in the below diagram) two threads spend time on a core (shown in blue), time off the core (shown in green), and time waiting for a blocking system call to return:


Since threads progress through these phases of doing serious number crunching, waiting for system calls to return, mutexes/semaphores to be released by other threads and acquired by this thread, and just waiting its turn for time on a core, having more threads than cores makes sense.  Unless all the thread does is number crunching in local memory, it does not spend all of its time on a core.

Locking.


When threads want to share memory (say a structure like hash table or a vector or a queue), the textbook way of making that safe (it is unsafe since partial state changes on that memory -- when interleaved -- might transition the structure into a state that should be unreachable or invalid) is a lock of some sort (a mutex usually).  Read and write operations on structures shared by threads use locks to serialize -- allowing uninterrupted access for the duration -- the operations and keep things safe:



Poison.



Serialization costs something.  All this co-ordination has a bit of overhead, and it slows both the threads down as they hit the brakes more often to wait for access to the shared structures.  If the duration of the lock only has some very fast number crunching or local memory (cache) access, then things aren't so bad.  The time in the critical section is so short, the penalty is not onerous.  Things get epic-ly awful if there are blocking system calls happening in a thread while those locks are held:


This can quite terrible for threads waiting for their turn with the lock, since these threads collectively go to 0 percent, I/O ends up dictating the speed of progress.  This is not what you want.

Tasty.


In the OFlux run-time, blocking system calls are shimmed via interposition so that the main run-time mutex is not held for the duration of those system calls.  It avoids the "big faux pas" by design:


Other event-based systems do similar tricks to avoid the same problem.  Hopefully this post helps explain what the problem is with blocking whiling locking,  and how life is better (by design) using a run-time which side-steps the issue completely.

[O]Flux: En garde!

Tasty.


Even event-based programs tend to have some state.  Its best if that state is minimal, and shared as narrowly as possible, accessed as briefly as possible and viewed with suspicion when evaluating performance bottlenecks.  Message passing systems like Akka or Erlang tend to avoid this problem my locating state within one thread which runs the Actor which owns that state.  That has some advantages for that model, but it lacks some flexibility when we consider that machines have lots of memory these days and that memory is meant to be shared (safely of course).

The way this problem was solved in Flux was with atomic constraints.  In OFlux these were generalized to guards (at first there were only exclusive guards).  Suppose we had a object foo of type Foo which we wanted to allow multiple node functions to access -- but only one event at a time.  We would declare an exclusive guard to do this job in our OFlux code as follows:


  exclusive aFooGuard () => Foo *;

A node Bar would would be granted access to the Foo * & underlying this using a declaration like:

  node Bar (InputType1 anInput1,..., guard aFooGuard()) 
     => (OutputType1 anOutput1,...);

At the C++ level our implementation of Bar will have an argument atoms which can be used to get our Foo * & with atoms->aFooGuard().  A node event which has a guard in its declaration like this is said to acquire the guard before entry and it releases it after the node event has executed.  Initially this will be a reference to a NULL pointer, but we can change that since it is a reference (allocate a new Foo).  Without the "right hand value" part to store a Foo *, you have basically what Flux used for atomic constraints.

Furthermore, OFlux extends atomic constraints by allowing some data to be stored and even de-reference an index to get at that data.  Consider what happens if we add a couple of keys to aFooGuard (which has a map data structure inside of it to keep track of these "right hand values":


  exclusive aFooGuard(int k1, long k2) => Foo *;

Now when we acquire the guard with a node declaration we have a chance to use some of the input arguments of the node to formulate key values:


  node Bar (InputType1 anInput1,InputType2 anInput2, 
      ...,
      guard aFooGuard(inthash(anInput1),longhash(anInput2))) 
   => (OutputType1 anOutput1,...);


The C++ functions inthash() and longhash() are not necessary, they just illustrate how we could use code buried in our C++ application to massage the inputs into appropriate keys. The semantics of an exclusive guard is that a Bar event gets exclusive access to the "right hand value" at the particular key combination it is using. For instance, one Bar event might hold aFooGuard(0,0) while another simultaneously works with aFooGuard(0,1). Finer grain keys lead to more possibilities when considering which events could run concurrently.

The decision to grab a guard does not need to be made universally for a particular node. It is possible to grab the guard conditionally on its input values:


    node Bar (InputType1 anInput1,InputType2 anInput2,...,
        guard aFooGuard(inthash(anInput1),longhash(anInput2))
            if test(anInput1)) 
     => (OutputType1 anOutput1,...);

Only if test(anInput1) returns true will the guard be acquired. The Bar C++ function is made aware of whether the guard is properly acquired and held via convenience property atoms->have_aFooGuard().

Guards can also be given precedence which will indicate to the run-time the order in which they should be acquired.  The run-time will use that ordering (complaining at compilation if the ordering is not well-formed -- since a cycle is detected in its relation) when processing node events.  All the required guards for an event need to be acquired before it can be allowed to execute.


  exclusive aBazGuard Foo * => Baz *;

  precedence aFooGuard < aBazGuard;

When grabbing multiple guards it is possible to use the "right hand value" of one guard as a key to another guard.  This sets up an implicit precedence (one that does not need to be made explicit by the programmer), and it makes the second acquisition conditional on the first having a non-NULL "right hand value":

    node Bar (InputType1 anInput1,InputType2 anInput2,...,
        guard aFooGuard(inthash(anInput1),longhash(anInput2))
            as foo if test(anInput1),
        guard aBazGuard(foo) as baz) 
     => (OutputType1 anOutput1,...);  

The syntax "as foo" lets us give a local name to the acquired guard so that it can be referenced in the key the subsequent aBazGuard acquisition. By necessity, we have to acquire the aBazGuard(foo) after foo and only when foo is not NULL.

OFlux supports (via  macros) the ability to pre-populate a guard with data before the run-time is allowed to start.  It is also possible to use similar macros to walk the guard's entire contents (all keys and values) in circumstances where you can be sure that it is safe.

Exclusive guards are just one of the acquisition semantics available within the zoo of guards.  Other semantics are available and they affect the number of node events which can hold them (at the same key-set values):


Guard Type (keyword) Pre-populated? Semantics
exclusive not needed At most 1 node event per unique keys combination
free probably no restrictions. useful for read-only cases like configuration
readwrite not needed read: only other readers allowed
write : only one node event (no readers)
upgradeable : read when non-NULL, write otherwise
pool required from a prepopulated pool for each key-set, as many node events can run as there are items in the pool

The readwrite guard is special in that a mode (read/write/upgradable) needs to be specified in the node declaration in order to control the type of acquisition that is done.  A read-only acquisition provides a const and non-reference access to the "right hand value".

Summary


Guards provide an abstraction to co-ordinate control of resources (shared data, or other things) within an OFlux program.  They support a variety of concurrent access semantics, can be acquired conditionally, pre-populated when initializing the program, and use keys similar to hash tables.  These have evolved within the history of the OFlux tool from practical use-cases that application developers had at various times.