"use strict" var inherits = require('util').inherits, f = require('util').format, EventEmitter = require('events').EventEmitter, BasicCursor = require('../cursor'), Logger = require('../connection/logger'), retrieveBSON = require('../connection/utils').retrieveBSON, MongoError = require('../error'), Server = require('./server'), assign = require('./shared').assign, clone = require('./shared').clone, createClientInfo = require('./shared').createClientInfo; var BSON = retrieveBSON(); /** * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is * used to construct connections. * * @example * var Mongos = require('mongodb-core').Mongos * , ReadPreference = require('mongodb-core').ReadPreference * , assert = require('assert'); * * var server = new Mongos([{host: 'localhost', port: 30000}]); * // Wait for the connection event * server.on('connect', function(server) { * server.destroy(); * }); * * // Start connecting * server.connect(); */ var MongoCR = require('../auth/mongocr') , X509 = require('../auth/x509') , Plain = require('../auth/plain') , GSSAPI = require('../auth/gssapi') , SSPI = require('../auth/sspi') , ScramSHA1 = require('../auth/scram'); // // States var DISCONNECTED = 'disconnected'; var CONNECTING = 'connecting'; var CONNECTED = 'connected'; var DESTROYED = 'destroyed'; function stateTransition(self, newState) { var legalTransitions = { 'disconnected': [CONNECTING, DESTROYED, DISCONNECTED], 'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED], 'connected': [CONNECTED, DISCONNECTED, DESTROYED], 'destroyed': [DESTROYED] } // Get current state var legalStates = legalTransitions[self.state]; if(legalStates && legalStates.indexOf(newState) != -1) { self.state = newState; } else { self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]' , self.id, self.state, newState, legalStates)); } } // // ReplSet instance id var id = 1; var handlers = ['connect', 'close', 'error', 'timeout', 'parseError']; /** * Creates a new Mongos instance * @class * @param {array} seedlist A list of seeds for the replicaset * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors * @param {number} [options.size=5] Server connection pool size * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection * @param {boolean} [options.noDelay=true] TCP Connection no delay * @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting * @param {number} [options.socketTimeout=0] TCP Socket timeout setting * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed * @param {boolean} [options.ssl=false] Use SSL for connection * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. * @param {Buffer} [options.ca] SSL Certificate store binary buffer * @param {Buffer} [options.cert] SSL Certificate binary buffer * @param {Buffer} [options.key] SSL Key file binary buffer * @param {string} [options.passphrase] SSL Certificate pass phrase * @param {string} [options.servername=null] String containing the server name requested via TLS SNI. * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit. * @return {Mongos} A cursor instance * @fires Mongos#connect * @fires Mongos#reconnect * @fires Mongos#joined * @fires Mongos#left * @fires Mongos#failed * @fires Mongos#fullsetup * @fires Mongos#all * @fires Mongos#serverHeartbeatStarted * @fires Mongos#serverHeartbeatSucceeded * @fires Mongos#serverHeartbeatFailed * @fires Mongos#topologyOpening * @fires Mongos#topologyClosed * @fires Mongos#topologyDescriptionChanged * @property {string} type the topology type. * @property {string} parserType the parser type used (c++ or js). */ var Mongos = function(seedlist, options) { options = options || {}; // Get replSet Id this.id = id++; // Internal state this.s = { options: assign({}, options), // BSON instance bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128, BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey, BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]), // Factory overrides Cursor: options.cursorFactory || BasicCursor, // Logger instance logger: Logger('Mongos', options), // Seedlist seedlist: seedlist, // Ha interval haInterval: options.haInterval ? options.haInterval : 10000, // Disconnect handler disconnectHandler: options.disconnectHandler, // Server selection index index: 0, // Connect function options passed in connectOptions: {}, // Are we running in debug mode debug: typeof options.debug == 'boolean' ? options.debug : false, // localThresholdMS localThresholdMS: options.localThresholdMS || 15, // Client info clientInfo: createClientInfo(options) } // Set the client info this.s.options.clientInfo = createClientInfo(options); // Log info warning if the socketTimeout < haInterval as it will cause // a lot of recycled connections to happen. if(this.s.logger.isWarn() && this.s.options.socketTimeout != 0 && this.s.options.socketTimeout < this.s.haInterval) { this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts' , this.s.options.socketTimeout, this.s.haInterval)); } // All the authProviders this.authProviders = options.authProviders || { 'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson) , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson) , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson) } // Disconnected state this.state = DISCONNECTED; // Current proxies we are connecting to this.connectingProxies = []; // Currently connected proxies this.connectedProxies = []; // Disconnected proxies this.disconnectedProxies = []; // Are we authenticating this.authenticating = false; // Index of proxy to run operations against this.index = 0; // High availability timeout id this.haTimeoutId = null; // Last ismaster this.ismaster = null; // Add event listener EventEmitter.call(this); } inherits(Mongos, EventEmitter); Object.defineProperty(Mongos.prototype, 'type', { enumerable:true, get: function() { return 'mongos'; } }); Object.defineProperty(Mongos.prototype, 'parserType', { enumerable:true, get: function() { return BSON.native ? "c++" : "js"; } }); /** * Emit event if it exists * @method */ function emitSDAMEvent(self, event, description) { if(self.listeners(event).length > 0) { self.emit(event, description); } } /** * Initiate server connect * @method * @param {array} [options.auth=null] Array of auth options to apply on connect */ Mongos.prototype.connect = function(options) { var self = this; // Add any connect level options to the internal state this.s.connectOptions = options || {}; // Set connecting state stateTransition(this, CONNECTING); // Create server instances var servers = this.s.seedlist.map(function(x) { return new Server(assign({}, self.s.options, x, { authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true }, { clientInfo: clone(self.s.clientInfo) })); }); // Emit the topology opening event emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id }); // Start all server connections connectProxies(self, servers); } function handleEvent(self) { return function() { if(self.state == DESTROYED) return; // Move to list of disconnectedProxies moveServerFrom(self.connectedProxies, self.disconnectedProxies, this); // Emit the left signal self.emit('left', 'mongos', this); } } function handleInitialConnectEvent(self, event) { return function() { // Destroy the instance if(self.state == DESTROYED) { // Move from connectingProxies moveServerFrom(self.connectingProxies, self.disconnectedProxies, this); return this.destroy(); } // Check the type of server if(event == 'connect') { // Get last known ismaster self.ismaster = this.lastIsMaster(); // Is this not a proxy, remove t if(self.ismaster.msg == 'isdbgrid') { // Add to the connectd list for(var i = 0; i < self.connectedProxies.length; i++) { if(self.connectedProxies[i].name == this.name) { // Move from connectingProxies moveServerFrom(self.connectingProxies, self.disconnectedProxies, this); this.destroy(); return self.emit('failed', this); } } // Remove the handlers for(i = 0; i < handlers.length; i++) { this.removeAllListeners(handlers[i]); } // Add stable state handlers this.on('error', handleEvent(self, 'error')); this.on('close', handleEvent(self, 'close')); this.on('timeout', handleEvent(self, 'timeout')); this.on('parseError', handleEvent(self, 'parseError')); // Move from connecting proxies connected moveServerFrom(self.connectingProxies, self.connectedProxies, this); // Emit the joined event self.emit('joined', 'mongos', this); } else { // Print warning if we did not find a mongos proxy if(self.s.logger.isWarn()) { var message = 'expected mongos proxy, but found replicaset member mongod for server %s'; // We have a standalone server if(!self.ismaster.hosts) { message = 'expected mongos proxy, but found standalone mongod for server %s'; } self.s.logger.warn(f(message, this.name)); } // This is not a mongos proxy, remove it completely removeProxyFrom(self.connectingProxies, this); // Emit the left event self.emit('left', 'server', this); // Emit failed event self.emit('failed', this); } } else { moveServerFrom(self.connectingProxies, self.disconnectedProxies, this); // Emit the left event self.emit('left', 'mongos', this); // Emit failed event self.emit('failed', this); } // Trigger topologyMonitor if(self.connectingProxies.length == 0) { // Emit connected if we are connected if(self.connectedProxies.length > 0) { // Set the state to connected stateTransition(self, CONNECTED); // Emit the connect event self.emit('connect', self); self.emit('fullsetup', self); self.emit('all', self); } else if(self.disconnectedProxies.length == 0) { // Print warning if we did not find a mongos proxy if(self.s.logger.isWarn()) { self.s.logger.warn(f('no mongos proxies found in seed list, did you mean to connect to a replicaset')); } // Emit the error that no proxies were found return self.emit('error', new MongoError('no mongos proxies found in seed list')); } // Topology monitor topologyMonitor(self, {firstConnect:true}); } }; } function connectProxies(self, servers) { // Update connectingProxies self.connectingProxies = self.connectingProxies.concat(servers); // Index used to interleaf the server connects, avoiding // runtime issues on io constrained vm's var timeoutInterval = 0; function connect(server, timeoutInterval) { setTimeout(function() { // Add event handlers server.once('close', handleInitialConnectEvent(self, 'close')); server.once('timeout', handleInitialConnectEvent(self, 'timeout')); server.once('parseError', handleInitialConnectEvent(self, 'parseError')); server.once('error', handleInitialConnectEvent(self, 'error')); server.once('connect', handleInitialConnectEvent(self, 'connect')); // SDAM Monitoring events server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); // Start connection server.connect(self.s.connectOptions); }, timeoutInterval); } // Start all the servers while(servers.length > 0) { connect(servers.shift(), timeoutInterval++); } } function pickProxy(self) { // Get the currently connected Proxies var connectedProxies = self.connectedProxies.slice(0); // Set lower bound var lowerBoundLatency = Number.MAX_VALUE; // Determine the lower bound for the Proxies for(var i = 0; i < connectedProxies.length; i++) { if(connectedProxies[i].lastIsMasterMS < lowerBoundLatency) { lowerBoundLatency = connectedProxies[i].lastIsMasterMS; } } // Filter out the possible servers connectedProxies = connectedProxies.filter(function(server) { if((server.lastIsMasterMS <= (lowerBoundLatency + self.s.localThresholdMS)) && server.isConnected()) { return true; } }); // We have no connectedProxies pick first of the connected ones if(connectedProxies.length == 0) { return self.connectedProxies[0]; } // Get proxy var proxy = connectedProxies[self.index % connectedProxies.length]; // Update the index self.index = (self.index + 1) % connectedProxies.length; // Return the proxy return proxy; } function moveServerFrom(from, to, proxy) { for(var i = 0; i < from.length; i++) { if(from[i].name == proxy.name) { from.splice(i, 1); } } for(i = 0; i < to.length; i++) { if(to[i].name == proxy.name) { to.splice(i, 1); } } to.push(proxy); } function removeProxyFrom(from, proxy) { for(var i = 0; i < from.length; i++) { if(from[i].name == proxy.name) { from.splice(i, 1); } } } function reconnectProxies(self, proxies, callback) { // Count lefts var count = proxies.length; // Handle events var _handleEvent = function(self, event) { return function() { var _self = this; count = count - 1; // Destroyed if(self.state == DESTROYED) { moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self); return this.destroy(); } if(event == 'connect' && !self.authenticating) { // Destroyed if(self.state == DESTROYED) { moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self); return _self.destroy(); } // Remove the handlers for(var i = 0; i < handlers.length; i++) { _self.removeAllListeners(handlers[i]); } // Add stable state handlers _self.on('error', handleEvent(self, 'error')); _self.on('close', handleEvent(self, 'close')); _self.on('timeout', handleEvent(self, 'timeout')); _self.on('parseError', handleEvent(self, 'parseError')); // Move to the connected servers moveServerFrom(self.disconnectedProxies, self.connectedProxies, _self); // Emit joined event self.emit('joined', 'mongos', _self); } else if(event == 'connect' && self.authenticating) { // Move from connectingProxies moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self); this.destroy(); } // Are we done finish up callback if(count == 0) { callback(); } } } // No new servers if(count == 0) { return callback(); } // Execute method function execute(_server, i) { setTimeout(function() { // Destroyed if(self.state == DESTROYED) { return; } // Create a new server instance var server = new Server(assign({}, self.s.options, { host: _server.name.split(':')[0], port: parseInt(_server.name.split(':')[1], 10) }, { authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true }, { clientInfo: clone(self.s.clientInfo) })); // Add temp handlers server.once('connect', _handleEvent(self, 'connect')); server.once('close', _handleEvent(self, 'close')); server.once('timeout', _handleEvent(self, 'timeout')); server.once('error', _handleEvent(self, 'error')); server.once('parseError', _handleEvent(self, 'parseError')); // SDAM Monitoring events server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); server.connect(self.s.connectOptions); }, i); } // Create new instances for(var i = 0; i < proxies.length; i++) { execute(proxies[i], i); } } function topologyMonitor(self, options) { options = options || {}; // Set momitoring timeout self.haTimeoutId = setTimeout(function() { if(self.state == DESTROYED) return; // If we have a primary and a disconnect handler, execute // buffered operations if(self.isConnected() && self.s.disconnectHandler) { self.s.disconnectHandler.execute(); } // Get the connectingServers var proxies = self.connectedProxies.slice(0); // Get the count var count = proxies.length; // If the count is zero schedule a new fast function pingServer(_self, _server, cb) { // Measure running time var start = new Date().getTime(); // Emit the server heartbeat start emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name }); // Execute ismaster _server.command('admin.$cmd', { ismaster:true }, { monitoring: true, socketTimeout: self.s.options.connectionTimeout || 2000, }, function(err, r) { if(self.state == DESTROYED) { // Move from connectingProxies moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server); _server.destroy(); return cb(err, r); } // Calculate latency var latencyMS = new Date().getTime() - start; // We had an error, remove it from the state if(err) { // Emit the server heartbeat failure emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name }); // Move from connected proxies to disconnected proxies moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server); } else { // Update the server ismaster _server.ismaster = r.result; _server.lastIsMasterMS = latencyMS; // Server heart beat event emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name }); } cb(err, r); }); } // No proxies initiate monitor again if(proxies.length == 0) { // Emit close event if any listeners registered if(self.listeners("close").length > 0 && self.state == CONNECTING) { self.emit('error', new MongoError('no mongos proxy available')); } else { self.emit('close', self); } // Attempt to connect to any unknown servers return reconnectProxies(self, self.disconnectedProxies, function() { if(self.state == DESTROYED) return; // Are we connected ? emit connect event if(self.state == CONNECTING && options.firstConnect) { self.emit('connect', self); self.emit('fullsetup', self); self.emit('all', self); } else if(self.isConnected()) { self.emit('reconnect', self); } else if(!self.isConnected() && self.listeners("close").length > 0) { self.emit('close', self); } // Perform topology monitor topologyMonitor(self); }); } // Ping all servers for(var i = 0; i < proxies.length; i++) { pingServer(self, proxies[i], function() { count = count - 1; if(count == 0) { if(self.state == DESTROYED) return; // Attempt to connect to any unknown servers reconnectProxies(self, self.disconnectedProxies, function() { if(self.state == DESTROYED) return; // Perform topology monitor topologyMonitor(self); }); } }); } }, self.s.haInterval); } /** * Returns the last known ismaster document for this server * @method * @return {object} */ Mongos.prototype.lastIsMaster = function() { return this.ismaster; } /** * Unref all connections belong to this server * @method */ Mongos.prototype.unref = function() { // Transition state stateTransition(this, DISCONNECTED); // Get all proxies var proxies = this.connectedProxies.concat(this.connectingProxies); proxies.forEach(function(x) { x.unref(); }); clearTimeout(this.haTimeoutId); } /** * Destroy the server connection * @param {boolean} [options.force=false] Force destroy the pool * @method */ Mongos.prototype.destroy = function(options) { // Transition state stateTransition(this, DESTROYED); // Get all proxies var proxies = this.connectedProxies.concat(this.connectingProxies); // Clear out any monitoring process if(this.haTimeoutId) clearTimeout(this.haTimeoutId); // Destroy all connecting servers proxies.forEach(function(x) { x.destroy(options); }); // Emit toplogy closing event emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id }); } /** * Figure out if the server is connected * @method * @return {boolean} */ Mongos.prototype.isConnected = function() { return this.connectedProxies.length > 0; } /** * Figure out if the server instance was destroyed by calling destroy * @method * @return {boolean} */ Mongos.prototype.isDestroyed = function() { return this.state == DESTROYED; } // // Operations // // Execute write operation var executeWriteOperation = function(self, op, ns, ops, options, callback) { if(typeof options == 'function') callback = options, options = {}, options = options || {}; // Ensure we have no options options = options || {}; // Pick a server var server = pickProxy(self); // No server found error out if(!server) return callback(new MongoError('no mongos proxy available')); // Execute the command server[op](ns, ops, options, callback); } /** * Insert one or more documents * @method * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) * @param {array} ops An array of documents to insert * @param {boolean} [options.ordered=true] Execute in order or out of order * @param {object} [options.writeConcern={}] Write concern for the operation * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. * @param {opResultCallback} callback A callback function */ Mongos.prototype.insert = function(ns, ops, options, callback) { if(typeof options == 'function') callback = options, options = {}, options = options || {}; if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); // Not connected but we have a disconnecthandler if(!this.isConnected() && this.s.disconnectHandler != null) { return this.s.disconnectHandler.add('insert', ns, ops, options, callback); } // No mongos proxy available if(!this.isConnected()) { return callback(new MongoError('no mongos proxy available')); } // Execute write operation executeWriteOperation(this, 'insert', ns, ops, options, callback); } /** * Perform one or more update operations * @method * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) * @param {array} ops An array of updates * @param {boolean} [options.ordered=true] Execute in order or out of order * @param {object} [options.writeConcern={}] Write concern for the operation * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. * @param {opResultCallback} callback A callback function */ Mongos.prototype.update = function(ns, ops, options, callback) { if(typeof options == 'function') callback = options, options = {}, options = options || {}; if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); // Not connected but we have a disconnecthandler if(!this.isConnected() && this.s.disconnectHandler != null) { return this.s.disconnectHandler.add('update', ns, ops, options, callback); } // No mongos proxy available if(!this.isConnected()) { return callback(new MongoError('no mongos proxy available')); } // Execute write operation executeWriteOperation(this, 'update', ns, ops, options, callback); } /** * Perform one or more remove operations * @method * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) * @param {array} ops An array of removes * @param {boolean} [options.ordered=true] Execute in order or out of order * @param {object} [options.writeConcern={}] Write concern for the operation * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. * @param {opResultCallback} callback A callback function */ Mongos.prototype.remove = function(ns, ops, options, callback) { if(typeof options == 'function') callback = options, options = {}, options = options || {}; if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); // Not connected but we have a disconnecthandler if(!this.isConnected() && this.s.disconnectHandler != null) { return this.s.disconnectHandler.add('remove', ns, ops, options, callback); } // No mongos proxy available if(!this.isConnected()) { return callback(new MongoError('no mongos proxy available')); } // Execute write operation executeWriteOperation(this, 'remove', ns, ops, options, callback); } /** * Execute a command * @method * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) * @param {object} cmd The command hash * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it * @param {Connection} [options.connection] Specify connection object to execute command against * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. * @param {opResultCallback} callback A callback function */ Mongos.prototype.command = function(ns, cmd, options, callback) { if(typeof options == 'function') callback = options, options = {}, options = options || {}; if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); var self = this; // Pick a proxy var server = pickProxy(self); // Topology is not connected, save the call in the provided store to be // Executed at some point when the handler deems it's reconnected if((server == null || !server.isConnected()) && this.s.disconnectHandler != null) { return this.s.disconnectHandler.add('command', ns, cmd, options, callback); } // No server returned we had an error if(server == null) { return callback(new MongoError('no mongos proxy available')); } // Execute the command server.command(ns, cmd, options, callback); } /** * Perform one or more remove operations * @method * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId * @param {object} [options.batchSize=0] Batchsize for the operation * @param {array} [options.documents=[]] Initial documents list for cursor * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. * @param {opResultCallback} callback A callback function */ Mongos.prototype.cursor = function(ns, cmd, cursorOptions) { cursorOptions = cursorOptions || {}; var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor; return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options); } /** * Authenticate using a specified mechanism * @method * @param {string} mechanism The Auth mechanism we are invoking * @param {string} db The db we are invoking the mechanism against * @param {...object} param Parameters for the specific mechanism * @param {authResultCallback} callback A callback function */ Mongos.prototype.auth = function(mechanism, db) { var allArgs = Array.prototype.slice.call(arguments, 0).slice(0); var self = this; var args = Array.prototype.slice.call(arguments, 2); var callback = args.pop(); // If we don't have the mechanism fail if(this.authProviders[mechanism] == null && mechanism != 'default') { return callback(new MongoError(f("auth provider %s does not exist", mechanism))); } // Are we already authenticating, throw if(this.authenticating) { return callback(new MongoError('authentication or logout allready in process')); } // Topology is not connected, save the call in the provided store to be // Executed at some point when the handler deems it's reconnected if(!self.isConnected() && self.s.disconnectHandler != null) { return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback); } // Set to authenticating this.authenticating = true; // All errors var errors = []; // Get all the servers var servers = this.connectedProxies.slice(0); // No servers return if(servers.length == 0) { this.authenticating = false; callback(null, true); } // Authenticate function auth(server) { // Arguments without a callback var argsWithoutCallback = [mechanism, db].concat(args.slice(0)); // Create arguments var finalArguments = argsWithoutCallback.concat([function(err) { count = count - 1; // Save all the errors if(err) errors.push({name: server.name, err: err}); // We are done if(count == 0) { // Auth is done self.authenticating = false; // Return the auth error if(errors.length) return callback(MongoError.create({ message: 'authentication fail', errors: errors }), false); // Successfully authenticated session callback(null, self); } }]); // Execute the auth only against non arbiter servers if(!server.lastIsMaster().arbiterOnly) { server.auth.apply(server, finalArguments); } } // Get total count var count = servers.length; // Authenticate against all servers while(servers.length > 0) { auth(servers.shift()); } } /** * Logout from a database * @method * @param {string} db The db we are logging out from * @param {authResultCallback} callback A callback function */ Mongos.prototype.logout = function(dbName, callback) { var self = this; // Are we authenticating or logging out, throw if(this.authenticating) { throw new MongoError('authentication or logout allready in process'); } // Ensure no new members are processed while logging out this.authenticating = true; // Remove from all auth providers (avoid any reaplication of the auth details) var providers = Object.keys(this.authProviders); for(var i = 0; i < providers.length; i++) { this.authProviders[providers[i]].logout(dbName); } // Now logout all the servers var servers = this.connectedProxies.slice(0); var count = servers.length; if(count == 0) return callback(); var errors = []; function logoutServer(_server, cb) { _server.logout(dbName, function(err) { if(err) errors.push({name: _server.name, err: err}); cb(); }); } // Execute logout on all server instances for(i = 0; i < servers.length; i++) { logoutServer(servers[i], function() { count = count - 1; if(count == 0) { // Do not block new operations self.authenticating = false; // If we have one or more errors if(errors.length) return callback(MongoError.create({ message: f('logout failed against db %s', dbName), errors: errors }), false); // No errors callback(); } }) } } /** * Get server * @method * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it * @return {Server} */ Mongos.prototype.getServer = function() { var server = pickProxy(this); if(this.s.debug) this.emit('pickedServer', null, server); return server; } /** * All raw connections * @method * @return {Connection[]} */ Mongos.prototype.connections = function() { var connections = []; for(var i = 0; i < this.connectedProxies.length; i++) { connections = connections.concat(this.connectedProxies[i].connections()); } return connections; } /** * A mongos connect event, used to verify that the connection is up and running * * @event Mongos#connect * @type {Mongos} */ /** * A mongos reconnect event, used to verify that the mongos topology has reconnected * * @event Mongos#reconnect * @type {Mongos} */ /** * A mongos fullsetup event, used to signal that all topology members have been contacted. * * @event Mongos#fullsetup * @type {Mongos} */ /** * A mongos all event, used to signal that all topology members have been contacted. * * @event Mongos#all * @type {Mongos} */ /** * A server member left the mongos list * * @event Mongos#left * @type {Mongos} * @param {string} type The type of member that left (mongos) * @param {Server} server The server object that left */ /** * A server member joined the mongos list * * @event Mongos#joined * @type {Mongos} * @param {string} type The type of member that left (mongos) * @param {Server} server The server object that joined */ /** * A server opening SDAM monitoring event * * @event Mongos#serverOpening * @type {object} */ /** * A server closed SDAM monitoring event * * @event Mongos#serverClosed * @type {object} */ /** * A server description SDAM change monitoring event * * @event Mongos#serverDescriptionChanged * @type {object} */ /** * A topology open SDAM event * * @event Mongos#topologyOpening * @type {object} */ /** * A topology closed SDAM event * * @event Mongos#topologyClosed * @type {object} */ /** * A topology structure SDAM change event * * @event Mongos#topologyDescriptionChanged * @type {object} */ /** * A topology serverHeartbeatStarted SDAM event * * @event Mongos#serverHeartbeatStarted * @type {object} */ /** * A topology serverHeartbeatFailed SDAM event * * @event Mongos#serverHeartbeatFailed * @type {object} */ /** * A topology serverHeartbeatSucceeded SDAM change event * * @event Mongos#serverHeartbeatSucceeded * @type {object} */ module.exports = Mongos;