pool.js 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237
  1. "use strict";
  2. var inherits = require('util').inherits,
  3. EventEmitter = require('events').EventEmitter,
  4. Connection = require('./connection'),
  5. MongoError = require('../error'),
  6. Logger = require('./logger'),
  7. f = require('util').format,
  8. Query = require('./commands').Query,
  9. CommandResult = require('./command_result'),
  10. assign = require('../topologies/shared').assign;
  11. var MongoCR = require('../auth/mongocr')
  12. , X509 = require('../auth/x509')
  13. , Plain = require('../auth/plain')
  14. , GSSAPI = require('../auth/gssapi')
  15. , SSPI = require('../auth/sspi')
  16. , ScramSHA1 = require('../auth/scram');
  17. var DISCONNECTED = 'disconnected';
  18. var CONNECTING = 'connecting';
  19. var CONNECTED = 'connected';
  20. var DESTROYING = 'destroying';
  21. var DESTROYED = 'destroyed';
  22. var _id = 0;
  23. /**
  24. * Creates a new Pool instance
  25. * @class
  26. * @param {string} options.host The server host
  27. * @param {number} options.port The server port
  28. * @param {number} [options.size=1] Max server connection pool size
  29. * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
  30. * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
  31. * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
  32. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  33. * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
  34. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  35. * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
  36. * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
  37. * @param {number} [options.monitoringSocketTimeout=30000] TCP Socket timeout setting for replicaset monitoring socket
  38. * @param {boolean} [options.ssl=false] Use SSL for connection
  39. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  40. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  41. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  42. * @param {Buffer} [options.key] SSL Key file binary buffer
  43. * @param {string} [options.passPhrase] SSL Certificate pass phrase
  44. * @param {boolean} [options.rejectUnauthorized=false] Reject unauthorized server certificates
  45. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  46. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  47. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  48. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
  49. * @fires Pool#connect
  50. * @fires Pool#close
  51. * @fires Pool#error
  52. * @fires Pool#timeout
  53. * @fires Pool#parseError
  54. * @return {Pool} A cursor instance
  55. */
  56. var Pool = function(options) {
  57. // Add event listener
  58. EventEmitter.call(this);
  59. // Add the options
  60. this.options = assign({
  61. // Host and port settings
  62. host: 'localhost',
  63. port: 27017,
  64. // Pool default max size
  65. size: 5,
  66. // socket settings
  67. connectionTimeout: 30000,
  68. socketTimeout: 30000,
  69. keepAlive: true,
  70. keepAliveInitialDelay: 0,
  71. noDelay: true,
  72. // SSL Settings
  73. ssl: false, checkServerIdentity: true,
  74. ca: null, cert: null, key: null, passPhrase: null,
  75. rejectUnauthorized: false,
  76. promoteLongs: true,
  77. promoteValues: true,
  78. promoteBuffers: false,
  79. // Reconnection options
  80. reconnect: true,
  81. reconnectInterval: 1000,
  82. reconnectTries: 30,
  83. // Enable domains
  84. domainsEnabled: false
  85. }, options);
  86. // Identification information
  87. this.id = _id++;
  88. // Current reconnect retries
  89. this.retriesLeft = this.options.reconnectTries;
  90. this.reconnectId = null;
  91. // No bson parser passed in
  92. if(!options.bson || (options.bson
  93. && (typeof options.bson.serialize != 'function'
  94. || typeof options.bson.deserialize != 'function'))) {
  95. throw new Error("must pass in valid bson parser");
  96. }
  97. // Logger instance
  98. this.logger = Logger('Pool', options);
  99. // Pool state
  100. this.state = DISCONNECTED;
  101. // Connections
  102. this.availableConnections = [];
  103. this.inUseConnections = [];
  104. this.connectingConnections = [];
  105. // Currently executing
  106. this.executing = false;
  107. // Operation work queue
  108. this.queue = [];
  109. // All the authProviders
  110. this.authProviders = options.authProviders || {
  111. 'mongocr': new MongoCR(options.bson), 'x509': new X509(options.bson)
  112. , 'plain': new Plain(options.bson), 'gssapi': new GSSAPI(options.bson)
  113. , 'sspi': new SSPI(options.bson), 'scram-sha-1': new ScramSHA1(options.bson)
  114. }
  115. // Are we currently authenticating
  116. this.authenticating = false;
  117. this.loggingout = false;
  118. this.nonAuthenticatedConnections = [];
  119. this.authenticatingTimestamp = null;
  120. // Number of consecutive timeouts caught
  121. this.numberOfConsecutiveTimeouts = 0;
  122. // Current pool Index
  123. this.connectionIndex = 0;
  124. }
  125. inherits(Pool, EventEmitter);
  126. Object.defineProperty(Pool.prototype, 'size', {
  127. enumerable:true,
  128. get: function() { return this.options.size; }
  129. });
  130. Object.defineProperty(Pool.prototype, 'connectionTimeout', {
  131. enumerable:true,
  132. get: function() { return this.options.connectionTimeout; }
  133. });
  134. Object.defineProperty(Pool.prototype, 'socketTimeout', {
  135. enumerable:true,
  136. get: function() { return this.options.socketTimeout; }
  137. });
  138. function stateTransition(self, newState) {
  139. var legalTransitions = {
  140. 'disconnected': [CONNECTING, DESTROYING, DISCONNECTED],
  141. 'connecting': [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
  142. 'connected': [CONNECTED, DISCONNECTED, DESTROYING],
  143. 'destroying': [DESTROYING, DESTROYED],
  144. 'destroyed': [DESTROYED]
  145. }
  146. // Get current state
  147. var legalStates = legalTransitions[self.state];
  148. if(legalStates && legalStates.indexOf(newState) != -1) {
  149. self.state = newState;
  150. } else {
  151. self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
  152. , self.id, self.state, newState, legalStates));
  153. }
  154. }
  155. function authenticate(pool, auth, connection, cb) {
  156. if(auth[0] === undefined) return cb(null);
  157. // We need to authenticate the server
  158. var mechanism = auth[0];
  159. var db = auth[1];
  160. // Validate if the mechanism exists
  161. if(!pool.authProviders[mechanism]) {
  162. throw new MongoError(f('authMechanism %s not supported', mechanism));
  163. }
  164. // Get the provider
  165. var provider = pool.authProviders[mechanism];
  166. // Authenticate using the provided mechanism
  167. provider.auth.apply(provider, [write(pool), [connection], db].concat(auth.slice(2)).concat([cb]));
  168. }
  169. // The write function used by the authentication mechanism (bypasses external)
  170. function write(self) {
  171. return function(connection, command, callback) {
  172. // Get the raw buffer
  173. // Ensure we stop auth if pool was destroyed
  174. if(self.state == DESTROYED || self.state == DESTROYING) {
  175. return callback(new MongoError('pool destroyed'));
  176. }
  177. // Set the connection workItem callback
  178. connection.workItems.push({
  179. cb: callback, command: true, requestId: command.requestId
  180. });
  181. // Write the buffer out to the connection
  182. connection.write(command.toBin());
  183. };
  184. }
  185. function reauthenticate(pool, connection, cb) {
  186. // Authenticate
  187. function authenticateAgainstProvider(pool, connection, providers, cb) {
  188. // Finished re-authenticating against providers
  189. if(providers.length == 0) return cb();
  190. // Get the provider name
  191. var provider = pool.authProviders[providers.pop()];
  192. // Auth provider
  193. provider.reauthenticate(write(pool), [connection], function(err) {
  194. // We got an error return immediately
  195. if(err) return cb(err);
  196. // Continue authenticating the connection
  197. authenticateAgainstProvider(pool, connection, providers, cb);
  198. });
  199. }
  200. // Start re-authenticating process
  201. authenticateAgainstProvider(pool, connection, Object.keys(pool.authProviders), cb);
  202. }
  203. function connectionFailureHandler(self, event) {
  204. return function(err) {
  205. if (this._connectionFailHandled) return;
  206. this._connectionFailHandled = true;
  207. // Destroy the connection
  208. this.destroy();
  209. // Remove the connection
  210. removeConnection(self, this);
  211. // Flush all work Items on this connection
  212. while(this.workItems.length > 0) {
  213. var workItem = this.workItems.shift();
  214. // if(workItem.cb) workItem.cb(err);
  215. if(workItem.cb) workItem.cb(err);
  216. }
  217. // Did we catch a timeout, increment the numberOfConsecutiveTimeouts
  218. if(event == 'timeout') {
  219. self.numberOfConsecutiveTimeouts = self.numberOfConsecutiveTimeouts + 1;
  220. // Have we timed out more than reconnectTries in a row ?
  221. // Force close the pool as we are trying to connect to tcp sink hole
  222. if(self.numberOfConsecutiveTimeouts > self.options.reconnectTries) {
  223. self.numberOfConsecutiveTimeouts = 0;
  224. // Destroy all connections and pool
  225. self.destroy(true);
  226. // Emit close event
  227. return self.emit('close', self);
  228. }
  229. }
  230. // No more socket available propegate the event
  231. if(self.socketCount() == 0) {
  232. if(self.state != DESTROYED && self.state != DESTROYING) {
  233. stateTransition(self, DISCONNECTED);
  234. }
  235. // Do not emit error events, they are always close events
  236. // do not trigger the low level error handler in node
  237. event = event == 'error' ? 'close' : event;
  238. self.emit(event, err);
  239. }
  240. // Start reconnection attempts
  241. if(!self.reconnectId && self.options.reconnect) {
  242. self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
  243. }
  244. };
  245. }
  246. function attemptReconnect(self) {
  247. return function() {
  248. self.emit('attemptReconnect', self);
  249. if(self.state == DESTROYED || self.state == DESTROYING) return;
  250. // We are connected do not try again
  251. if(self.isConnected()) {
  252. self.reconnectId = null;
  253. return;
  254. }
  255. // If we have failure schedule a retry
  256. function _connectionFailureHandler(self) {
  257. return function() {
  258. if (this._connectionFailHandled) return;
  259. this._connectionFailHandled = true;
  260. // Destroy the connection
  261. this.destroy();
  262. // Count down the number of reconnects
  263. self.retriesLeft = self.retriesLeft - 1;
  264. // How many retries are left
  265. if(self.retriesLeft == 0) {
  266. // Destroy the instance
  267. self.destroy();
  268. // Emit close event
  269. self.emit('reconnectFailed'
  270. , new MongoError(f('failed to reconnect after %s attempts with interval %s ms', self.options.reconnectTries, self.options.reconnectInterval)));
  271. } else {
  272. self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
  273. }
  274. }
  275. }
  276. // Got a connect handler
  277. function _connectHandler(self) {
  278. return function() {
  279. // Assign
  280. var connection = this;
  281. // Pool destroyed stop the connection
  282. if(self.state == DESTROYED || self.state == DESTROYING) {
  283. return connection.destroy();
  284. }
  285. // Clear out all handlers
  286. handlers.forEach(function(event) {
  287. connection.removeAllListeners(event);
  288. });
  289. // Reset reconnect id
  290. self.reconnectId = null;
  291. // Apply pool connection handlers
  292. connection.on('error', connectionFailureHandler(self, 'error'));
  293. connection.on('close', connectionFailureHandler(self, 'close'));
  294. connection.on('timeout', connectionFailureHandler(self, 'timeout'));
  295. connection.on('parseError', connectionFailureHandler(self, 'parseError'));
  296. // Apply any auth to the connection
  297. reauthenticate(self, this, function() {
  298. // Reset retries
  299. self.retriesLeft = self.options.reconnectTries;
  300. // Push to available connections
  301. self.availableConnections.push(connection);
  302. // Emit reconnect event
  303. self.emit('reconnect', self);
  304. // Trigger execute to start everything up again
  305. _execute(self)();
  306. });
  307. }
  308. }
  309. // Create a connection
  310. var connection = new Connection(messageHandler(self), self.options);
  311. // Add handlers
  312. connection.on('close', _connectionFailureHandler(self, 'close'));
  313. connection.on('error', _connectionFailureHandler(self, 'error'));
  314. connection.on('timeout', _connectionFailureHandler(self, 'timeout'));
  315. connection.on('parseError', _connectionFailureHandler(self, 'parseError'));
  316. // On connection
  317. connection.on('connect', _connectHandler(self));
  318. // Attempt connection
  319. connection.connect();
  320. }
  321. }
  322. function moveConnectionBetween(connection, from, to) {
  323. var index = from.indexOf(connection);
  324. // Move the connection from connecting to available
  325. if(index != -1) {
  326. from.splice(index, 1);
  327. to.push(connection);
  328. }
  329. }
  330. function messageHandler(self) {
  331. return function(message, connection) {
  332. // workItem to execute
  333. var workItem = null;
  334. // Locate the workItem
  335. for(var i = 0; i < connection.workItems.length; i++) {
  336. if(connection.workItems[i].requestId == message.responseTo) {
  337. // Get the callback
  338. workItem = connection.workItems[i];
  339. // Remove from list of workItems
  340. connection.workItems.splice(i, 1);
  341. }
  342. }
  343. // Reset timeout counter
  344. self.numberOfConsecutiveTimeouts = 0;
  345. // Reset the connection timeout if we modified it for
  346. // this operation
  347. if(workItem.socketTimeout) {
  348. connection.resetSocketTimeout();
  349. }
  350. // Log if debug enabled
  351. if(self.logger.isDebug()) {
  352. self.logger.debug(f('message [%s] received from %s:%s'
  353. , message.raw.toString('hex'), self.options.host, self.options.port));
  354. }
  355. // Authenticate any straggler connections
  356. function authenticateStragglers(self, connection, callback) {
  357. // Get any non authenticated connections
  358. var connections = self.nonAuthenticatedConnections.slice(0);
  359. var nonAuthenticatedConnections = self.nonAuthenticatedConnections;
  360. self.nonAuthenticatedConnections = [];
  361. // Establish if the connection need to be authenticated
  362. // Add to authentication list if
  363. // 1. we were in an authentication process when the operation was executed
  364. // 2. our current authentication timestamp is from the workItem one, meaning an auth has happened
  365. if(connection.workItems.length == 1 && (connection.workItems[0].authenticating == true
  366. || (typeof connection.workItems[0].authenticatingTimestamp == 'number'
  367. && connection.workItems[0].authenticatingTimestamp != self.authenticatingTimestamp))) {
  368. // Add connection to the list
  369. connections.push(connection);
  370. }
  371. // No connections need to be re-authenticated
  372. if(connections.length == 0) {
  373. // Release the connection back to the pool
  374. moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
  375. // Finish
  376. return callback();
  377. }
  378. // Apply re-authentication to all connections before releasing back to pool
  379. var connectionCount = connections.length;
  380. // Authenticate all connections
  381. for(var i = 0; i < connectionCount; i++) {
  382. reauthenticate(self, connections[i], function() {
  383. connectionCount = connectionCount - 1;
  384. if(connectionCount == 0) {
  385. // Put non authenticated connections in available connections
  386. self.availableConnections = self.availableConnections.concat(nonAuthenticatedConnections);
  387. // Release the connection back to the pool
  388. moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
  389. // Return
  390. callback();
  391. }
  392. });
  393. }
  394. }
  395. function handleOperationCallback(self, cb, err, result) {
  396. // No domain enabled
  397. if(!self.options.domainsEnabled) {
  398. return process.nextTick(function() {
  399. return cb(err, result);
  400. });
  401. }
  402. // Domain enabled just call the callback
  403. cb(err, result);
  404. }
  405. authenticateStragglers(self, connection, function() {
  406. // Keep executing, ensure current message handler does not stop execution
  407. if(!self.executing) {
  408. process.nextTick(function() {
  409. _execute(self)();
  410. });
  411. }
  412. // Time to dispatch the message if we have a callback
  413. if(!workItem.immediateRelease) {
  414. try {
  415. // Parse the message according to the provided options
  416. message.parse(workItem);
  417. } catch(err) {
  418. return handleOperationCallback(self, workItem.cb, MongoError.create(err));
  419. }
  420. // Establish if we have an error
  421. if(workItem.command && message.documents[0] && (message.documents[0].ok == 0 || message.documents[0]['$err']
  422. || message.documents[0]['errmsg'] || message.documents[0]['code'])) {
  423. return handleOperationCallback(self, workItem.cb, MongoError.create(message.documents[0]));
  424. }
  425. // Add the connection details
  426. message.hashedName = connection.hashedName;
  427. // Return the documents
  428. handleOperationCallback(self, workItem.cb, null, new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message));
  429. }
  430. });
  431. }
  432. }
  433. /**
  434. * Return the total socket count in the pool.
  435. * @method
  436. * @return {Number} The number of socket available.
  437. */
  438. Pool.prototype.socketCount = function() {
  439. return this.availableConnections.length
  440. + this.inUseConnections.length;
  441. // + this.connectingConnections.length;
  442. }
  443. /**
  444. * Return all pool connections
  445. * @method
  446. * @return {Connectio[]} The pool connections
  447. */
  448. Pool.prototype.allConnections = function() {
  449. return this.availableConnections
  450. .concat(this.inUseConnections)
  451. .concat(this.connectingConnections);
  452. }
  453. /**
  454. * Get a pool connection (round-robin)
  455. * @method
  456. * @return {Connection}
  457. */
  458. Pool.prototype.get = function() {
  459. return this.allConnections()[0];
  460. }
  461. /**
  462. * Is the pool connected
  463. * @method
  464. * @return {boolean}
  465. */
  466. Pool.prototype.isConnected = function() {
  467. // We are in a destroyed state
  468. if(this.state == DESTROYED || this.state == DESTROYING) {
  469. return false;
  470. }
  471. // Get connections
  472. var connections = this.availableConnections
  473. .concat(this.inUseConnections);
  474. // Check if we have any connected connections
  475. for(var i = 0; i < connections.length; i++) {
  476. if(connections[i].isConnected()) return true;
  477. }
  478. // Might be authenticating, but we are still connected
  479. if(connections.length == 0 && this.authenticating) {
  480. return true
  481. }
  482. // Not connected
  483. return false;
  484. }
  485. /**
  486. * Was the pool destroyed
  487. * @method
  488. * @return {boolean}
  489. */
  490. Pool.prototype.isDestroyed = function() {
  491. return this.state == DESTROYED || this.state == DESTROYING;
  492. }
  493. /**
  494. * Is the pool in a disconnected state
  495. * @method
  496. * @return {boolean}
  497. */
  498. Pool.prototype.isDisconnected = function() {
  499. return this.state == DISCONNECTED;
  500. }
  501. /**
  502. * Connect pool
  503. * @method
  504. */
  505. Pool.prototype.connect = function() {
  506. if(this.state != DISCONNECTED) {
  507. throw new MongoError('connection in unlawful state ' + this.state);
  508. }
  509. var self = this;
  510. // Transition to connecting state
  511. stateTransition(this, CONNECTING);
  512. // Create an array of the arguments
  513. var args = Array.prototype.slice.call(arguments, 0);
  514. // Create a connection
  515. var connection = new Connection(messageHandler(self), this.options);
  516. // Add to list of connections
  517. this.connectingConnections.push(connection);
  518. // Add listeners to the connection
  519. connection.once('connect', function(connection) {
  520. if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
  521. // Apply any store credentials
  522. reauthenticate(self, connection, function(err) {
  523. if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
  524. // We have an error emit it
  525. if(err) {
  526. // Destroy the pool
  527. self.destroy();
  528. // Emit the error
  529. return self.emit('error', err);
  530. }
  531. // Authenticate
  532. authenticate(self, args, connection, function(err) {
  533. if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
  534. // We have an error emit it
  535. if(err) {
  536. // Destroy the pool
  537. self.destroy();
  538. // Emit the error
  539. return self.emit('error', err);
  540. }
  541. // Set connected mode
  542. stateTransition(self, CONNECTED);
  543. // Move the active connection
  544. moveConnectionBetween(connection, self.connectingConnections, self.availableConnections);
  545. // Emit the connect event
  546. self.emit('connect', self);
  547. });
  548. });
  549. });
  550. // Add error handlers
  551. connection.once('error', connectionFailureHandler(this, 'error'));
  552. connection.once('close', connectionFailureHandler(this, 'close'));
  553. connection.once('timeout', connectionFailureHandler(this, 'timeout'));
  554. connection.once('parseError', connectionFailureHandler(this, 'parseError'));
  555. try {
  556. connection.connect();
  557. } catch(err) {
  558. // SSL or something threw on connect
  559. self.emit('error', err);
  560. }
  561. }
  562. /**
  563. * Authenticate using a specified mechanism
  564. * @method
  565. * @param {string} mechanism The Auth mechanism we are invoking
  566. * @param {string} db The db we are invoking the mechanism against
  567. * @param {...object} param Parameters for the specific mechanism
  568. * @param {authResultCallback} callback A callback function
  569. */
  570. Pool.prototype.auth = function(mechanism) {
  571. var self = this;
  572. var args = Array.prototype.slice.call(arguments, 0);
  573. var callback = args.pop();
  574. // If we don't have the mechanism fail
  575. if(self.authProviders[mechanism] == null && mechanism != 'default') {
  576. throw new MongoError(f("auth provider %s does not exist", mechanism));
  577. }
  578. // Signal that we are authenticating a new set of credentials
  579. this.authenticating = true;
  580. this.authenticatingTimestamp = new Date().getTime();
  581. // Authenticate all live connections
  582. function authenticateLiveConnections(self, args, cb) {
  583. // Get the current viable connections
  584. var connections = self.availableConnections;
  585. // Allow nothing else to use the connections while we authenticate them
  586. self.availableConnections = [];
  587. var connectionsCount = connections.length;
  588. var error = null;
  589. // No connections available, return
  590. if(connectionsCount == 0) return callback(null);
  591. // Authenticate the connections
  592. for(var i = 0; i < connections.length; i++) {
  593. authenticate(self, args, connections[i], function(err) {
  594. connectionsCount = connectionsCount - 1;
  595. // Store the error
  596. if(err) error = err;
  597. // Processed all connections
  598. if(connectionsCount == 0) {
  599. // Auth finished
  600. self.authenticating = false;
  601. // Add the connections back to available connections
  602. self.availableConnections = self.availableConnections.concat(connections);
  603. // We had an error, return it
  604. if(error) {
  605. // Log the error
  606. if(self.logger.isError()) {
  607. self.logger.error(f('[%s] failed to authenticate against server %s:%s'
  608. , self.id, self.options.host, self.options.port));
  609. }
  610. return cb(error);
  611. }
  612. cb(null);
  613. }
  614. });
  615. }
  616. }
  617. // Wait for a logout in process to happen
  618. function waitForLogout(self, cb) {
  619. if(!self.loggingout) return cb();
  620. setTimeout(function() {
  621. waitForLogout(self, cb);
  622. }, 1)
  623. }
  624. // Wait for loggout to finish
  625. waitForLogout(self, function() {
  626. // Authenticate all live connections
  627. authenticateLiveConnections(self, args, function(err) {
  628. // Credentials correctly stored in auth provider if successful
  629. // Any new connections will now reauthenticate correctly
  630. self.authenticating = false;
  631. // Return after authentication connections
  632. callback(err);
  633. });
  634. });
  635. }
  636. /**
  637. * Logout all users against a database
  638. * @method
  639. * @param {string} dbName The database name
  640. * @param {authResultCallback} callback A callback function
  641. */
  642. Pool.prototype.logout = function(dbName, callback) {
  643. var self = this;
  644. if(typeof dbName != 'string') {
  645. throw new MongoError('logout method requires a db name as first argument');
  646. }
  647. if(typeof callback != 'function') {
  648. throw new MongoError('logout method requires a callback');
  649. }
  650. // Indicate logout in process
  651. this.loggingout = true;
  652. // Get all relevant connections
  653. var connections = self.availableConnections.concat(self.inUseConnections);
  654. var count = connections.length;
  655. // Store any error
  656. var error = null;
  657. // Send logout command over all the connections
  658. for(var i = 0; i < connections.length; i++) {
  659. write(self)(connections[i], new Query(this.options.bson
  660. , f('%s.$cmd', dbName)
  661. , {logout:1}, {numberToSkip: 0, numberToReturn: 1}), function(err) {
  662. count = count - 1;
  663. if(err) error = err;
  664. if(count == 0) {
  665. self.loggingout = false;
  666. callback(error);
  667. }
  668. });
  669. }
  670. }
  671. /**
  672. * Unref the pool
  673. * @method
  674. */
  675. Pool.prototype.unref = function() {
  676. // Get all the known connections
  677. var connections = this.availableConnections
  678. .concat(this.inUseConnections)
  679. .concat(this.connectingConnections);
  680. connections.forEach(function(c) {
  681. c.unref();
  682. });
  683. }
  684. // Events
  685. var events = ['error', 'close', 'timeout', 'parseError', 'connect'];
  686. // Destroy the connections
  687. function destroy(self, connections) {
  688. // Destroy all connections
  689. connections.forEach(function(c) {
  690. // Remove all listeners
  691. for(var i = 0; i < events.length; i++) {
  692. c.removeAllListeners(events[i]);
  693. }
  694. // Destroy connection
  695. c.destroy();
  696. });
  697. // Zero out all connections
  698. self.inUseConnections = [];
  699. self.availableConnections = [];
  700. self.nonAuthenticatedConnections = [];
  701. self.connectingConnections = [];
  702. // Set state to destroyed
  703. stateTransition(self, DESTROYED);
  704. }
  705. /**
  706. * Destroy pool
  707. * @method
  708. */
  709. Pool.prototype.destroy = function(force) {
  710. var self = this;
  711. // Do not try again if the pool is already dead
  712. if(this.state == DESTROYED || self.state == DESTROYING) return;
  713. // Set state to destroyed
  714. stateTransition(this, DESTROYING);
  715. // Are we force closing
  716. if(force) {
  717. // Get all the known connections
  718. var connections = self.availableConnections
  719. .concat(self.inUseConnections)
  720. .concat(self.nonAuthenticatedConnections)
  721. .concat(self.connectingConnections);
  722. return destroy(self, connections);
  723. }
  724. // Wait for the operations to drain before we close the pool
  725. function checkStatus() {
  726. if(self.queue.length == 0) {
  727. // Get all the known connections
  728. var connections = self.availableConnections
  729. .concat(self.inUseConnections)
  730. .concat(self.nonAuthenticatedConnections)
  731. .concat(self.connectingConnections);
  732. // Check if we have any in flight operations
  733. for(var i = 0; i < connections.length; i++) {
  734. // There is an operation still in flight, reschedule a
  735. // check waiting for it to drain
  736. if(connections[i].workItems.length > 0) {
  737. return setTimeout(checkStatus, 1);
  738. }
  739. }
  740. destroy(self, connections);
  741. } else {
  742. setTimeout(checkStatus, 1);
  743. }
  744. }
  745. // Initiate drain of operations
  746. checkStatus();
  747. }
  748. /**
  749. * Write a message to MongoDB
  750. * @method
  751. * @return {Connection}
  752. */
  753. Pool.prototype.write = function(commands, options, cb) {
  754. var self = this;
  755. // Ensure we have a callback
  756. if(typeof options == 'function') {
  757. cb = options;
  758. }
  759. // Always have options
  760. options = options || {};
  761. // Pool was destroyed error out
  762. if(this.state == DESTROYED || this.state == DESTROYING) {
  763. // Callback with an error
  764. if(cb) {
  765. try {
  766. cb(new MongoError('pool destroyed'));
  767. } catch(err) {
  768. process.nextTick(function() {
  769. throw err;
  770. });
  771. }
  772. }
  773. return;
  774. }
  775. if(this.options.domainsEnabled
  776. && process.domain && typeof cb === "function") {
  777. // if we have a domain bind to it
  778. var oldCb = cb;
  779. cb = process.domain.bind(function() {
  780. // v8 - argumentsToArray one-liner
  781. var args = new Array(arguments.length); for(var i = 0; i < arguments.length; i++) { args[i] = arguments[i]; }
  782. // bounce off event loop so domain switch takes place
  783. process.nextTick(function() {
  784. oldCb.apply(null, args);
  785. });
  786. });
  787. }
  788. // Do we have an operation
  789. var operation = {
  790. cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false
  791. };
  792. var buffer = null
  793. if(Array.isArray(commands)) {
  794. buffer = [];
  795. for(var i = 0; i < commands.length; i++) {
  796. buffer.push(commands[i].toBin());
  797. }
  798. // Get the requestId
  799. operation.requestId = commands[commands.length - 1].requestId;
  800. } else {
  801. operation.requestId = commands.requestId;
  802. buffer = commands.toBin();
  803. }
  804. // Set the buffers
  805. operation.buffer = buffer;
  806. // Set the options for the parsing
  807. operation.promoteLongs = typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true;
  808. operation.promoteValues = typeof options.promoteValues == 'boolean' ? options.promoteValues : true;
  809. operation.promoteBuffers = typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers : false;
  810. operation.raw = typeof options.raw == 'boolean' ? options.raw : false;
  811. operation.immediateRelease = typeof options.immediateRelease == 'boolean' ? options.immediateRelease : false;
  812. operation.documentsReturnedIn = options.documentsReturnedIn;
  813. operation.command = typeof options.command == 'boolean' ? options.command : false;
  814. operation.fullResult = typeof options.fullResult == 'boolean' ? options.fullResult : false;
  815. operation.noResponse = typeof options.noResponse == 'boolean' ? options.noResponse : false;
  816. // operation.requestId = options.requestId;
  817. // Optional per operation socketTimeout
  818. operation.socketTimeout = options.socketTimeout;
  819. operation.monitoring = options.monitoring;
  820. // Custom socket Timeout
  821. if(options.socketTimeout) {
  822. operation.socketTimeout = options.socketTimeout;
  823. }
  824. // We need to have a callback function unless the message returns no response
  825. if(!(typeof cb == 'function') && !options.noResponse) {
  826. throw new MongoError('write method must provide a callback');
  827. }
  828. // If we have a monitoring operation schedule as the very first operation
  829. // Otherwise add to back of queue
  830. if(options.monitoring) {
  831. this.queue.unshift(operation);
  832. } else {
  833. this.queue.push(operation);
  834. }
  835. // Attempt to execute the operation
  836. if(!self.executing) {
  837. process.nextTick(function() {
  838. _execute(self)();
  839. });
  840. }
  841. }
  842. // Remove connection method
  843. function remove(connection, connections) {
  844. for(var i = 0; i < connections.length; i++) {
  845. if(connections[i] === connection) {
  846. connections.splice(i, 1);
  847. return true;
  848. }
  849. }
  850. }
  851. function removeConnection(self, connection) {
  852. if(remove(connection, self.availableConnections)) return;
  853. if(remove(connection, self.inUseConnections)) return;
  854. if(remove(connection, self.connectingConnections)) return;
  855. if(remove(connection, self.nonAuthenticatedConnections)) return;
  856. }
  857. // All event handlers
  858. var handlers = ["close", "message", "error", "timeout", "parseError", "connect"];
  859. function _createConnection(self) {
  860. var connection = new Connection(messageHandler(self), self.options);
  861. // Push the connection
  862. self.connectingConnections.push(connection);
  863. // Handle any errors
  864. var tempErrorHandler = function(_connection) {
  865. return function() {
  866. // Destroy the connection
  867. _connection.destroy();
  868. // Remove the connection from the connectingConnections list
  869. removeConnection(self, _connection);
  870. // Start reconnection attempts
  871. if(!self.reconnectId && self.options.reconnect) {
  872. self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
  873. }
  874. }
  875. }
  876. // Handle successful connection
  877. var tempConnectHandler = function(_connection) {
  878. return function() {
  879. // Destroyed state return
  880. if(self.state == DESTROYED || self.state == DESTROYING) {
  881. // Remove the connection from the list
  882. removeConnection(self, _connection);
  883. return _connection.destroy();
  884. }
  885. // Destroy all event emitters
  886. handlers.forEach(function(e) {
  887. _connection.removeAllListeners(e);
  888. });
  889. // Add the final handlers
  890. _connection.once('close', connectionFailureHandler(self, 'close'));
  891. _connection.once('error', connectionFailureHandler(self, 'error'));
  892. _connection.once('timeout', connectionFailureHandler(self, 'timeout'));
  893. _connection.once('parseError', connectionFailureHandler(self, 'parseError'));
  894. // Signal
  895. reauthenticate(self, _connection, function(err) {
  896. if(self.state == DESTROYED || self.state == DESTROYING) {
  897. return _connection.destroy();
  898. }
  899. // Remove the connection from the connectingConnections list
  900. removeConnection(self, _connection);
  901. // Handle error
  902. if(err) {
  903. return _connection.destroy();
  904. }
  905. // If we are authenticating at the moment
  906. // Do not automatially put in available connections
  907. // As we need to apply the credentials first
  908. if(self.authenticating) {
  909. self.nonAuthenticatedConnections.push(_connection);
  910. } else {
  911. // Push to available
  912. self.availableConnections.push(_connection);
  913. // Execute any work waiting
  914. _execute(self)();
  915. }
  916. });
  917. }
  918. }
  919. // Add all handlers
  920. connection.once('close', tempErrorHandler(connection));
  921. connection.once('error', tempErrorHandler(connection));
  922. connection.once('timeout', tempErrorHandler(connection));
  923. connection.once('parseError', tempErrorHandler(connection));
  924. connection.once('connect', tempConnectHandler(connection));
  925. // Start connection
  926. connection.connect();
  927. }
  928. function flushMonitoringOperations(queue) {
  929. for(var i = 0; i < queue.length; i++) {
  930. if(queue[i].monitoring) {
  931. var workItem = queue[i];
  932. queue.splice(i, 1);
  933. workItem.cb(new MongoError({ message: 'no connection available for monitoring', driver:true }));
  934. }
  935. }
  936. }
  937. function _execute(self) {
  938. return function() {
  939. if(self.state == DESTROYED) return;
  940. // Already executing, skip
  941. if(self.executing) return;
  942. // Set pool as executing
  943. self.executing = true;
  944. // Wait for auth to clear before continuing
  945. function waitForAuth(cb) {
  946. if(!self.authenticating) return cb();
  947. // Wait for a milisecond and try again
  948. setTimeout(function() {
  949. waitForAuth(cb);
  950. }, 1);
  951. }
  952. // Block on any auth in process
  953. waitForAuth(function() {
  954. // As long as we have available connections
  955. while(true) {
  956. // Total availble connections
  957. var totalConnections = self.availableConnections.length
  958. + self.connectingConnections.length
  959. + self.inUseConnections.length;
  960. // No available connections available, flush any monitoring ops
  961. if(self.availableConnections.length == 0) {
  962. // Flush any monitoring operations
  963. flushMonitoringOperations(self.queue);
  964. break;
  965. }
  966. // No queue break
  967. if(self.queue.length == 0) {
  968. break;
  969. }
  970. // Get a connection
  971. var connection = self.availableConnections[self.connectionIndex++ % self.availableConnections.length];
  972. // Is the connection connected
  973. if(connection.isConnected()) {
  974. // Get the next work item
  975. var workItem = self.queue.shift();
  976. // Get actual binary commands
  977. var buffer = workItem.buffer;
  978. // Set current status of authentication process
  979. workItem.authenticating = self.authenticating;
  980. workItem.authenticatingTimestamp = self.authenticatingTimestamp;
  981. // If we are monitoring take the connection of the availableConnections
  982. if (workItem.monitoring) {
  983. moveConnectionBetween(connection, self.availableConnections, self.inUseConnections);
  984. }
  985. // Track the executing commands on the mongo server
  986. // as long as there is an expected response
  987. if (! workItem.noResponse) {
  988. connection.workItems.push(workItem);
  989. }
  990. // We have a custom socketTimeout
  991. if(!workItem.immediateRelease && typeof workItem.socketTimeout == 'number') {
  992. connection.setSocketTimeout(workItem.socketTimeout);
  993. }
  994. // Put operation on the wire
  995. if(Array.isArray(buffer)) {
  996. for(var i = 0; i < buffer.length; i++) {
  997. connection.write(buffer[i])
  998. }
  999. } else {
  1000. connection.write(buffer);
  1001. }
  1002. if(workItem.immediateRelease && self.authenticating) {
  1003. self.nonAuthenticatedConnections.push(connection);
  1004. }
  1005. // Have we not reached the max connection size yet
  1006. if(totalConnections < self.options.size
  1007. && self.queue.length > 0) {
  1008. // Create a new connection
  1009. _createConnection(self);
  1010. }
  1011. } else {
  1012. // Remove the disconnected connection
  1013. removeConnection(self, connection);
  1014. // Flush any monitoring operations in the queue, failing fast
  1015. flushMonitoringOperations(self.queue);
  1016. }
  1017. }
  1018. });
  1019. self.executing = false;
  1020. }
  1021. }
  1022. // Make execution loop available for testing
  1023. Pool._execute = _execute;
  1024. /**
  1025. * A server connect event, used to verify that the connection is up and running
  1026. *
  1027. * @event Pool#connect
  1028. * @type {Pool}
  1029. */
  1030. /**
  1031. * A server reconnect event, used to verify that pool reconnected.
  1032. *
  1033. * @event Pool#reconnect
  1034. * @type {Pool}
  1035. */
  1036. /**
  1037. * The server connection closed, all pool connections closed
  1038. *
  1039. * @event Pool#close
  1040. * @type {Pool}
  1041. */
  1042. /**
  1043. * The server connection caused an error, all pool connections closed
  1044. *
  1045. * @event Pool#error
  1046. * @type {Pool}
  1047. */
  1048. /**
  1049. * The server connection timed out, all pool connections closed
  1050. *
  1051. * @event Pool#timeout
  1052. * @type {Pool}
  1053. */
  1054. /**
  1055. * The driver experienced an invalid message, all pool connections closed
  1056. *
  1057. * @event Pool#parseError
  1058. * @type {Pool}
  1059. */
  1060. /**
  1061. * The driver attempted to reconnect
  1062. *
  1063. * @event Pool#attemptReconnect
  1064. * @type {Pool}
  1065. */
  1066. /**
  1067. * The driver exhausted all reconnect attempts
  1068. *
  1069. * @event Pool#reconnectFailed
  1070. * @type {Pool}
  1071. */
  1072. module.exports = Pool;