server.js 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855
  1. "use strict"
  2. var inherits = require('util').inherits,
  3. require_optional = require('require_optional'),
  4. f = require('util').format,
  5. EventEmitter = require('events').EventEmitter,
  6. ReadPreference = require('./read_preference'),
  7. Logger = require('../connection/logger'),
  8. debugOptions = require('../connection/utils').debugOptions,
  9. retrieveBSON = require('../connection/utils').retrieveBSON,
  10. Pool = require('../connection/pool'),
  11. Query = require('../connection/commands').Query,
  12. MongoError = require('../error'),
  13. PreTwoSixWireProtocolSupport = require('../wireprotocol/2_4_support'),
  14. TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support'),
  15. ThreeTwoWireProtocolSupport = require('../wireprotocol/3_2_support'),
  16. BasicCursor = require('../cursor'),
  17. sdam = require('./shared'),
  18. assign = require('./shared').assign,
  19. createClientInfo = require('./shared').createClientInfo;
  20. // Used for filtering out fields for loggin
  21. var debugFields = ['reconnect', 'reconnectTries', 'reconnectInterval', 'emitError', 'cursorFactory', 'host'
  22. , 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay', 'connectionTimeout', 'checkServerIdentity'
  23. , 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert', 'key', 'rejectUnauthorized', 'promoteLongs', 'promoteValues'
  24. , 'promoteBuffers', 'servername'];
  25. // Server instance id
  26. var id = 0;
  27. var serverAccounting = false;
  28. var servers = {};
  29. var BSON = retrieveBSON();
  30. /**
  31. * Creates a new Server instance
  32. * @class
  33. * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
  34. * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
  35. * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
  36. * @param {number} [options.monitoring=true] Enable the server state monitoring (calling ismaster at monitoringInterval)
  37. * @param {number} [options.monitoringInterval=5000] The interval of calling ismaster when monitoring is enabled.
  38. * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
  39. * @param {string} options.host The server host
  40. * @param {number} options.port The server port
  41. * @param {number} [options.size=5] Server connection pool size
  42. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  43. * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
  44. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  45. * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
  46. * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
  47. * @param {boolean} [options.ssl=false] Use SSL for connection
  48. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  49. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  50. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  51. * @param {Buffer} [options.key] SSL Key file binary buffer
  52. * @param {string} [options.passphrase] SSL Certificate pass phrase
  53. * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
  54. * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
  55. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  56. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  57. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  58. * @param {string} [options.appname=null] Application name, passed in on ismaster call and logged in mongod server logs. Maximum size 128 bytes.
  59. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
  60. * @return {Server} A cursor instance
  61. * @fires Server#connect
  62. * @fires Server#close
  63. * @fires Server#error
  64. * @fires Server#timeout
  65. * @fires Server#parseError
  66. * @fires Server#reconnect
  67. * @fires Server#reconnectFailed
  68. * @fires Server#serverHeartbeatStarted
  69. * @fires Server#serverHeartbeatSucceeded
  70. * @fires Server#serverHeartbeatFailed
  71. * @fires Server#topologyOpening
  72. * @fires Server#topologyClosed
  73. * @fires Server#topologyDescriptionChanged
  74. * @property {string} type the topology type.
  75. * @property {string} parserType the parser type used (c++ or js).
  76. */
  77. var Server = function(options) {
  78. options = options || {};
  79. // Add event listener
  80. EventEmitter.call(this);
  81. // Server instance id
  82. this.id = id++;
  83. // Internal state
  84. this.s = {
  85. // Options
  86. options: options,
  87. // Logger
  88. logger: Logger('Server', options),
  89. // Factory overrides
  90. Cursor: options.cursorFactory || BasicCursor,
  91. // BSON instance
  92. bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
  93. BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
  94. BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]),
  95. // Pool
  96. pool: null,
  97. // Disconnect handler
  98. disconnectHandler: options.disconnectHandler,
  99. // Monitor thread (keeps the connection alive)
  100. monitoring: typeof options.monitoring == 'boolean' ? options.monitoring : true,
  101. // Is the server in a topology
  102. inTopology: typeof options.inTopology == 'boolean' ? options.inTopology : false,
  103. // Monitoring timeout
  104. monitoringInterval: typeof options.monitoringInterval == 'number'
  105. ? options.monitoringInterval
  106. : 5000,
  107. // Topology id
  108. topologyId: -1
  109. }
  110. // Curent ismaster
  111. this.ismaster = null;
  112. // Current ping time
  113. this.lastIsMasterMS = -1;
  114. // The monitoringProcessId
  115. this.monitoringProcessId = null;
  116. // Initial connection
  117. this.initalConnect = true;
  118. // Wire protocol handler, default to oldest known protocol handler
  119. // this gets changed when the first ismaster is called.
  120. this.wireProtocolHandler = new PreTwoSixWireProtocolSupport();
  121. // Default type
  122. this._type = 'server';
  123. // Set the client info
  124. this.clientInfo = createClientInfo(options);
  125. // Max Stalleness values
  126. // last time we updated the ismaster state
  127. this.lastUpdateTime = 0;
  128. // Last write time
  129. this.lastWriteDate = 0;
  130. // Stalleness
  131. this.staleness = 0;
  132. }
  133. inherits(Server, EventEmitter);
  134. Object.defineProperty(Server.prototype, 'type', {
  135. enumerable:true, get: function() { return this._type; }
  136. });
  137. Object.defineProperty(Server.prototype, 'parserType', {
  138. enumerable:true, get: function() {
  139. return BSON.native ? "c++" : "js";
  140. }
  141. });
  142. Server.enableServerAccounting = function() {
  143. serverAccounting = true;
  144. servers = {};
  145. }
  146. Server.disableServerAccounting = function() {
  147. serverAccounting = false;
  148. }
  149. Server.servers = function() {
  150. return servers;
  151. }
  152. Object.defineProperty(Server.prototype, 'name', {
  153. enumerable:true,
  154. get: function() { return this.s.options.host + ":" + this.s.options.port; }
  155. });
  156. function configureWireProtocolHandler(self, ismaster) {
  157. // 3.2 wire protocol handler
  158. if(ismaster.maxWireVersion >= 4) {
  159. return new ThreeTwoWireProtocolSupport(new TwoSixWireProtocolSupport());
  160. }
  161. // 2.6 wire protocol handler
  162. if(ismaster.maxWireVersion >= 2) {
  163. return new TwoSixWireProtocolSupport();
  164. }
  165. // 2.4 or earlier wire protocol handler
  166. return new PreTwoSixWireProtocolSupport();
  167. }
  168. function disconnectHandler(self, type, ns, cmd, options, callback) {
  169. // Topology is not connected, save the call in the provided store to be
  170. // Executed at some point when the handler deems it's reconnected
  171. if(!self.s.pool.isConnected() && self.s.disconnectHandler != null && !options.monitoring) {
  172. self.s.disconnectHandler.add(type, ns, cmd, options, callback);
  173. return true;
  174. }
  175. // If we have no connection error
  176. if(!self.s.pool.isConnected()) {
  177. callback(MongoError.create(f("no connection available to server %s", self.name)));
  178. return true;
  179. }
  180. }
  181. function monitoringProcess(self) {
  182. return function() {
  183. // Pool was destroyed do not continue process
  184. if(self.s.pool.isDestroyed()) return;
  185. // Emit monitoring Process event
  186. self.emit('monitoring', self);
  187. // Perform ismaster call
  188. // Query options
  189. var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
  190. // Create a query instance
  191. var query = new Query(self.s.bson, 'admin.$cmd', {ismaster:true}, queryOptions);
  192. // Get start time
  193. var start = new Date().getTime();
  194. // Execute the ismaster query
  195. self.s.pool.write(query, {
  196. socketTimeout: self.s.options.connectionTimeout || 2000,
  197. }, function(err, result) {
  198. // Set initial lastIsMasterMS
  199. self.lastIsMasterMS = new Date().getTime() - start;
  200. if(self.s.pool.isDestroyed()) return;
  201. // Update the ismaster view if we have a result
  202. if(result) {
  203. self.ismaster = result.result;
  204. }
  205. // Re-schedule the monitoring process
  206. self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval);
  207. });
  208. }
  209. }
  210. var eventHandler = function(self, event) {
  211. return function(err) {
  212. // Log information of received information if in info mode
  213. if(self.s.logger.isInfo()) {
  214. var object = err instanceof MongoError ? JSON.stringify(err) : {}
  215. self.s.logger.info(f('server %s fired event %s out with message %s'
  216. , self.name, event, object));
  217. }
  218. // Handle connect event
  219. if(event == 'connect') {
  220. // Issue an ismaster command at connect
  221. // Query options
  222. var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
  223. // Create a query instance
  224. var query = new Query(self.s.bson, 'admin.$cmd', {ismaster:true, client: self.clientInfo}, queryOptions);
  225. // Get start time
  226. var start = new Date().getTime();
  227. // Execute the ismaster query
  228. self.s.pool.write(query, {
  229. socketTimeout: self.s.options.connectionTimeout || 2000,
  230. }, function(err, result) {
  231. // Set initial lastIsMasterMS
  232. self.lastIsMasterMS = new Date().getTime() - start;
  233. if(err) {
  234. self.destroy();
  235. if(self.listeners('error').length > 0) self.emit('error', err);
  236. return;
  237. }
  238. // Ensure no error emitted after initial connect when reconnecting
  239. self.initalConnect = false;
  240. // Save the ismaster
  241. self.ismaster = result.result;
  242. // It's a proxy change the type so
  243. // the wireprotocol will send $readPreference
  244. if(self.ismaster.msg == 'isdbgrid') {
  245. self._type = 'mongos';
  246. }
  247. // Add the correct wire protocol handler
  248. self.wireProtocolHandler = configureWireProtocolHandler(self, self.ismaster);
  249. // Have we defined self monitoring
  250. if(self.s.monitoring) {
  251. self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval);
  252. }
  253. // Emit server description changed if something listening
  254. sdam.emitServerDescriptionChanged(self, {
  255. address: self.name, arbiters: [], hosts: [], passives: [], type: !self.s.inTopology ? 'Standalone' : sdam.getTopologyType(self)
  256. });
  257. // Emit topology description changed if something listening
  258. sdam.emitTopologyDescriptionChanged(self, {
  259. topologyType: 'Single', servers: [{address: self.name, arbiters: [], hosts: [], passives: [], type: 'Standalone'}]
  260. });
  261. // Log the ismaster if available
  262. if(self.s.logger.isInfo()) {
  263. self.s.logger.info(f('server %s connected with ismaster [%s]', self.name, JSON.stringify(self.ismaster)));
  264. }
  265. // Emit connect
  266. self.emit('connect', self);
  267. });
  268. } else if(event == 'error' || event == 'parseError'
  269. || event == 'close' || event == 'timeout' || event == 'reconnect'
  270. || event == 'attemptReconnect' || 'reconnectFailed') {
  271. // Remove server instance from accounting
  272. if(serverAccounting && ['close', 'timeout', 'error', 'parseError', 'reconnectFailed'].indexOf(event) != -1) {
  273. // Emit toplogy opening event if not in topology
  274. if(!self.s.inTopology) {
  275. self.emit('topologyOpening', { topologyId: self.id });
  276. }
  277. delete servers[self.id];
  278. }
  279. // Reconnect failed return error
  280. if(event == 'reconnectFailed') {
  281. self.emit('reconnectFailed', err);
  282. // Emit error if any listeners
  283. if(self.listeners('error').length > 0) {
  284. self.emit('error', err);
  285. }
  286. // Terminate
  287. return;
  288. }
  289. // On first connect fail
  290. if(self.s.pool.state == 'disconnected' && self.initalConnect && ['close', 'timeout', 'error', 'parseError'].indexOf(event) != -1) {
  291. self.initalConnect = false;
  292. return self.emit('error', new MongoError(f('failed to connect to server [%s] on first connect', self.name)));
  293. }
  294. // Reconnect event, emit the server
  295. if(event == 'reconnect') {
  296. return self.emit(event, self);
  297. }
  298. // Emit the event
  299. self.emit(event, err);
  300. }
  301. }
  302. }
  303. /**
  304. * Initiate server connect
  305. * @method
  306. * @param {array} [options.auth=null] Array of auth options to apply on connect
  307. */
  308. Server.prototype.connect = function(options) {
  309. var self = this;
  310. options = options || {};
  311. // Set the connections
  312. if(serverAccounting) servers[this.id] = this;
  313. // Do not allow connect to be called on anything that's not disconnected
  314. if(self.s.pool && !self.s.pool.isDisconnected() && !self.s.pool.isDestroyed()) {
  315. throw MongoError.create(f('server instance in invalid state %s', self.s.state));
  316. }
  317. // Create a pool
  318. self.s.pool = new Pool(assign(self.s.options, options, {bson: this.s.bson}));
  319. // Set up listeners
  320. self.s.pool.on('close', eventHandler(self, 'close'));
  321. self.s.pool.on('error', eventHandler(self, 'error'));
  322. self.s.pool.on('timeout', eventHandler(self, 'timeout'));
  323. self.s.pool.on('parseError', eventHandler(self, 'parseError'));
  324. self.s.pool.on('connect', eventHandler(self, 'connect'));
  325. self.s.pool.on('reconnect', eventHandler(self, 'reconnect'));
  326. self.s.pool.on('reconnectFailed', eventHandler(self, 'reconnectFailed'));
  327. // Emit toplogy opening event if not in topology
  328. if(!self.s.inTopology) {
  329. this.emit('topologyOpening', { topologyId: self.id });
  330. }
  331. // Emit opening server event
  332. self.emit('serverOpening', {
  333. topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.id,
  334. address: self.name
  335. });
  336. // Connect with optional auth settings
  337. if(options.auth) {
  338. self.s.pool.connect.apply(self.s.pool, options.auth);
  339. } else {
  340. self.s.pool.connect();
  341. }
  342. }
  343. /**
  344. * Get the server description
  345. * @method
  346. * @return {object}
  347. */
  348. Server.prototype.getDescription = function() {
  349. var ismaster = this.ismaster || {};
  350. var description = {
  351. type: sdam.getTopologyType(this),
  352. address: this.name,
  353. };
  354. // Add fields if available
  355. if(ismaster.hosts) description.hosts = ismaster.hosts;
  356. if(ismaster.arbiters) description.arbiters = ismaster.arbiters;
  357. if(ismaster.passives) description.passives = ismaster.passives;
  358. if(ismaster.setName) description.setName = ismaster.setName;
  359. return description;
  360. }
  361. /**
  362. * Returns the last known ismaster document for this server
  363. * @method
  364. * @return {object}
  365. */
  366. Server.prototype.lastIsMaster = function() {
  367. return this.ismaster;
  368. }
  369. /**
  370. * Unref all connections belong to this server
  371. * @method
  372. */
  373. Server.prototype.unref = function() {
  374. this.s.pool.unref();
  375. }
  376. /**
  377. * Figure out if the server is connected
  378. * @method
  379. * @return {boolean}
  380. */
  381. Server.prototype.isConnected = function() {
  382. if(!this.s.pool) return false;
  383. return this.s.pool.isConnected();
  384. }
  385. /**
  386. * Figure out if the server instance was destroyed by calling destroy
  387. * @method
  388. * @return {boolean}
  389. */
  390. Server.prototype.isDestroyed = function() {
  391. if(!this.s.pool) return false;
  392. return this.s.pool.isDestroyed();
  393. }
  394. function basicWriteValidations(self) {
  395. if(!self.s.pool) return MongoError.create('server instance is not connected');
  396. if(self.s.pool.isDestroyed()) return MongoError.create('server instance pool was destroyed');
  397. }
  398. function basicReadValidations(self, options) {
  399. basicWriteValidations(self, options);
  400. if(options.readPreference && !(options.readPreference instanceof ReadPreference)) {
  401. throw new Error("readPreference must be an instance of ReadPreference");
  402. }
  403. }
  404. /**
  405. * Execute a command
  406. * @method
  407. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  408. * @param {object} cmd The command hash
  409. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  410. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  411. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  412. * @param {Boolean} [options.fullResult=false] Return the full envelope instead of just the result document.
  413. * @param {opResultCallback} callback A callback function
  414. */
  415. Server.prototype.command = function(ns, cmd, options, callback) {
  416. var self = this;
  417. if(typeof options == 'function') callback = options, options = {}, options = options || {};
  418. var result = basicReadValidations(self, options);
  419. if(result) return callback(result);
  420. // Debug log
  421. if(self.s.logger.isDebug()) self.s.logger.debug(f('executing command [%s] against %s', JSON.stringify({
  422. ns: ns, cmd: cmd, options: debugOptions(debugFields, options)
  423. }), self.name));
  424. // If we are not connected or have a disconnectHandler specified
  425. if(disconnectHandler(self, 'command', ns, cmd, options, callback)) return;
  426. // Check if we have collation support
  427. if(this.ismaster && this.ismaster.maxWireVersion < 5 && cmd.collation) {
  428. return callback(new MongoError(f('server %s does not support collation', this.name)));
  429. }
  430. // Query options
  431. var queryOptions = {
  432. numberToSkip: 0,
  433. numberToReturn: -1,
  434. checkKeys: typeof options.checkKeys == 'boolean' ? options.checkKeys: false,
  435. serializeFunctions: typeof options.serializeFunctions == 'boolean' ? options.serializeFunctions : false,
  436. ignoreUndefined: typeof options.ignoreUndefined == 'boolean' ? options.ignoreUndefined : false
  437. };
  438. // Create a query instance
  439. var query = new Query(self.s.bson, ns, cmd, queryOptions);
  440. // Set slave OK of the query
  441. query.slaveOk = options.readPreference ? options.readPreference.slaveOk() : false;
  442. // Write options
  443. var writeOptions = {
  444. raw: typeof options.raw == 'boolean' ? options.raw : false,
  445. promoteLongs: typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true,
  446. promoteValues: typeof options.promoteValues == 'boolean' ? options.promoteValues : true,
  447. promoteBuffers: typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers : false,
  448. command: true,
  449. monitoring: typeof options.monitoring == 'boolean' ? options.monitoring : false,
  450. fullResult: typeof options.fullResult == 'boolean' ? options.fullResult : false,
  451. requestId: query.requestId,
  452. socketTimeout: typeof options.socketTimeout == 'number' ? options.socketTimeout : null,
  453. };
  454. // Write the operation to the pool
  455. self.s.pool.write(query, writeOptions, callback);
  456. }
  457. /**
  458. * Insert one or more documents
  459. * @method
  460. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  461. * @param {array} ops An array of documents to insert
  462. * @param {boolean} [options.ordered=true] Execute in order or out of order
  463. * @param {object} [options.writeConcern={}] Write concern for the operation
  464. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  465. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  466. * @param {opResultCallback} callback A callback function
  467. */
  468. Server.prototype.insert = function(ns, ops, options, callback) {
  469. var self = this;
  470. if(typeof options == 'function') callback = options, options = {}, options = options || {};
  471. var result = basicWriteValidations(self, options);
  472. if(result) return callback(result);
  473. // If we are not connected or have a disconnectHandler specified
  474. if(disconnectHandler(self, 'insert', ns, ops, options, callback)) return;
  475. // Setup the docs as an array
  476. ops = Array.isArray(ops) ? ops : [ops];
  477. // Execute write
  478. return self.wireProtocolHandler.insert(self.s.pool, self.ismaster, ns, self.s.bson, ops, options, callback);
  479. }
  480. /**
  481. * Perform one or more update operations
  482. * @method
  483. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  484. * @param {array} ops An array of updates
  485. * @param {boolean} [options.ordered=true] Execute in order or out of order
  486. * @param {object} [options.writeConcern={}] Write concern for the operation
  487. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  488. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  489. * @param {opResultCallback} callback A callback function
  490. */
  491. Server.prototype.update = function(ns, ops, options, callback) {
  492. var self = this;
  493. if(typeof options == 'function') callback = options, options = {}, options = options || {};
  494. var result = basicWriteValidations(self, options);
  495. if(result) return callback(result);
  496. // If we are not connected or have a disconnectHandler specified
  497. if(disconnectHandler(self, 'update', ns, ops, options, callback)) return;
  498. // Check if we have collation support
  499. if(this.ismaster && this.ismaster.maxWireVersion < 5 && options.collation) {
  500. return callback(new MongoError(f('server %s does not support collation', this.name)));
  501. }
  502. // Setup the docs as an array
  503. ops = Array.isArray(ops) ? ops : [ops];
  504. // Execute write
  505. return self.wireProtocolHandler.update(self.s.pool, self.ismaster, ns, self.s.bson, ops, options, callback);
  506. }
  507. /**
  508. * Perform one or more remove operations
  509. * @method
  510. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  511. * @param {array} ops An array of removes
  512. * @param {boolean} [options.ordered=true] Execute in order or out of order
  513. * @param {object} [options.writeConcern={}] Write concern for the operation
  514. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  515. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  516. * @param {opResultCallback} callback A callback function
  517. */
  518. Server.prototype.remove = function(ns, ops, options, callback) {
  519. var self = this;
  520. if(typeof options == 'function') callback = options, options = {}, options = options || {};
  521. var result = basicWriteValidations(self, options);
  522. if(result) return callback(result);
  523. // If we are not connected or have a disconnectHandler specified
  524. if(disconnectHandler(self, 'remove', ns, ops, options, callback)) return;
  525. // Check if we have collation support
  526. if(this.ismaster && this.ismaster.maxWireVersion < 5 && options.collation) {
  527. return callback(new MongoError(f('server %s does not support collation', this.name)));
  528. }
  529. // Setup the docs as an array
  530. ops = Array.isArray(ops) ? ops : [ops];
  531. // Execute write
  532. return self.wireProtocolHandler.remove(self.s.pool, self.ismaster, ns, self.s.bson, ops, options, callback);
  533. }
  534. /**
  535. * Get a new cursor
  536. * @method
  537. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  538. * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
  539. * @param {object} [options.batchSize=0] Batchsize for the operation
  540. * @param {array} [options.documents=[]] Initial documents list for cursor
  541. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  542. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  543. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  544. * @param {opResultCallback} callback A callback function
  545. */
  546. Server.prototype.cursor = function(ns, cmd, cursorOptions) {
  547. var s = this.s;
  548. cursorOptions = cursorOptions || {};
  549. // Set up final cursor type
  550. var FinalCursor = cursorOptions.cursorFactory || s.Cursor;
  551. // Return the cursor
  552. return new FinalCursor(s.bson, ns, cmd, cursorOptions, this, s.options);
  553. }
  554. /**
  555. * Logout from a database
  556. * @method
  557. * @param {string} db The db we are logging out from
  558. * @param {authResultCallback} callback A callback function
  559. */
  560. Server.prototype.logout = function(dbName, callback) {
  561. this.s.pool.logout(dbName, callback);
  562. }
  563. /**
  564. * Authenticate using a specified mechanism
  565. * @method
  566. * @param {string} mechanism The Auth mechanism we are invoking
  567. * @param {string} db The db we are invoking the mechanism against
  568. * @param {...object} param Parameters for the specific mechanism
  569. * @param {authResultCallback} callback A callback function
  570. */
  571. Server.prototype.auth = function(mechanism, db) {
  572. var self = this;
  573. // If we have the default mechanism we pick mechanism based on the wire
  574. // protocol max version. If it's >= 3 then scram-sha1 otherwise mongodb-cr
  575. if(mechanism == 'default' && self.ismaster && self.ismaster.maxWireVersion >= 3) {
  576. mechanism = 'scram-sha-1';
  577. } else if(mechanism == 'default') {
  578. mechanism = 'mongocr';
  579. }
  580. // Slice all the arguments off
  581. var args = Array.prototype.slice.call(arguments, 0);
  582. // Set the mechanism
  583. args[0] = mechanism;
  584. // Get the callback
  585. var callback = args[args.length - 1];
  586. // If we are not connected or have a disconnectHandler specified
  587. if(disconnectHandler(self, 'auth', db, args, {}, callback)) {
  588. return;
  589. }
  590. // Do not authenticate if we are an arbiter
  591. if(this.lastIsMaster() && this.lastIsMaster().arbiterOnly) {
  592. return callback(null, true);
  593. }
  594. // Apply the arguments to the pool
  595. self.s.pool.auth.apply(self.s.pool, args);
  596. }
  597. /**
  598. * Compare two server instances
  599. * @method
  600. * @param {Server} server Server to compare equality against
  601. * @return {boolean}
  602. */
  603. Server.prototype.equals = function(server) {
  604. if(typeof server == 'string') return this.name == server;
  605. if(server.name) return this.name == server.name;
  606. return false;
  607. }
  608. /**
  609. * All raw connections
  610. * @method
  611. * @return {Connection[]}
  612. */
  613. Server.prototype.connections = function() {
  614. return this.s.pool.allConnections();
  615. }
  616. /**
  617. * Get server
  618. * @method
  619. * @return {Server}
  620. */
  621. Server.prototype.getServer = function() {
  622. return this;
  623. }
  624. /**
  625. * Get connection
  626. * @method
  627. * @return {Connection}
  628. */
  629. Server.prototype.getConnection = function() {
  630. return this.s.pool.get();
  631. }
  632. var listeners = ['close', 'error', 'timeout', 'parseError', 'connect'];
  633. /**
  634. * Destroy the server connection
  635. * @method
  636. * @param {boolean} [options.emitClose=false] Emit close event on destroy
  637. * @param {boolean} [options.emitDestroy=false] Emit destroy event on destroy
  638. * @param {boolean} [options.force=false] Force destroy the pool
  639. */
  640. Server.prototype.destroy = function(options) {
  641. options = options || {};
  642. var self = this;
  643. // Set the connections
  644. if(serverAccounting) delete servers[this.id];
  645. // Destroy the monitoring process if any
  646. if(this.monitoringProcessId) {
  647. clearTimeout(this.monitoringProcessId);
  648. }
  649. // Emit close event
  650. if(options.emitClose) {
  651. self.emit('close', self);
  652. }
  653. // Emit destroy event
  654. if(options.emitDestroy) {
  655. self.emit('destroy', self);
  656. }
  657. // Remove all listeners
  658. listeners.forEach(function(event) {
  659. self.s.pool.removeAllListeners(event);
  660. });
  661. // Emit opening server event
  662. if(self.listeners('serverClosed').length > 0) self.emit('serverClosed', {
  663. topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.id, address: self.name
  664. });
  665. // Emit toplogy opening event if not in topology
  666. if(self.listeners('topologyClosed').length > 0 && !self.s.inTopology) {
  667. self.emit('topologyClosed', { topologyId: self.id });
  668. }
  669. if(self.s.logger.isDebug()) {
  670. self.s.logger.debug(f('destroy called on server %s', self.name));
  671. }
  672. // Destroy the pool
  673. this.s.pool.destroy(options.force);
  674. }
  675. /**
  676. * A server connect event, used to verify that the connection is up and running
  677. *
  678. * @event Server#connect
  679. * @type {Server}
  680. */
  681. /**
  682. * A server reconnect event, used to verify that the server topology has reconnected
  683. *
  684. * @event Server#reconnect
  685. * @type {Server}
  686. */
  687. /**
  688. * A server opening SDAM monitoring event
  689. *
  690. * @event Server#serverOpening
  691. * @type {object}
  692. */
  693. /**
  694. * A server closed SDAM monitoring event
  695. *
  696. * @event Server#serverClosed
  697. * @type {object}
  698. */
  699. /**
  700. * A server description SDAM change monitoring event
  701. *
  702. * @event Server#serverDescriptionChanged
  703. * @type {object}
  704. */
  705. /**
  706. * A topology open SDAM event
  707. *
  708. * @event Server#topologyOpening
  709. * @type {object}
  710. */
  711. /**
  712. * A topology closed SDAM event
  713. *
  714. * @event Server#topologyClosed
  715. * @type {object}
  716. */
  717. /**
  718. * A topology structure SDAM change event
  719. *
  720. * @event Server#topologyDescriptionChanged
  721. * @type {object}
  722. */
  723. /**
  724. * Server reconnect failed
  725. *
  726. * @event Server#reconnectFailed
  727. * @type {Error}
  728. */
  729. /**
  730. * Server connection pool closed
  731. *
  732. * @event Server#close
  733. * @type {object}
  734. */
  735. /**
  736. * Server connection pool caused an error
  737. *
  738. * @event Server#error
  739. * @type {Error}
  740. */
  741. /**
  742. * Server destroyed was called
  743. *
  744. * @event Server#destroy
  745. * @type {Server}
  746. */
  747. module.exports = Server;