forked from primus/primus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark.js
515 lines (448 loc) · 15.3 KB
/
spark.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
'use strict';
var ParserError = require('./errors').ParserError
, log = require('diagnostics')('primus:spark')
, parse = require('querystring').parse
, forwarded = require('forwarded-for')
, nanoid = require('nanoid').nanoid
, Ultron = require('ultron')
, fuse = require('fusing')
, u2028 = /\u2028/g
, u2029 = /\u2029/g;
/**
* The Spark is an indefinable, indescribable energy or soul of a transformer
* which can be used to create new transformers. In our case, it's a simple
* wrapping interface.
*
* @constructor
* @param {Primus} primus Reference to the Primus server. (Set using .bind)
* @param {Object} headers The request headers for this connection.
* @param {Object} address The object that holds the remoteAddress and port.
* @param {Object} query The query string of request.
* @param {String} id An optional id of the socket, or we will generate one.
* @param {Request} request The HTTP Request instance that initialised the spark.
* @param {Mixed} socket Reference to the transformer socket.
* @api public
*/
function Spark(primus, headers, address, query, id, request, socket) {
this.fuse();
var writable = this.writable
, spark = this
, idgen = primus.options.idGenerator;
query = query || {};
id = idgen ? idgen() : (id || nanoid());
headers = headers || {};
address = address || {};
request = request || headers['primus::req::backup'];
writable('id', id); // Unique id for socket.
writable('primus', primus); // References to Primus.
writable('remote', address); // The remote address location.
writable('headers', headers); // The request headers.
writable('request', request); // Reference to an HTTP request.
writable('socket', socket); // Reference to the transformer's socket
writable('writable', true); // Silly stream compatibility.
writable('readable', true); // Silly stream compatibility.
writable('queue', []); // Data queue for data events.
writable('query', query); // The query string.
writable('ultron', new Ultron(this)); // Our event listening cleanup.
writable('alive', true); // Flag used to detect zombie sparks.
//
// Parse our query string.
//
if ('string' === typeof this.query) {
this.query = parse(this.query);
}
this.__initialise.forEach(function execute(initialise) {
initialise.call(spark);
});
}
fuse(Spark, require('stream'), { merge: false, mixin: false });
//
// Internal readyState's to prevent writes against close sockets.
//
Spark.OPENING = 1; // Only here for primus.js readyState number compatibility.
Spark.CLOSED = 2; // The connection is closed.
Spark.OPEN = 3; // The connection is open.
//
// Make sure that we emit `readyState` change events when a new readyState is
// checked. This way plugins can correctly act according to this.
//
Spark.readable('readyState', {
get: function get() {
return this.__readyState;
},
set: function set(readyState) {
if (this.__readyState === readyState) return readyState;
this.__readyState = readyState;
this.emit('readyStateChange');
return readyState;
}
}, true);
Spark.writable('__readyState', Spark.OPEN);
//
// Lazy parse interface for IP address information. As nobody is always
// interested in this, we're going to defer parsing until it's actually needed.
//
Spark.get('address', function address() {
return this.request.forwarded || forwarded(this.remote, this.headers, this.primus.whitelist);
});
/**
* Checks if the given event is an emitted event by Primus.
*
* @param {String} evt The event name.
* @returns {Boolean}
* @api public
*/
Spark.readable('reserved', function reserved(evt) {
return (/^(incoming|outgoing)::/).test(evt)
|| evt in reserved.events;
});
/**
* The actual events that are used by the Spark.
*
* @type {Object}
* @api public
*/
Spark.prototype.reserved.events = {
readyStateChange: 1,
heartbeat: 1,
error: 1,
data: 1,
end: 1
};
/**
* Allows for adding initialise listeners without people overriding our default
* initializer. If they are feeling adventures and really want want to hack it
* up, they can remove it from the __initialise array.
*
* @returns {Function} The last added initialise hook.
* @api public
*/
Spark.readable('initialise', {
get: function get() {
return this.__initialise[this.__initialise.length - 1];
},
set: function set(initialise) {
if ('function' === typeof initialise) this.__initialise.push(initialise);
}
}, true);
/**
* Send a heartbeat to the client.
*
* Checks if any message has been received from the client before sending
* another heartbeat. If not, we can assume it's dead (no response to our last
* ping), so we should close.
*
* This is intentionally writable so it can be overwritten for custom heartbeat
* policies.
*
* @returns {undefined}
* @api public
*/
Spark.writable('heartbeat', function heartbeat() {
var spark = this;
if (!spark.alive) {
//
// Set the `reconnect` option to `true` so we don't send a
// `primus::server::close` packet to an already broken connection.
//
spark.end(undefined, { reconnect: true });
} else {
const now = Date.now();
spark.alive = false;
spark.emit('outgoing::ping', now);
spark._write(`primus::ping::${now}`);
}
});
/**
* Attach hooks and automatically announce a new connection.
*
* @type {Array}
* @api private
*/
Spark.readable('__initialise', [function initialise() {
var primus = this.primus
, ultron = this.ultron
, spark = this;
//
// Prevent double initialization of the spark. If we already have an
// `incoming::data` handler we assume that all other cases are handled as well.
//
if (this.listeners('incoming::data').length) {
return log('already has incoming::data listeners, bailing out');
}
//
// We've received new data from our client, decode and emit it.
//
ultron.on('incoming::data', function message(raw) {
primus.decoder.call(spark, raw, function decoding(err, data) {
//
// Do a "save" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
//
if (err) {
log('failed to decode the incoming data for %s', spark.id);
return new ParserError('Failed to decode incoming data: '+ err.message, spark, err);
}
//
// Handle "primus::" prefixed protocol messages.
//
if (spark.protocol(data)) return;
spark.transforms(primus, spark, 'incoming', data, raw);
});
});
//
// We've received a pong event. This is fired upon receipt of a
// `pimus::pong::<timestamp>` message.
//
ultron.on('incoming::pong', function pong() {
spark.alive = true;
spark.emit('heartbeat');
});
//
// The client has disconnected.
//
ultron.on('incoming::end', function disconnect() {
//
// The socket is closed, sending data over it will throw an error.
//
log('transformer closed connection for %s', spark.id);
spark.end(undefined, { reconnect: true });
});
ultron.on('incoming::error', function error(err) {
//
// Ensure that the error we emit is always an Error instance. There are
// transformers that used to emit only strings. A string is not an Error.
//
if ('string' === typeof err) {
err = new Error(err);
}
if (spark.listeners('error').length) spark.emit('error', err);
spark.primus.emit('log', 'error', err);
log('transformer received error `%s` for %s', err.message, spark.id);
spark.end();
});
//
// End is triggered by both incoming and outgoing events.
//
ultron.on('end', function end() {
primus.emit('disconnection', spark);
});
//
// Announce a new connection. This allows the transformers to change or listen
// to events before we announce it.
//
process.nextTick(function tick() {
primus.asyncemit('connection', spark, function damn(err) {
if (!err) {
if (spark.queue) spark.queue.forEach(function each(packet) {
spark.emit('data', packet.data, packet.raw);
});
spark.queue = null;
return;
}
spark.emit('incoming::error', err);
});
});
}]);
/**
* Execute the set of message transformers from Primus on the incoming or
* outgoing message.
* This function and it's content should be in sync with Primus#transforms in
* primus.js.
*
* @param {Primus} primus Reference to the Primus instance with message transformers.
* @param {Spark|Primus} connection Connection that receives or sends data.
* @param {String} type The type of message, 'incoming' or 'outgoing'.
* @param {Mixed} data The data to send or that has been received.
* @param {String} raw The raw encoded data.
* @returns {Spark}
* @api public
*/
Spark.readable('transforms', function transforms(primus, connection, type, data, raw) {
var packet = { data: data, raw: raw }
, fns = primus.transformers[type];
//
// Iterate in series over the message transformers so we can allow optional
// asynchronous execution of message transformers which could for example
// retrieve additional data from the server, do extra decoding or even
// message validation.
//
(function transform(index, done) {
var transformer = fns[index++];
if (!transformer) return done();
if (1 === transformer.length) {
if (false === transformer.call(connection, packet)) {
//
// When false is returned by an incoming transformer it means that's
// being handled by the transformer and we should not emit the `data`
// event.
//
return;
}
return transform(index, done);
}
transformer.call(connection, packet, function finished(err, arg) {
if (err) return connection.emit('error', err);
if (false === arg) return;
transform(index, done);
});
}(0, function done() {
//
// We always emit 2 arguments for the data event, the first argument is the
// parsed data and the second argument is the raw string that we received.
// This allows you, for example, to do some validation on the parsed data
// and then save the raw string in your database without the stringify
// overhead.
//
if ('incoming' === type) {
//
// This is pretty bad edge case, it's possible that the async version of
// the `connection` event listener takes so long that we cannot assign
// `data` handlers and we are already receiving data as the connection is
// already established. In this edge case we need to queue the data and
// pass it to the data event once we're listening.
//
if (connection.queue) return connection.queue.push(packet);
return connection.emit('data', packet.data, packet.raw);
}
connection._write(packet.data);
}));
return this;
});
/**
* Really dead simple protocol parser. We simply assume that every message that
* is prefixed with `primus::` could be used as some sort of protocol definition
* for Primus.
*
* @param {String} msg The data.
* @returns {Boolean} Is a protocol message.
* @api private
*/
Spark.readable('protocol', function protocol(msg) {
if (
'string' !== typeof msg
|| msg.indexOf('primus::') !== 0
) return false;
var last = msg.indexOf(':', 8)
, value = msg.slice(last + 2);
switch (msg.slice(8, last)) {
case 'pong':
this.emit('incoming::pong', +value);
break;
case 'id':
this._write('primus::id::'+ this.id);
break;
//
// Unknown protocol, somebody is probably sending `primus::` prefixed
// messages.
//
default:
log('message `%s` was prefixed with primus:: but not supported', msg);
return false;
}
log('processed a primus protocol message `%s`', msg);
return true;
});
/**
* Send a new message to a given spark.
*
* @param {Mixed} data The data that needs to be written.
* @returns {Boolean} Always returns true.
* @api public
*/
Spark.readable('write', function write(data) {
var primus = this.primus;
//
// The connection is closed, return false.
//
if (Spark.CLOSED === this.readyState) {
log('attempted to write but readyState was already set to CLOSED for %s', this.id);
return false;
}
this.transforms(primus, this, 'outgoing', data);
return true;
});
/**
* The actual message writer.
*
* @param {Mixed} data The message that needs to be written.
* @returns {Boolean}
* @api private
*/
Spark.readable('_write', function _write(data) {
var primus = this.primus
, spark = this;
//
// The connection is closed, normally this would already be done in the
// `spark.write` method, but as `_write` is used internally, we should also
// add the same check here to prevent potential crashes by writing to a dead
// socket.
//
if (Spark.CLOSED === spark.readyState) {
log('attempted to _write but readyState was already set to CLOSED for %s', spark.id);
return false;
}
primus.encoder.call(spark, data, function encoded(err, packet) {
//
// Do a "safe" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
//
if (err) return new ParserError('Failed to encode outgoing data: '+ err.message, spark, err);
if (!packet) return log('nothing to write, bailing out for %s', spark.id);
//
// Hack 1: \u2028 and \u2029 are allowed inside a JSON string, but JavaScript
// defines them as newline separators. Unescaped control characters are not
// allowed inside JSON strings, so this causes an error at parse time. We
// work around this issue by escaping these characters. This can cause
// errors with JSONP requests or if the string is just evaluated.
//
if ('string' === typeof packet) {
if (~packet.indexOf('\u2028')) packet = packet.replace(u2028, '\\u2028');
if (~packet.indexOf('\u2029')) packet = packet.replace(u2029, '\\u2029');
}
spark.emit('outgoing::data', packet);
});
return true;
});
/**
* End the connection.
*
* Options:
* - reconnect (boolean) Trigger client-side reconnect.
*
* @param {Mixed} data Optional closing data.
* @param {Object} options End instructions.
* @api public
*/
Spark.readable('end', function end(data, options) {
if (Spark.CLOSED === this.readyState) return this;
options = options || {};
if (data !== undefined) this.write(data);
//
// If we want to trigger a reconnect do not send
// `primus::server::close`, otherwise bypass the .write method
// as this message should not be transformed.
//
if (!options.reconnect) this._write('primus::server::close');
//
// This seems redundant but there are cases where the above writes
// can trigger another `end` call. An example is with Engine.IO
// when calling `end` on the client and `end` on the spark right
// after. The `end` call on the spark comes before the `incoming::end`
// event and the result is an attempt of writing to a closed socket.
// When this happens Engine.IO closes the connection and without
// this check the following instructions could be executed twice.
//
if (Spark.CLOSED === this.readyState) return this;
log('emitting final events for spark %s', this.id);
this.readyState = Spark.CLOSED;
this.emit('outgoing::end');
this.emit('end');
this.ultron.destroy();
this.ultron = this.queue = null;
return this;
});
//
// Expose the module.
//
module.exports = Spark;