var contrib = require("ndn-js-contrib")
, ndn = contrib.ndn, os = require("os")
, debug = {}
, Manager = require("./Manager.js");
debug.debug = require("debug")("Forwarder");
/**Forwarder constructor
*@constructor
*@param {Object} options - an object with options
*@returns {forwarder} an NDN Subject
*
*/
var Forwarder = function Forwarder (options){
options = options || {};
this.ndn = contrib.ndn;
this.contrib = contrib;
this.nameTree = new contrib.NameTree();
this.fib = new contrib.FIB(this.nameTree);
this.listeners = new contrib.FIB(new contrib.NameTree());
this.listenerCallbacks = [];
this.pit = new contrib.PIT(this.nameTree);
this.maxMemory = options.mem || os.freemem() * 0.5;
this.cache = new contrib.ContentStore(this.nameTree,null,this.maxMemory);
this.interfaces = new contrib.Interfaces(this);
var transports = Object.keys(contrib.Transports);
options.TCPServerTransport = options.tcp;
options.WebSocketServerTransport = options.ws;
this.remoteInfo = {
ipv4: options.ipv4 || "0.0.0.0",
domain : options.domain || "localhost",
iceServers : options.iceServers || []
};
for (var i = 0; i < transports.length; i++){
if (contrib.Transports[transports[i]].defineListener){
if (contrib.Transports[transports[i]].prototype.name === "TCPServerTransport"){
this.remoteInfo.tcp = {
port: options.tcp || 8484
, name : "TCPServerTransport"
};
} else if (contrib.Transports[transports[i]].prototype.name === "WebSocketServerTransport"){
this.remoteInfo.ws = {
port: options.ws || 8585
, name : "WebSocketServerTransport"
};
} else if (contrib.Transports[transports[i]].prototype.name === "WebSocketTransport"){
this.remoteInfo.ws = {
name: "WebSocketTransport"
};
}
//console.log(options[contrib.Transports[transports[i]].prototype.name], contrib.Transports[transports[i]].prototype.name);
contrib.Transports[transports[i]].defineListener(this, options[contrib.Transports[transports[i]].prototype.name]);
}
this.interfaces.installTransport(contrib.Transports[transports[i]]);
}
Manager(this);
return this;
};
Forwarder.ndn = contrib.ndn;
Forwarder.contrib = contrib;
/** add a connection listener along a given prefix.
*@param {String} prefix - the prefix to listen along
*@param {Number} maxConnections - maximum number of simultaneous connections for this prefix
*@param {Function} onNewFace - a function called for each new face, containing the numerical faceID
*@param {Function} onFaceClosed - a function called every time a face on this prefix is closed
*/
Forwarder.prototype.addConnectionListener = function(){}
require("./node/ConnectionListeners.js")(Forwarder);
require("./node/createSuffix.js")(Forwarder);
/**handle an incoming interest from the interfaces module
*@param {Buffer} element the raw interest packet
*@param {Number} faceID the Integer faceID of the face which recieved the Interest
*@returns {this} for chaining
*/
Forwarder.prototype.handleInterest = function(element, faceID, skipListen){
var override = {}
, Self = this
, interest = new ndn.Interest();
interest.wireDecode(element);
debug.debug("handleInterest %s from %s", interest.toUri(), faceID);
if(this.pit.checkDuplicate(interest)){
debug.debug("interest is duplicate, discontinue forwarding");
return this;
} else {
debug.debug("interest is not duplicate, insert into PIT");
Self.pit.insertPitEntry(element, interest, faceID);
}
function Closure(skipListen, skipForward, listeners){
debug.debug("handleInterest closure");
listeners = listeners || Self.listeners.findAllFibEntries(interest.name);
function unblock(){
var Self = this;
return function unblock(skipListen){
Closure(skipListen, false, listeners);
};
}
function iterateListeners(){
if (listeners.hasNext){
var listener = listeners.next();
debug.debug("handleInterest iterate listeners, current =", listener.nextHops);
var blockingCallback, connectionCallback, nonBlockingCallbacks = [];
for (var i = 0; i < listener.nextHops.length; i++){
if (Self.listenerCallbacks[listener.nextHops[i].faceID].blocking){
blockingCallback = Self.listenerCallbacks[listener.nextHops[i].faceID].callback;
block = true;
} else if (Self.listenerCallbacks[listener.nextHops[i].faceID].connection){
connectionCallback = Self.listenerCallbacks[listener.nextHops[i].faceID].callback;
} else{
debug.debug("inserting nonBlockingCallback into queue: %o",Self.listenerCallbacks[listener.nextHops[i].faceID].toString());
nonBlockingCallbacks.push(Self.listenerCallbacks[listener.nextHops[i].faceID]);
}
}
//console.log("blocking", blockingCallback);
if (connectionCallback){
debug.debug("connection Listener triggering at %s", interest.name.toUri());
connectionCallback(interest, faceID, function(skip){
debug.debug("connection listener unblocked");
if (!skip && nonBlockingCallbacks.length > 0){
debug.debug("progressing to nonBlocks");
for (var p = 0; p < nonBlockingCallbacks.length; p++){
debug.debug("calling non blocker: %s", nonBlockingCallbacks[p].callback.toString());
nonBlockingCallbacks[p].callback(interest, faceID);
}
}
if (blockingCallback){
debug.debug("executing non connection, yet blocking callback");
blockingCallback(interest, faceID, new unblock());
} else {
debug.debug("unblocking completely");
var un = new unblock();
un(skip);
}
});
return;
} else if (nonBlockingCallbacks.length > 0){
for (var j = 0; j < nonBlockingCallbacks.length; j++){
//console.log("nonBlocking", nonBlockingCallbacks[j]);
nonBlockingCallbacks[j].callback(interest, faceID);
}
}
if (blockingCallback){
blockingCallback(interest, faceID, new unblock());
} else if (listeners.hasNext) {
iterateListeners();
} else {
forward();
}
} else {
forward();
}
}
if (!skipListen){
iterateListeners();
} else {
forward();
}
function forward(){
var cacheHit = Self.cache.check(interest);
if (cacheHit){
Self.interfaces.dispatch(cacheHit, 0 | (1<<faceID));
} else if (!override.skipForward){
var nextHopFlag = Self.fib.findAllNextHops(interest.name, faceID);
debug.debug("nextHopFLag for %s is %s", interest.name.toUri(),nextHopFlag);
if (nextHopFlag){
Self.interfaces.dispatch(element, nextHopFlag);
}
}
}
return Self;
}
return Closure(skipListen);
/*else {
var inFace = this.interfaces.Faces[faceID], toCheck, matched;
console.log("cleanup")
for (var i = 0; i < inFace.prefixes.length; i++){
console.log("createName toCheck")
toCheck = new ndn.Name(inFace.prefixes[i]);
console.log(toCheck)
if (toCheck.match(interest.name)){
matched = inFace.prefixes[i]
this.unregisterPrefix(matched, faceID);
break;
}
}
console.log("first loop completed")
for (var j = 0; j < inFace.prefixes.length; j++){
if ( matched === inFace.prefixes[j]){
inFace.prefixes.splice(i, 1);
}
}
if(inFace.prefixes.length === 0){
console.log("remove idle connection")
this.removeConnection(faceID);
}
}*/
};
/** main algorithm for incoming data packets. In order; check and dispatch matching PitEntries, insert into cache, return.
*@param {Buffer} element the raw data packet
*@param {faceID} the numerical faceID that the packet arrived on
*@returns {Forwarder} for chaining
*/
Forwarder.prototype.handleData = function(element, faceID){
var data = new ndn.Data();
data.wireDecode(element);
debug.debug("handle data % from face ID %s", data.name.toUri(), faceID);
var pitMatch = this.pit.lookup(data);
if (pitMatch.faces){
debug.debug("found matching pitEntries for faceFlag %s", pitMatch.faces);
this.interfaces.dispatch(element, pitMatch.faces);
}
if (pitMatch.pitEntries.length > 0) {
this.cache.insert(element, data);
for (var i = 0; i < pitMatch.pitEntries.length; i++){
if (pitMatch.pitEntries[i].callback){
debug.debug("excecuting pitEntry callback for %s", pitMatch.pitEntries[i].interest.toUri());
debug.debug("with data %s", data.name.toUri());
pitMatch.pitEntries[i].callback(data, pitMatch.pitEntries[i].interest);
}
debug.debug("consuming pitEntry for %s", pitMatch.pitEntries[i].interest.toUri());
pitMatch.pitEntries[i].consume(true);
}
}
return this;
};
/** add a nameSpace Listener to the Forwarder. the listener will be triggered via the same semantics as forwarding entries
*@param {String | option} nameSpace the uri of the namespace to listen on, or an options object containing that uri under the .prefix property
*so far only a boolean '.blocking' property, to tell whether to interupt normal forwarding
*@param {function} callback
*/
Forwarder.prototype.addListener = function(nameSpace, callback) {
this.listenerCallbacks = this.listenerCallbacks || [];
var prefix
, options
, Self = this;
if (typeof nameSpace === "string"){
prefix = nameSpace;
options = {};
} else {
prefix = nameSpace.prefix;
options = nameSpace;
}
prefix = new ndn.Name(prefix);
prefix = prefix.toUri();
debug.debug("addListener at %s", prefix);
var listenerID = this.listenerCallbacks.length
, isNew = true;
if(options.connection){
var connectionReplaced = false;
if (this.listenerCallbacks.length > 0){
for (var i = 0; i < this.listenerCallbacks.length; i++){
//console.log("loop", i, prefix, this.listenerCallbacks[i]);
if (this.listenerCallbacks[i].prefix === prefix && this.listenerCallbacks[i].connection){
this.listenerCallbacks[i].callback = callback;
connectionReplaced = true;
isNew = false;
debug.debug("replaced existing connection listener");
//console.log("found");
break;
}
}
}
if (!connectionReplaced){
//console.log("not found");
this.listenerCallbacks.push({
connection : true
, callback : callback
, listenerID : listenerID
, prefix : prefix
});
debug.debug("added new connectionListener");
//console.log("pushed");
}
} else if(options.blocking){
debug.debug("listener is blocking");
var blockingReplaced = false;
if (this.listenerCallbacks.length > 0){
for (var j = 0; j < this.listenerCallbacks.length; j++){
//console.log("loop", j, prefix, this.listenerCallbacks[j]);
if (this.listenerCallbacks[j].prefix === prefix && this.listenerCallbacks[j].blocking){
this.listenerCallbacks[j].callback = callback;
blockingReplaced = true;
isNew = false;
//console.log("found");
break;
}
}
}
if (!blockingReplaced){
//console.log("not found");
this.listenerCallbacks.push({
blocking : true
, callback : callback
, listenerID : listenerID
, prefix : prefix
});
console.log("pushed");
}
} else {
this.listenerCallbacks.push({
blocking: false
, callback: callback
, listenerID: listenerID
, prefix : prefix
});
}
if (isNew){
this.listeners.addEntry(prefix, listenerID);
}
return this;
};
/** Remove ALL listeners on a given namespace (but NOT all prefixes) ie, two listeners on /a/b and one on /a: .removeListeners("/a/b") will remove both on /a/b and leave the one on /a
*@param {String} prefix the nameSpace uri to remove listeners on
*@returns {this} for chaining
*/
Forwarder.prototype.removeListeners = function(prefix){
prefix = new ndn.Name(prefix);
this.listenerCallbacks = this.listenerCallbacks || [];
if (this.listenerCallbacks.length === 0){
return this;
}
var listenerEntry = this.listeners.lookup(prefix);
while(listenerEntry.nextHops.length){
var hopEntry = listenerEntry.nextHops.pop();
this.listenerCallbacks[hopEntry.faceID].callback = null;
}
return this;
};
/** set maximum number of connections for the forwarder (default unset)
*@param {Number} maximum the maximum number of simultaneous connections
*@returns {this} for chaining
*/
Forwarder.prototype.setMaxConnections = function(maximum){
this.maxConnections = maximum;
return this;
};
/** add a connection
*@param {String | Object} urlOrObject - Either a url string representing the endpoint protocol, ip/domain, and port (e.g "ws://localhost:8585"), or an object (e.g messageChannel port or RTC datachannel)
*@param {Function} onFace - callback function which receives the faceID of the newly constructed Face
*@param {Function} onFaceClosed - callback function
*/
Forwarder.prototype.addConnection = function(){};
require("./node/addConnection.js")(Forwarder);
/** remove a connection, and purge any registered Prefixes from the FIB
*@param {Number} faceID Numerical faceID of the connection to remove
*@returns {this} Forwarder for chaining
*/
Forwarder.prototype.removeConnection = function(faceID) {
debug.debug("removeConnection begin loop", faceID);
if(this.interfaces.Faces[faceID]){
while ( this.interfaces.Faces[faceID].prefixes.length > 0){
this.fib
.lookup(this.interfaces.Faces[faceID].prefixes.pop())
.removeNextHop({
faceID: faceID
});
}
debug.debug("removeConnection loop complete");
this.interfaces.Faces[faceID].close();
this.interfaces.Faces[faceID].closeByTransport();
this.interfaces.Faces[faceID].onclose();
this.connectionCount--;
return this;
}
};
/** request a connection 'In-band' over NDN; rather than provide an IP/DNS endpoint,
* provide a NDN prefix, and be connected with any other forwarder that is listening for connections along that prefix
*@param {String} prefix - the uri encoded prefix to register the connection along
*@param {Function} onFace - a callback function called upon face construction, which gets the numerical faceID as the only argument
*@param {Function} onFaceClosed - a callback function once the face is closed
*/
Forwarder.prototype.requestConnection = function(prefix, onFace, onFaceClosed){
var Self = this;
Self.createConnectionRequestSuffix(function(suffix, responseCB){
var name = new ndn.Name(prefix);
name.append(suffix.value);
var interest = new ndn.Interest(name);
interest.setInterestLifetimeMilliseconds(16000);
var element = interest.wireEncode().buffer;
var inst = new ndn.Interest();
try{
inst.wireDecode(element);
} catch(e){
debug.debug("wire decode error:", e.message);
}
Self.pit.insertPitEntry(element, inst, function(data, interest){
debug.debug("triggered PitEntry callback for", interest.name.toUri());
debug.debug("with data: %s", data);
if(data){
try{
debug.debug("PitEntry callback for .requestConnection");
if (responseCB){
var json = JSON.parse(data.content.toString());
responseCB(json);
}
Self.addRegisteredPrefix(prefix, Self.interfaces.Faces.length -1 );
onFace(null, Self.interfaces.Faces.length -1);
} catch(e){
debug.debug(e.message);
}
} else {
debug.debug("triggered PitEntry callback after %s ms timeout", interest.getInterestLifetimeMilliseconds());
onFace(new Error("connection request timeout"));
}
});
var faceFlag = Self.fib.findAllNextHops(prefix);
try{
Self.interfaces.dispatch(element, faceFlag);
} catch(e){
debug.debug(e.message);
}
}, function(connectionInfo){
debug.debug("connectionInfo callback in connection request");
Self.addConnection(connectionInfo, function(id){
debug.debug("connection added in connectioninfoCallback, got faceID %s", id);
Self.addRegisteredPrefix(prefix, id);
onFace(null, id);
debug.debug("completed onOpen");
}, function(id){
Self.removeConnection(id);
onFaceClosed(id);
});
});
return this;
};
/** add a registered prefix for interest forwarding into the fib
*@param {String} prefix - the uri encoded prefix for the forwarding entry
*@param {Number} faceID - the numerical faceID of the face to add the prefix to
*@returns {this} for chaining
*/
Forwarder.prototype.addRegisteredPrefix = function(prefix, faceID){
this.fib.addEntry(prefix, faceID);
this.interfaces.Faces[faceID].prefixes = this.interfaces.Faces[faceID].prefixes || [];
this.interfaces.Faces[faceID].prefixes.push(prefix);
return this;
};
/** request a remote forwarder to add a registered prefix for this forwarder
*@param {String} prefix - the uri encoded prefix for the forwarding entry
*@param {Number} faceID - the numerical faceID of the face to make the request
*@returns {this} for chaining
*/
Forwarder.prototype.registerPrefix = function(prefix, faceID){
var name = new ndn.Name("marx/fib/add-nexthop");
name.append(new ndn.Name(prefix));
var interest = new ndn.Interest(name);
this.interfaces.Faces[faceID].send(interest.wireEncode().buffer);
return this;
};
/** remove a registered prefix for a remote face
*@param {String} prefix - the uri encoded prefix for the forwarding entry
*@param {Number} faceID - the numerical faceID of the face remove the prefix from
*@returns {this} for chaining
*/
Forwarder.prototype.unregisterPrefix = function(prefix, faceID){
var name = new ndn.Name("marx/fib/remove-nexthop");
name.append(new ndn.Name(prefix));
var interest = new ndn.Interest(name);
this.interfaces.Faces[faceID].send(interest.wireEncode().buffer);
return this;
};
module.exports = Forwarder;