mongos.js 36 KB


  1. "use strict"
  2. var inherits = require('util').inherits,
  3. f = require('util').format,
  4. EventEmitter = require('events').EventEmitter,
  5. BasicCursor = require('../cursor'),
  6. Logger = require('../connection/logger'),
  7. retrieveBSON = require('../connection/utils').retrieveBSON,
  8. MongoError = require('../error'),
  9. Server = require('./server'),
  10. assign = require('./shared').assign,
  11. clone = require('./shared').clone,
  12. createClientInfo = require('./shared').createClientInfo;
  13. var BSON = retrieveBSON();
  14. /**
  15. * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
  16. * used to construct connections.
  17. *
  18. * @example
  19. * var Mongos = require('mongodb-core').Mongos
  20. * , ReadPreference = require('mongodb-core').ReadPreference
  21. * , assert = require('assert');
  22. *
  23. * var server = new Mongos([{host: 'localhost', port: 30000}]);
  24. * // Wait for the connection event
  25. * server.on('connect', function(server) {
  26. * server.destroy();
  27. * });
  28. *
  29. * // Start connecting
  30. * server.connect();
  31. */
  32. var MongoCR = require('../auth/mongocr')
  33. , X509 = require('../auth/x509')
  34. , Plain = require('../auth/plain')
  35. , GSSAPI = require('../auth/gssapi')
  36. , SSPI = require('../auth/sspi')
  37. , ScramSHA1 = require('../auth/scram');
  38. //
  39. // States
  40. var DISCONNECTED = 'disconnected';
  41. var CONNECTING = 'connecting';
  42. var CONNECTED = 'connected';
  43. var DESTROYED = 'destroyed';
  44. function stateTransition(self, newState) {
  45. var legalTransitions = {
  46. 'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
  47. 'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
  48. 'connected': [CONNECTED, DISCONNECTED, DESTROYED],
  49. 'destroyed': [DESTROYED]
  50. }
  51. // Get current state
  52. var legalStates = legalTransitions[self.state];
  53. if(legalStates && legalStates.indexOf(newState) != -1) {
  54. self.state = newState;
  55. } else {
  56. self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
  57. , self.id, self.state, newState, legalStates));
  58. }
  59. }
  60. //
  61. // ReplSet instance id
  62. var id = 1;
  63. var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
  64. /**
  65. * Creates a new Mongos instance
  66. * @class
  67. * @param {array} seedlist A list of seeds for the replicaset
  68. * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry
  69. * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
  70. * @param {number} [options.size=5] Server connection pool size
  71. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  72. * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
  73. * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
  74. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  75. * @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting
  76. * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
  77. * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
  78. * @param {boolean} [options.ssl=false] Use SSL for connection
  79. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  80. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  81. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  82. * @param {Buffer} [options.key] SSL Key file binary buffer
  83. * @param {string} [options.passphrase] SSL Certificate pass phrase
  84. * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
  85. * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
  86. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  87. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  88. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  89. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
  90. * @return {Mongos} A cursor instance
  91. * @fires Mongos#connect
  92. * @fires Mongos#reconnect
  93. * @fires Mongos#joined
  94. * @fires Mongos#left
  95. * @fires Mongos#failed
  96. * @fires Mongos#fullsetup
  97. * @fires Mongos#all
  98. * @fires Mongos#serverHeartbeatStarted
  99. * @fires Mongos#serverHeartbeatSucceeded
  100. * @fires Mongos#serverHeartbeatFailed
  101. * @fires Mongos#topologyOpening
  102. * @fires Mongos#topologyClosed
  103. * @fires Mongos#topologyDescriptionChanged
  104. * @property {string} type the topology type.
  105. * @property {string} parserType the parser type used (c++ or js).
  106. */
  107. var Mongos = function(seedlist, options) {
  108. options = options || {};
  109. // Get replSet Id
  110. this.id = id++;
  111. // Internal state
  112. this.s = {
  113. options: assign({}, options),
  114. // BSON instance
  115. bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
  116. BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
  117. BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]),
  118. // Factory overrides
  119. Cursor: options.cursorFactory || BasicCursor,
  120. // Logger instance
  121. logger: Logger('Mongos', options),
  122. // Seedlist
  123. seedlist: seedlist,
  124. // Ha interval
  125. haInterval: options.haInterval ? options.haInterval : 10000,
  126. // Disconnect handler
  127. disconnectHandler: options.disconnectHandler,
  128. // Server selection index
  129. index: 0,
  130. // Connect function options passed in
  131. connectOptions: {},
  132. // Are we running in debug mode
  133. debug: typeof options.debug == 'boolean' ? options.debug : false,
  134. // localThresholdMS
  135. localThresholdMS: options.localThresholdMS || 15,
  136. // Client info
  137. clientInfo: createClientInfo(options)
  138. }
  139. // Set the client info
  140. this.s.options.clientInfo = createClientInfo(options);
  141. // Log info warning if the socketTimeout < haInterval as it will cause
  142. // a lot of recycled connections to happen.
  143. if(this.s.logger.isWarn()
  144. && this.s.options.socketTimeout != 0
  145. && this.s.options.socketTimeout < this.s.haInterval) {
  146. this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
  147. , this.s.options.socketTimeout, this.s.haInterval));
  148. }
  149. // All the authProviders
  150. this.authProviders = options.authProviders || {
  151. 'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
  152. , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
  153. , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
  154. }
  155. // Disconnected state
  156. this.state = DISCONNECTED;
  157. // Current proxies we are connecting to
  158. this.connectingProxies = [];
  159. // Currently connected proxies
  160. this.connectedProxies = [];
  161. // Disconnected proxies
  162. this.disconnectedProxies = [];
  163. // Are we authenticating
  164. this.authenticating = false;
  165. // Index of proxy to run operations against
  166. this.index = 0;
  167. // High availability timeout id
  168. this.haTimeoutId = null;
  169. // Last ismaster
  170. this.ismaster = null;
  171. // Add event listener
  172. EventEmitter.call(this);
  173. }
  174. inherits(Mongos, EventEmitter);
  175. Object.defineProperty(Mongos.prototype, 'type', {
  176. enumerable:true, get: function() { return 'mongos'; }
  177. });
  178. Object.defineProperty(Mongos.prototype, 'parserType', {
  179. enumerable:true, get: function() {
  180. return BSON.native ? "c++" : "js";
  181. }
  182. });
  183. /**
  184. * Emit event if it exists
  185. * @method
  186. */
  187. function emitSDAMEvent(self, event, description) {
  188. if(self.listeners(event).length > 0) {
  189. self.emit(event, description);
  190. }
  191. }
  192. /**
  193. * Initiate server connect
  194. * @method
  195. * @param {array} [options.auth=null] Array of auth options to apply on connect
  196. */
  197. Mongos.prototype.connect = function(options) {
  198. var self = this;
  199. // Add any connect level options to the internal state
  200. this.s.connectOptions = options || {};
  201. // Set connecting state
  202. stateTransition(this, CONNECTING);
  203. // Create server instances
  204. var servers = this.s.seedlist.map(function(x) {
  205. return new Server(assign({}, self.s.options, x, {
  206. authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
  207. }, {
  208. clientInfo: clone(self.s.clientInfo)
  209. }));
  210. });
  211. // Emit the topology opening event
  212. emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
  213. // Start all server connections
  214. connectProxies(self, servers);
  215. }
  216. function handleEvent(self) {
  217. return function() {
  218. if(self.state == DESTROYED) return;
  219. // Move to list of disconnectedProxies
  220. moveServerFrom(self.connectedProxies, self.disconnectedProxies, this);
  221. // Emit the left signal
  222. self.emit('left', 'mongos', this);
  223. }
  224. }
  225. function handleInitialConnectEvent(self, event) {
  226. return function() {
  227. // Destroy the instance
  228. if(self.state == DESTROYED) {
  229. // Move from connectingProxies
  230. moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
  231. return this.destroy();
  232. }
  233. // Check the type of server
  234. if(event == 'connect') {
  235. // Get last known ismaster
  236. self.ismaster = this.lastIsMaster();
  237. // Is this not a proxy, remove t
  238. if(self.ismaster.msg == 'isdbgrid') {
  239. // Add to the connectd list
  240. for(var i = 0; i < self.connectedProxies.length; i++) {
  241. if(self.connectedProxies[i].name == this.name) {
  242. // Move from connectingProxies
  243. moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
  244. this.destroy();
  245. return self.emit('failed', this);
  246. }
  247. }
  248. // Remove the handlers
  249. for(i = 0; i < handlers.length; i++) {
  250. this.removeAllListeners(handlers[i]);
  251. }
  252. // Add stable state handlers
  253. this.on('error', handleEvent(self, 'error'));
  254. this.on('close', handleEvent(self, 'close'));
  255. this.on('timeout', handleEvent(self, 'timeout'));
  256. this.on('parseError', handleEvent(self, 'parseError'));
  257. // Move from connecting proxies connected
  258. moveServerFrom(self.connectingProxies, self.connectedProxies, this);
  259. // Emit the joined event
  260. self.emit('joined', 'mongos', this);
  261. } else {
  262. // Print warning if we did not find a mongos proxy
  263. if(self.s.logger.isWarn()) {
  264. var message = 'expected mongos proxy, but found replicaset member mongod for server %s';
  265. // We have a standalone server
  266. if(!self.ismaster.hosts) {
  267. message = 'expected mongos proxy, but found standalone mongod for server %s';
  268. }
  269. self.s.logger.warn(f(message, this.name));
  270. }
  271. // This is not a mongos proxy, remove it completely
  272. removeProxyFrom(self.connectingProxies, this);
  273. // Emit the left event
  274. self.emit('left', 'server', this);
  275. // Emit failed event
  276. self.emit('failed', this);
  277. }
  278. } else {
  279. moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
  280. // Emit the left event
  281. self.emit('left', 'mongos', this);
  282. // Emit failed event
  283. self.emit('failed', this);
  284. }
  285. // Trigger topologyMonitor
  286. if(self.connectingProxies.length == 0) {
  287. // Emit connected if we are connected
  288. if(self.connectedProxies.length > 0) {
  289. // Set the state to connected
  290. stateTransition(self, CONNECTED);
  291. // Emit the connect event
  292. self.emit('connect', self);
  293. self.emit('fullsetup', self);
  294. self.emit('all', self);
  295. } else if(self.disconnectedProxies.length == 0) {
  296. // Print warning if we did not find a mongos proxy
  297. if(self.s.logger.isWarn()) {
  298. self.s.logger.warn(f('no mongos proxies found in seed list, did you mean to connect to a replicaset'));
  299. }
  300. // Emit the error that no proxies were found
  301. return self.emit('error', new MongoError('no mongos proxies found in seed list'));
  302. }
  303. // Topology monitor
  304. topologyMonitor(self, {firstConnect:true});
  305. }
  306. };
  307. }
  308. function connectProxies(self, servers) {
  309. // Update connectingProxies
  310. self.connectingProxies = self.connectingProxies.concat(servers);
  311. // Index used to interleaf the server connects, avoiding
  312. // runtime issues on io constrained vm's
  313. var timeoutInterval = 0;
  314. function connect(server, timeoutInterval) {
  315. setTimeout(function() {
  316. // Add event handlers
  317. server.once('close', handleInitialConnectEvent(self, 'close'));
  318. server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
  319. server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
  320. server.once('error', handleInitialConnectEvent(self, 'error'));
  321. server.once('connect', handleInitialConnectEvent(self, 'connect'));
  322. // SDAM Monitoring events
  323. server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
  324. server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
  325. server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
  326. // Start connection
  327. server.connect(self.s.connectOptions);
  328. }, timeoutInterval);
  329. }
  330. // Start all the servers
  331. while(servers.length > 0) {
  332. connect(servers.shift(), timeoutInterval++);
  333. }
  334. }
  335. function pickProxy(self) {
  336. // Get the currently connected Proxies
  337. var connectedProxies = self.connectedProxies.slice(0);
  338. // Set lower bound
  339. var lowerBoundLatency = Number.MAX_VALUE;
  340. // Determine the lower bound for the Proxies
  341. for(var i = 0; i < connectedProxies.length; i++) {
  342. if(connectedProxies[i].lastIsMasterMS < lowerBoundLatency) {
  343. lowerBoundLatency = connectedProxies[i].lastIsMasterMS;
  344. }
  345. }
  346. // Filter out the possible servers
  347. connectedProxies = connectedProxies.filter(function(server) {
  348. if((server.lastIsMasterMS <= (lowerBoundLatency + self.s.localThresholdMS))
  349. && server.isConnected()) {
  350. return true;
  351. }
  352. });
  353. // We have no connectedProxies pick first of the connected ones
  354. if(connectedProxies.length == 0) {
  355. return self.connectedProxies[0];
  356. }
  357. // Get proxy
  358. var proxy = connectedProxies[self.index % connectedProxies.length];
  359. // Update the index
  360. self.index = (self.index + 1) % connectedProxies.length;
  361. // Return the proxy
  362. return proxy;
  363. }
  364. function moveServerFrom(from, to, proxy) {
  365. for(var i = 0; i < from.length; i++) {
  366. if(from[i].name == proxy.name) {
  367. from.splice(i, 1);
  368. }
  369. }
  370. for(i = 0; i < to.length; i++) {
  371. if(to[i].name == proxy.name) {
  372. to.splice(i, 1);
  373. }
  374. }
  375. to.push(proxy);
  376. }
  377. function removeProxyFrom(from, proxy) {
  378. for(var i = 0; i < from.length; i++) {
  379. if(from[i].name == proxy.name) {
  380. from.splice(i, 1);
  381. }
  382. }
  383. }
  384. function reconnectProxies(self, proxies, callback) {
  385. // Count lefts
  386. var count = proxies.length;
  387. // Handle events
  388. var _handleEvent = function(self, event) {
  389. return function() {
  390. var _self = this;
  391. count = count - 1;
  392. // Destroyed
  393. if(self.state == DESTROYED) {
  394. moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
  395. return this.destroy();
  396. }
  397. if(event == 'connect' && !self.authenticating) {
  398. // Destroyed
  399. if(self.state == DESTROYED) {
  400. moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
  401. return _self.destroy();
  402. }
  403. // Remove the handlers
  404. for(var i = 0; i < handlers.length; i++) {
  405. _self.removeAllListeners(handlers[i]);
  406. }
  407. // Add stable state handlers
  408. _self.on('error', handleEvent(self, 'error'));
  409. _self.on('close', handleEvent(self, 'close'));
  410. _self.on('timeout', handleEvent(self, 'timeout'));
  411. _self.on('parseError', handleEvent(self, 'parseError'));
  412. // Move to the connected servers
  413. moveServerFrom(self.disconnectedProxies, self.connectedProxies, _self);
  414. // Emit joined event
  415. self.emit('joined', 'mongos', _self);
  416. } else if(event == 'connect' && self.authenticating) {
  417. // Move from connectingProxies
  418. moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
  419. this.destroy();
  420. }
  421. // Are we done finish up callback
  422. if(count == 0) {
  423. callback();
  424. }
  425. }
  426. }
  427. // No new servers
  428. if(count == 0) {
  429. return callback();
  430. }
  431. // Execute method
  432. function execute(_server, i) {
  433. setTimeout(function() {
  434. // Destroyed
  435. if(self.state == DESTROYED) {
  436. return;
  437. }
  438. // Create a new server instance
  439. var server = new Server(assign({}, self.s.options, {
  440. host: _server.name.split(':')[0],
  441. port: parseInt(_server.name.split(':')[1], 10)
  442. }, {
  443. authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
  444. }, {
  445. clientInfo: clone(self.s.clientInfo)
  446. }));
  447. // Add temp handlers
  448. server.once('connect', _handleEvent(self, 'connect'));
  449. server.once('close', _handleEvent(self, 'close'));
  450. server.once('timeout', _handleEvent(self, 'timeout'));
  451. server.once('error', _handleEvent(self, 'error'));
  452. server.once('parseError', _handleEvent(self, 'parseError'));
  453. // SDAM Monitoring events
  454. server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
  455. server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
  456. server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
  457. server.connect(self.s.connectOptions);
  458. }, i);
  459. }
  460. // Create new instances
  461. for(var i = 0; i < proxies.length; i++) {
  462. execute(proxies[i], i);
  463. }
  464. }
  465. function topologyMonitor(self, options) {
  466. options = options || {};
  467. // Set momitoring timeout
  468. self.haTimeoutId = setTimeout(function() {
  469. if(self.state == DESTROYED) return;
  470. // If we have a primary and a disconnect handler, execute
  471. // buffered operations
  472. if(self.isConnected() && self.s.disconnectHandler) {
  473. self.s.disconnectHandler.execute();
  474. }
  475. // Get the connectingServers
  476. var proxies = self.connectedProxies.slice(0);
  477. // Get the count
  478. var count = proxies.length;
  479. // If the count is zero schedule a new fast
  480. function pingServer(_self, _server, cb) {
  481. // Measure running time
  482. var start = new Date().getTime();
  483. // Emit the server heartbeat start
  484. emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
  485. // Execute ismaster
  486. _server.command('admin.$cmd', {
  487. ismaster:true
  488. }, {
  489. monitoring: true,
  490. socketTimeout: self.s.options.connectionTimeout || 2000,
  491. }, function(err, r) {
  492. if(self.state == DESTROYED) {
  493. // Move from connectingProxies
  494. moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
  495. _server.destroy();
  496. return cb(err, r);
  497. }
  498. // Calculate latency
  499. var latencyMS = new Date().getTime() - start;
  500. // We had an error, remove it from the state
  501. if(err) {
  502. // Emit the server heartbeat failure
  503. emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
  504. // Move from connected proxies to disconnected proxies
  505. moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
  506. } else {
  507. // Update the server ismaster
  508. _server.ismaster = r.result;
  509. _server.lastIsMasterMS = latencyMS;
  510. // Server heart beat event
  511. emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
  512. }
  513. cb(err, r);
  514. });
  515. }
  516. // No proxies initiate monitor again
  517. if(proxies.length == 0) {
  518. // Emit close event if any listeners registered
  519. if(self.listeners("close").length > 0 && self.state == CONNECTING) {
  520. self.emit('error', new MongoError('no mongos proxy available'));
  521. } else {
  522. self.emit('close', self);
  523. }
  524. // Attempt to connect to any unknown servers
  525. return reconnectProxies(self, self.disconnectedProxies, function() {
  526. if(self.state == DESTROYED) return;
  527. // Are we connected ? emit connect event
  528. if(self.state == CONNECTING && options.firstConnect) {
  529. self.emit('connect', self);
  530. self.emit('fullsetup', self);
  531. self.emit('all', self);
  532. } else if(self.isConnected()) {
  533. self.emit('reconnect', self);
  534. } else if(!self.isConnected() && self.listeners("close").length > 0) {
  535. self.emit('close', self);
  536. }
  537. // Perform topology monitor
  538. topologyMonitor(self);
  539. });
  540. }
  541. // Ping all servers
  542. for(var i = 0; i < proxies.length; i++) {
  543. pingServer(self, proxies[i], function() {
  544. count = count - 1;
  545. if(count == 0) {
  546. if(self.state == DESTROYED) return;
  547. // Attempt to connect to any unknown servers
  548. reconnectProxies(self, self.disconnectedProxies, function() {
  549. if(self.state == DESTROYED) return;
  550. // Perform topology monitor
  551. topologyMonitor(self);
  552. });
  553. }
  554. });
  555. }
  556. }, self.s.haInterval);
  557. }
  558. /**
  559. * Returns the last known ismaster document for this server
  560. * @method
  561. * @return {object}
  562. */
  563. Mongos.prototype.lastIsMaster = function() {
  564. return this.ismaster;
  565. }
  566. /**
  567. * Unref all connections belong to this server
  568. * @method
  569. */
  570. Mongos.prototype.unref = function() {
  571. // Transition state
  572. stateTransition(this, DISCONNECTED);
  573. // Get all proxies
  574. var proxies = this.connectedProxies.concat(this.connectingProxies);
  575. proxies.forEach(function(x) {
  576. x.unref();
  577. });
  578. clearTimeout(this.haTimeoutId);
  579. }
  580. /**
  581. * Destroy the server connection
  582. * @param {boolean} [options.force=false] Force destroy the pool
  583. * @method
  584. */
  585. Mongos.prototype.destroy = function(options) {
  586. // Transition state
  587. stateTransition(this, DESTROYED);
  588. // Get all proxies
  589. var proxies = this.connectedProxies.concat(this.connectingProxies);
  590. // Clear out any monitoring process
  591. if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
  592. // Destroy all connecting servers
  593. proxies.forEach(function(x) {
  594. x.destroy(options);
  595. });
  596. // Emit toplogy closing event
  597. emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
  598. }
  599. /**
  600. * Figure out if the server is connected
  601. * @method
  602. * @return {boolean}
  603. */
  604. Mongos.prototype.isConnected = function() {
  605. return this.connectedProxies.length > 0;
  606. }
  607. /**
  608. * Figure out if the server instance was destroyed by calling destroy
  609. * @method
  610. * @return {boolean}
  611. */
  612. Mongos.prototype.isDestroyed = function() {
  613. return this.state == DESTROYED;
  614. }
  615. //
  616. // Operations
  617. //
  618. // Execute write operation
  619. var executeWriteOperation = function(self, op, ns, ops, options, callback) {
  620. if(typeof options == 'function') callback = options, options = {}, options = options || {};
  621. // Ensure we have no options
  622. options = options || {};
  623. // Pick a server
  624. var server = pickProxy(self);
  625. // No server found error out
  626. if(!server) return callback(new MongoError('no mongos proxy available'));
  627. // Execute the command
  628. server[op](ns, ops, options, callback);
  629. }
  630. /**
  631. * Insert one or more documents
  632. * @method
  633. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  634. * @param {array} ops An array of documents to insert
  635. * @param {boolean} [options.ordered=true] Execute in order or out of order
  636. * @param {object} [options.writeConcern={}] Write concern for the operation
  637. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  638. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  639. * @param {opResultCallback} callback A callback function
  640. */
  641. Mongos.prototype.insert = function(ns, ops, options, callback) {
  642. if(typeof options == 'function') callback = options, options = {}, options = options || {};
  643. if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  644. // Not connected but we have a disconnecthandler
  645. if(!this.isConnected() && this.s.disconnectHandler != null) {
  646. return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
  647. }
  648. // No mongos proxy available
  649. if(!this.isConnected()) {
  650. return callback(new MongoError('no mongos proxy available'));
  651. }
  652. // Execute write operation
  653. executeWriteOperation(this, 'insert', ns, ops, options, callback);
  654. }
  655. /**
  656. * Perform one or more update operations
  657. * @method
  658. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  659. * @param {array} ops An array of updates
  660. * @param {boolean} [options.ordered=true] Execute in order or out of order
  661. * @param {object} [options.writeConcern={}] Write concern for the operation
  662. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  663. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  664. * @param {opResultCallback} callback A callback function
  665. */
  666. Mongos.prototype.update = function(ns, ops, options, callback) {
  667. if(typeof options == 'function') callback = options, options = {}, options = options || {};
  668. if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  669. // Not connected but we have a disconnecthandler
  670. if(!this.isConnected() && this.s.disconnectHandler != null) {
  671. return this.s.disconnectHandler.add('update', ns, ops, options, callback);
  672. }
  673. // No mongos proxy available
  674. if(!this.isConnected()) {
  675. return callback(new MongoError('no mongos proxy available'));
  676. }
  677. // Execute write operation
  678. executeWriteOperation(this, 'update', ns, ops, options, callback);
  679. }
  680. /**
  681. * Perform one or more remove operations
  682. * @method
  683. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  684. * @param {array} ops An array of removes
  685. * @param {boolean} [options.ordered=true] Execute in order or out of order
  686. * @param {object} [options.writeConcern={}] Write concern for the operation
  687. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  688. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  689. * @param {opResultCallback} callback A callback function
  690. */
  691. Mongos.prototype.remove = function(ns, ops, options, callback) {
  692. if(typeof options == 'function') callback = options, options = {}, options = options || {};
  693. if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  694. // Not connected but we have a disconnecthandler
  695. if(!this.isConnected() && this.s.disconnectHandler != null) {
  696. return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
  697. }
  698. // No mongos proxy available
  699. if(!this.isConnected()) {
  700. return callback(new MongoError('no mongos proxy available'));
  701. }
  702. // Execute write operation
  703. executeWriteOperation(this, 'remove', ns, ops, options, callback);
  704. }
  705. /**
  706. * Execute a command
  707. * @method
  708. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  709. * @param {object} cmd The command hash
  710. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  711. * @param {Connection} [options.connection] Specify connection object to execute command against
  712. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  713. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  714. * @param {opResultCallback} callback A callback function
  715. */
  716. Mongos.prototype.command = function(ns, cmd, options, callback) {
  717. if(typeof options == 'function') callback = options, options = {}, options = options || {};
  718. if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  719. var self = this;
  720. // Pick a proxy
  721. var server = pickProxy(self);
  722. // Topology is not connected, save the call in the provided store to be
  723. // Executed at some point when the handler deems it's reconnected
  724. if((server == null || !server.isConnected()) && this.s.disconnectHandler != null) {
  725. return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
  726. }
  727. // No server returned we had an error
  728. if(server == null) {
  729. return callback(new MongoError('no mongos proxy available'));
  730. }
  731. // Execute the command
  732. server.command(ns, cmd, options, callback);
  733. }
  734. /**
  735. * Perform one or more remove operations
  736. * @method
  737. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  738. * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
  739. * @param {object} [options.batchSize=0] Batchsize for the operation
  740. * @param {array} [options.documents=[]] Initial documents list for cursor
  741. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  742. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  743. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  744. * @param {opResultCallback} callback A callback function
  745. */
  746. Mongos.prototype.cursor = function(ns, cmd, cursorOptions) {
  747. cursorOptions = cursorOptions || {};
  748. var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
  749. return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
  750. }
  751. /**
  752. * Authenticate using a specified mechanism
  753. * @method
  754. * @param {string} mechanism The Auth mechanism we are invoking
  755. * @param {string} db The db we are invoking the mechanism against
  756. * @param {...object} param Parameters for the specific mechanism
  757. * @param {authResultCallback} callback A callback function
  758. */
  759. Mongos.prototype.auth = function(mechanism, db) {
  760. var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
  761. var self = this;
  762. var args = Array.prototype.slice.call(arguments, 2);
  763. var callback = args.pop();
  764. // If we don't have the mechanism fail
  765. if(this.authProviders[mechanism] == null && mechanism != 'default') {
  766. return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
  767. }
  768. // Are we already authenticating, throw
  769. if(this.authenticating) {
  770. return callback(new MongoError('authentication or logout allready in process'));
  771. }
  772. // Topology is not connected, save the call in the provided store to be
  773. // Executed at some point when the handler deems it's reconnected
  774. if(!self.isConnected() && self.s.disconnectHandler != null) {
  775. return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
  776. }
  777. // Set to authenticating
  778. this.authenticating = true;
  779. // All errors
  780. var errors = [];
  781. // Get all the servers
  782. var servers = this.connectedProxies.slice(0);
  783. // No servers return
  784. if(servers.length == 0) {
  785. this.authenticating = false;
  786. callback(null, true);
  787. }
  788. // Authenticate
  789. function auth(server) {
  790. // Arguments without a callback
  791. var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
  792. // Create arguments
  793. var finalArguments = argsWithoutCallback.concat([function(err) {
  794. count = count - 1;
  795. // Save all the errors
  796. if(err) errors.push({name: server.name, err: err});
  797. // We are done
  798. if(count == 0) {
  799. // Auth is done
  800. self.authenticating = false;
  801. // Return the auth error
  802. if(errors.length) return callback(MongoError.create({
  803. message: 'authentication fail', errors: errors
  804. }), false);
  805. // Successfully authenticated session
  806. callback(null, self);
  807. }
  808. }]);
  809. // Execute the auth only against non arbiter servers
  810. if(!server.lastIsMaster().arbiterOnly) {
  811. server.auth.apply(server, finalArguments);
  812. }
  813. }
  814. // Get total count
  815. var count = servers.length;
  816. // Authenticate against all servers
  817. while(servers.length > 0) {
  818. auth(servers.shift());
  819. }
  820. }
  821. /**
  822. * Logout from a database
  823. * @method
  824. * @param {string} db The db we are logging out from
  825. * @param {authResultCallback} callback A callback function
  826. */
  827. Mongos.prototype.logout = function(dbName, callback) {
  828. var self = this;
  829. // Are we authenticating or logging out, throw
  830. if(this.authenticating) {
  831. throw new MongoError('authentication or logout allready in process');
  832. }
  833. // Ensure no new members are processed while logging out
  834. this.authenticating = true;
  835. // Remove from all auth providers (avoid any reaplication of the auth details)
  836. var providers = Object.keys(this.authProviders);
  837. for(var i = 0; i < providers.length; i++) {
  838. this.authProviders[providers[i]].logout(dbName);
  839. }
  840. // Now logout all the servers
  841. var servers = this.connectedProxies.slice(0);
  842. var count = servers.length;
  843. if(count == 0) return callback();
  844. var errors = [];
  845. function logoutServer(_server, cb) {
  846. _server.logout(dbName, function(err) {
  847. if(err) errors.push({name: _server.name, err: err});
  848. cb();
  849. });
  850. }
  851. // Execute logout on all server instances
  852. for(i = 0; i < servers.length; i++) {
  853. logoutServer(servers[i], function() {
  854. count = count - 1;
  855. if(count == 0) {
  856. // Do not block new operations
  857. self.authenticating = false;
  858. // If we have one or more errors
  859. if(errors.length) return callback(MongoError.create({
  860. message: f('logout failed against db %s', dbName), errors: errors
  861. }), false);
  862. // No errors
  863. callback();
  864. }
  865. })
  866. }
  867. }
  868. /**
  869. * Get server
  870. * @method
  871. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  872. * @return {Server}
  873. */
  874. Mongos.prototype.getServer = function() {
  875. var server = pickProxy(this);
  876. if(this.s.debug) this.emit('pickedServer', null, server);
  877. return server;
  878. }
  879. /**
  880. * All raw connections
  881. * @method
  882. * @return {Connection[]}
  883. */
  884. Mongos.prototype.connections = function() {
  885. var connections = [];
  886. for(var i = 0; i < this.connectedProxies.length; i++) {
  887. connections = connections.concat(this.connectedProxies[i].connections());
  888. }
  889. return connections;
  890. }
  891. /**
  892. * A mongos connect event, used to verify that the connection is up and running
  893. *
  894. * @event Mongos#connect
  895. * @type {Mongos}
  896. */
  897. /**
  898. * A mongos reconnect event, used to verify that the mongos topology has reconnected
  899. *
  900. * @event Mongos#reconnect
  901. * @type {Mongos}
  902. */
  903. /**
  904. * A mongos fullsetup event, used to signal that all topology members have been contacted.
  905. *
  906. * @event Mongos#fullsetup
  907. * @type {Mongos}
  908. */
  909. /**
  910. * A mongos all event, used to signal that all topology members have been contacted.
  911. *
  912. * @event Mongos#all
  913. * @type {Mongos}
  914. */
  915. /**
  916. * A server member left the mongos list
  917. *
  918. * @event Mongos#left
  919. * @type {Mongos}
  920. * @param {string} type The type of member that left (mongos)
  921. * @param {Server} server The server object that left
  922. */
  923. /**
  924. * A server member joined the mongos list
  925. *
  926. * @event Mongos#joined
  927. * @type {Mongos}
  928. * @param {string} type The type of member that left (mongos)
  929. * @param {Server} server The server object that joined
  930. */
  931. /**
  932. * A server opening SDAM monitoring event
  933. *
  934. * @event Mongos#serverOpening
  935. * @type {object}
  936. */
  937. /**
  938. * A server closed SDAM monitoring event
  939. *
  940. * @event Mongos#serverClosed
  941. * @type {object}
  942. */
  943. /**
  944. * A server description SDAM change monitoring event
  945. *
  946. * @event Mongos#serverDescriptionChanged
  947. * @type {object}
  948. */
  949. /**
  950. * A topology open SDAM event
  951. *
  952. * @event Mongos#topologyOpening
  953. * @type {object}
  954. */
  955. /**
  956. * A topology closed SDAM event
  957. *
  958. * @event Mongos#topologyClosed
  959. * @type {object}
  960. */
  961. /**
  962. * A topology structure SDAM change event
  963. *
  964. * @event Mongos#topologyDescriptionChanged
  965. * @type {object}
  966. */
  967. /**
  968. * A topology serverHeartbeatStarted SDAM event
  969. *
  970. * @event Mongos#serverHeartbeatStarted
  971. * @type {object}
  972. */
  973. /**
  974. * A topology serverHeartbeatFailed SDAM event
  975. *
  976. * @event Mongos#serverHeartbeatFailed
  977. * @type {object}
  978. */
  979. /**
  980. * A topology serverHeartbeatSucceeded SDAM change event
  981. *
  982. * @event Mongos#serverHeartbeatSucceeded
  983. * @type {object}
  984. */
  985. module.exports = Mongos;