Skip to content

Commit

Permalink
create parallel.js, seed for new package
Browse files Browse the repository at this point in the history
  • Loading branch information
luciotato committed Dec 5, 2013
1 parent 55822c0 commit 73965d9
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 97 deletions.
4 changes: 0 additions & 4 deletions samples/blogServer/blogTemplate.html
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,4 @@
{{ content }}
</body>

<button onclick=launch()>Launch Ajax Requests</button>
<div id=parallel>
</div>

</html>
4 changes: 3 additions & 1 deletion samples/blogServer/run
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
node --harmony --debug waitfor-demo
node -v
echo NODE version should be at least 0.11.6
node --harmony --debug server
108 changes: 108 additions & 0 deletions samples/blogServer/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
var fs = require('fs');
var http = require('http');
var dns = require('dns');
var wait = require('../../waitfor');

//------------------------------
// TEST APP - dummy blog server
//------------------------------

function translateAsync(text, callback){
//let's asume this fn make api calls for translation, etc.
// and can take a while
setTimeout(function(){
if (Math.random()>0.8) return callback(new Error('ocassional error, translate server down, catch it! - try again'));
return callback(null,'<i>Latin:</i>'+text);
}
,200)
}

//------------------------------
function formatPost(post, callback){

var lines=post.split('\n');

var title = lines.shift();
var subTitle = lines.shift();

var result = '<h1>'+ title +'</h1>'
+ '<h2>'+ subTitle +'</h2>';

result += '<p>'+lines.join('</p><p>')+'</p>';

return result;
}

//------------------------------
function composeTemplate(css, template){
var composed=template.replace('{{ css }}', css);
return composed;
}

//------------------------------
function applyTemplate(template, content){
return template.replace('{{ content }}', content);
}

// Get client IP address from request object ---
function getClientAddress(req) {
return (req.headers['x-forwarded-for'] || '').split(',')[0]
|| req.connection.remoteAddress;
};

//------------------------------
// REQUEST HANDLER (generator, sequential)
function* handler(req,res){ // function* => generator

try {

console.log('request for',req.url, ' from ',getClientAddress(req));
switch(req.url){

case "/": //blog server

res.writeHead(200, {'Content-Type': 'text/html'});
var start = new Date().getTime();
//console.log(start);

//read css (wait.for syntax)
var css = yield wait.for(fs.readFile,'style.css','utf8');

//compose template
var template = composeTemplate ( css, yield wait.for(fs.readFile,'blogTemplate.html','utf8') );

//read post (fancy syntax)
var content = yield [fs.readFile,'blogPost.txt','utf8'];

//translate post (log operation,api call, normally a callback hell)
var translated = yield wait.for(translateAsync,content);

res.write(applyTemplate(template, formatPost(translated)));

//another async call
res.write('google ips: '+JSON.stringify(yield wait.for(dns.resolve4, 'google.com')));

return res.end();

default:
response.statusCode = 404;
return res.end();
}

}
catch(err){
res.end('async ERROR catched: '+err.message);
}
}

//----------------
// Main

var server = http.createServer(
function(req, res){
console.log('req!');
wait.launchFiber(handler,req,res); //handle requests in a "fiber"(generator), allowing sequential programming by yield
}).listen(8000);

console.log('server started on port', 8000);

20 changes: 12 additions & 8 deletions test/parallel-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,24 @@ wait.helper.fiberForItem = function*(asyncItemFn,item,inx,result,finalCallback){
}
};

//async parallel map
parallel.map = function(arr,asyncItemFn,finalCallback){
// -------------
// parallel.map
// -------------
// mapFn will be started for each item
// when all the functions complete, finalCallback will be called
//
// parallel.map can be waited.for, as in:
// mappedArr = yield wait.for(parallel.map, arr, translateFn);
//
parallel.map = function(arr,mapFn,finalCallback){
//
// asyncItemFn = function(item,callback) -> callback(err,data) returns item transformed
//
// can be called with yield, as in:
// mappedArr = yield wait.for(parallel.map,arr,testFn);
// mapFn = function*(item,index) -> returns item transformed
//
var result={arr:[],count:0,expected:arr.length};
if (result.expected===0) return finalCallback(null,result.arr);

for (var i = 0; i < arr.length; i++) {
wait.launchFiber(wait.helper.fiberForItem
,asyncItemFn,arr[i],i,result,finalCallback);
wait.launchFiber(mapFn,arr[i],i,result,finalCallback);
};

};
Expand Down
151 changes: 151 additions & 0 deletions test/parallel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/* parallel-ES6 - based on ES6-harmony generators
- Sequential programming for node.js - end of callback hell
- Copyright 2013 Lucio Tato
-- WARNING: bleeding edge --
This lib is based on ECMAScript 6,
the next version of the javascript standard, code-named "Harmony".
(release target date December 2013).
This lib also uses bleeding edge v8 Harmony features, so you’ll need to
use the latest -unstable- nodejs version wich today is v0.11.6
and also pass the --harmony flag when executing node.
node --harmony parallel-demo.js
check the complementing lib wait.for-ES6 at http://github.com/luciotato/waitfor-ES6
*/
"use strict";

var Parallel = {

// parallel.runFiber
// runs fiber until completion. (a fiber is a generator yielding async fns and params)
// On completion, callback.
//
// same usage:
//
// parallel.runfiber(processData(myData),
// function(err,data){
// console.log('process ended, result:',data)
// });
//
runFiber: function(generator,finalCallback){

// helper function, call the async, using theCallback as callback
function callTheAsync(FunctionAndArgs,theCallback){
// prepare arguments
var argsOnly=Array.prototype.slice.call(FunctionAndArgs,1); // remove function from args & convert to Array
argsOnly.push(theCallback); //add callback to arguments
// start the asyncFn
FunctionAndArgs[0].apply(null, argsOnly); // call the asyncFn, (strict: this=null)
}

// create a closure to act as callback for the yielded async calls in this generator
// this callback will RESUME the generator, returning 'data' as the result of the 'yield' keyword
generator.defaultCallback=function(err,data){

// on callback:
// console.log('on callback, err:',err,'data:',data);

// if err, schedule a throw inside the generator, and exit.
if (err) return generator.throw(err);

// data:
// return data as the result of the yield keyword,
// and RESUME the generator. (we store the result of resuming the generator in 'nextPart')
var nextPart = generator.next(data); // generator.next => RESUME generator, data:->result of 'yield'

//after the next part of the generator runs...
if (nextPart.done) {//the generator function has finished (executed "return")
finalCallback && finalCallback();
return; //it was the last part, nothing more to do
}

// else...
// not finished yet. The generator paused on another yield,
// with another call to: wait.for(asyncFn,arg1,arg2...)
// so in nextPart.value we have wait.for's arguments
// Let's call the indicated async
callTheAsync(nextPart.value, thisIterator.defaultCallback);
// the async function callback will be handled by this same closure (thisIterator.defaultCallback)
// repeating the loop until nextPart.done
};

// starts the generator loop
generator.defaultCallback();

}


// -------------
// parallel.map
// -------------
// generatorFn will be run for each item
// when all the generators complete, finalCallback will be called
//
// parallel.map can be waited.for, as in:
// mappedArr = yield wait.for(parallel.map, arr, translateFn);
//
,map : function(arr, fiberFn, finalCallback){
//
// fiberFn = function*(item,index,arr) -> returns item mapped
//
var result={arr:[],count:0,expected:arr.length};
if (result.expected===0) return finalCallback(null,result.arr); //early exit

var taskJoiner=function(inx,data,result){
//console.log('result arrived',inx,data);
result.arr[inx]=data;
result.count++;
if (result.count>=result.expected) { // all results arrived
finalCallback(null,result.arr) ; // final callback OK
}
};

//main
for (var i = 0; i < result.expected; i++) {
Parallel.runFiber( fiber(arr[i],i,arr)
//as callback, create a closure to contain the index and store result data
,function(err,data){
if (err) return finalCallback(err,i); //err
taskJoiner(i,data,result);
});
};

}

// ----------------------
// parallel filter
// ----------------------
,filter : function(arr, filterGeneratorFn, finalCallback){
//
// filterGeneratorFn = function*(item,index,arr) -> returns true/false
//
// usage:
//
// parallel.filter(myArr, filterFn,
// function(err,data){
// console.log('process ended, filtered result arr:', data)
// });
//
// parallel.filter can be waited with wait.for, as in:
//
// filteredArr = yield wait.for(parallel.filter,arr,filterFn);
//
// main: parallel call filterGeneratorFn on each item, store result (true/false)
Parallel.map( arr, filterGeneratorFn
,function(err,testResults){ //when all the filters finish
if (err) return finalCallback(err);
// create an array for each item where filterGeneratorFn returned true
var filteredArr=[];
for (var i = 0; i < arr.length; i++) {
if (testResults[i]) filteredArr.push(arr[i]);
};
finalCallback(null,filteredArr);
});
}

};

module.exports = Parallel; //export
Loading

0 comments on commit 73965d9

Please sign in to comment.