constructors/async.js

/**
 * @callback AsyncConstructor
 * @memberof async
 * @returns {async.Async}
 */

/**
 * Module used processing data asynchronous
 * @example
 * Application.require('async').then(function (asyncOperations) {
 *	// @TODO
 * }, console.error);
 * @interface async
 * @returns {async.AsyncConstructor}
 * @see async.Async
 */

/**
 * @class
 * @name Async
 * @memberof async
 */
var async	= function () {
	var life = 0;
	var waited	= 0;
	var received	= 0;
	var responsesId	= [];
	var responses	= {};
	var context = {};
	var app	= new ApplicationPrototype();


	/**
	 * return unique index identifier for an operation
	 * @method index
	 * @memberof async.Async#
	 * @returns {string}
	 */
	app.bind("index", ((function () {
		var index = 0;
		return function () {
			return life + '::' + ( ++index );
		};
	})()), '');


	/**
	 * method used for return result for an operation,
	 * returns `true` if value was accepted.
	 * if operation already obtained a value
	 * then value is not accepted and it returns `false`
	 * @method receive
	 * @memberof async.Async#
	 * @param {string} id obtained from {@link async.Async#index}
	 * @param {any} args
	 * @returns {boolean}
	 */
	app.bind("receive", function (id, args) {
		if (id) {
			if (id.indexOf(life+'::') !== 0) {
				return false;
			}
			responses[id]	= args;
		}
		received += 1;
		if (received == waited) {
			app.emit("done");
		}
		return true;
	}, 'on af st');

	/**
	 * require to wait an additional operation
	 * @method wait
	 * @memberof async.Async#
	 */
	app.bind("wait", function () {
		waited += 1;
	}, 'on af st');

	/**
	 * require to reserve index {@link async.Async#index} for an additional operation
	 * @method reserve
	 * @memberof async.Async#
	 * @returns {string}
	 */
	app.bind("reserve", function () {
		waited += 1;
		return app.index();
	}, 'on af st');

	/**
	 * require to run an operation
	 * @method run
	 * @memberof async.Async#
	 * @param {function():void} func function that should be executed
	 * @param {any[]} args 
	 * @param {object} context
	 * @returns {string}
	 */
	app.bind("run", function (func, args, context) {
		var id = app.index();
		responsesId.push(id);
		waited+=1;
		func.apply(context, args);
		return id;
	}, 'on af st');

	/**
	 * reset operation processing
	 * @method flush
	 * @memberof async.Async#
	 */
	app.bind("flush", function () {
		life		+= 1;
		responses	= {};
		responsesId	= [];
		received	= 0;
		waited		= 0;
	}, 'on af st');

	/**
	 * return how many operations are processing right now
	 * @method processing
	 * @memberof async.Async#
	 * @returns {number}
	 */
	app.bind('processing', function () {
		return ( waited - received );
	}, '');

	/**
	 * return operations' responses
	 * @method responses
	 * @memberof async.Async#
	 * @param {boolean} [returnUnknownResponses=false]
	 * @returns {any[][]}
	 */
	app.bind('responses', function (id) {
		if (id === true) {
			return responsesId.map(function (v) {
				return (v in responses ? responses[v] : undefined);
			});
		}
		return responses;
	});

	/**
	 * return all errors found in responses
	 * @method errors
	 * @memberof async.Async#
	 * @returns {Error[]}
	 */
	app.bind('errors', function () {
		var errs	= [];
		responsesId.forEach(function (id) {
			var r = responses[id];
			if (r instanceof Error) {
				errs.push(r);
			} else {
				var i;
				for (i=0;i<r.length;i++) {
					if (r[i] instanceof Error) {
						errs.push(r[i]);
					}
				}
			}
		});
		return errs;
	});

	/**
	 * register a callback to be called when processing is done 
	 * @method done
	 * @memberof async.Async#
	 * @param {function():void} cb
	 */
	app.bind('done', function (cb) {
		if (typeof(cb) === "function") {
			app.on('done', cb);
			if (received === waited && received) {
				cb.apply(app, []);
			}
		}
	});

	return app;
};

/**
 * @typedef {object} Operation
 * @memberof async.Async
 * @property {async.Async.OperationCallback} [0]
 * @property {async.Async.OperationArgs} [1]
 * @property {async.Async.OperationContext} [2]
 * @property {async.Async.OperationCallbackIndex} [3] 
 */

/**
 * a function that represents the operation itself, it have as argument `next` callback, by default it is first.
 * @typedef {Function} OperationCallback
 * @memberof async.Async
 */

/**
 * list if arguments passed to `OperationCallback`.
 * @typedef {any[]} OperationArgs
 * @memberof async.Async
 */

/**
 * context that should be used in `OperationCallback`. Default value is `{}`.
 * @typedef {object} OperationContext
 * @memberof async.Async
 */

/**
 * index of `next()` callback in list of `OperationCallback`'s arguments. Default value is `0`.
 * @typedef {number} OperationCallbackIndex
 * @memberof async.Async
 */

/**
 * @typedef {async.Async.Operation[]} Operations
 * @memberof async.Async
 */

/**
 * @callback async.processCallback
 * @param {function(Error?): void} next
 * @param {any} item
 * @param {number} index
 * @param {any[]} items
 */

/**
 * @callback async.doneCallback
 * @this async.Async
 */

/**
 * @method flow
 * @memberof async.
 * @param {async.Async.Operations} operations
 * @param {async.doneCallback} cb
 * @param {number} [timeout=0] timeout between operations
 * @returns {async.Async}
 */
async.flow	= function (operations, cb, timeout) {
	if (typeof(operations) !== "undefined" && Array.isArray(operations)) {
		var app = new async();
		app.done(cb);
		var c = {};
		timeout		= timeout || 0;
		var i = 0;
		if (operations.length === 0) {
			app.emit('done');
		}
		var tick	= function () {
			if ( i < operations.length) {
				var op	= operations[i++];
				if (op) {
					var ar	= op[1] || [];
					var ai	= op[3] || 0;
					var id	= app.reserve();
					ar[ai]	= function () {
						app.receive(id, arguments);
					};
					var err;
					setTimeout(function () {
						try {
							op[0].apply(op[2] || c, ar);
						} catch (err) {
							app.receive(id, err);
							app.emit('error', [err]);
						}
					}, timeout);
				}
			}
		};
		app.on('onReceive', function () {
			// console.log("tick");
			tick();
		});
		tick();
		return app;
	}
	return;
};


/**
 * @method waterfall
 * @memberof async.
 * @param {async.Async.Operations} operations
 * @param {async.doneCallback} cb
 * @param {number} [parallel=27] number of operations that can be done in parallel
 * @param {number} [timeout=0] timeout between operations
 * @returns {async.Async}
 */
async.waterfall	= function (ops, cb, parallel, timeout) {
	if (typeof(ops) !== "undefined" && Array.isArray(ops)) {
		var app = new async();
		app.done(cb);
		var c = {};
		timeout		= timeout || 0;
		if (typeof(parallel) !== "number") {
			parallel	= 27;
		}
		var i = 0;
		if (ops.length === 0) {
			app.emit('done');
		}
		var tick	= function () {
			if ( i < ops.length) {
				var op	= ops[i++];
				if (op) {
					var ar	= op[1] || [];
					var ai	= op[3] || 0;
					var id	= app.reserve();
					ar[ai]	= function () {
						app.receive(id, arguments);
					};
					var err;
					setTimeout(function () {
						try {
							op[0].apply(op[2] || c, ar);
						} catch (err) {
							app.receive(id, err);
							app.emit('error', [err]);
						}
					}, timeout);
				}
				return true;
			}
			return false;
		};
		app.on('onReceive', function () {
			// console.log("tick");
			while (( parallel === 0 || parallel > app.processing()) && tick()) {
				// true;
			}
		});
		tick();
		return app;
	}
	return;
};




/**
 * @method map
 * @memberof async.
 * @param {any[]} operations
 * @param {async.processCallback}
 * @param {async.doneCallback} cb
 * @param {number} [timeout=0] timeout between operations
 * @returns {async.Async}
 */
/**
 * @method flow_map
 * @memberof async.
 * @param {any[]} operations
 * @param {async.processCallback}
 * @param {async.doneCallback} cb
 * @param {number} [timeout=0] timeout between operations
 * @returns {async.Async}
 */

async.map = async.flow.map	= function (ops, ev, cb, timeout) {
	if (typeof(ops) !== "undefined" && Array.isArray(ops)) {
		var app = new async();
		var c = {};
		timeout		= timeout || 0;
		var i = 0;
		var ret		= [];
		app.done(function () {
			cb.apply(app,[ret]);
		});
		if (ops.length) {
			ret[ops.length - 1]	= undefined;
		} else {
			app.emit('done');
		}
		var tick	= function () {
			if ( i < ops.length) {
				var op	= ops[i++];
				var id	= app.reserve();
				var ri	= i-1;
				var rr	= function (v, err) {
					if (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					} else {
						ret[ri]	= v;
						app.receive();
					}
				};
				var err;
				setTimeout(function () {
					try {
						ev.apply(ops, [rr, op, ri, ops]);
					} catch (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					}
				}, timeout);
			}
		};
		app.on('onReceive', function () {
			tick();
		});
		tick();
		return app;
	}
	return;
};


/**
 * @method waterfall_map
 * @memberof async.
 * @param {any[]} operations
 * @param {async.processCallback}
 * @param {async.doneCallback} cb
 * @param {number} [parallel=27] number of operations that can be done in parallel
 * @param {number} [timeout=0] timeout between operations
 * @returns {async.Async}
 */

async.waterfall.map	= function (ops, ev, cb, parallel, timeout) {
	if (typeof(ops) !== "undefined" && Array.isArray(ops)) {
		var app = new async();
		var c = {};
		timeout		= timeout || 0;
		if (typeof(parallel) !== "number") {
			parallel	= 27;
		}
		var i = 0;
		var ret		= [];
		app.done(function () {
			cb.apply(app,[ret]);
		});
		if (ops.length) {
			ret[ops.length - 1]	= undefined;
		} else {
			app.emit('done');
		}
		var tick	= function () {
			if ( i < ops.length) {
				var op	= ops[i++];
				var id	= app.reserve();
				var ri	= i-1;
				var rr	= function (v, err) {
					if (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					} else {
						ret[ri]	= v;
						app.receive();
					}
				};
				var err;
				setTimeout(function () {
					try {
						ev.apply(ops, [rr, op, ri, ops]);
					} catch (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					}
				}, timeout);
			}
		};
		app.on('onReceive', function () {
			while (( parallel === 0 || parallel > app.processing()) && tick()) {
				// true;
			}
		});
		tick();
		return app;
	}
	return;
};


async.filter = async.flow.filter	= function (ops, ev, cb, timeout) {
	if (typeof(ops) !== "undefined" && Array.isArray(ops)) {
		var app = new async();
		var c = {};
		timeout		= timeout || 0;
		var i = 0;
		var ret		= [];
		app.done(function () {
			cb.apply(app,[ret]);
		});
		if (ops.length) {
		} else {
			app.emit('done');
		}
		var tick	= function () {
			if ( i < ops.length) {
				var op	= ops[i++];
				var id	= app.reserve();
				var ri	= i-1;
				var rr	= function (v, err) {
					if (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					} else {
						if (v) ret.push(op);
						app.receive();
					}
				};
				var err;
				setTimeout(function () {
					try {
						ev.apply(ops, [rr, op, ri, ops]);
					} catch (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					}
				}, timeout);
			}
		};
		app.on('onReceive', function () {
			tick();
		});
		tick();
		return app;
	}
	return;
};

async.waterfall.filter	= function (ops, ev, cb, parralel, timeout) {
	if (typeof(ops) !== "undefined" && Array.isArray(ops)) {
		var app = new async();
		var c = {};
		timeout		= timeout || 0;
		if (typeof(parralel) !== "number") {
			parralel	= 27;
		}
		var i = 0;
		var ret		= [];
		app.done(function () {
			cb.apply(app,[ret]);
		});
		if (ops.length) {
		} else {
			app.emit('done');
		}
		var tick	= function () {
			if ( i < ops.length) {
				var op	= ops[i++];
				var id	= app.reserve();
				var ri	= i-1;
				var rr	= function (v, err) {
					if (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					} else {
						if (v) ret.push(op);
						app.receive();
					}
				};
				var err;
				setTimeout(function () {
					try {
						ev.apply(ops, [rr, op, ri, ops]);
					} catch (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					}
				}, timeout);
				return true;
			}
			return false;
		};
		app.on('onReceive', function () {
			// debugger;
			while (( parralel === 0 || parralel > app.processing()) && tick()) {
				// true;
			}
		});
		tick();
		return app;
	}
	return;
};



async.forEach = async.flow.forEach	= function (ops, ev, cb, timeout) {
	if (typeof(ops) !== "undefined" && Array.isArray(ops)) {
		var app = new async();
		var c = {};
		timeout		= timeout || 0;
		var i = 0;
		app.done(function () {
			cb.apply(app,[ops]);
		});
		if (ops.length) {
		} else {
			app.emit('done');
		}
		var tick	= function () {
			if ( i < ops.length) {
				var op	= ops[i++];
				var id	= app.reserve();
				var ri	= i-1;
				var rr	= function (err) {
					if (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					} else {
						app.receive();
					}
				};
				var err;
				setTimeout(function () {
					try {
						ev.apply(ops, [rr, op, ri, ops]);
					} catch (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					}
				}, timeout);
				return true;
			}
			return false;
		};
		app.on('onReceive', function () {
			tick();
		});
		tick();
		return app;
	}
	return;
};

async.waterfall.forEach	= function (ops, ev, cb, parralel, timeout) {
	if (typeof(ops) !== "undefined" && Array.isArray(ops)) {
		var app = new async();
		var c = {};
		timeout		= timeout || 0;
		if (typeof(parralel) !== "number") {
			parralel	= 27;
		}
		var i = 0;
		app.done(function () {
			cb.apply(app,[ops]);
		});
		if (ops.length) {
		} else {
			app.emit('done');
		}
		var tick	= function () {
			if ( i < ops.length) {
				var op	= ops[i++];
				var id	= app.reserve();
				var ri	= i-1;
				var rr	= function (err) {
					if (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					} else {
						app.receive();
					}
				};
				var err;
				setTimeout(function () {
					try {
						ev.apply(ops, [rr, op, ri, ops]);
					} catch (err) {
						app.receive(id, err);
						app.emit('error', [err]);
					}
				}, timeout);
				return true;
			}
			return false;
		};
		app.on('onReceive', function () {
			while (( parralel === 0 || parralel > app.processing()) && tick()) {
				// true;
			}
		});
		tick();
		return app;
	}
	return;
};




async.test	= {};
async.test.flow	= function (n) {
	console.clear();
	global.require('async', function (async) {
		if (typeof(n) !== "number") n = 1000;
		var ops = [];
		var i = 0;
		while (i++ < n) {
			ops.push([(function (k) {
				return function (cb) {
					cb(k);
				};
			})(i + 0)]);
		}
		console.log(ops.length);
		var t0 = new Date().valueOf();
		var a1 = async.flow(ops, function () {
			console.log((new Date().valueOf() - t0)/(n * 1000));
		});
		var k=0;
		a1.on('onError', function (err) {
			console.error(err);
		});
		a1.on('onReceive', function () {
			console.log(k++);
		});
		window.a1 = a1;
	});
};

async.test.waterfall	= function (n) {
	console.clear();
	global.require('async', function (async) {
		if (typeof(n) !== "number") n = 1000;
		var ops = [];
		var i = 0;
		while (i++ < n) {
			ops.push([(function (k) {
				return function (cb) {
					cb(k);
				};
			})(i + 0)]);
		}
		console.log(ops.length);
		var t0 = new Date().valueOf();
		var a1 = async.waterfall(ops, function () {
			console.log((new Date().valueOf() - t0)/(n * 1000));
		}, 69);
		var k=0;
		a1.on('onError', function (err) {
			console.error(err);
		});
		a1.on('onReceive', function () {
			console.log(k++);
		});
		window.a1 = a1;
	});
};
async.test.map = async.test.flow.map	= function (n) {
	console.clear();
	global.require('async', function (async) {
		if (typeof(n) !== "number") n = 1000;
		var b = [];
		if (n)
			b[n - 1] = undefined;
		async.flow.map(b, function (cb, v, i, arr) {
			console.log(i);
			cb(Math.floor(Math.random() * 1000));
		}, function (r) {
			console.log("DONE", r);
		});
	});
};

async.test.waterfall.map	= function (n) {
	console.clear();
	global.require('async', function (async) {
		if (typeof(n) !== "number") n = 1000;
		var b = [];
		if (n)
			b[n - 1] = undefined;
		async.waterfall.map(b, function (cb, v, i, arr) {
			console.log(i);
			cb(Math.floor(Math.random() * 1000));
		}, function (r) {
			console.log("DONE", r);
		});
	});
};


async.test.filter = async.test.flow.filter	= function (n) {
	console.clear();
	global.require('async', function (async) {
		if (typeof(n) !== "number") n = 1000;
		var b = [];
		if (n)
			b[n - 1] = undefined;
		async.flow.filter(b, function (cb, v, i, arr) {
			console.log(i);
			cb(Math.random() > 0.5);
		}, function (r) {
			console.log("DONE ", r.length);
		});
	});
};

async.test.waterfall.filter	= function (n) {
	console.clear();
	global.require('async', function (async) {
		if (typeof(n) !== "number") n = 1000;
		var b = [];
		if (n)
			b[n - 1] = undefined;
		async.waterfall.filter(b, function (cb, v, i, arr) {
			console.log(i);
			cb(Math.random() > 0.5);
		}, function (r) {
			console.log("DONE ", r.length);
		});
	});
};

async.test.forEach = async.test.flow.forEach	= function (n) {
	console.clear();
	global.require('async', function (async) {
		if (typeof(n) !== "number") n = 1000;
		var b = [];
		if (n)
			b[n - 1] = undefined;
		async.flow.forEach(b, function (cb, v, i, arr) {
			console.log(i);
			cb();
		}, function (r) {
			console.log("DONE ", r.length);
		});
	});
};

async.test.waterfall.forEach	= function (n) {
	console.clear();
	global.require('async', function (async) {
		if (typeof(n) !== "number") n = 1000;
		var b = [];
		if (n)
			b[n - 1] = undefined;
		async.waterfall.forEach(b, function (cb, v, i, arr) {
			console.log(i);
			cb();
		}, function (r) {
			console.log("DONE ", r.length);
		});
	});
};




module.exports	= async;