connection.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. "use strict";
  2. var inherits = require('util').inherits
  3. , EventEmitter = require('events').EventEmitter
  4. , net = require('net')
  5. , tls = require('tls')
  6. , crypto = require('crypto')
  7. , f = require('util').format
  8. , debugOptions = require('./utils').debugOptions
  9. , Response = require('./commands').Response
  10. , MongoError = require('../error')
  11. , Logger = require('./logger');
  12. var _id = 0;
  13. var debugFields = ['host', 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay'
  14. , 'connectionTimeout', 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert'
  15. , 'rejectUnauthorized', 'promoteLongs', 'promoteValues', 'promoteBuffers', 'checkServerIdentity'];
  16. var connectionAccounting = false;
  17. var connections = {};
  18. /**
  19. * Creates a new Connection instance
  20. * @class
  21. * @param {string} options.host The server host
  22. * @param {number} options.port The server port
  23. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  24. * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
  25. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  26. * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
  27. * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
  28. * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
  29. * @param {boolean} [options.ssl=false] Use SSL for connection
  30. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  31. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  32. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  33. * @param {Buffer} [options.key] SSL Key file binary buffer
  34. * @param {string} [options.passphrase] SSL Certificate pass phrase
  35. * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
  36. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  37. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  38. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  39. * @fires Connection#connect
  40. * @fires Connection#close
  41. * @fires Connection#error
  42. * @fires Connection#timeout
  43. * @fires Connection#parseError
  44. * @return {Connection} A cursor instance
  45. */
  46. var Connection = function(messageHandler, options) {
  47. // Add event listener
  48. EventEmitter.call(this);
  49. // Set empty if no options passed
  50. this.options = options || {};
  51. // Identification information
  52. this.id = _id++;
  53. // Logger instance
  54. this.logger = Logger('Connection', options);
  55. // No bson parser passed in
  56. if(!options.bson) throw new Error("must pass in valid bson parser");
  57. // Get bson parser
  58. this.bson = options.bson;
  59. // Grouping tag used for debugging purposes
  60. this.tag = options.tag;
  61. // Message handler
  62. this.messageHandler = messageHandler;
  63. // Max BSON message size
  64. this.maxBsonMessageSize = options.maxBsonMessageSize || (1024 * 1024 * 16 * 4);
  65. // Debug information
  66. if(this.logger.isDebug()) this.logger.debug(f('creating connection %s with options [%s]', this.id, JSON.stringify(debugOptions(debugFields, options))));
  67. // Default options
  68. this.port = options.port || 27017;
  69. this.host = options.host || 'localhost';
  70. this.keepAlive = typeof options.keepAlive == 'boolean' ? options.keepAlive : true;
  71. this.keepAliveInitialDelay = options.keepAliveInitialDelay || 0;
  72. this.noDelay = typeof options.noDelay == 'boolean' ? options.noDelay : true;
  73. this.connectionTimeout = options.connectionTimeout || 0;
  74. this.socketTimeout = options.socketTimeout || 0;
  75. // If connection was destroyed
  76. this.destroyed = false;
  77. // Check if we have a domain socket
  78. this.domainSocket = this.host.indexOf('\/') != -1;
  79. // Serialize commands using function
  80. this.singleBufferSerializtion = typeof options.singleBufferSerializtion == 'boolean' ? options.singleBufferSerializtion : true;
  81. this.serializationFunction = this.singleBufferSerializtion ? 'toBinUnified' : 'toBin';
  82. // SSL options
  83. this.ca = options.ca || null;
  84. this.cert = options.cert || null;
  85. this.key = options.key || null;
  86. this.passphrase = options.passphrase || null;
  87. this.ssl = typeof options.ssl == 'boolean' ? options.ssl : false;
  88. this.rejectUnauthorized = typeof options.rejectUnauthorized == 'boolean' ? options.rejectUnauthorized : true;
  89. this.checkServerIdentity = typeof options.checkServerIdentity == 'boolean'
  90. || typeof options.checkServerIdentity == 'function' ? options.checkServerIdentity : true;
  91. // If ssl not enabled
  92. if(!this.ssl) this.rejectUnauthorized = false;
  93. // Response options
  94. this.responseOptions = {
  95. promoteLongs: typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true,
  96. promoteValues: typeof options.promoteValues == 'boolean' ? options.promoteValues : true,
  97. promoteBuffers: typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers: false
  98. }
  99. // Flushing
  100. this.flushing = false;
  101. this.queue = [];
  102. // Internal state
  103. this.connection = null;
  104. this.writeStream = null;
  105. // Create hash method
  106. var hash = crypto.createHash('sha1');
  107. hash.update(f('%s:%s', this.host, this.port));
  108. // Create a hash name
  109. this.hashedName = hash.digest('hex');
  110. // All operations in flight on the connection
  111. this.workItems = [];
  112. }
  113. inherits(Connection, EventEmitter);
  114. Connection.prototype.setSocketTimeout = function(value) {
  115. if(this.connection) {
  116. this.connection.setTimeout(value);
  117. }
  118. }
  119. Connection.prototype.resetSocketTimeout = function() {
  120. if(this.connection) {
  121. this.connection.setTimeout(this.socketTimeout);
  122. }
  123. }
  124. Connection.enableConnectionAccounting = function() {
  125. connectionAccounting = true;
  126. connections = {};
  127. }
  128. Connection.disableConnectionAccounting = function() {
  129. connectionAccounting = false;
  130. }
  131. Connection.connections = function() {
  132. return connections;
  133. }
  134. function deleteConnection(id) {
  135. delete connections[id];
  136. }
  137. function addConnection(id, connection) {
  138. connections[id] = connection;
  139. }
  140. //
  141. // Connection handlers
  142. var errorHandler = function(self) {
  143. return function(err) {
  144. if(connectionAccounting) deleteConnection(self.id);
  145. // Debug information
  146. if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] errored out with [%s]', self.id, self.host, self.port, JSON.stringify(err)));
  147. // Emit the error
  148. if(self.listeners('error').length > 0) self.emit("error", MongoError.create(err), self);
  149. }
  150. }
  151. var timeoutHandler = function(self) {
  152. return function() {
  153. if(connectionAccounting) deleteConnection(self.id);
  154. // Debug information
  155. if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port));
  156. // Emit timeout error
  157. self.emit("timeout"
  158. , MongoError.create(f("connection %s to %s:%s timed out", self.id, self.host, self.port))
  159. , self);
  160. }
  161. }
  162. var closeHandler = function(self) {
  163. return function(hadError) {
  164. if(connectionAccounting) deleteConnection(self.id);
  165. // Debug information
  166. if(self.logger.isDebug()) self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port));
  167. // Emit close event
  168. if(!hadError) {
  169. self.emit("close"
  170. , MongoError.create(f("connection %s to %s:%s closed", self.id, self.host, self.port))
  171. , self);
  172. }
  173. }
  174. }
  175. var dataHandler = function(self) {
  176. return function(data) {
  177. // Parse until we are done with the data
  178. while(data.length > 0) {
  179. // If we still have bytes to read on the current message
  180. if(self.bytesRead > 0 && self.sizeOfMessage > 0) {
  181. // Calculate the amount of remaining bytes
  182. var remainingBytesToRead = self.sizeOfMessage - self.bytesRead;
  183. // Check if the current chunk contains the rest of the message
  184. if(remainingBytesToRead > data.length) {
  185. // Copy the new data into the exiting buffer (should have been allocated when we know the message size)
  186. data.copy(self.buffer, self.bytesRead);
  187. // Adjust the number of bytes read so it point to the correct index in the buffer
  188. self.bytesRead = self.bytesRead + data.length;
  189. // Reset state of buffer
  190. data = new Buffer(0);
  191. } else {
  192. // Copy the missing part of the data into our current buffer
  193. data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead);
  194. // Slice the overflow into a new buffer that we will then re-parse
  195. data = data.slice(remainingBytesToRead);
  196. // Emit current complete message
  197. try {
  198. var emitBuffer = self.buffer;
  199. // Reset state of buffer
  200. self.buffer = null;
  201. self.sizeOfMessage = 0;
  202. self.bytesRead = 0;
  203. self.stubBuffer = null;
  204. // Emit the buffer
  205. self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
  206. } catch(err) {
  207. var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
  208. sizeOfMessage:self.sizeOfMessage,
  209. bytesRead:self.bytesRead,
  210. stubBuffer:self.stubBuffer}};
  211. // We got a parse Error fire it off then keep going
  212. self.emit("parseError", errorObject, self);
  213. }
  214. }
  215. } else {
  216. // Stub buffer is kept in case we don't get enough bytes to determine the
  217. // size of the message (< 4 bytes)
  218. if(self.stubBuffer != null && self.stubBuffer.length > 0) {
  219. // If we have enough bytes to determine the message size let's do it
  220. if(self.stubBuffer.length + data.length > 4) {
  221. // Prepad the data
  222. var newData = new Buffer(self.stubBuffer.length + data.length);
  223. self.stubBuffer.copy(newData, 0);
  224. data.copy(newData, self.stubBuffer.length);
  225. // Reassign for parsing
  226. data = newData;
  227. // Reset state of buffer
  228. self.buffer = null;
  229. self.sizeOfMessage = 0;
  230. self.bytesRead = 0;
  231. self.stubBuffer = null;
  232. } else {
  233. // Add the the bytes to the stub buffer
  234. var newStubBuffer = new Buffer(self.stubBuffer.length + data.length);
  235. // Copy existing stub buffer
  236. self.stubBuffer.copy(newStubBuffer, 0);
  237. // Copy missing part of the data
  238. data.copy(newStubBuffer, self.stubBuffer.length);
  239. // Exit parsing loop
  240. data = new Buffer(0);
  241. }
  242. } else {
  243. if(data.length > 4) {
  244. // Retrieve the message size
  245. // var sizeOfMessage = data.readUInt32LE(0);
  246. var sizeOfMessage = data[0] | data[1] << 8 | data[2] << 16 | data[3] << 24;
  247. // If we have a negative sizeOfMessage emit error and return
  248. if(sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) {
  249. errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{
  250. sizeOfMessage: sizeOfMessage,
  251. bytesRead: self.bytesRead,
  252. stubBuffer: self.stubBuffer}};
  253. // We got a parse Error fire it off then keep going
  254. self.emit("parseError", errorObject, self);
  255. return;
  256. }
  257. // Ensure that the size of message is larger than 0 and less than the max allowed
  258. if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage > data.length) {
  259. self.buffer = new Buffer(sizeOfMessage);
  260. // Copy all the data into the buffer
  261. data.copy(self.buffer, 0);
  262. // Update bytes read
  263. self.bytesRead = data.length;
  264. // Update sizeOfMessage
  265. self.sizeOfMessage = sizeOfMessage;
  266. // Ensure stub buffer is null
  267. self.stubBuffer = null;
  268. // Exit parsing loop
  269. data = new Buffer(0);
  270. } else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage == data.length) {
  271. try {
  272. emitBuffer = data;
  273. // Reset state of buffer
  274. self.buffer = null;
  275. self.sizeOfMessage = 0;
  276. self.bytesRead = 0;
  277. self.stubBuffer = null;
  278. // Exit parsing loop
  279. data = new Buffer(0);
  280. // Emit the message
  281. self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
  282. } catch (err) {
  283. self.emit("parseError", err, self);
  284. }
  285. } else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) {
  286. errorObject = {err:"socketHandler", trace:null, bin:data, parseState:{
  287. sizeOfMessage:sizeOfMessage,
  288. bytesRead:0,
  289. buffer:null,
  290. stubBuffer:null}};
  291. // We got a parse Error fire it off then keep going
  292. self.emit("parseError", errorObject, self);
  293. // Clear out the state of the parser
  294. self.buffer = null;
  295. self.sizeOfMessage = 0;
  296. self.bytesRead = 0;
  297. self.stubBuffer = null;
  298. // Exit parsing loop
  299. data = new Buffer(0);
  300. } else {
  301. emitBuffer = data.slice(0, sizeOfMessage);
  302. // Reset state of buffer
  303. self.buffer = null;
  304. self.sizeOfMessage = 0;
  305. self.bytesRead = 0;
  306. self.stubBuffer = null;
  307. // Copy rest of message
  308. data = data.slice(sizeOfMessage);
  309. // Emit the message
  310. self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
  311. }
  312. } else {
  313. // Create a buffer that contains the space for the non-complete message
  314. self.stubBuffer = new Buffer(data.length)
  315. // Copy the data to the stub buffer
  316. data.copy(self.stubBuffer, 0);
  317. // Exit parsing loop
  318. data = new Buffer(0);
  319. }
  320. }
  321. }
  322. }
  323. }
  324. }
  325. // List of socket level valid ssl options
  326. var legalSslSocketOptions = ['pfx', 'key', 'passphrase', 'cert', 'ca', 'ciphers'
  327. , 'NPNProtocols', 'ALPNProtocols', 'servername'
  328. , 'secureProtocol', 'secureContext', 'session'
  329. , 'minDHSize'];
  330. function merge(options1, options2) {
  331. // Merge in any allowed ssl options
  332. for(var name in options2) {
  333. if(options2[name] != null && legalSslSocketOptions.indexOf(name) != -1) {
  334. options1[name] = options2[name];
  335. }
  336. }
  337. }
  338. /**
  339. * Connect
  340. * @method
  341. */
  342. Connection.prototype.connect = function(_options) {
  343. var self = this;
  344. _options = _options || {};
  345. // Set the connections
  346. if(connectionAccounting) addConnection(this.id, this);
  347. // Check if we are overriding the promoteLongs
  348. if(typeof _options.promoteLongs == 'boolean') {
  349. self.responseOptions.promoteLongs = _options.promoteLongs;
  350. self.responseOptions.promoteValues = _options.promoteValues;
  351. self.responseOptions.promoteBuffers = _options.promoteBuffers;
  352. }
  353. // Create new connection instance
  354. self.connection = self.domainSocket
  355. ? net.createConnection(self.host)
  356. : net.createConnection(self.port, self.host);
  357. // Set the options for the connection
  358. self.connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
  359. self.connection.setTimeout(self.connectionTimeout);
  360. self.connection.setNoDelay(self.noDelay);
  361. // If we have ssl enabled
  362. if(self.ssl) {
  363. var sslOptions = {
  364. socket: self.connection
  365. , rejectUnauthorized: self.rejectUnauthorized
  366. }
  367. // Merge in options
  368. merge(sslOptions, this.options);
  369. merge(sslOptions, _options);
  370. // Set options for ssl
  371. if(self.ca) sslOptions.ca = self.ca;
  372. if(self.cert) sslOptions.cert = self.cert;
  373. if(self.key) sslOptions.key = self.key;
  374. if(self.passphrase) sslOptions.passphrase = self.passphrase;
  375. // Override checkServerIdentity behavior
  376. if(self.checkServerIdentity == false) {
  377. // Skip the identiy check by retuning undefined as per node documents
  378. // https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
  379. sslOptions.checkServerIdentity = function() {
  380. return undefined;
  381. }
  382. } else if(typeof self.checkServerIdentity == 'function') {
  383. sslOptions.checkServerIdentity = self.checkServerIdentity;
  384. }
  385. // Set default sni servername to be the same as host
  386. if(sslOptions.servername == null) {
  387. sslOptions.servername = self.host;
  388. }
  389. // Attempt SSL connection
  390. self.connection = tls.connect(self.port, self.host, sslOptions, function() {
  391. // Error on auth or skip
  392. if(self.connection.authorizationError && self.rejectUnauthorized) {
  393. return self.emit("error", self.connection.authorizationError, self, {ssl:true});
  394. }
  395. // Set socket timeout instead of connection timeout
  396. self.connection.setTimeout(self.socketTimeout);
  397. // We are done emit connect
  398. self.emit('connect', self);
  399. });
  400. self.connection.setTimeout(self.connectionTimeout);
  401. } else {
  402. self.connection.on('connect', function() {
  403. // Set socket timeout instead of connection timeout
  404. self.connection.setTimeout(self.socketTimeout);
  405. // Emit connect event
  406. self.emit('connect', self);
  407. });
  408. }
  409. // Add handlers for events
  410. self.connection.once('error', errorHandler(self));
  411. self.connection.once('timeout', timeoutHandler(self));
  412. self.connection.once('close', closeHandler(self));
  413. self.connection.on('data', dataHandler(self));
  414. }
  415. /**
  416. * Unref this connection
  417. * @method
  418. * @return {boolean}
  419. */
  420. Connection.prototype.unref = function() {
  421. if (this.connection) this.connection.unref();
  422. else {
  423. var self = this;
  424. this.once('connect', function() {
  425. self.connection.unref();
  426. });
  427. }
  428. }
  429. /**
  430. * Destroy connection
  431. * @method
  432. */
  433. Connection.prototype.destroy = function() {
  434. // Set the connections
  435. if(connectionAccounting) deleteConnection(this.id);
  436. if(this.connection) {
  437. this.connection.end();
  438. this.connection.destroy();
  439. }
  440. this.destroyed = true;
  441. }
  442. /**
  443. * Write to connection
  444. * @method
  445. * @param {Command} command Command to write out need to implement toBin and toBinUnified
  446. */
  447. Connection.prototype.write = function(buffer) {
  448. var i;
  449. // Debug Log
  450. if(this.logger.isDebug()) {
  451. if(!Array.isArray(buffer)) {
  452. this.logger.debug(f('writing buffer [%s] to %s:%s', buffer.toString('hex'), this.host, this.port));
  453. } else {
  454. for(i = 0; i < buffer.length; i++)
  455. this.logger.debug(f('writing buffer [%s] to %s:%s', buffer[i].toString('hex'), this.host, this.port));
  456. }
  457. }
  458. // Write out the command
  459. if(!Array.isArray(buffer)) return this.connection.write(buffer, 'binary');
  460. // Iterate over all buffers and write them in order to the socket
  461. for(i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
  462. }
  463. /**
  464. * Return id of connection as a string
  465. * @method
  466. * @return {string}
  467. */
  468. Connection.prototype.toString = function() {
  469. return "" + this.id;
  470. }
  471. /**
  472. * Return json object of connection
  473. * @method
  474. * @return {object}
  475. */
  476. Connection.prototype.toJSON = function() {
  477. return {id: this.id, host: this.host, port: this.port};
  478. }
  479. /**
  480. * Is the connection connected
  481. * @method
  482. * @return {boolean}
  483. */
  484. Connection.prototype.isConnected = function() {
  485. if(this.destroyed) return false;
  486. return !this.connection.destroyed && this.connection.writable;
  487. }
  488. /**
  489. * A server connect event, used to verify that the connection is up and running
  490. *
  491. * @event Connection#connect
  492. * @type {Connection}
  493. */
  494. /**
  495. * The server connection closed, all pool connections closed
  496. *
  497. * @event Connection#close
  498. * @type {Connection}
  499. */
  500. /**
  501. * The server connection caused an error, all pool connections closed
  502. *
  503. * @event Connection#error
  504. * @type {Connection}
  505. */
  506. /**
  507. * The server connection timed out, all pool connections closed
  508. *
  509. * @event Connection#timeout
  510. * @type {Connection}
  511. */
  512. /**
  513. * The driver experienced an invalid message, all pool connections closed
  514. *
  515. * @event Connection#parseError
  516. * @type {Connection}
  517. */
  518. module.exports = Connection;