Commit 361aa1cc authored by Leo Iannacone's avatar Leo Iannacone

removed tailfd module - no longer needed

parent bfcd9086
language: node_js
node_js:
- 0.6
- 0.8
The MIT License (MIT)
Copyright (c) 2014 Ryan Day
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
The MIT License (MIT)
Copyright (c) 2014 Ryan Day
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
{
"name": "watchfd",
"description": "watch events open,change,unlink on all files that are refrenced or become refrenced by path. pause/resumeable",
"version": "0.0.13",
"author": {
"name": "Ryan Day",
"email": "soldair@gmail.com"
},
"keywords": [
"watch",
"descriptors",
"log",
"fs"
],
"main": "./watch.js",
"homepage": "http://github.com/soldair/node-watchfd",
"repository": {
"type": "git",
"url": "git://github.com/soldair/node-watchfd.git"
},
"scripts": {
"test": "tap ./test/unit/*.js"
},
"devDependencies": {
"tap": "*",
"jshint": "0.5.x"
},
"engines": {
"node": ">=0.6.0"
},
"dependencies": {},
"license": "MIT/X11",
"readme": "[![Build Status](https://secure.travis-ci.org/soldair/node-watchfd.png)](http://travis-ci.org/soldair/node-watchfd)\n\n## watchfd\n\nWatch events open,change,unlink on all files that are refrenced or become refrenced by path\n\nprovide events for any file descriptors that are referenced by a watched path, \nor were referenced by a watched path for as long as they are still changing.\nactive is defined by a timeout since last event. file descriptors that become inactive are removed.\n\n\n## install\n\n\tnpm install watchfd\n\n## use\n\n\tvar watchfd = require('watchfd').watch;\n\twatchfd('/some.log',function(cur,prev){\n\t\tconsole.log(prev.size,' changed to ',cur.size);\n\t});\n\n### a use case\n\nan issue with log/file forwarding utilities currently available in npm is that they only watch the file descriptor under the filename. when a log is rotated and a new log is created the server may not stop writing to the old file descriptor immediately. Any data written to that descriptor in this state ends up in /dev/null\n\n\n### argument structure\n\nwatchfd.watch(filename, [options], listener)\n\n- filename\n its really intended that this be a regular file or non existant. i dont know what would happen right now if its a directory.\n- options. supported custom options are\n\n\t```js\n\t{\n\t\"timeout\": 60*60*1000, //defaults to one hour\n\t//how long an inactive file descriptor can remain inactive\n\n\t\"timeoutInterval\":60*5*1000 //every five minutes\n\t// how often to check for inactive file descriptors\n\t}\n\n\t//the options object is also passed directly to watch and watchFile so you may configure\n\n\t{\n\t\"persistent\":true, //defaults to true\n\t//persistent indicates whether the process should continue to run as long as files are being watched\n\n\t\"interval\":0, //defaults 0\n\t//interval indicates how often the target should be polled, in milliseconds. (On Linux systems with inotify, interval is ignored.) \n\t}\n\t```\n\n- callback\n this is bound to the change event of the watcher. its required\n\n\t```js\n\tcallback(cur,prev)\n\t```\n\n cur and prev are instances of fs.Stats\n\n- @returns\n an instance of Watcher\n\n### Watcher methods\n\nWatcher.pause()\n\n- paused, changed and last state is kept for each file descriptor\n - stops file descriptors from timing out.\n - all events except error are paused.\n - unlink, open, change etc will be fired in the correct order after resume. \n no events will be missed but change events will be combined\n\n\nWatcher.resume()\n\n- resumed\n - for each file descriptor pending events are fired in the corect order\n open,change,unlink\n - the change event has the stat from first change event while paused and the most recent so no change is missed.\n\n\nWatcher.paused\n\n - is paused\n - readonly please.\n\n### Watcher events\n\nWatcher.on(event name,call back);\n\n- change\n\t\tfs.Stats cur, fs.Stats prev\n- open\n\t\tfs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}\n- unlink\n fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}\n- timeout\n fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}\n\n#### windows support problems\n\n- It uses file inode as a unique id for each descriptor. I know there is a way to get a unique id for a file in windows i just don't know if that would be passed to stat as stat.ino. \n- I use watchFile which is not supported at all on windows but this would be easier to overcome considering i can use a configured polling interval as a stat polling fall back on windows. \n- I also don't know windows very well and don't know if windows has the problem this module solves...but i imagine it would\n\n#### notes\n\nI noticed distinct differences in watchFile vs watch api\nfs.watchFile will issue events for a file that is currently referenced by a path\nfs.watch will take a path but issue events whenever that file descriptor is changed even after it's unlinked\n\nWe should probably design servers to listen to SIGHUP and grab new file descriptors for all loggers but even if you used logrotate with copytruncate mode as to not change the file referenced by a path the chance that you will loose data is still there. I feel safer waiting for a file descriptor to be quiet so i know its out of use before i close it in a process that has the ability to read data out of it.\n",
"readmeFilename": "readme.md",
"bugs": {
"url": "https://github.com/soldair/node-watchfd/issues"
},
"_id": "watchfd@0.0.13",
"_from": "watchfd@~0.0.12"
}
[![Build Status](https://secure.travis-ci.org/soldair/node-watchfd.png)](http://travis-ci.org/soldair/node-watchfd)
## watchfd
Watch events open,change,unlink on all files that are refrenced or become refrenced by path
provide events for any file descriptors that are referenced by a watched path,
or were referenced by a watched path for as long as they are still changing.
active is defined by a timeout since last event. file descriptors that become inactive are removed.
## install
npm install watchfd
## use
var watchfd = require('watchfd').watch;
watchfd('/some.log',function(cur,prev){
console.log(prev.size,' changed to ',cur.size);
});
### a use case
an issue with log/file forwarding utilities currently available in npm is that they only watch the file descriptor under the filename. when a log is rotated and a new log is created the server may not stop writing to the old file descriptor immediately. Any data written to that descriptor in this state ends up in /dev/null
### argument structure
watchfd.watch(filename, [options], listener)
- filename
its really intended that this be a regular file or non existant. i dont know what would happen right now if its a directory.
- options. supported custom options are
```js
{
"timeout": 60*60*1000, //defaults to one hour
//how long an inactive file descriptor can remain inactive
"timeoutInterval":60*5*1000 //every five minutes
// how often to check for inactive file descriptors
}
//the options object is also passed directly to watch and watchFile so you may configure
{
"persistent":true, //defaults to true
//persistent indicates whether the process should continue to run as long as files are being watched
"interval":0, //defaults 0
//interval indicates how often the target should be polled, in milliseconds. (On Linux systems with inotify, interval is ignored.)
}
```
- callback
this is bound to the change event of the watcher. its required
```js
callback(cur,prev)
```
cur and prev are instances of fs.Stats
- @returns
an instance of Watcher
### Watcher methods
Watcher.pause()
- paused, changed and last state is kept for each file descriptor
- stops file descriptors from timing out.
- all events except error are paused.
- unlink, open, change etc will be fired in the correct order after resume.
no events will be missed but change events will be combined
Watcher.resume()
- resumed
- for each file descriptor pending events are fired in the corect order
open,change,unlink
- the change event has the stat from first change event while paused and the most recent so no change is missed.
Watcher.paused
- is paused
- readonly please.
### Watcher events
Watcher.on(event name,call back);
- change
fs.Stats cur, fs.Stats prev
- open
fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}
- unlink
fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}
- timeout
fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}
#### windows support problems
- It uses file inode as a unique id for each descriptor. I know there is a way to get a unique id for a file in windows i just don't know if that would be passed to stat as stat.ino.
- I use watchFile which is not supported at all on windows but this would be easier to overcome considering i can use a configured polling interval as a stat polling fall back on windows.
- I also don't know windows very well and don't know if windows has the problem this module solves...but i imagine it would
#### notes
I noticed distinct differences in watchFile vs watch api
fs.watchFile will issue events for a file that is currently referenced by a path
fs.watch will take a path but issue events whenever that file descriptor is changed even after it's unlinked
We should probably design servers to listen to SIGHUP and grab new file descriptors for all loggers but even if you used logrotate with copytruncate mode as to not change the file referenced by a path the chance that you will loose data is still there. I feel safer waiting for a file descriptor to be quiet so i know its out of use before i close it in a process that has the ability to read data out of it.
#!/bin/sh
node_modules/jshint/bin/hint ./*
hint=$?
if [ $hint != 0 ]; then
echo "< script runner stopped jshint failed >";
exit $hint
else
echo "< jshint passed >";
fi
node_modules/tap/bin/tap.js ./test/unit/*.js
unit=$?
if [ $unit != 0 ]; then
echo "< script runner stopped unit tests failed >";
exit $unit
else
echo "< unit tests passed >";
fi
//
// TODO make this a module.
// expectevent module. allows you to expect an event in a test
// if you pass an emitter i will proxy emit to log all events and handle wait timeouts.
//
exports.ExpectEvent = ExpectEvent;
function ExpectEvent(emitter,options){
var self = this;
//
// hijack emit for watching
//
if(emitter && emitter.emit) {
this.emitter = emitter;
var e = emitter.emit;
emitter.emit = function(ev){
//pass all args except event name again to log.
var args = Array.prototype.slice.call(arguments);
self.log(ev,args);
return e.apply(emitter,arguments);
};
//
// add un expect handle to emitter just in case an edge case needs to turn it off from the inside
//
emitter.unexpectevent = function(){
emitter.emit = e;
};
}
options = options ||{};
this.maxLogEntries = options.maxLogEntries||this.maxLogEntries;
}
ExpectEvent.prototype = {
counter:0,
maxLogEntries:1000,
eventLog:[],
expected:{},
emitter:null,
log:function(name,args){
this.eventLog.push(Array.prototype.slice.call(args).unshift(name));
if(this.eventLog.length > this.maxLogEntries) this.eventLog.shift();
for(var i=0,k = Object.keys(this.expected),j=k.length;i<j;++i) {
if(this.expected[k[i]].name == name) {
this.expected[k[i]].cb(this.expected[k[i]],args);
clearTimeout(this.expected[k[i]].timer);
delete this.expected[k[i]];
}
}
},
expect:function(name,cb,timeout){
this.counter++;
//local ref to counter
var self = this,
c = this.counter,
failed = false;
//
//activate timer so we dont wait forever for the event
//
var timer = setTimeout(function(){
var err = new Error('event '+name+' not fired before timeout of '+timeout+' ms');
console.log('delete expected callback ',c);
delete self.expected[c];
cb(err,false);
},timeout);
//
// add expect event watcher on to the watchers list
//
this.expected[this.counter] = {
name:name,
cb:function(arr,eventArgs){
console.log('expect callback ',c,' called');
//just in case its called after it fails. shouldnt happen but who knows.
if(failed) {
console.error('expectEvent warning> callback called after failed. there is probably a bug.');
return;
}
clearTimeout(arr[1]);
cb(false,eventArgs);
},
timer:timer
};
},
getLog:function(){
return this.eventLog;
},
flushLog:function(){
this.eventLog = [];
}
};
var w = require('../../watch.js');
var watcher = w.watch('taco.log',{timeout:5000,timeoutInterval:1000},function(cur,prev){
console.log('CHANGE ','ino: '+cur.ino+', size: '+prev.size+' -> '+cur.size);
});
watcher.on('open',function(fd,data){
console.log('OPEN ','ino: '+data.stat.ino+', size:'+data.stat.size);
});
watcher.on('unlink',function(fd,data){
console.log('UNLINK ','ino: '+data.stat.ino+', size:'+data.stat.size);
});
watcher.on('timeout',function(fd,data){
console.log('TIMEOUT ','ino: '+data.stat.ino+', size:'+data.stat.size);
});
var fs = require('fs');
setInterval(function(){
var ws = fs.createWriteStream('taco.log');
ws.write('party!'+"\n");
},5000);
setInterval(function(){
fs.unlink('taco.log');
console.log(Object.keys(watcher.fds).length,' fds being watched');
},10000);
var ExpectEvent = require(__dirname+'/../lib/expectevent.js').ExpectEvent,
test = require('tap').test,
assert = require('assert'),
fs = require('fs'),
watchfd = require(__dirname+'/../../watch.js'),
logFile = 'test1'+Date.now()+Math.random()+'.log';
test('test that the stuff works =)',function(t){
var changesObservedThroughDefaultListener = 0,
watcher = watchfd.watch(logFile,{timeout:1000,timeoutInterval:200},function(cur,prev){
/*dont need to watch change here*/
changesObservedThroughDefaultListener++;
}),
expect = new ExpectEvent(watcher);
watcher.on('error',function(err){
throw err;
});
//
// enforce some max execution time for watcher test
//
expect.expect('close',function(err,data){
if(err) {
watcher.close();
fs.unlink(logFile);
throw err;
}
},20000);
// file not exists event should be triggered
expect.expect('noent',function(err,data){
if(err) throw err;
},1000);
//cleanup
var cleanup = function(){
fs.unlink(logFile);
if(!changesObservedThroughDefaultListener){
assert.ok(changesObservedThroughDefaultListener,"this test should have triggered the default change handler numerous times");
}
};
process.on('exit',cleanup);
// file descriptors for unlinked events tests
var fd1 = null,
fd2 = null;
var q = {
//
"trigger open. expect that it is fired within six seconds":function(){
expect.expect('open',function(err,stat){
if(err) throw err;
done();
},6000);
fs.open(logFile,'a+',function(err,fd){
assert.ifError(err);
fd1 = fd;
var buf = new Buffer('party rockin');
// watchFile does not seem to hit imediately for regular empty files.
fs.write(fd1,buf,0,buf.length,null,function(err,bytesWritten){
assert.ifError(err);
});
});
},
//
"trigger change expect that it is fired within one second":function(){
expect.expect('change',function(err,data){
if(err) throw err;
//must have file descriptor with change events
assert.ok(data[3].fd,'must have fd with change events');
done();
},1000);
var buf = new Buffer('party floppin');
fs.write(fd1,buf,0,buf.length,null,function(err,bytesWritten){
assert.ifError(err,'can write to file');
});
},
//
"unlink and wait for unlink":function(){
expect.expect('unlink',function(err,data){
if(err) throw err;
done();
},10000);
fs.unlink(logFile,function(err){
assert.ifError(err,'got an error cleaning up files');
});
},
//
"create again wait for open":function(){
expect.expect('open',function(err,data){
if(err) throw err;
done();
},10000);
fs.open(logFile,'w+',function(err,fd){
assert.ifError(err,'error opening test file for writing');
fd2 = fd;
var buf = new Buffer('new party');
fs.write(fd2,buf,0,buf.length,null,function(err,bytesWritten){
assert.ifError(err,'should have written byte to the test file');
});
});
},
//
"write data to unlinked fd and wait for change":function(){
expect.expect('change',function(err,data){
if(err) throw err;
done();
},1000);
var buf = new Buffer('party unlinked');
fs.write(fd1,buf,0,buf.length,null,function(err,bytesWritten){
assert.ifError(err,'should not hav error writing test log file');
});
},
//
"wait for timeout on fd1":function(){
expect.expect('timeout',function(err,data){
if(err) throw err;
assert.equal(Object.keys(watcher.fds).length,1,'should only have one fd if fd1 timed out');
done();
},2000);
},
"pause and get no events":function(){
expect.expect('change',function(err,data){
if(!err) throw new Error('expected to get an error. events should not have fired!');
done();
},1000);//same wait as the other change listener
watcher.pause();
var buf = new Buffer('paused party');
fs.write(fd2,buf,0,buf.length,null,function(err,bytesWritten){
assert.ifError(err,'should have written bytes to the test file');
});
},
"resume and get events":function(){
expect.expect('change',function(err,data){
console.log("in resume and get results handler!");
if(err) throw err;
done();
},1000);//expect it quickly
watcher.resume();
}
},
lastStart = Date.now(),
done = function(){
var keys = Object.keys(q),
testKey = keys.shift(),
test = q[testKey];
if(test) {
delete q[testKey];
var elapsed = Date.now()-lastStart;
console.log("\telapsed: ",elapsed,'ms');
console.log('starting test : ',testKey);
lastStart = Date.now();
test();
} else complete();
},
complete = function(){
watcher.close();
cleanup();
t.end();
};
done();
});
var util = require('util'),
events = require('events'),
fs = require('fs');
//
//watching is accomplished at the file descriptor level.
//watching a "filename" means you get events on deleted files where applications are still writing to open descriptors they are holding.
//a big thing to note is that if a file is moved and another process starts to write to it these change events will be buffered
//
module.exports = function(filename,options,listener){
return new Watcher(filename,options,listener);
};
module.exports.watch = module.exports;
function Watcher(filename,options,listener){
events.EventEmitter.call(this);
var self = this,
args = this._normalizeArguments(arguments);
// treat missing listener exactly like node does in fs.watchFile
if(typeof args.listener != 'function') {
throw new Error('watch requires a listener function');
}
this.options = args.options||{};
this.file = args.file;
this.fds = {};
//
//if im watching a file descriptor thats deleted and inactive.
//
this.options.timeout = this.options.timeout || 60*60*1000;
//
//this is the interval that the watcher uses to enforce options.timeout.
//
this.options.timeoutInterval = this.options.timeoutInterval || 60*5*1000;
if(this.options.timeout < this.options.timeoutInterval) this.options.timeoutInterval = this.options.timeout;
this.on('change',args.listener);
fs.stat(this.file,function(err,stat) {
if(err) {
if(err.code != 'ENOENT') {
//next tick so we have a chance to bind error
process.nextTick(function(){
//for all other errors we cannot continue.
self.emit('error',err);
});
return;
} else {
self.emit('noent');
}
} else {
self._observeInode(stat);
}
self._watchFile();
self._startTimeout();
});
}
util.inherits(Watcher,events.EventEmitter);
//
// define class members
//
var WatcherMethods = {
//public api methods
close:function(){
for(var inode in this.fds) {
if(this.fds.hasOwnProperty(inode)) {
this._closeFd(inode);
}
}
fs.unwatchFile(this.file);
clearTimeout(this._timeoutInterval);
this.emit('close');
},
//
// ## pause and resume.
//
// - paused, changed and last state is kept for each file descriptor
// - stop file descriptors from timing out.
// - all events except error
// - unlink, open
// - change
// - resumed, the state events are isued then change
// - opens and unlinks are issued for each file descriptor
// - change event for change if any
//
paused:false,
resume:function() {
var self = this;
this.emit = this._emit;
this.paused = false;
Object.keys(this._pausedEvents||{}).forEach(function(key,k){
var events = self._pausedEvents[key];
if(!events) return;
if(events.open) self.emit.apply(self,events.open);
if(events.change) self.emit.apply(self,events.change);
if(events.unlink) self.emit.apply(self,events.unlink);
});
delete this._emit;
delete this._pausedEvents;
},
pause:function(){
var self = this;
if(this.paused) return;
this._pausedEvents = {};
this.paused = true;
//jack emit
this._emit = this.emit;
this.emit = function(ev,cur,prev) {
if(ev == 'error') return this._emit.apply(this,arguments);
if(ev == 'open' || ev == 'unlink') {
if(prev.stat) prev = prev.stat;
cur = prev;
}
if(!cur) return;
if(!self._pausedEvents[cur.ino]) {
self._pausedEvents[cur.ino]= {};
self._pausedEvents[cur.ino]._first = cur;
}
self._pausedEvents[cur.ino][ev] = arguments;
if(ev == 'change') {
//set previous stat to be the first stat after pause
self._pausedEvents[cur.ino][ev][2] = self._pausedEvents[cur.ino]._first;
}
};
},
//------ protected methods -------
//
//this is the path to the last stat i got from the filename im trying to watch.
//used to differentiate "inactive" descriptors from the one currently residing at that file location.
//
_fileStat:null,
//
// the interval used to cleanup inactive file descriptors that no longer refrenced by this.file
//
_timeoutInterval:null,
//
// watchFile watches the stat at path
// i am using watchFile to determine if the file i was originally told to watch is replaced etc.
//
_watchFile:function(){
var self = this,lastInode = null;
//NOTE for windows i could poll with fs.stat at options.interval
fs.watchFile(this.file,this.options,function(cur,prev){
if(!cur.ino && prev.ino) cur.ino = prev.ino;
//i need to know what fd is the active fd inter the file path
self._fileStat = cur;
if(!cur.ino && pre.ino || cur.nlink === 0) {
//no hardlinks left to this file.
//or no inode. its unlinked for sure.
self.emit('unlink',self.fds[cur.ino].fd,self.fds[cur.ino].getData());
} else if(!self.fds[cur.ino]){
self._observeInode(cur);
} else if(cur.size === prev.size){
//sometimes the watch event fires after an unlink with nlink still equal to 1
//i stat to first see if its not there
//by the time stat is done checking the file could have been replaced by a new file
//so i validate the inode also.
fs.stat(self.file,function(err,stat){
var deleted = false;
if(err && err.code === 'ENOENT'){
deleted = true;
} else if(!err) {
if(stat.ino !== cur.ino || cur.nlink === 0) {
deleted = true;
}
}
if(deleted) {
self.emit('unlink',self.fds[cur.ino].fd,self.fds[cur.ino].getData());
}
});
}
});
},
//
// manage open file descriptors to deleted/moved log files.
//
_startTimeout:function(){
//timeouts are not subject to stacking and stuff with process overload
var self = this;
self._timeoutInterval = setTimeout(function fn(){
self._timeoutInterval = setTimeout(fn,self.options.timeoutInterval);
if(self.paused) {
return;
}
if(!self._fileStat) {
return;
}
for(var inode in self.fds){
if(self.fds.hasOwnProperty(inode) && self.fds[inode]) {
// if im not the current file descriptor refrenced by path
if(inode+'' !== self._fileStat.ino+''){
var fdState = self.fds[inode],
mtime = Date.parse(fdState.stat.mtime);
// i want to wait at least timeout from the time i start watching the fd
if(mtime < fdState.created){
mtime = fdState.created;
}
var sinceChange = Date.now()-mtime;
if(sinceChange > self.options.timeoutInterval){
self._closeFd(inode);
self.emit('timeout',fdState.fd,fdState.getData());
}
}
}
}
},self.options.timeoutInterval);
},
//
// start file descriptor based watcher
//
_observeInode:function(stat,cb) {
var self = this;
//prevent assigning multiple watch watchers
if(self.fds[stat.ino]) {
return;
}
var fdState = self.fds[stat.ino] = new WatcherFd(stat),
inode = stat.ino;
fs.open(this.file,'r',function(err,fd){
if(err || !self.fds[inode]){
//file must not exist now. it was deleted pretty quickly.. =/
// or i was ended while i was setting up
self._closeFd(stat.ino);
} else {
fdState.fd = fd;
self.emit('open',fdState.fd,fdState.getData());
fdState.watcher = fs.watch(self.file,function(event,filename) {
fdState.created = Date.now();//time of last event
fs.fstat(fd,function(err,stat){
// ended. while i was setting up
if(!self.fds[inode]) return;
if(!self.fds[stat.ino]){
//between the first change event. and getting the fd. the file was replaced by another
//recreate fdState
fdState = new WatcherFd(stat);
fdState.fd = fd;
fdState.created = Date.now();
//the watcher is already aware of this fd. no need to recreate it.
fdState.watcher = self.fds[inode].watcher;
//issue timeout event for dead before new inode
this.emit('timeout',null, self.fds[inode].getData());
//clean unknown inode
delete self.fds[inode];
self.fds[stat.ino] = fdState;
}
var prev = fdState.stat;
fdState.stat = stat;
self._observeChange(stat,prev);
});
});
// observe change that told us about the fd
process.nextTick(function() {
self._observeChange(stat,stat);
});
}
});
},
//
// clean up
//
_closeFd:function(inode){
if(this.fds[inode]) {
this.fds[inode].close();
delete this.fds[inode];
}
},
//
// change dispatcher - sends WatcherFd data with each change event.
//
_observeChange:function(stat,prev) {
//should always be set.
if(!this.fds[stat.ino]) return;
this.emit('change',stat,prev,this.fds[stat.ino].getData());
},
//
// format arguments for easy reading / access
//
_normalizeArguments:function(args){
if(typeof args[1] == 'function'){
args[2] = args[1];
args[1] = {};
}
return {file:args[0],options:args[1],listener:args[2]};
}
};
extend(Watcher.prototype,WatcherMethods);
function WatcherFd(stat,timeout){
this.stat = stat;
this.timeout = timeout || this.timeout;
this.created = Date.now();
}
WatcherFd.prototype = {
fd:null,
stat:null,
state:null,
watcher:null,
created:null,
getData:function() {
return {fd:this.fd,stat:this.stat};
},
close:function() {
if(this.fd) fs.close(this.fd);
if(this.watcher) this.watcher.close();
clearTimeout(this.timer);
}
};
//---
function extend(o,o2){
for( var i in o2 ) {
if(o2.hasOwnProperty(i)) o[i] = o2[i];
}
}
{
"name": "tailfd",
"description": "Tail a file. This will continue to work even if a file is unlinked rotated or truncated. It is also ok if the path doesnt exist before watching it",
"version": "0.1.6",
"author": {
"name": "Ryan Day",
"email": "soldair@gmail.com"
},
"keywords": [
"watch",
"tail",
"descriptors",
"log",
"fs"
],
"main": "./tail.js",
"homepage": "http://github.com/soldair/node-tailfd",
"repository": {
"type": "git",
"url": "git://github.com/soldair/node-tailfd.git"
},
"scripts": {
"test": "tap test/*.js"
},
"devDependencies": {
"tap": "*",
"jshint": "0.5.x"
},
"engines": {
"node": ">=0.6.0"
},
"dependencies": {
"watchfd": "~0.0.12"
},
"license": "MIT/X11",
"readme": "[![Build Status](https://secure.travis-ci.org/soldair/node-tailfd.png)](http://travis-ci.org/soldair/node-tailfd)\n \n## tailfd\n\nTails a file. it should work great. it will continue to work even if a file is unlinked rotated or truncated. It is also ok if the path doesnt exist before watching it\n\n## Example\n\n```js\n\nvar tail = require('tailfd').tail,\nwatcher = tail('/some.log',function(line,tailInfo){\n //default line listener. optional.\n console.log('line of data> ',line);\n});\n\n```\n\nwhen you are done\n\n```js\nwatcher.close();\n\n```\n\n## install\n\n\tnpm install tailfd\n\n### argument structure\n\ntailfd.tail(filename, [options], listener)\n\n- filename\n - this should be a regular file or non existent. the behavior is undefined in the case of a directory.\n\n- options. supported custom options are\n\n\t```js\n\t{\n\n\t\"start\":undefined, //defaults to the first reported stat.size\n\t//optional. a hard start position in the file for tail to start emitting data events.\n\n\t\"offset\":0,\n\t//optional. offset is negtively applied to the start position\n\n\t\"delimiter\":\"\\n\",\n\t//optional. defaults to newline but can be anything\n\n\t\"maxBytesPerRead\":10240,\n\t// optional. this is how much data will be read off of a file descriptor in one call to fs.read. defaults to 10k.\n\t// the maximum data buffer size for each tail is \n\t// \tmaxBufferPerRead + the last incomplete line from the previous read.length\n\n\t\"readAttempts\":3,\n\t//optional. if fs.read cannot read the offset from a file it will try attempts times before is gives up with a range-unreadable event\n\t// defaults to 3 attempts\n\n\t\"maxLineLength\":102400\n\t// optional. defaults to 1 mb\n\t// if the line excedes this length it's data will be emitted as a line-part event\n // this is a failsafe so that a single line cant eat all of the memory.\n\t// all gathered line-parts are complete with the value of the next line event for that file descriptor.\n\n\t}\n\n\t// the options object is passed to watchfd as well. With watchfd you may configure\n\n\t{\n\n\t\"timeout\": 60*60*1000, //defaults to one hour\n\t//how long an inactive file descriptor can remain inactive before being cleared\n\n\t\"timeoutInterval\":60*5*1000 //every five minutes\n\t// how often to check for inactive file descriptors\n\n\t}\n\n\t//the options object is also passed directly to fs.watch and fs.watchFile so you may configure\n\n\t{\n\t\"persistent\":true, //defaults to true\n\t//persistent indicates whether the process should continue to run as long as files are being watched\n\n\t\"interval\":0, //defaults 0\n\t//interval indicates how often the target should be polled, in milliseconds. (On Linux systems with inotify, interval is ignored.) \n\t}\n\n\t```\n\n- callback\n - this is bound to the line event of the watcher. its optional.\n\n\t```js\n\tcallback(line,tailInfo)\n\n\t```\n\n cur and prev are instances of fs.Stats\n\n- @returns\n TailFD Watcher\n\nWatcher.pause\n- pause data and line events on all underlying descriptors\n\nWatcher.resume\n- get it goin again! =)\n\n\n### events\n\n- line\n\t- String line, Object tailInfo\n- data\n\t- Buffer buffer, Object tailInfo\n- line-part\n\t- String linePart, Object tailInfo\n\t\t- if line length excedes the options.maxLineLength the linePart is emitted. \n\t\t This is to prevent cases where unexpected values in logs can eat all of the memory.\n- range-unreadable\n\t- Array errors, Number fromPos,Number toPos,Object tailInfo\n\t\t- After configured readAttempts the offset could still not be read. This range will be skipped\n\n### events inherited from watchfd\n\n- change\n\t- fs.Stats cur, fs.Stats prev\n- open\n\t- fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}\n- unlink\n\t- fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}\n- timeout\n\t- fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}\n\n### tailInfo properties\n\n- stat\n\t- instanceof fs.Stats\n- pos\n\t- current seek position in the file\n- fd\n\t- file descriptor being tailed\n- buf\n\t- string containing the last data fragment from delimiter parsing\n\n\n#### watch file and watch may behave differently on different systems here is the doc for it.\n\n- http://nodejs.org/api/fs.html#fs_fs_writefile_filename_data_encoding_callback\n- http://nodejs.org/api/fs.html#fs_fs_watch_filename_options_listener\n",
"readmeFilename": "readme.md",
"bugs": {
"url": "https://github.com/soldair/node-tailfd/issues"
},
"_id": "tailfd@0.1.6",
"_from": "tailfd@*"
}
[![Build Status](https://secure.travis-ci.org/soldair/node-tailfd.png)](http://travis-ci.org/soldair/node-tailfd)
## tailfd
Tails a file. it should work great. it will continue to work even if a file is unlinked rotated or truncated. It is also ok if the path doesnt exist before watching it
## Example
```js
var tail = require('tailfd').tail,
watcher = tail('/some.log',function(line,tailInfo){
//default line listener. optional.
console.log('line of data> ',line);
});
```
when you are done
```js
watcher.close();
```
## install
npm install tailfd
### argument structure
tailfd.tail(filename, [options], listener)
- filename
- this should be a regular file or non existent. the behavior is undefined in the case of a directory.
- options. supported custom options are
```js
{
"start":undefined, //defaults to the first reported stat.size
//optional. a hard start position in the file for tail to start emitting data events.
"offset":0,
//optional. offset is negtively applied to the start position
"delimiter":"\n",
//optional. defaults to newline but can be anything
"maxBytesPerRead":10240,
// optional. this is how much data will be read off of a file descriptor in one call to fs.read. defaults to 10k.
// the maximum data buffer size for each tail is
// maxBufferPerRead + the last incomplete line from the previous read.length
"readAttempts":3,
//optional. if fs.read cannot read the offset from a file it will try attempts times before is gives up with a range-unreadable event
// defaults to 3 attempts
"maxLineLength":102400
// optional. defaults to 1 mb
// if the line excedes this length it's data will be emitted as a line-part event
// this is a failsafe so that a single line cant eat all of the memory.
// all gathered line-parts are complete with the value of the next line event for that file descriptor.
}
// the options object is passed to watchfd as well. With watchfd you may configure
{
"timeout": 60*60*1000, //defaults to one hour
//how long an inactive file descriptor can remain inactive before being cleared
"timeoutInterval":60*5*1000 //every five minutes
// how often to check for inactive file descriptors
}
//the options object is also passed directly to fs.watch and fs.watchFile so you may configure
{
"persistent":true, //defaults to true
//persistent indicates whether the process should continue to run as long as files are being watched
"interval":0, //defaults 0
//interval indicates how often the target should be polled, in milliseconds. (On Linux systems with inotify, interval is ignored.)
}
```
- callback
- this is bound to the line event of the watcher. its optional.
```js
callback(line,tailInfo)
```
cur and prev are instances of fs.Stats
- @returns
TailFD Watcher
Watcher.pause
- pause data and line events on all underlying descriptors
Watcher.resume
- get it goin again! =)
### events
- line
- String line, Object tailInfo
- data
- Buffer buffer, Object tailInfo
- line-part
- String linePart, Object tailInfo
- if line length excedes the options.maxLineLength the linePart is emitted.
This is to prevent cases where unexpected values in logs can eat all of the memory.
- range-unreadable
- Array errors, Number fromPos,Number toPos,Object tailInfo
- After configured readAttempts the offset could still not be read. This range will be skipped
### events inherited from watchfd
- change
- fs.Stats cur, fs.Stats prev
- open
- fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}
- unlink
- fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}
- timeout
- fs.Stats cur,{fd:file descriptor,stat:fs.Stats cur}
### tailInfo properties
- stat
- instanceof fs.Stats
- pos
- current seek position in the file
- fd
- file descriptor being tailed
- buf
- string containing the last data fragment from delimiter parsing
#### watch file and watch may behave differently on different systems here is the doc for it.
- http://nodejs.org/api/fs.html#fs_fs_writefile_filename_data_encoding_callback
- http://nodejs.org/api/fs.html#fs_fs_watch_filename_options_listener
var watchfd = require('watchfd'),
EventEmitter = require('events').EventEmitter,
util = require('util'),
fs = require('fs');
module.exports = function(log,options,cb){
var tails = {},
q = [],
watch,
args = Array.prototype.slice.call(arguments);
// load args from args array
log = args.shift();
options = args.shift() || {};
cb = args.pop();
//support optional callback
if(typeof options == 'function') {
if(!cb) cb = options;
options = {};
}
//start watching
var tailer = new TailFd(log,options);
if(cb) {
tailer.on('line',function(line,tailInfo){
cb(line,tailInfo);
});
}
return tailer;
};
module.exports.tail = module.exports;
function TailFd(log,options){
this.startWatching(log,options);
}
util.inherits(TailFd,EventEmitter);
_ext(TailFd.prototype,{
q:[],
tails:{},
watch:null,
startWatching:function(log,options){
var z = this,
first = 1,
noent = 0,
watch = this.watch = watchfd.watch(log,options,function(stat,prev,data){
//
// TODO
// test refactor from stat.ino to +data.fd
//
if(!z.tails[stat.ino]) {
z.tails[stat.ino] = tailDescriptor(data);
z.tails[stat.ino].pos = stat.size;
// if this is the first time i have picked up any file attached to this fd
if (first) {
first = 0;
//apply hard start
if(typeof options.start != 'undefined') {
z.tails[stat.ino].pos = options.start>stat.size?stat.size:options.start;
} else if(noent) {
z.tails[stat.ino].pos = 0;
}
//apply offset
if(options.offset) {
z.tails[stat.ino].pos -= options.offset;
}
//dont let offset take read to invalid range
if(z.tails[stat.ino].pos > stat.size) {
z.tails[stat.ino].pos = stat.size;
} else if (z.tails[stat.ino].pos < 0) {
z.tails[stat.ino].pos = 0;
}
} else {
//new file descriptor at this file path but not the first one.
//an unlink+create or rename opperation happened.
//
// TODO
// if A file was moved to this path i should still start from 0.
// i better have flood control for this case because fs.read will seriously read all of that data from the file
//
z.tails[stat.ino].pos = 0;
}
z.tails[stat.ino].linePos = z.tails[stat.ino].pos;
z.tails[stat.ino].fd = data.fd;
}
z.tails[stat.ino].stat = stat;
z.tails[stat.ino].changed = 1;
z.readChangedFile(z.tails[stat.ino]);
});
watch.on('noent',function(){
// if the file didnt exist when the watch started and its the first time start should be 0.
noent = 1;
});
watch.on('unlink',function(stat,prev,data){
if(z.tails[stat.ino]) {
z.tails[stat.ino].stat = stat;
z.tails[stat.ino].changed = 1;
}
z.emit.apply(z,arguments);
});
watch.on('timeout',function(stat,data){
if(z.tails[stat.ino]) {
delete z.tails[stat.ino];
//cleanup queue will be in queue process.
}
z.emit.apply(z,arguments);
});
this.on('data',function(buffer,tailInfo){
tailInfo.buf = tailInfo.buf.toString()+buffer.toString();
var lines = tailInfo.buf.split(options.delimiter||"\n");
var b;
tailInfo.buf = lines.pop();
tailInfo.linePos = tailInfo.pos-tailInfo.buf.length;
for(var i=0,j=lines.length;i<j;++i) {
// binary length. =/ not efficient this way but i dont want to emit lines as buffers right now
b = new Buffer(lines[i]+(options.delimiter||"\n"));
if(!tailInfo.linePos) tailInfo.linePos = 0;
if(tailInfo.linePos > tailInfo.pos) {
console.log('i have a bug! tailinfo line position in source file is greater than the position in the source file!');
console.log('linePos:',tailInfo.linePos-b.length,'pos:',tailInfo.pos);
tailInfo.linePos = tailInfo.pos;
}
// copy tailinfo with line position so events can be handles async and preserve state
z.emit('line',lines[i],_ext({},tailInfo));
}
// in order to make last line length reflect the binary length buf has to be packed into a Buffer.
tailInfo.buf = new Buffer(tailInfo.buf);
if(tailInfo.buf.length >= z.maxLineLength){
z.emit('line-part',tailInfo.buf,tailInfo);
tailInfo.linePos += tailInfo.buf.length;
tailInfo.buf = '';
}
if(tailInfo.linePos < (tailInfo.pos-tailInfo.buf.length)) {
console.log('i have a bug! tailinfo line position in source file is less than the ammount of data i should have sent!');
}
});
//10k max per read
this.maxBytesPerRead = options.maxBytesPerRead || 10240;
// 1mb max per line
this.maxLineLength = options.maxLineLength || 102400;
// 3 read attempts per range
this.readAttempts = options.readAttempts||3;
},
//this emits the data events on the watcher emitter for all fds
readChangedFile:function(tailInfo){
var z = this;
if(tailInfo) {
z.q.push(tailInfo);
}
var ti;
//for all changed fds fire readStream
for(var i = 0;i < z.q.length;++i) {
ti = z.q[i];
if(ti.reading) {
//still reading file
continue;
}
if (!z.tails[ti.stat.ino]) {
//remove timed out file tail from q
z.q.splice(i,1);
--i;
continue;
}
//truncated
if(ti.stat.size < ti.pos) {
ti.pos = 0;
}
var len = ti.stat.size-ti.pos;
//remove from queue because im doing this work.
z.q.splice(i,1);
--i;
z.readTail(ti,len);
}
},
readTail:function(tailInfo,len) {
var z = this;
if(len){
tailInfo.reading = 1;
//retry attempts per range.
var attempts = [];
var readJob = function(len){
fs.read(tailInfo.fd, new Buffer(len), 0, len, tailInfo.pos, function(err,bytesRead,buffer) {
if(err) {
attempts.push(err);
//
// after configured number of attempts emit range-unreadable and move to next
//
if(attempts.length >= (z.readAttempts || 3)) {
z.emit('range-unreadable',attempts,tailInfo.pos,len,tailInfo);
// skip range
tailInfo.pos += len;
attempts = [];
}
done();
return;
}
tailInfo.pos += bytesRead;
attempts = [];
//
// TODO
// provide a stream event for each distinct file descriptor
// i cant stream multiple file descriptor's data through the same steam object because mixing the data makes it not make sense.
// this cannot emit data events here? because to be a stream the above case has to make sense.
//
z.emit('data',buffer,tailInfo);
done();
});
},
done = function(){
//
//if paused i should not continue to buffer data events.
//
if(!len || z.watch.paused){
tailInfo.reading = 0;
if(z.watch.paused && len){
// if i am paused mid read requeue remaining.
z.q.push(tailInfo);
//console.log('requeued remaining read because im paused');
}
return;
}
var toRead = z.maxBytesPerRead;
if(len < toRead) {
toRead = len;
}
len -= toRead;
readJob(toRead);
}
;
done();
}
},
//
// return the total line buffer length from all active tails
//
lineBufferLength:function(){
var z = this;
var l = 0;
Object.keys(z.tails).forEach(function(k) {
l += (z.tails[k].buf||'').length;
});
return l;
},
//
// streamy methods
//
pause:function() {
this.watch.pause();
},
resume:function(){
this.watch.resume();
// i may have been stopped mid read so changes may still need to be read.
this.readChangedFile();
},
destroy:function(){
this.close();
},
destroySoon:function(){
this.close();
},
close:function(){
this.readable = false;
this.emit('close');
this.watch.close();
},
writable:false,
readable:true
});
function tailDescriptor(data){
var o = {
stat:null,
pos:0,
linePos:0,
fd:data.fd,
buf:''
};
return o;
}
function _ext(o,o2){
for(var i in o2) if(o2.hasOwnProperty(i)) o[i] = o2[i];
return o;
}
#!/bin/sh
./node_modules/jshint/bin/hint ./*
hint=$?
if [ $hint != 0 ]; then
echo "< script runner stopped jshint failed >";
exit $hint
else
echo "< jshint passed >";
fi
./node_modules/tap/bin/tap.js ./test/*.js
unit=$?
if [ $unit != 0 ]; then
echo "< script runner stopped unit tests failed >";
exit $unit
else
echo "< unit tests passed >";
fi
var test = require('tap').test,
fs = require('fs'),
tail = require('../tail.js').tail;
function _log(msg){
process.stderr.write(new Buffer(JSON.stringify(msg)+"\n"));
}
var cleanup = [];
test('should be able to tail something',function(t){
var log = './'+Date.now()+'.log';
cleanup.push(log);
var watcher = tail(log,{start:0});
t.ok(typeof watcher == 'object','watcher should be some kind of object');
t.ok(watcher.on,'watcher should have an on method');
var testedChange = 0;
watcher.on('change',function(){
if(!testedChange) {
t.ok(++testedChange,'change event should have been fired');
}
});
var writeDone,
buf = '',
prevpos = 0,
prevdata = '',
timer = setTimeout(function(){
t.fail('hard timeout of 20 seconds reached. something is wrong');
t.end();
watcher.close();
},20000),
len = -1,
checkBuf = function(){
if(len == buf.length) {
watcher.close();
clearTimeout(timer);
t.equals(len,buf.length,'buffer should be expected length');
t.end();
}
};
watcher.on('data',function(buffer,tailInfo){
buf += buffer.toString();
prevdata = buffer.toString();
t.ok(buffer.length,'buffer should have length');
t.notEqual(prevpos,tailInfo.pos,'for another change should not have the same pos in tailed file as before');
t.equal(prevpos+buffer.length,tailInfo.pos,'prev pos + buffer read should be new pos');
prevpos = tailInfo.pos;
if(writeDone){
checkBuf();
}
});
writeLog(log,function(err,l){
len = l;
writeDone = 1;
checkBuf();
});
});
test("should be able to write half lines",function(t){
var log = './'+Date.now()+'-'+Math.random()+'.log';
cleanup.push(log);
var watcher = tail(log),
buf = '',
c = 0,
len = 4,
checkBuf = function(){
watcher.close();
t.equals(len,buf.length,'buffer should be expected length when writing incomplete lines.');
t.equals(buf,'HIHO',' should have written HIHO');
t.end();
}
;
watcher.on('line',function(data){
buf += data.toString();
checkBuf();
});
watcher.on('data',function(){
if(!c) {
ws.write('HO\n');
ws.end();
}
c++;
});
var ws = fs.createWriteStream(log);
ws.write('HI');
});
test('should be able to pause/resume tail',function(t){
var log = './'+Date.now()+'-'+Math.random()+'.log';
cleanup.push(log);
var watcher = tail(log,{start:0}),
buf = '',
timer = setTimeout(function(){
t.fail('hard timeout of 20 seconds reached. something is wrong');
t.end();
watcher.close();
},20000),
c = 0,
len = -1,
checkBuf = function(){
if(len == buf.length) {
clearTimeout(timer);
watcher.close();
t.equals(len,buf.length,'buffer should be expected length');
t.end();
}
}
;
watcher.pause();
setTimeout(function(){
t.equals(c,0,'should not have emitted any data events while paused');
watcher.resume();
},500);
watcher.on('data',function(data){
c++;
buf += data.toString();
checkBuf();
});
writeLog(log,function(err,l){
writeDone = 1;
len = l;
process.nextTick(function(){
checkBuf();
});
});
watcher.on('range-unreadable',function(){
console.log(arguments);
clearTimeout(timer);
t.fail('should not get range unreadable error');
t.end();
watcher.close();
});
});
process.on('exit',function(){
var fs = require('fs');
while(cleanup.length) {
try {
fs.unlinkSync(cleanup.pop());
} catch (e){
console.log('cleanup error');
console.error(e);
}
}
});
function writeLog(log,cb){
var ws = fs.createWriteStream(log,{flags:'w+'}),
loop = 10,
len = 0,
inter;
ws.on('open',function(){
inter = setInterval(function(){
if(!(--loop)) {
clearInterval(inter);
cb(null,len);
return;
}
try{
//_log('writing');
var b = new Buffer(Date.now()+"\n");
len += b.length;
ws.write(b);
} catch (e) {
_log(e+' >> '+e.stack);
}
},10);
});
return inter;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment