topology_base.js 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. "use strict";
  2. var MongoError = require('mongodb-core').MongoError
  3. , f = require('util').format;
  4. // The store of ops
  5. var Store = function(topology, storeOptions) {
  6. var self = this;
  7. var storedOps = [];
  8. storeOptions = storeOptions || {force:false, bufferMaxEntries: -1}
  9. // Internal state
  10. this.s = {
  11. storedOps: storedOps
  12. , storeOptions: storeOptions
  13. , topology: topology
  14. }
  15. Object.defineProperty(this, 'length', {
  16. enumerable:true, get: function() { return self.s.storedOps.length; }
  17. });
  18. }
  19. Store.prototype.add = function(opType, ns, ops, options, callback) {
  20. if(this.s.storeOptions.force) {
  21. return callback(MongoError.create({message: "db closed by application", driver:true}));
  22. }
  23. if(this.s.storeOptions.bufferMaxEntries == 0) {
  24. return callback(MongoError.create({message: f("no connection available for operation and number of stored operation > %s", this.s.storeOptions.bufferMaxEntries), driver:true }));
  25. }
  26. if(this.s.storeOptions.bufferMaxEntries > 0 && this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries) {
  27. while(this.s.storedOps.length > 0) {
  28. var op = this.s.storedOps.shift();
  29. op.c(MongoError.create({message: f("no connection available for operation and number of stored operation > %s", this.s.storeOptions.bufferMaxEntries), driver:true }));
  30. }
  31. return;
  32. }
  33. this.s.storedOps.push({t: opType, n: ns, o: ops, op: options, c: callback})
  34. }
  35. Store.prototype.addObjectAndMethod = function(opType, object, method, params, callback) {
  36. if(this.s.storeOptions.force) {
  37. return callback(MongoError.create({message: "db closed by application", driver:true }));
  38. }
  39. if(this.s.storeOptions.bufferMaxEntries == 0) {
  40. return callback(MongoError.create({message: f("no connection available for operation and number of stored operation > %s", this.s.storeOptions.bufferMaxEntries), driver:true }));
  41. }
  42. if(this.s.storeOptions.bufferMaxEntries > 0 && this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries) {
  43. while(this.s.storedOps.length > 0) {
  44. var op = this.s.storedOps.shift();
  45. op.c(MongoError.create({message: f("no connection available for operation and number of stored operation > %s", this.s.storeOptions.bufferMaxEntries), driver:true }));
  46. }
  47. return;
  48. }
  49. this.s.storedOps.push({t: opType, m: method, o: object, p: params, c: callback})
  50. }
  51. Store.prototype.flush = function(err) {
  52. while(this.s.storedOps.length > 0) {
  53. this.s.storedOps.shift().c(err || MongoError.create({message: f("no connection available for operation"), driver:true }));
  54. }
  55. }
  56. var primaryOptions = ['primary', 'primaryPreferred', 'nearest', 'secondaryPreferred'];
  57. var secondaryOptions = ['secondary', 'secondaryPreferred'];
  58. Store.prototype.execute = function(options) {
  59. options = options || {};
  60. // Get current ops
  61. var ops = this.s.storedOps;
  62. // Reset the ops
  63. this.s.storedOps = [];
  64. // Unpack options
  65. var executePrimary = typeof options.executePrimary === 'boolean'
  66. ? options.executePrimary : true;
  67. var executeSecondary = typeof options.executeSecondary === 'boolean'
  68. ? options.executeSecondary : true;
  69. // Execute all the stored ops
  70. while(ops.length > 0) {
  71. var op = ops.shift();
  72. if(op.t == 'cursor') {
  73. if(executePrimary && executeSecondary) {
  74. op.o[op.m].apply(op.o, op.p);
  75. } else if(executePrimary && op.o.options
  76. && op.o.options.readPreference
  77. && primaryOptions.indexOf(op.o.options.readPreference.mode) != -1) {
  78. op.o[op.m].apply(op.o, op.p);
  79. } else if(!executePrimary && executeSecondary && op.o.options
  80. && op.o.options.readPreference
  81. && secondaryOptions.indexOf(op.o.options.readPreference.mode) != -1) {
  82. op.o[op.m].apply(op.o, op.p);
  83. }
  84. } else if(op.t == 'auth') {
  85. this.s.topology[op.t].apply(this.s.topology, op.o);
  86. } else {
  87. if(executePrimary && executeSecondary) {
  88. this.s.topology[op.t](op.n, op.o, op.op, op.c);
  89. } else if(executePrimary && op.op && op.op.readPreference
  90. && primaryOptions.indexOf(op.op.readPreference.mode) != -1) {
  91. this.s.topology[op.t](op.n, op.o, op.op, op.c);
  92. } else if(!executePrimary && executeSecondary && op.op && op.op.readPreference
  93. && secondaryOptions.indexOf(op.op.readPreference.mode) != -1) {
  94. this.s.topology[op.t](op.n, op.o, op.op, op.c);
  95. }
  96. }
  97. }
  98. }
  99. Store.prototype.all = function() {
  100. return this.s.storedOps;
  101. }
  102. // Server capabilities
  103. var ServerCapabilities = function(ismaster) {
  104. var setup_get_property = function(object, name, value) {
  105. Object.defineProperty(object, name, {
  106. enumerable: true
  107. , get: function () { return value; }
  108. });
  109. }
  110. // Capabilities
  111. var aggregationCursor = false;
  112. var writeCommands = false;
  113. var textSearch = false;
  114. var authCommands = false;
  115. var listCollections = false;
  116. var listIndexes = false;
  117. var maxNumberOfDocsInBatch = ismaster.maxWriteBatchSize || 1000;
  118. var commandsTakeWriteConcern = false;
  119. var commandsTakeCollation = false;
  120. if(ismaster.minWireVersion >= 0) {
  121. textSearch = true;
  122. }
  123. if(ismaster.maxWireVersion >= 1) {
  124. aggregationCursor = true;
  125. authCommands = true;
  126. }
  127. if(ismaster.maxWireVersion >= 2) {
  128. writeCommands = true;
  129. }
  130. if(ismaster.maxWireVersion >= 3) {
  131. listCollections = true;
  132. listIndexes = true;
  133. }
  134. if(ismaster.maxWireVersion >= 5) {
  135. commandsTakeWriteConcern = true;
  136. commandsTakeCollation = true;
  137. }
  138. // If no min or max wire version set to 0
  139. if(ismaster.minWireVersion == null) {
  140. ismaster.minWireVersion = 0;
  141. }
  142. if(ismaster.maxWireVersion == null) {
  143. ismaster.maxWireVersion = 0;
  144. }
  145. // Map up read only parameters
  146. setup_get_property(this, "hasAggregationCursor", aggregationCursor);
  147. setup_get_property(this, "hasWriteCommands", writeCommands);
  148. setup_get_property(this, "hasTextSearch", textSearch);
  149. setup_get_property(this, "hasAuthCommands", authCommands);
  150. setup_get_property(this, "hasListCollectionsCommand", listCollections);
  151. setup_get_property(this, "hasListIndexesCommand", listIndexes);
  152. setup_get_property(this, "minWireVersion", ismaster.minWireVersion);
  153. setup_get_property(this, "maxWireVersion", ismaster.maxWireVersion);
  154. setup_get_property(this, "maxNumberOfDocsInBatch", maxNumberOfDocsInBatch);
  155. setup_get_property(this, "commandsTakeWriteConcern", commandsTakeWriteConcern);
  156. setup_get_property(this, "commandsTakeCollation", commandsTakeCollation);
  157. }
  158. exports.Store = Store;
  159. exports.ServerCapabilities = ServerCapabilities;