1 var _ = require('underscore'),
  2     util = require('util'),
  3     events = require('events'),
  4     mkdirp = require('mkdirp'),
  5     fs = require('fs'),
  6     path = require('path');
  8 var CLOSED = 0, DRAINING = 1, OPENING = 2, OPEN = 3, OVERFLOW = 4;
 10 /**
 11  * Creates a new FileBuffer.
 12  * @class
 13  * The default file buffer.
 14  * This is a file buffer that also has a memory buffer to which it writes when the file stream is
 15  * being read out or if a write to the file failed.
 16  *
 17  * This class has no maximum size or any limits attached to it yet.
 18  * So it is possible for this class to exhaust the memory and/or disk space of the machine using this buffer.
 19  *
 20  * @param {String} path The path to the file to use as buffer.
 21  * @param {Object} extra The extra configuration options.
 22  * @author <a href="mailto:ivan@backchat.io">Ivan Porto Carrero</a>
 23  */
 24 var FileBuffer = function(path, extra) {
 26   events.EventEmitter.call(this);
 27   this.path = path;
 28   var ex = extra||{};
 29   this._memoryBuffer = ex.memoryBuffer || [];
 30   this._state = ex.initialState || CLOSED;
 31 }
 33 util.inherits(FileBuffer, events.EventEmitter);
 35 _.extend(FileBuffer.prototype, /** @lends FileBuffer.prototype */ {
 36   /**
 37    * Open this file buffer if not already opened.
 38    * This method is idempotent.
 39    */
 40   open: function() {
 41     if (this._state < OPENING) {
 42       this._openFile(true);
 43     }
 44     return this._state < DRAINING;
 45   },
 46   /**
 47    * Closes the buffer and releases any external resources contained by this buffer.
 48    * This method is idempotent.
 49    */
 50   close: function() {
 51     if (this._state < DRAINING) {
 52       var self = this;
 53       this._stream.once("close", function() { 
 54         self.emit("close");
 55         self._state = CLOSED;
 56       });
 57       this._stream.end();
 58     } else {
 59       this.emit("close");
 60       this._state = CLOSED;
 61     }
 62   },
 63   /**
 64    * Write a message to the buffer.
 65    *
 66    * @param {String|Object} outMessage The outbound message
 67    * @param {Function} [callback] The callback 
 68    */
 69   write: function (outMessage, callback) {
 70     var self = this;
 71     try {
 72       switch(this._state) {
 73         case OPEN:
 74           this._flushMemoryBuffer();
 75           process.nextTick(function() {
 76             try {
 77               self._stream.write(outMessage + "\n");
 78               if (callback) callback(null, true);
 79             } catch (e) {
 80               if (callback) callback(e, null);
 81             }
 82           });
 83           break;
 84         case CLOSED:
 85           this.open();
 86           this.once("open", function() {
 87             self.write(outMessage, callback);
 88           });
 89           break;
 90         default:
 91           this._memoryBuffer.push(outMessage);
 92           if (callback) callback(null, true);
 93       }
 94     } catch (e) {
 95       if (callback) callback(e, null);
 96     }
 97   },
 98   /**
 99    * Drain the buffer raising data events to process each message in the buffer.
100    * Truncates the buffer after draining.
101    */
102   drain: function() {
103     var self = this;
104     var jnl = this._stream;
105     jnl.once('close', function() {
106       var rdJnl = fs.createReadStream(self.path, {encoding: 'utf8'});
107       rdJnl.on('data', function(data) {
108         data.split("\n").forEach(function(line) {
109           var cleaned = (line||"").replace(/(\n|\r)+$/, '').trim();
110           if (cleaned.length > 0) {
111             self.emit('data', cleaned);
112           }
113         })
114       });
115       rdJnl.on('close', function() {
116         self._openFile(false);
117       });
118     });
119     jnl.end();
120   },
121   /**
122    * @private
123    * Opens the file, creating the path if it doesn't exist yet
124    */
125   _openFile: function(append) {
126     this._state = OPENING;
127     var bufferDir = path.dirname(this.path);
128     var self = this;
129     mkdirp(bufferDir, function(err, data) {
130       if (err) {
131         self.emit("error", new Error("Can't create the path for the file buffer"));
132       }
133       try {
134         self._stream = fs.createWriteStream(self.path, { flags: append ? 'a' : 'w', encoding: 'utf8'});
135         self._stream.once('open', function(fd) { 
136           self._state = OPEN;
137           if (append) self.emit('open');
138           self._flushMemoryBuffer();
139         });
140       } catch (e) {
141         self.emit("error", e);
142       }
143     });
144   },
145   /**
146    * @private
147    * flushes the memory buffer, to file.
148    */
149   _flushMemoryBuffer: function() {
150     var self = this;
151     if (self._memoryBuffer.length > 0) {
152       self._stream.write(self._memoryBuffer.join("\n") + "\n");
153       self._memoryBuffer = [];
154     }
155   }
156 });
157 exports = exports.FileBuffer = FileBuffer;