1 var WebSocket = require('faye-websocket'), 2 _ = require('underscore'), 3 util = require('util'), 4 events = require('events'), 5 FileBuffer = require("./filebuffer").FileBuffer, 6 WireFormat = require("./wireformat").WireFormat; 7 fs = require('fs'), 8 Uri = require('url'); 9 10 11 var RECONNECT_SCHEDULE = {min: 1, max: 300}; 12 var BUFFER_PATH = './logs/buffer.log'; 13 var EVENT_NAMES = { 14 connected: "open", 15 receive: "message", 16 disconnected: "close" 17 } 18 19 DISCONNECTED=0 20 DISCONNECTING=1 21 RECONNECTING=2 22 CONNECTING=3 23 CONNECTED=4 24 /** 25 * possible values: open, reconnecting, ack_failed, data, error, disconnect. 26 * @name HookupClient#on 27 * 28 * @event 29 * @param {String} name The name of the event, possible values: open, reconnecting, ack_failed, data, error, disconnect. 30 * @param {Object} [args] The event args 31 */ 32 33 /** 34 * Creates a new hookup client 35 * @class 36 * The Hookup client provides a client for the hookup server, it doesn't lock you in to using a specific message format. 37 * The default implementation uses websockets to communicate with the server. 38 * You can opt-in or opt-out of every feature in the hookup client through configuration. 39 * 40 * Usage of the simple hookup client: 41 * 42 * {{ 43 * var myHookup = new Hookup({uri: "ws://localhost:8080/thesocket"}); 44 * myHookup.on("open", function() { 45 * console.log("The websocket to"+myHookup.uri+" is connected.")'' 46 * }); 47 * myHookup.on("close", function() { 48 * console.log("The websocket to"+myHookup.uri+" disconnected.")'' 49 * }); 50 * myHookup.on("data", function(data) { 51 * if (data.type == "text") { 52 * console.log("RECV: "+data.content); 53 * myHookup.send("ECHO: "+data.content); 54 * } 55 * }); 56 * }} 57 * 58 * @augments EventEmitter 59 * 60 * @param {String|Object} options 61 * @param {String} options.uri The uri to connect to. 62 * @param {Object} [options.reconnectSchedule = { min: 1, min: 300 }] 63 * The schedule (an object with a min and max property) to use for backing-off. 64 * @param {WireFormat} [options.wireFormat = new WireFormat] The wire format to use with this client. 65 * @param {FileBuffer} [options.buffer = undefined] The buffer to use if you want this client to fallback to buffering. 66 * @property {Object} reconnectSchedule The schedule (an object with a min and max property) to use for backing-off when reconnecting. 67 * @property {String} uri The uri this client connects to. 68 */ 69 var HookupClient = function (options) { 70 events.EventEmitter.call(this); 71 if (typeof options === 'string') { 72 options = { uri: options } 73 } 74 if (!options.uri || options.uri.trim().length == 0) throw new Error('`uri` option is required.'); 75 this.uri = options.uri; 76 this._uri = Uri.parse(this.uri.replace(/^http/i, 'ws')); 77 if (!this._uri.host || !((this._uri.protocol||"").match(/^ws/))) throw new Error("Invalid uri supplied.") 78 delete options['uri']; 79 80 this.reconnectSchedule = options.reconnectSchedule || RECONNECT_SCHEDULE; 81 82 this._quiet = options.quiet; 83 this._state = DISCONNECTED; 84 this._handlers = options.handlers || []; 85 this._wireFormat = options.wireFormat || new WireFormat(); 86 this._expectedAcks = {}; 87 this._ackCounter = 0; 88 // this option `raiseAckEvents` is only useful in tests for the acking itself. 89 // it raises an event when an ack is received or an ack request is prepared 90 // it serves no other purpose than that, ack_failed events are raised independently from this option. 91 this._raiseAckEvents = options.raiseAckEvents === true; 92 93 if (options.buffer || options.buffered) { 94 this._buffer = options.buffer || new FileBuffer(options.bufferPath||BUFFER_PATH); 95 this._buffer.on('data', this.send); 96 } 97 } 98 99 util.inherits(HookupClient, events.EventEmitter); 100 101 /** 102 * The default reconnection schedule 103 * This is an object with 2 properties, min and max. 104 * The values entered are seconds 105 * 106 * @returns {Number[]} The minimum and maximum wait for a reconnection attempt 107 * @constant 108 */ 109 HookupClient.RECONNECT_SCHEDULE = RECONNECT_SCHEDULE 110 /** 111 * The default path for the file buffer to write to. 112 * 113 * @returns {String} The path to the file. 114 * @constant 115 */ 116 HookupClient.BUFFER_PATH = BUFFER_PATH 117 118 _.extend(HookupClient.prototype, /** @lends HookupClient.prototype */ { 119 /** 120 * Connect to the server 121 */ 122 connect: function() { 123 if (this.state != CONNECTING && !this.isConnected()) this._establishConnection(); 124 }, 125 /** 126 * Disconnect from the socket, perform closing handshake if necessary 127 */ 128 close: function() { 129 this._skipReconnect = true; 130 this._client.close(); 131 }, 132 /** 133 * Send a message over the current connection, buffer the message if not connected and the client is 134 * configured to buffer messages. 135 * 136 * @param {String|Object} message The message to send 137 */ 138 send: function(message) { 139 var m = this._prepareForSend(message); 140 if (this.isConnected()) { 141 this._client.send(m); 142 } else if(this.isBuffered()) 143 this._buffer.write(m); 144 }, 145 /** 146 * Send a message over the current connection and request that this message will be acked upon receipt by the server. 147 * Buffers the message if not connected and the client is configured to buffer messages. 148 * 149 * @param {String|Object} message The message to send 150 * @param {Object} [options.timeout = 5000] The timeout for the ack in milliseconds 151 */ 152 sendAcked: function(message, options) { 153 var timeout = (options||{})['timeout']||5000; 154 this.send({type: "needs_ack", content: message, timeout: timeout}); 155 }, 156 /** 157 * A flag indicating connection status of the client. 158 * 159 * @returns true when the client is connected. 160 */ 161 isConnected: function() { 162 return this._state == CONNECTED; 163 }, 164 /** 165 * A flag indicating if this client should fallback to buffering upon disconnection. 166 */ 167 isBuffered: function() { 168 return !!this._buffer; 169 }, 170 _establishConnection: function() { 171 if (this._scheduledReconnect) { 172 clearTimeout(this._scheduledReconnect); 173 this._scheduledReconnect = null; 174 } 175 if (!this.isConnected()) { 176 console.log(this._wireFormat); 177 this._client = client = new WebSocket.Client(this.uri, [this._wireFormat.name]); 178 var self = this; 179 180 client.onopen = function(evt) { 181 console.info("connected to " + self.uri); 182 self._connected(); 183 } 184 client.onclose = function(evt) { 185 self._state = self._skipReconnect ? DISCONNECTING : RECONNECTING 186 self.emit(self._state == DISCONNECTING ? "disconnected" : "reconnecting"); 187 self._reconnect(); 188 } 189 client.onerror = function(evt) { 190 if (!self.quiet) console.error("Couldn't connect to " + self.uri); 191 self.emit("error", evt.data); 192 } 193 client.onmessage = function(evt) { 194 var inMsg = self._preprocessInMessage(evt.data); 195 if (inMsg) 196 self.emit("data", inMsg); 197 } 198 } 199 }, 200 _reconnect: function () { 201 if (this._state == DISCONNECTING) { 202 this._doDisconnect(); 203 } else { 204 this._notifiedOfReconnect = true; 205 if (this._reconnectIn && this._reconnectIn > 0 && this._reconnectIn < this.reconnectSchedule.max) { 206 var curr = this._reconnectIn; 207 var max = this.reconnectSchedule.max; 208 var next = curr < max ? curr : max; 209 if (this.reconnectSchedule.maxRetries && this.reconnectSchedule.maxRetries > 0) { 210 if (this._reconnectsLeft <= 0) 211 this.emit('error', new Error("Exhausted the retry schedule. The server at "+this.uri+" is just not there.")); 212 else 213 --this._reconnectsLeft; 214 } 215 this._doReconnect(next); 216 } else { 217 this._doDisconnect(); 218 } 219 } 220 }, 221 _doDisconnect: function() { 222 this._state = DISCONNECTED; 223 this.emit('close'); 224 if (this._buffer) this.buffer.close(); 225 }, 226 _doReconnect: function (retryIn) { 227 if (!this._scheduledReconnect) { 228 var secs = "second" + (retryIn == 1 ? "" : "s"); 229 var out = retryIn < 1 ? retryIn * 1000 : retryIn; 230 console.info("connection lost, reconnecting in "+out+" "+(retryIn < 1 ? "millis" : secs)); 231 var self = this; 232 this._scheduledReconnect = setTimeout(function() { self._establishConnection() }, retryIn * 1000); 233 this._reconnectIn = this._reconnectIn * 2; 234 } 235 }, 236 _connected: function() { 237 if (this._buffer) this._buffer.drain(); 238 this._reconnectIn = this.reconnectSchedule.min; 239 this._reconnectsLeft = 0; 240 this._notifiedOfReconnect = false; 241 this._skipReconnect = false; 242 this._state = CONNECTED; 243 this.emit("open"); 244 }, 245 _preprocessInMessage: function(msg) { 246 var message = this._wireFormat.parseMessage(msg); 247 if (message.type === "ack_request") { 248 this.send({type: "ack", id: message.id}); 249 } 250 if (message.type === "ack") { 251 var timeout = this._expectedAcks[message.id]; 252 if (timeout) clearTimeout(timeout); 253 delete this._expectedAcks[message.id]; 254 if (this._raiseAckEvents) this.emit("ack", message); 255 return null; 256 } 257 return this._wireFormat.unwrapContent(message); 258 }, 259 _prepareForSend: function(message) { 260 var out = this._wireFormat.renderMessage(message); 261 var self = this; 262 if (message.type === "needs_ack") { 263 var ackReq = { 264 content: this._wireFormat.buildMessage(message.content), 265 type: "ack_request", 266 id: ++this._ackCounter 267 }; 268 this._expectedAcks[ackReq.id] = setTimeout(function (){ self.emit('ack_failed', message.content) }, message.timeout); 269 out = this._wireFormat.renderMessage(ackReq); 270 if (this._raiseAckEvents) this.emit("ack_request", ackReq); 271 } 272 return out; 273 } 274 }); 275 276 module.exports.HookupClient = HookupClient; 277 // exports.WireFormat = WireFormat; 278 // exports.FileBuffer = FileBuffer; 279