1 var _ = require('underscore'), 2 util = require('util'), 3 events = require('events'), 4 mkdirp = require('mkdirp'), 5 fs = require('fs'), 6 path = require('path'); 7 8 var CLOSED = 0, DRAINING = 1, OPENING = 2, OPEN = 3, OVERFLOW = 4; 9 10 /** 11 * Creates a new FileBuffer. 12 * @class 13 * The default file buffer. 14 * This is a file buffer that also has a memory buffer to which it writes when the file stream is 15 * being read out or if a write to the file failed. 16 * 17 * This class has no maximum size or any limits attached to it yet. 18 * So it is possible for this class to exhaust the memory and/or disk space of the machine using this buffer. 19 * 20 * @param {String} path The path to the file to use as buffer. 21 * @param {Object} extra The extra configuration options. 22 * @author <a href="mailto:ivan@backchat.io">Ivan Porto Carrero</a> 23 */ 24 var FileBuffer = function(path, extra) { 25 26 events.EventEmitter.call(this); 27 this.path = path; 28 var ex = extra||{}; 29 this._memoryBuffer = ex.memoryBuffer || []; 30 this._state = ex.initialState || CLOSED; 31 } 32 33 util.inherits(FileBuffer, events.EventEmitter); 34 35 _.extend(FileBuffer.prototype, /** @lends FileBuffer.prototype */ { 36 /** 37 * Open this file buffer if not already opened. 38 * This method is idempotent. 39 */ 40 open: function() { 41 if (this._state < OPENING) { 42 this._openFile(true); 43 } 44 return this._state < DRAINING; 45 }, 46 /** 47 * Closes the buffer and releases any external resources contained by this buffer. 48 * This method is idempotent. 49 */ 50 close: function() { 51 if (this._state < DRAINING) { 52 var self = this; 53 this._stream.once("close", function() { 54 self.emit("close"); 55 self._state = CLOSED; 56 }); 57 this._stream.end(); 58 } else { 59 this.emit("close"); 60 this._state = CLOSED; 61 } 62 }, 63 /** 64 * Write a message to the buffer. 65 * 66 * @param {String|Object} outMessage The outbound message 67 * @param {Function} [callback] The callback 68 */ 69 write: function (outMessage, callback) { 70 var self = this; 71 try { 72 switch(this._state) { 73 case OPEN: 74 this._flushMemoryBuffer(); 75 process.nextTick(function() { 76 try { 77 self._stream.write(outMessage + "\n"); 78 if (callback) callback(null, true); 79 } catch (e) { 80 if (callback) callback(e, null); 81 } 82 }); 83 break; 84 case CLOSED: 85 this.open(); 86 this.once("open", function() { 87 self.write(outMessage, callback); 88 }); 89 break; 90 default: 91 this._memoryBuffer.push(outMessage); 92 if (callback) callback(null, true); 93 } 94 } catch (e) { 95 if (callback) callback(e, null); 96 } 97 }, 98 /** 99 * Drain the buffer raising data events to process each message in the buffer. 100 * Truncates the buffer after draining. 101 */ 102 drain: function() { 103 var self = this; 104 var jnl = this._stream; 105 jnl.once('close', function() { 106 var rdJnl = fs.createReadStream(self.path, {encoding: 'utf8'}); 107 rdJnl.on('data', function(data) { 108 data.split("\n").forEach(function(line) { 109 var cleaned = (line||"").replace(/(\n|\r)+$/, '').trim(); 110 if (cleaned.length > 0) { 111 self.emit('data', cleaned); 112 } 113 }) 114 }); 115 rdJnl.on('close', function() { 116 self._openFile(false); 117 }); 118 }); 119 jnl.end(); 120 }, 121 /** 122 * @private 123 * Opens the file, creating the path if it doesn't exist yet 124 */ 125 _openFile: function(append) { 126 this._state = OPENING; 127 var bufferDir = path.dirname(this.path); 128 var self = this; 129 mkdirp(bufferDir, function(err, data) { 130 if (err) { 131 self.emit("error", new Error("Can't create the path for the file buffer")); 132 } 133 try { 134 self._stream = fs.createWriteStream(self.path, { flags: append ? 'a' : 'w', encoding: 'utf8'}); 135 self._stream.once('open', function(fd) { 136 self._state = OPEN; 137 if (append) self.emit('open'); 138 self._flushMemoryBuffer(); 139 }); 140 } catch (e) { 141 self.emit("error", e); 142 } 143 }); 144 }, 145 /** 146 * @private 147 * flushes the memory buffer, to file. 148 */ 149 _flushMemoryBuffer: function() { 150 var self = this; 151 if (self._memoryBuffer.length > 0) { 152 self._stream.write(self._memoryBuffer.join("\n") + "\n"); 153 self._memoryBuffer = []; 154 } 155 } 156 }); 157 exports = exports.FileBuffer = FileBuffer;