apm.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. var EventEmitter = require('events').EventEmitter,
  2. inherits = require('util').inherits;
  3. // Get prototypes
  4. var AggregationCursor = require('./aggregation_cursor'),
  5. CommandCursor = require('./command_cursor'),
  6. OrderedBulkOperation = require('./bulk/ordered').OrderedBulkOperation,
  7. UnorderedBulkOperation = require('./bulk/unordered').UnorderedBulkOperation,
  8. GridStore = require('./gridfs/grid_store'),
  9. Cursor = require('./cursor'),
  10. Collection = require('./collection'),
  11. Db = require('./db');
  12. var basicOperationIdGenerator = {
  13. operationId: 1,
  14. next: function() {
  15. return this.operationId++;
  16. }
  17. }
  18. var basicTimestampGenerator = {
  19. current: function() {
  20. return new Date().getTime();
  21. },
  22. duration: function(start, end) {
  23. return end - start;
  24. }
  25. }
  26. var senstiveCommands = ['authenticate', 'saslStart', 'saslContinue', 'getnonce',
  27. 'createUser', 'updateUser', 'copydbgetnonce', 'copydbsaslstart', 'copydb'];
  28. var Instrumentation = function(core, options, callback) {
  29. options = options || {};
  30. // Optional id generators
  31. var operationIdGenerator = options.operationIdGenerator || basicOperationIdGenerator;
  32. // Optional timestamp generator
  33. var timestampGenerator = options.timestampGenerator || basicTimestampGenerator;
  34. // Extend with event emitter functionality
  35. EventEmitter.call(this);
  36. // Contains all the instrumentation overloads
  37. this.overloads = [];
  38. // ---------------------------------------------------------
  39. //
  40. // Instrument prototype
  41. //
  42. // ---------------------------------------------------------
  43. var instrumentPrototype = function(callback) {
  44. var instrumentations = []
  45. // Classes to support
  46. var classes = [GridStore, OrderedBulkOperation, UnorderedBulkOperation,
  47. CommandCursor, AggregationCursor, Cursor, Collection, Db];
  48. // Add instrumentations to the available list
  49. for(var i = 0; i < classes.length; i++) {
  50. if(classes[i].define) {
  51. instrumentations.push(classes[i].define.generate());
  52. }
  53. }
  54. // Return the list of instrumentation points
  55. callback(null, instrumentations);
  56. }
  57. // Did the user want to instrument the prototype
  58. if(typeof callback == 'function') {
  59. instrumentPrototype(callback);
  60. }
  61. // ---------------------------------------------------------
  62. //
  63. // Server
  64. //
  65. // ---------------------------------------------------------
  66. // Reference
  67. var self = this;
  68. // Names of methods we need to wrap
  69. var methods = ['command', 'insert', 'update', 'remove'];
  70. // Prototype
  71. var proto = core.Server.prototype;
  72. // Core server method we are going to wrap
  73. methods.forEach(function(x) {
  74. var func = proto[x];
  75. // Add to overloaded methods
  76. self.overloads.push({proto: proto, name:x, func:func});
  77. // The actual prototype
  78. proto[x] = function() {
  79. var requestId = core.Query.nextRequestId();
  80. // Get the aruments
  81. var args = Array.prototype.slice.call(arguments, 0);
  82. var ns = args[0];
  83. var commandObj = args[1];
  84. var options = args[2] || {};
  85. var keys = Object.keys(commandObj);
  86. var commandName = keys[0];
  87. var db = ns.split('.')[0];
  88. // Get the collection
  89. var col = ns.split('.');
  90. col.shift();
  91. col = col.join('.');
  92. // Do we have a legacy insert/update/remove command
  93. if(x == 'insert') { //} && !this.lastIsMaster().maxWireVersion) {
  94. commandName = 'insert';
  95. // Re-write the command
  96. commandObj = {
  97. insert: col, documents: commandObj
  98. }
  99. if(options.writeConcern && Object.keys(options.writeConcern).length > 0) {
  100. commandObj.writeConcern = options.writeConcern;
  101. }
  102. commandObj.ordered = options.ordered != undefined ? options.ordered : true;
  103. } else if(x == 'update') { // && !this.lastIsMaster().maxWireVersion) {
  104. commandName = 'update';
  105. // Re-write the command
  106. commandObj = {
  107. update: col, updates: commandObj
  108. }
  109. if(options.writeConcern && Object.keys(options.writeConcern).length > 0) {
  110. commandObj.writeConcern = options.writeConcern;
  111. }
  112. commandObj.ordered = options.ordered != undefined ? options.ordered : true;
  113. } else if(x == 'remove') { //&& !this.lastIsMaster().maxWireVersion) {
  114. commandName = 'delete';
  115. // Re-write the command
  116. commandObj = {
  117. delete: col, deletes: commandObj
  118. }
  119. if(options.writeConcern && Object.keys(options.writeConcern).length > 0) {
  120. commandObj.writeConcern = options.writeConcern;
  121. }
  122. commandObj.ordered = options.ordered != undefined ? options.ordered : true;
  123. }
  124. // Get the callback
  125. var callback = args.pop();
  126. // Set current callback operation id from the current context or create
  127. // a new one
  128. var ourOpId = callback.operationId || operationIdGenerator.next();
  129. // Get a connection reference for this server instance
  130. var connection = this.s.pool.get()
  131. // Emit the start event for the command
  132. var command = {
  133. // Returns the command.
  134. command: commandObj,
  135. // Returns the database name.
  136. databaseName: db,
  137. // Returns the command name.
  138. commandName: commandName,
  139. // Returns the driver generated request id.
  140. requestId: requestId,
  141. // Returns the driver generated operation id.
  142. // This is used to link events together such as bulk write operations. OPTIONAL.
  143. operationId: ourOpId,
  144. // Returns the connection id for the command. For languages that do not have this,
  145. // this MUST return the driver equivalent which MUST include the server address and port.
  146. // The name of this field is flexible to match the object that is returned from the driver.
  147. connectionId: connection
  148. };
  149. // Filter out any sensitive commands
  150. if(senstiveCommands.indexOf(commandName.toLowerCase())) {
  151. command.commandObj = {};
  152. command.commandObj[commandName] = true;
  153. }
  154. // Emit the started event
  155. self.emit('started', command)
  156. // Start time
  157. var startTime = timestampGenerator.current();
  158. // Push our handler callback
  159. args.push(function(err, r) {
  160. var endTime = timestampGenerator.current();
  161. var command = {
  162. duration: timestampGenerator.duration(startTime, endTime),
  163. commandName: commandName,
  164. requestId: requestId,
  165. operationId: ourOpId,
  166. connectionId: connection
  167. };
  168. // If we have an error
  169. if(err || (r && r.result && r.result.ok == 0)) {
  170. command.failure = err || r.result.writeErrors || r.result;
  171. // Filter out any sensitive commands
  172. if(senstiveCommands.indexOf(commandName.toLowerCase())) {
  173. command.failure = {};
  174. }
  175. self.emit('failed', command);
  176. } else if(commandObj && commandObj.writeConcern
  177. && commandObj.writeConcern.w == 0) {
  178. // If we have write concern 0
  179. command.reply = {ok:1};
  180. self.emit('succeeded', command);
  181. } else {
  182. command.reply = r && r.result ? r.result : r;
  183. // Filter out any sensitive commands
  184. if(senstiveCommands.indexOf(commandName.toLowerCase()) != -1) {
  185. command.reply = {};
  186. }
  187. self.emit('succeeded', command);
  188. }
  189. // Return to caller
  190. callback(err, r);
  191. });
  192. // Apply the call
  193. func.apply(this, args);
  194. }
  195. });
  196. // ---------------------------------------------------------
  197. //
  198. // Bulk Operations
  199. //
  200. // ---------------------------------------------------------
  201. // Inject ourselves into the Bulk methods
  202. methods = ['execute'];
  203. var prototypes = [
  204. require('./bulk/ordered').Bulk.prototype,
  205. require('./bulk/unordered').Bulk.prototype
  206. ]
  207. prototypes.forEach(function(proto) {
  208. // Core server method we are going to wrap
  209. methods.forEach(function(x) {
  210. var func = proto[x];
  211. // Add to overloaded methods
  212. self.overloads.push({proto: proto, name:x, func:func});
  213. // The actual prototype
  214. proto[x] = function() {
  215. // Get the aruments
  216. var args = Array.prototype.slice.call(arguments, 0);
  217. // Set an operation Id on the bulk object
  218. this.operationId = operationIdGenerator.next();
  219. // Get the callback
  220. var callback = args.pop();
  221. // If we have a callback use this
  222. if(typeof callback == 'function') {
  223. args.push(function(err, r) {
  224. // Return to caller
  225. callback(err, r);
  226. });
  227. // Apply the call
  228. func.apply(this, args);
  229. } else {
  230. return func.apply(this, args);
  231. }
  232. }
  233. });
  234. });
  235. // ---------------------------------------------------------
  236. //
  237. // Cursor
  238. //
  239. // ---------------------------------------------------------
  240. // Inject ourselves into the Cursor methods
  241. methods = ['_find', '_getmore', '_killcursor'];
  242. prototypes = [
  243. require('./cursor').prototype,
  244. require('./command_cursor').prototype,
  245. require('./aggregation_cursor').prototype
  246. ]
  247. // Command name translation
  248. var commandTranslation = {
  249. '_find': 'find', '_getmore': 'getMore', '_killcursor': 'killCursors', '_explain': 'explain'
  250. }
  251. prototypes.forEach(function(proto) {
  252. // Core server method we are going to wrap
  253. methods.forEach(function(x) {
  254. var func = proto[x];
  255. // Add to overloaded methods
  256. self.overloads.push({proto: proto, name:x, func:func});
  257. // The actual prototype
  258. proto[x] = function() {
  259. var cursor = this;
  260. var requestId = core.Query.nextRequestId();
  261. var ourOpId = operationIdGenerator.next();
  262. var parts = this.ns.split('.');
  263. var db = parts[0];
  264. // Get the collection
  265. parts.shift();
  266. var collection = parts.join('.');
  267. // Set the command
  268. var command = this.query;
  269. var cmd = this.s.cmd;
  270. // If we have a find method, set the operationId on the cursor
  271. if(x == '_find') {
  272. cursor.operationId = ourOpId;
  273. }
  274. // Do we have a find command rewrite it
  275. if(x == '_getmore') {
  276. command = {
  277. getMore: this.cursorState.cursorId,
  278. collection: collection,
  279. batchSize: cmd.batchSize
  280. }
  281. if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS;
  282. } else if(x == '_killcursor') {
  283. command = {
  284. killCursors: collection,
  285. cursors: [this.cursorState.cursorId]
  286. }
  287. } else if(cmd.find) {
  288. command = {
  289. find: collection, filter: cmd.query
  290. }
  291. if(cmd.sort) command.sort = cmd.sort;
  292. if(cmd.fields) command.projection = cmd.fields;
  293. if(cmd.limit && cmd.limit < 0) {
  294. command.limit = Math.abs(cmd.limit);
  295. command.singleBatch = true;
  296. } else if(cmd.limit) {
  297. command.limit = Math.abs(cmd.limit);
  298. }
  299. // Options
  300. if(cmd.skip) command.skip = cmd.skip;
  301. if(cmd.hint) command.hint = cmd.hint;
  302. if(cmd.batchSize) command.batchSize = cmd.batchSize;
  303. if(typeof cmd.returnKey == 'boolean') command.returnKey = cmd.returnKey;
  304. if(cmd.comment) command.comment = cmd.comment;
  305. if(cmd.min) command.min = cmd.min;
  306. if(cmd.max) command.max = cmd.max;
  307. if(cmd.maxScan) command.maxScan = cmd.maxScan;
  308. if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS;
  309. // Flags
  310. if(typeof cmd.awaitData == 'boolean') command.awaitData = cmd.awaitData;
  311. if(typeof cmd.snapshot == 'boolean') command.snapshot = cmd.snapshot;
  312. if(typeof cmd.tailable == 'boolean') command.tailable = cmd.tailable;
  313. if(typeof cmd.oplogReplay == 'boolean') command.oplogReplay = cmd.oplogReplay;
  314. if(typeof cmd.noCursorTimeout == 'boolean') command.noCursorTimeout = cmd.noCursorTimeout;
  315. if(typeof cmd.partial == 'boolean') command.partial = cmd.partial;
  316. if(typeof cmd.showDiskLoc == 'boolean') command.showRecordId = cmd.showDiskLoc;
  317. // Read Concern
  318. if(cmd.readConcern) command.readConcern = cmd.readConcern;
  319. // Override method
  320. if(cmd.explain) command.explain = cmd.explain;
  321. if(cmd.exhaust) command.exhaust = cmd.exhaust;
  322. // If we have a explain flag
  323. if(cmd.explain) {
  324. // Create fake explain command
  325. command = {
  326. explain: command,
  327. verbosity: 'allPlansExecution'
  328. }
  329. // Set readConcern on the command if available
  330. if(cmd.readConcern) command.readConcern = cmd.readConcern
  331. // Set up the _explain name for the command
  332. x = '_explain';
  333. }
  334. } else {
  335. command = cmd;
  336. }
  337. // Set up the connection
  338. var connectionId = null;
  339. // Set local connection
  340. if(this.connection) connectionId = this.connection;
  341. if(!connectionId && this.server && this.server.getConnection) connectionId = this.server.getConnection();
  342. // Get the command Name
  343. var commandName = x == '_find' ? Object.keys(command)[0] : commandTranslation[x];
  344. // Emit the start event for the command
  345. command = {
  346. // Returns the command.
  347. command: command,
  348. // Returns the database name.
  349. databaseName: db,
  350. // Returns the command name.
  351. commandName: commandName,
  352. // Returns the driver generated request id.
  353. requestId: requestId,
  354. // Returns the driver generated operation id.
  355. // This is used to link events together such as bulk write operations. OPTIONAL.
  356. operationId: this.operationId,
  357. // Returns the connection id for the command. For languages that do not have this,
  358. // this MUST return the driver equivalent which MUST include the server address and port.
  359. // The name of this field is flexible to match the object that is returned from the driver.
  360. connectionId: connectionId
  361. };
  362. // Get the aruments
  363. var args = Array.prototype.slice.call(arguments, 0);
  364. // Get the callback
  365. var callback = args.pop();
  366. // We do not have a callback but a Promise
  367. if(typeof callback == 'function' || command.commandName == 'killCursors') {
  368. var startTime = timestampGenerator.current();
  369. // Emit the started event
  370. self.emit('started', command)
  371. // Emit succeeded event with killcursor if we have a legacy protocol
  372. if(command.commandName == 'killCursors'
  373. && this.server.lastIsMaster()
  374. && this.server.lastIsMaster().maxWireVersion < 4) {
  375. // Emit the succeeded command
  376. command = {
  377. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  378. commandName: commandName,
  379. requestId: requestId,
  380. operationId: cursor.operationId,
  381. connectionId: cursor.server.getConnection(),
  382. reply: [{ok:1}]
  383. };
  384. // Apply callback to the list of args
  385. args.push(callback);
  386. // Apply the call
  387. func.apply(this, args);
  388. // Emit the command
  389. return self.emit('succeeded', command)
  390. }
  391. // Add our callback handler
  392. args.push(function(err, r) {
  393. if(err) {
  394. // Command
  395. var command = {
  396. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  397. commandName: commandName,
  398. requestId: requestId,
  399. operationId: ourOpId,
  400. connectionId: cursor.server.getConnection(),
  401. failure: err };
  402. // Emit the command
  403. self.emit('failed', command)
  404. } else {
  405. // Do we have a getMore
  406. if(commandName.toLowerCase() == 'getmore' && r == null) {
  407. r = {
  408. cursor: {
  409. id: cursor.cursorState.cursorId,
  410. ns: cursor.ns,
  411. nextBatch: cursor.cursorState.documents
  412. }, ok:1
  413. }
  414. } else if(commandName.toLowerCase() == 'find' && r == null) {
  415. r = {
  416. cursor: {
  417. id: cursor.cursorState.cursorId,
  418. ns: cursor.ns,
  419. firstBatch: cursor.cursorState.documents
  420. }, ok:1
  421. }
  422. } else if(commandName.toLowerCase() == 'killcursors' && r == null) {
  423. r = {
  424. cursorsUnknown:[cursor.cursorState.lastCursorId],
  425. ok:1
  426. }
  427. }
  428. // cursor id is zero, we can issue success command
  429. command = {
  430. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  431. commandName: commandName,
  432. requestId: requestId,
  433. operationId: cursor.operationId,
  434. connectionId: cursor.server.getConnection(),
  435. reply: r && r.result ? r.result : r
  436. };
  437. // Emit the command
  438. self.emit('succeeded', command)
  439. }
  440. // Return
  441. if(!callback) return;
  442. // Return to caller
  443. callback(err, r);
  444. });
  445. // Apply the call
  446. func.apply(this, args);
  447. } else {
  448. // Assume promise, push back the missing value
  449. args.push(callback);
  450. // Get the promise
  451. var promise = func.apply(this, args);
  452. // Return a new promise
  453. return new cursor.s.promiseLibrary(function(resolve, reject) {
  454. var startTime = timestampGenerator.current();
  455. // Emit the started event
  456. self.emit('started', command)
  457. // Execute the function
  458. promise.then(function() {
  459. // cursor id is zero, we can issue success command
  460. var command = {
  461. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  462. commandName: commandName,
  463. requestId: requestId,
  464. operationId: cursor.operationId,
  465. connectionId: cursor.server.getConnection(),
  466. reply: cursor.cursorState.documents
  467. };
  468. // Emit the command
  469. self.emit('succeeded', command)
  470. }).catch(function(err) {
  471. // Command
  472. var command = {
  473. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  474. commandName: commandName,
  475. requestId: requestId,
  476. operationId: ourOpId,
  477. connectionId: cursor.server.getConnection(),
  478. failure: err };
  479. // Emit the command
  480. self.emit('failed', command)
  481. // reject the promise
  482. reject(err);
  483. });
  484. });
  485. }
  486. }
  487. });
  488. });
  489. }
  490. inherits(Instrumentation, EventEmitter);
  491. Instrumentation.prototype.uninstrument = function() {
  492. for(var i = 0; i < this.overloads.length; i++) {
  493. var obj = this.overloads[i];
  494. obj.proto[obj.name] = obj.func;
  495. }
  496. // Remove all listeners
  497. this.removeAllListeners('started');
  498. this.removeAllListeners('succeeded');
  499. this.removeAllListeners('failed');
  500. }
  501. module.exports = Instrumentation;