Friday, August 10, 2012

Async Mutex – Control flow in asynchronous functionality

 

Hi All,

In this post I am going to continue talking about asynchronous execution. Having read about async one might think that mutual exclusion and flow control structures are no longer required but this is not the case. For async execution you need similar structures with the async twist to them. In this post I am going to present a simple case where an async mutex is needed and used.

The scenario is quite simple. Through async calls you acquire some data structure, calculate some value and add it to the data structure (which affects all subsequent calculations). In my example I would like to calculate the first 7 elements of the Fibonacci series. For the sake of the argument I will assume that each operation must be performed asynchronously.

(Sorry C# guys, this code is in JavaScript).

 var data = [0, 1];

function retrieveData(callback) {
var x = data[data.length - 1];
var y = data[data.length - 2];
setTimeout(function() {
callback(x, y);
}, 0);
}

function calculateSum(x, y, callback) {
var sum = x + y;
setTimeout(function() {
callback(sum);
}, 0);
}

function addSum(sum, callback) {
data.push(sum);
setTimeout(callback, 0);
}

//Adds count fibonnacci elementd to data
function fibonacci(count) {
var i;
for ( i = 0; i < count; i++) {
retrieveData(function(x, y) {
calculateSum(x, y, function(sum) {
addSum(sum, function() {
console.log(data);
});
});
})
}
}

So what do we have here…  The next Fibonacci number is calculated through three async calls. The first call to retrieveData takes the last two numbers on the array and returns them through an async call to a callback. Think of it as retrieving a record from a remote repository or database. Next we have an async call that does the “logic”. It calculates the sum of the two numbers through the calculateSum function. The sum itself is then passed to a callback which adds it back to the original array, again through an async call. When I run this code on V8 I get the following result:


fibonacci(5);


5 x [0, 1, 1, 1, 1, 1, 1]


5 times the array you see above. But why? Lets look at the loop and try to trace what happens. The loop runs 5 times without interruptions (we are not multi-threaded). Each run it schedules the call to retrieveData on the event loop. Next, the function fibonacci exits and retrieveData is run five times in a row. At this point all five execution scenarios run on the same two values of the array – 0 and 1. Next, calculateSum and addSum run (each runs five times in a row). The end result is now clear. What we need here is to tell the event loop that it can run the functions asynchronously but only one execution may run within the loop. To this end I have implemented an async mutex:

   App.AsyncMutex = function() {
this.queue = [];
this.ownerToken = null;
this.token = 0;
};

App.AsyncMutex.prototype.enter = function(callback) {
if (this.ownerToken !== null) {
this.queue.push(callback);
} else {
this.token++;
this.ownerToken = this.token;
callback(this.token);
}
}

App.AsyncMutex.prototype.leave = function(token) {
if (this.ownerToken === null || this.ownerToken !== token) {
throw new Error("Owner token mismatch. Expected " + this.ownerToken + " but received " + token);
};

var callback;

if (this.queue.length > 0) {
this.token++;
this.ownerToken = this.token;
callback = this.queue.shift();
callback(this.token);
} else {
this.ownerToken = null;
}
}




The async mutex implementation is very simple. To protect some code simply wrap it with a call to mutex.enter. In the callback you receive a token which you use when you are done with the protected code and you call mutex.leave(token). The mutex assures that only one executer is within the protected code and adds all the others to a queue. Once the executer is done it runs the next executer in the list.


To use the mutex we need only a tiny code change:

      function fibonacci(count) {
var i, mux = new App.AsyncMutex();
for ( i = 0; i < count; i++) {
mux.enter(function(token) {
retrieveData(function(x, y) {
calculateSum(x, y, function(sum) {
addSum(sum, function() {
console.log(data);
mux.leave(token);
});
});
});
});
}
}
Run the code again and the result is:
[0, 1, 1] 

[0, 1, 1, 2]

[0, 1, 1, 2, 3]

[0, 1, 1, 2, 3, 5]

[0, 1, 1, 2, 3, 5, 8]

Exactly as we want it to be.

 

 

Thank you for reading.

Boris.

 


3 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. I really liked it and i have a suggestion: notice that the mutex will usually be on the entire async function flow - from top to bottom, so In fact there is only one function that we want to lock.
    I suggest to have something in underscore style like:


    var atomicFunction = _.atomic(function () {
    db.find(someQuery, function(err, doc) {
    calcSomething();
    db.update(update, function(err, numAffected) {
    })
    })
    });


    for (int i = 0; i < count; i++){
    atomicFunction();
    }


    where the atomic hides the token stuff in your impl using closures.
    Notice that if you want to leave() inside the function - it is impossible and needs your original impl.

    ReplyDelete
    Replies
    1. Hi Ofer,

      This is a nice idea indeed. How do you suggest to return the control flow to the _.atomic function so that it frees the local resources once all the internal functions are done?

      Delete