1 var WebSocket = require('faye-websocket'),
  2     _ = require('underscore'),
  3     util = require('util'),
  4     events = require('events'),
  5     FileBuffer = require("./filebuffer").FileBuffer,
  6     WireFormat = require("./wireformat").WireFormat;
  7     fs = require('fs'),
  8     Uri = require('url');
  9 
 10 
 11 var RECONNECT_SCHEDULE = {min: 1, max: 300};
 12 var BUFFER_PATH = './logs/buffer.log';
 13 var EVENT_NAMES = {
 14   connected: "open",
 15   receive: "message",
 16   disconnected: "close"
 17 }
 18 
 19 DISCONNECTED=0
 20 DISCONNECTING=1
 21 RECONNECTING=2
 22 CONNECTING=3
 23 CONNECTED=4
 24 /**
 25  * possible values: open, reconnecting, ack_failed, data, error, disconnect.
 26  * @name HookupClient#on 
 27  * 
 28  * @event 
 29  * @param {String} name The name of the event, possible values: open, reconnecting, ack_failed, data, error, disconnect.
 30  * @param {Object} [args] The event args 
 31  */
 32 
 33 /**
 34  * Creates a new hookup client 
 35  * @class
 36  * The Hookup client provides a client for the hookup server, it doesn't lock you in to using a specific message format.
 37  * The default implementation uses websockets to communicate with the server.
 38  * You can opt-in or opt-out of every feature in the hookup client through configuration.
 39  * 
 40  * Usage of the simple hookup client:
 41  *
 42  * {{
 43  *   var myHookup = new Hookup({uri: "ws://localhost:8080/thesocket"});
 44  *   myHookup.on("open", function() {
 45  *     console.log("The websocket to"+myHookup.uri+" is connected.")''
 46  *   }); 
 47  *   myHookup.on("close", function() {
 48  *     console.log("The websocket to"+myHookup.uri+" disconnected.")''
 49  *   }); 
 50  *   myHookup.on("data", function(data) {
 51  *     if (data.type == "text") {
 52  *       console.log("RECV: "+data.content);
 53  *       myHookup.send("ECHO: "+data.content);
 54  *     }
 55  *   });
 56  * }}
 57  *
 58  * @augments EventEmitter
 59  *
 60  * @param {String|Object} options
 61  * @param {String} options.uri The uri to connect to.
 62  * @param {Object} [options.reconnectSchedule = { min: 1, min: 300 }]  
 63  *                 The schedule (an object with a min and max property) to use for backing-off.
 64  * @param {WireFormat} [options.wireFormat = new WireFormat] The wire format to use with this client.
 65  * @param {FileBuffer} [options.buffer = undefined] The buffer to use if you want this client to fallback to buffering. 
 66  * @property {Object} reconnectSchedule The schedule (an object with a min and max property) to use for backing-off when reconnecting.
 67  * @property {String} uri The uri this client connects to.
 68  */
 69 var HookupClient = function (options) {
 70   events.EventEmitter.call(this);
 71   if (typeof options === 'string') {
 72     options = { uri: options }
 73   }
 74   if (!options.uri || options.uri.trim().length == 0) throw new Error('`uri` option is required.');
 75   this.uri = options.uri;
 76   this._uri = Uri.parse(this.uri.replace(/^http/i, 'ws'));
 77   if (!this._uri.host || !((this._uri.protocol||"").match(/^ws/))) throw new Error("Invalid uri supplied.")
 78   delete options['uri'];
 79   
 80   this.reconnectSchedule = options.reconnectSchedule || RECONNECT_SCHEDULE;
 81 
 82   this._quiet = options.quiet;
 83   this._state = DISCONNECTED;
 84   this._handlers = options.handlers || [];
 85   this._wireFormat = options.wireFormat || new WireFormat();
 86   this._expectedAcks = {};
 87   this._ackCounter = 0;
 88   // this option `raiseAckEvents` is only useful in tests for the acking itself.
 89   // it raises an event when an ack is received or an ack request is prepared
 90   // it serves no other purpose than that, ack_failed events are raised independently from this option.
 91   this._raiseAckEvents = options.raiseAckEvents === true; 
 92 
 93   if (options.buffer || options.buffered) {
 94     this._buffer = options.buffer || new FileBuffer(options.bufferPath||BUFFER_PATH);
 95     this._buffer.on('data', this.send);
 96   }
 97 }
 98 
 99 util.inherits(HookupClient, events.EventEmitter);
100 
101 /**
102  * The default reconnection schedule
103  * This is an object with 2 properties, min and max.
104  * The values entered are seconds
105  *
106  * @returns {Number[]} The minimum and maximum wait for a reconnection attempt
107  * @constant
108  */
109 HookupClient.RECONNECT_SCHEDULE = RECONNECT_SCHEDULE
110 /**
111  * The default path for the file buffer to write to.
112  *
113  * @returns {String} The path to the file.
114  * @constant
115  */
116 HookupClient.BUFFER_PATH = BUFFER_PATH
117 
118 _.extend(HookupClient.prototype, /** @lends HookupClient.prototype */ {
119   /**
120    * Connect to the server
121    */
122   connect: function() {
123     if (this.state != CONNECTING && !this.isConnected()) this._establishConnection();
124   },
125   /**
126    * Disconnect from the socket, perform closing handshake if necessary
127    */
128   close: function() {
129     this._skipReconnect = true;
130     this._client.close();
131   },
132   /**
133    * Send a message over the current connection, buffer the message if not connected and the client is 
134    * configured to buffer messages.
135    *
136    * @param {String|Object} message The message to send
137    */
138   send: function(message) {
139     var m = this._prepareForSend(message);
140     if (this.isConnected()) {
141       this._client.send(m);
142     } else if(this.isBuffered()) 
143       this._buffer.write(m);
144   },
145   /**
146    * Send a message over the current connection and request that this message will be acked upon receipt by the server.
147    * Buffers the message if not connected and the client is configured to buffer messages.
148    *
149    * @param {String|Object} message The message to send
150    * @param {Object} [options.timeout = 5000] The timeout for the ack in milliseconds
151    */
152   sendAcked: function(message, options) {
153     var timeout = (options||{})['timeout']||5000;
154     this.send({type: "needs_ack", content: message, timeout: timeout});
155   },
156   /**
157    * A flag indicating connection status of the client.
158    *
159    * @returns true when the client is connected.
160    */
161   isConnected: function() {
162     return this._state == CONNECTED;
163   },
164   /**
165    * A flag indicating if this client should fallback to buffering upon disconnection.
166    */
167   isBuffered: function() {
168     return !!this._buffer;
169   },
170   _establishConnection: function() {
171     if (this._scheduledReconnect) {
172       clearTimeout(this._scheduledReconnect);
173       this._scheduledReconnect = null;
174     }
175     if (!this.isConnected()) {
176       console.log(this._wireFormat);
177       this._client = client = new WebSocket.Client(this.uri, [this._wireFormat.name]);
178       var self = this;
179 
180       client.onopen = function(evt) {
181         console.info("connected to " + self.uri);
182         self._connected();
183       }
184       client.onclose = function(evt) {
185         self._state = self._skipReconnect ? DISCONNECTING : RECONNECTING
186         self.emit(self._state == DISCONNECTING ? "disconnected" : "reconnecting");
187         self._reconnect();
188       }
189       client.onerror = function(evt) {
190         if (!self.quiet) console.error("Couldn't connect to " + self.uri);
191         self.emit("error", evt.data);
192       }
193       client.onmessage = function(evt) {
194         var inMsg = self._preprocessInMessage(evt.data);
195         if (inMsg)
196           self.emit("data", inMsg);
197       }
198     }
199   },
200   _reconnect: function () {
201     if (this._state == DISCONNECTING) {
202       this._doDisconnect();
203     } else {
204       this._notifiedOfReconnect = true;
205       if (this._reconnectIn && this._reconnectIn > 0 && this._reconnectIn < this.reconnectSchedule.max) {
206         var curr = this._reconnectIn;
207         var max = this.reconnectSchedule.max;
208         var next = curr < max ? curr : max;
209         if (this.reconnectSchedule.maxRetries && this.reconnectSchedule.maxRetries > 0) {
210           if (this._reconnectsLeft <= 0)
211             this.emit('error', new Error("Exhausted the retry schedule. The server at "+this.uri+" is just not there."));
212           else
213             --this._reconnectsLeft;
214         } 
215         this._doReconnect(next);
216       } else {
217         this._doDisconnect();    
218       }
219     }
220   },
221   _doDisconnect: function() {
222     this._state = DISCONNECTED;
223     this.emit('close');
224     if (this._buffer) this.buffer.close();
225   },
226   _doReconnect: function (retryIn) {
227     if (!this._scheduledReconnect) {
228       var secs = "second" + (retryIn == 1 ? "" : "s");
229       var out = retryIn < 1 ? retryIn * 1000 : retryIn;
230       console.info("connection lost, reconnecting in "+out+" "+(retryIn < 1 ? "millis" : secs));
231       var self = this;
232       this._scheduledReconnect = setTimeout(function() { self._establishConnection() }, retryIn * 1000);
233       this._reconnectIn = this._reconnectIn * 2;
234     }
235   },
236   _connected: function() {
237     if (this._buffer) this._buffer.drain();
238     this._reconnectIn = this.reconnectSchedule.min;
239     this._reconnectsLeft = 0;
240     this._notifiedOfReconnect = false;
241     this._skipReconnect = false;
242     this._state = CONNECTED;
243     this.emit("open");
244   },
245   _preprocessInMessage: function(msg) {
246     var message = this._wireFormat.parseMessage(msg);
247     if (message.type === "ack_request") {
248       this.send({type: "ack", id: message.id});
249     }
250     if (message.type === "ack") {
251       var timeout = this._expectedAcks[message.id]; 
252       if (timeout) clearTimeout(timeout);
253       delete this._expectedAcks[message.id];
254       if (this._raiseAckEvents) this.emit("ack", message);
255       return null;
256     }
257     return this._wireFormat.unwrapContent(message);
258   },
259   _prepareForSend: function(message) {
260     var out = this._wireFormat.renderMessage(message);
261     var self = this;
262     if (message.type === "needs_ack") {
263       var ackReq = {
264         content: this._wireFormat.buildMessage(message.content),
265         type: "ack_request",
266         id: ++this._ackCounter
267       };
268       this._expectedAcks[ackReq.id] = setTimeout(function (){ self.emit('ack_failed', message.content) }, message.timeout);
269       out = this._wireFormat.renderMessage(ackReq);
270       if (this._raiseAckEvents) this.emit("ack_request", ackReq);
271     }
272     return out;
273   }
274 });
275 
276 module.exports.HookupClient = HookupClient;
277 // exports.WireFormat = WireFormat;
278 // exports.FileBuffer = FileBuffer;
279