replset.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. "use strict";
  2. var EventEmitter = require('events').EventEmitter
  3. , inherits = require('util').inherits
  4. , f = require('util').format
  5. , Server = require('./server')
  6. , Cursor = require('./cursor')
  7. , AggregationCursor = require('./aggregation_cursor')
  8. , CommandCursor = require('./command_cursor')
  9. , ReadPreference = require('./read_preference')
  10. , MongoError = require('mongodb-core').MongoError
  11. , ServerCapabilities = require('./topology_base').ServerCapabilities
  12. , Store = require('./topology_base').Store
  13. , Define = require('./metadata')
  14. , CReplSet = require('mongodb-core').ReplSet
  15. , CoreReadPreference = require('mongodb-core').ReadPreference
  16. , MAX_JS_INT = require('./utils').MAX_JS_INT
  17. , translateOptions = require('./utils').translateOptions
  18. , filterOptions = require('./utils').filterOptions
  19. , mergeOptions = require('./utils').mergeOptions
  20. , os = require('os');
  21. /**
  22. * @fileOverview The **ReplSet** class is a class that represents a Replicaset topology and is
  23. * used to construct connections.
  24. *
  25. * **ReplSet Should not be used, use MongoClient.connect**
  26. * @example
  27. * var Db = require('mongodb').Db,
  28. * ReplSet = require('mongodb').ReplSet,
  29. * Server = require('mongodb').Server,
  30. * test = require('assert');
  31. * // Connect using ReplSet
  32. * var server = new Server('localhost', 27017);
  33. * var db = new Db('test', new ReplSet([server]));
  34. * db.open(function(err, db) {
  35. * // Get an additional db
  36. * db.close();
  37. * });
  38. */
  39. // Allowed parameters
  40. var legalOptionNames = ['ha', 'haInterval', 'replicaSet', 'rs_name', 'secondaryAcceptableLatencyMS'
  41. , 'connectWithNoPrimary', 'poolSize', 'ssl', 'checkServerIdentity', 'sslValidate'
  42. , 'sslCA', 'sslCert', 'sslKey', 'sslPass', 'socketOptions', 'bufferMaxEntries'
  43. , 'store', 'auto_reconnect', 'autoReconnect', 'emitError'
  44. , 'keepAlive', 'noDelay', 'connectTimeoutMS', 'socketTimeoutMS', 'strategy', 'debug'
  45. , 'loggerLevel', 'logger', 'reconnectTries', 'appname', 'domainsEnabled'
  46. , 'servername', 'promoteLongs', 'promoteValues', 'promoteBuffers'];
  47. // Get package.json variable
  48. var driverVersion = require(__dirname + '/../package.json').version;
  49. var nodejsversion = f('Node.js %s, %s', process.version, os.endianness());
  50. var type = os.type();
  51. var name = process.platform;
  52. var architecture = process.arch;
  53. var release = os.release();
  54. /**
  55. * Creates a new ReplSet instance
  56. * @class
  57. * @deprecated
  58. * @param {Server[]} servers A seedlist of servers participating in the replicaset.
  59. * @param {object} [options=null] Optional settings.
  60. * @param {booelan} [options.ha=true] Turn on high availability monitoring.
  61. * @param {number} [options.haInterval=10000] Time between each replicaset status check.
  62. * @param {string} [options.replicaSet] The name of the replicaset to connect to.
  63. * @param {number} [options.secondaryAcceptableLatencyMS=15] Sets the range of servers to pick when using NEAREST (lowest ping ms + the latency fence, ex: range of 1 to (1 + 15) ms)
  64. * @param {boolean} [options.connectWithNoPrimary=false] Sets if the driver should connect even if no primary is available
  65. * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons.
  66. * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support)
  67. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  68. * @param {object} [options.sslValidate=true] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher)
  69. * @param {array} [options.sslCA=null] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
  70. * @param {(Buffer|string)} [options.sslCert=null] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  71. * @param {(Buffer|string)} [options.sslKey=null] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  72. * @param {(Buffer|string)} [options.sslPass=null] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher)
  73. * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
  74. * @param {object} [options.socketOptions=null] Socket options
  75. * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option.
  76. * @param {number} [options.socketOptions.keepAlive=0] TCP KeepAlive on the socket with a X ms delay before start.
  77. * @param {number} [options.socketOptions.connectTimeoutMS=10000] TCP Connection timeout setting
  78. * @param {number} [options.socketOptions.socketTimeoutMS=0] TCP Socket timeout setting
  79. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
  80. * @fires ReplSet#connect
  81. * @fires ReplSet#ha
  82. * @fires ReplSet#joined
  83. * @fires ReplSet#left
  84. * @fires ReplSet#fullsetup
  85. * @fires ReplSet#open
  86. * @fires ReplSet#close
  87. * @fires ReplSet#error
  88. * @fires ReplSet#timeout
  89. * @fires ReplSet#parseError
  90. * @property {string} parserType the parser type used (c++ or js).
  91. * @return {ReplSet} a ReplSet instance.
  92. */
  93. var ReplSet = function(servers, options) {
  94. if(!(this instanceof ReplSet)) return new ReplSet(servers, options);
  95. options = options || {};
  96. var self = this;
  97. // Set up event emitter
  98. EventEmitter.call(this);
  99. // Filter the options
  100. options = filterOptions(options, legalOptionNames);
  101. // Ensure all the instances are Server
  102. for(var i = 0; i < servers.length; i++) {
  103. if(!(servers[i] instanceof Server)) {
  104. throw MongoError.create({message: "all seed list instances must be of the Server type", driver:true});
  105. }
  106. }
  107. // Stored options
  108. var storeOptions = {
  109. force: false
  110. , bufferMaxEntries: typeof options.bufferMaxEntries == 'number' ? options.bufferMaxEntries : MAX_JS_INT
  111. }
  112. // Shared global store
  113. var store = options.store || new Store(self, storeOptions);
  114. // Build seed list
  115. var seedlist = servers.map(function(x) {
  116. return {host: x.host, port: x.port}
  117. });
  118. // Clone options
  119. var clonedOptions = mergeOptions({}, {
  120. disconnectHandler: store,
  121. cursorFactory: Cursor,
  122. reconnect: false,
  123. emitError: typeof options.emitError == 'boolean' ? options.emitError : true,
  124. size: typeof options.poolSize == 'number' ? options.poolSize : 5
  125. });
  126. // Translate any SSL options and other connectivity options
  127. clonedOptions = translateOptions(clonedOptions, options);
  128. // Socket options
  129. var socketOptions = options.socketOptions && Object.keys(options.socketOptions).length > 0
  130. ? options.socketOptions : options;
  131. // Translate all the options to the mongodb-core ones
  132. clonedOptions = translateOptions(clonedOptions, socketOptions);
  133. if(typeof clonedOptions.keepAlive == 'number') {
  134. clonedOptions.keepAliveInitialDelay = clonedOptions.keepAlive;
  135. clonedOptions.keepAlive = clonedOptions.keepAlive > 0;
  136. }
  137. // Client info
  138. this.clientInfo = {
  139. driver: {
  140. name: "nodejs",
  141. version: driverVersion
  142. },
  143. os: {
  144. type: type,
  145. name: name,
  146. architecture: architecture,
  147. version: release
  148. },
  149. platform: nodejsversion
  150. }
  151. // Build default client information
  152. clonedOptions.clientInfo = this.clientInfo;
  153. // Do we have an application specific string
  154. if(options.appname) {
  155. clonedOptions.clientInfo.application = { name: options.appname };
  156. }
  157. // Create the ReplSet
  158. var replset = new CReplSet(seedlist, clonedOptions);
  159. // Listen to reconnect event
  160. replset.on('reconnect', function() {
  161. self.emit('reconnect');
  162. store.execute();
  163. });
  164. // Internal state
  165. this.s = {
  166. // Replicaset
  167. replset: replset
  168. // Server capabilities
  169. , sCapabilities: null
  170. // Debug tag
  171. , tag: options.tag
  172. // Store options
  173. , storeOptions: storeOptions
  174. // Cloned options
  175. , clonedOptions: clonedOptions
  176. // Store
  177. , store: store
  178. // Options
  179. , options: options
  180. }
  181. // Debug
  182. if(clonedOptions.debug) {
  183. // Last ismaster
  184. Object.defineProperty(this, 'replset', {
  185. enumerable:true, get: function() { return replset; }
  186. });
  187. }
  188. }
  189. /**
  190. * @ignore
  191. */
  192. inherits(ReplSet, EventEmitter);
  193. // Last ismaster
  194. Object.defineProperty(ReplSet.prototype, 'isMasterDoc', {
  195. enumerable:true, get: function() { return this.s.replset.lastIsMaster(); }
  196. });
  197. Object.defineProperty(ReplSet.prototype, 'parserType', {
  198. enumerable:true, get: function() {
  199. return this.s.replset.parserType;
  200. }
  201. });
  202. // BSON property
  203. Object.defineProperty(ReplSet.prototype, 'bson', {
  204. enumerable: true, get: function() {
  205. return this.s.replset.s.bson;
  206. }
  207. });
  208. Object.defineProperty(ReplSet.prototype, 'haInterval', {
  209. enumerable:true, get: function() { return this.s.replset.s.haInterval; }
  210. });
  211. var define = ReplSet.define = new Define('ReplSet', ReplSet, false);
  212. // Ensure the right read Preference object
  213. var translateReadPreference = function(options) {
  214. if(typeof options.readPreference == 'string') {
  215. options.readPreference = new CoreReadPreference(options.readPreference);
  216. } else if(options.readPreference instanceof ReadPreference) {
  217. options.readPreference = new CoreReadPreference(options.readPreference.mode
  218. , options.readPreference.tags, {maxStalenessSeconds: options.readPreference.maxStalenessSeconds});
  219. }
  220. return options;
  221. }
  222. // Connect method
  223. ReplSet.prototype.connect = function(db, _options, callback) {
  224. var self = this;
  225. if('function' === typeof _options) callback = _options, _options = {};
  226. if(_options == null) _options = {};
  227. if(!('function' === typeof callback)) callback = null;
  228. self.s.options = _options;
  229. // Update bufferMaxEntries
  230. self.s.storeOptions.bufferMaxEntries = db.bufferMaxEntries;
  231. // Actual handler
  232. var errorHandler = function(event) {
  233. return function(err) {
  234. if(event != 'error') {
  235. self.emit(event, err);
  236. }
  237. }
  238. }
  239. // Connect handler
  240. var connectHandler = function() {
  241. // Clear out all the current handlers left over
  242. ["timeout", "error", "close", 'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted',
  243. 'serverHeartbeatSucceeded', 'serverHeartbeatFailed', 'serverClosed', 'topologyOpening',
  244. 'topologyClosed', 'topologyDescriptionChanged'].forEach(function(e) {
  245. self.s.replset.removeAllListeners(e);
  246. });
  247. // Set up listeners
  248. self.s.replset.once('timeout', errorHandler('timeout'));
  249. self.s.replset.once('error', errorHandler('error'));
  250. self.s.replset.once('close', errorHandler('close'));
  251. // relay the event
  252. var relay = function(event) {
  253. return function(t, server) {
  254. self.emit(event, t, server);
  255. }
  256. }
  257. // Replset events relay
  258. var replsetRelay = function(event) {
  259. return function(t, server) {
  260. self.emit(event, t, server.lastIsMaster(), server);
  261. }
  262. }
  263. // Relay ha
  264. var relayHa = function(t, state) {
  265. self.emit('ha', t, state);
  266. if(t == 'start') {
  267. self.emit('ha_connect', t, state);
  268. } else if(t == 'end') {
  269. self.emit('ha_ismaster', t, state);
  270. }
  271. }
  272. // Set up serverConfig listeners
  273. self.s.replset.on('joined', replsetRelay('joined'));
  274. self.s.replset.on('left', relay('left'));
  275. self.s.replset.on('ping', relay('ping'));
  276. self.s.replset.on('ha', relayHa);
  277. // Set up SDAM listeners
  278. self.s.replset.on('serverDescriptionChanged', relay('serverDescriptionChanged'));
  279. self.s.replset.on('serverHeartbeatStarted', relay('serverHeartbeatStarted'));
  280. self.s.replset.on('serverHeartbeatSucceeded', relay('serverHeartbeatSucceeded'));
  281. self.s.replset.on('serverHeartbeatFailed', relay('serverHeartbeatFailed'));
  282. self.s.replset.on('serverOpening', relay('serverOpening'));
  283. self.s.replset.on('serverClosed', relay('serverClosed'));
  284. self.s.replset.on('topologyOpening', relay('topologyOpening'));
  285. self.s.replset.on('topologyClosed', relay('topologyClosed'));
  286. self.s.replset.on('topologyDescriptionChanged', relay('topologyDescriptionChanged'));
  287. self.s.replset.on('fullsetup', function() {
  288. self.emit('fullsetup', null, self);
  289. });
  290. self.s.replset.on('all', function() {
  291. self.emit('all', null, self);
  292. });
  293. // Emit open event
  294. self.emit('open', null, self);
  295. // Return correctly
  296. try {
  297. callback(null, self);
  298. } catch(err) {
  299. process.nextTick(function() { throw err; })
  300. }
  301. }
  302. // Error handler
  303. var connectErrorHandler = function() {
  304. return function(err) {
  305. ['timeout', 'error', 'close'].forEach(function(e) {
  306. self.s.replset.removeListener(e, connectErrorHandler);
  307. });
  308. self.s.replset.removeListener('connect', connectErrorHandler);
  309. // Destroy the replset
  310. self.s.replset.destroy();
  311. // Try to callback
  312. try {
  313. callback(err);
  314. } catch(err) {
  315. if(!self.s.replset.isConnected())
  316. process.nextTick(function() { throw err; })
  317. }
  318. }
  319. }
  320. // Set up listeners
  321. self.s.replset.once('timeout', connectErrorHandler('timeout'));
  322. self.s.replset.once('error', connectErrorHandler('error'));
  323. self.s.replset.once('close', connectErrorHandler('close'));
  324. self.s.replset.once('connect', connectHandler);
  325. // Start connection
  326. self.s.replset.connect(_options);
  327. }
  328. // Server capabilities
  329. ReplSet.prototype.capabilities = function() {
  330. if(this.s.sCapabilities) return this.s.sCapabilities;
  331. if(this.s.replset.lastIsMaster() == null) return null;
  332. this.s.sCapabilities = new ServerCapabilities(this.s.replset.lastIsMaster());
  333. return this.s.sCapabilities;
  334. }
  335. define.classMethod('capabilities', {callback: false, promise:false, returns: [ServerCapabilities]});
  336. // Command
  337. ReplSet.prototype.command = function(ns, cmd, options, callback) {
  338. options = translateReadPreference(options);
  339. this.s.replset.command(ns, cmd, options, callback);
  340. }
  341. define.classMethod('command', {callback: true, promise:false});
  342. // Insert
  343. ReplSet.prototype.insert = function(ns, ops, options, callback) {
  344. this.s.replset.insert(ns, ops, options, callback);
  345. }
  346. define.classMethod('insert', {callback: true, promise:false});
  347. // Update
  348. ReplSet.prototype.update = function(ns, ops, options, callback) {
  349. this.s.replset.update(ns, ops, options, callback);
  350. }
  351. define.classMethod('update', {callback: true, promise:false});
  352. // Remove
  353. ReplSet.prototype.remove = function(ns, ops, options, callback) {
  354. this.s.replset.remove(ns, ops, options, callback);
  355. }
  356. define.classMethod('remove', {callback: true, promise:false});
  357. // Destroyed
  358. ReplSet.prototype.isDestroyed = function() {
  359. return this.s.replset.isDestroyed();
  360. }
  361. // IsConnected
  362. ReplSet.prototype.isConnected = function(options) {
  363. options = options || {};
  364. // If we passed in a readPreference, translate to
  365. // a CoreReadPreference instance
  366. if(options.readPreference) {
  367. options.readPreference = translateReadPreference(options.readPreference);
  368. }
  369. return this.s.replset.isConnected(options);
  370. }
  371. define.classMethod('isConnected', {callback: false, promise:false, returns: [Boolean]});
  372. // Insert
  373. ReplSet.prototype.cursor = function(ns, cmd, options) {
  374. options = translateReadPreference(options);
  375. options.disconnectHandler = this.s.store;
  376. return this.s.replset.cursor(ns, cmd, options);
  377. }
  378. define.classMethod('cursor', {callback: false, promise:false, returns: [Cursor, AggregationCursor, CommandCursor]});
  379. ReplSet.prototype.lastIsMaster = function() {
  380. return this.s.replset.lastIsMaster();
  381. }
  382. ReplSet.prototype.close = function(forceClosed) {
  383. var self = this;
  384. this.s.replset.destroy();
  385. // We need to wash out all stored processes
  386. if(forceClosed == true) {
  387. this.s.storeOptions.force = forceClosed;
  388. this.s.store.flush();
  389. }
  390. var events = ['timeout', 'error', 'close', 'joined', 'left'];
  391. events.forEach(function(e) {
  392. self.removeAllListeners(e);
  393. });
  394. }
  395. define.classMethod('close', {callback: false, promise:false});
  396. ReplSet.prototype.auth = function() {
  397. var args = Array.prototype.slice.call(arguments, 0);
  398. this.s.replset.auth.apply(this.s.replset, args);
  399. }
  400. define.classMethod('auth', {callback: true, promise:false});
  401. ReplSet.prototype.logout = function() {
  402. var args = Array.prototype.slice.call(arguments, 0);
  403. this.s.replset.logout.apply(this.s.replset, args);
  404. }
  405. define.classMethod('logout', {callback: true, promise:false});
  406. /**
  407. * All raw connections
  408. * @method
  409. * @return {array}
  410. */
  411. ReplSet.prototype.connections = function() {
  412. return this.s.replset.connections();
  413. }
  414. define.classMethod('connections', {callback: false, promise:false, returns:[Array]});
  415. /**
  416. * A replset connect event, used to verify that the connection is up and running
  417. *
  418. * @event ReplSet#connect
  419. * @type {ReplSet}
  420. */
  421. /**
  422. * The replset high availability event
  423. *
  424. * @event ReplSet#ha
  425. * @type {function}
  426. * @param {string} type The stage in the high availability event (start|end)
  427. * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only
  428. * @param {number} data.id The id for this high availability request
  429. * @param {object} data.state An object containing the information about the current replicaset
  430. */
  431. /**
  432. * A server member left the replicaset
  433. *
  434. * @event ReplSet#left
  435. * @type {function}
  436. * @param {string} type The type of member that left (primary|secondary|arbiter)
  437. * @param {Server} server The server object that left
  438. */
  439. /**
  440. * A server member joined the replicaset
  441. *
  442. * @event ReplSet#joined
  443. * @type {function}
  444. * @param {string} type The type of member that joined (primary|secondary|arbiter)
  445. * @param {Server} server The server object that joined
  446. */
  447. /**
  448. * ReplSet open event, emitted when replicaset can start processing commands.
  449. *
  450. * @event ReplSet#open
  451. * @type {Replset}
  452. */
  453. /**
  454. * ReplSet fullsetup event, emitted when all servers in the topology have been connected to.
  455. *
  456. * @event ReplSet#fullsetup
  457. * @type {Replset}
  458. */
  459. /**
  460. * ReplSet close event
  461. *
  462. * @event ReplSet#close
  463. * @type {object}
  464. */
  465. /**
  466. * ReplSet error event, emitted if there is an error listener.
  467. *
  468. * @event ReplSet#error
  469. * @type {MongoError}
  470. */
  471. /**
  472. * ReplSet timeout event
  473. *
  474. * @event ReplSet#timeout
  475. * @type {object}
  476. */
  477. /**
  478. * ReplSet parseError event
  479. *
  480. * @event ReplSet#parseError
  481. * @type {object}
  482. */
  483. module.exports = ReplSet;