download.js 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. var stream = require('stream'),
  2. util = require('util');
  3. module.exports = GridFSBucketReadStream;
  4. /**
  5. * A readable stream that enables you to read buffers from GridFS.
  6. *
  7. * Do not instantiate this class directly. Use `openDownloadStream()` instead.
  8. *
  9. * @class
  10. * @param {Collection} chunks Handle for chunks collection
  11. * @param {Collection} files Handle for files collection
  12. * @param {Object} readPreference The read preference to use
  13. * @param {Object} filter The query to use to find the file document
  14. * @param {Object} [options=null] Optional settings.
  15. * @param {Number} [options.sort=null] Optional sort for the file find query
  16. * @param {Number} [options.skip=null] Optional skip for the file find query
  17. * @param {Number} [options.start=null] Optional 0-based offset in bytes to start streaming from
  18. * @param {Number} [options.end=null] Optional 0-based offset in bytes to stop streaming before
  19. * @fires GridFSBucketReadStream#error
  20. * @fires GridFSBucketReadStream#file
  21. * @return {GridFSBucketReadStream} a GridFSBucketReadStream instance.
  22. */
  23. function GridFSBucketReadStream(chunks, files, readPreference, filter, options) {
  24. this.s = {
  25. bytesRead: 0,
  26. chunks: chunks,
  27. cursor: null,
  28. expected: 0,
  29. files: files,
  30. filter: filter,
  31. init: false,
  32. expectedEnd: 0,
  33. file: null,
  34. options: options,
  35. readPreference: readPreference
  36. };
  37. stream.Readable.call(this);
  38. }
  39. util.inherits(GridFSBucketReadStream, stream.Readable);
  40. /**
  41. * An error occurred
  42. *
  43. * @event GridFSBucketReadStream#error
  44. * @type {Error}
  45. */
  46. /**
  47. * Fires when the stream loaded the file document corresponding to the
  48. * provided id.
  49. *
  50. * @event GridFSBucketReadStream#file
  51. * @type {object}
  52. */
  53. /**
  54. * Emitted when a chunk of data is available to be consumed.
  55. *
  56. * @event GridFSBucketReadStream#data
  57. * @type {object}
  58. */
  59. /**
  60. * Fired when the stream is exhausted (no more data events).
  61. *
  62. * @event GridFSBucketReadStream#end
  63. * @type {object}
  64. */
  65. /**
  66. * Fired when the stream is exhausted and the underlying cursor is killed
  67. *
  68. * @event GridFSBucketReadStream#close
  69. * @type {object}
  70. */
  71. /**
  72. * Reads from the cursor and pushes to the stream.
  73. * @method
  74. */
  75. GridFSBucketReadStream.prototype._read = function() {
  76. var _this = this;
  77. if (this.destroyed) {
  78. return;
  79. }
  80. waitForFile(_this, function() {
  81. doRead(_this);
  82. });
  83. };
  84. /**
  85. * Sets the 0-based offset in bytes to start streaming from. Throws
  86. * an error if this stream has entered flowing mode
  87. * (e.g. if you've already called `on('data')`)
  88. * @method
  89. * @param {Number} start Offset in bytes to start reading at
  90. * @return {GridFSBucketReadStream}
  91. */
  92. GridFSBucketReadStream.prototype.start = function(start) {
  93. throwIfInitialized(this);
  94. this.s.options.start = start;
  95. return this;
  96. };
  97. /**
  98. * Sets the 0-based offset in bytes to start streaming from. Throws
  99. * an error if this stream has entered flowing mode
  100. * (e.g. if you've already called `on('data')`)
  101. * @method
  102. * @param {Number} end Offset in bytes to stop reading at
  103. * @return {GridFSBucketReadStream}
  104. */
  105. GridFSBucketReadStream.prototype.end = function(end) {
  106. throwIfInitialized(this);
  107. this.s.options.end = end;
  108. return this;
  109. };
  110. /**
  111. * Marks this stream as aborted (will never push another `data` event)
  112. * and kills the underlying cursor. Will emit the 'end' event, and then
  113. * the 'close' event once the cursor is successfully killed.
  114. *
  115. * @method
  116. * @param {GridFSBucket~errorCallback} [callback] called when the cursor is successfully closed or an error occurred.
  117. * @fires GridFSBucketWriteStream#close
  118. * @fires GridFSBucketWriteStream#end
  119. */
  120. GridFSBucketReadStream.prototype.abort = function(callback) {
  121. var _this = this;
  122. this.push(null);
  123. this.destroyed = true;
  124. if (this.s.cursor) {
  125. this.s.cursor.close(function(error) {
  126. _this.emit('close');
  127. callback && callback(error);
  128. });
  129. } else {
  130. if (!this.s.init) {
  131. // If not initialized, fire close event because we will never
  132. // get a cursor
  133. _this.emit('close');
  134. }
  135. callback && callback();
  136. }
  137. };
  138. /**
  139. * @ignore
  140. */
  141. function throwIfInitialized(self) {
  142. if (self.s.init) {
  143. throw new Error('You cannot change options after the stream has entered' +
  144. 'flowing mode!');
  145. }
  146. }
  147. /**
  148. * @ignore
  149. */
  150. function doRead(_this) {
  151. if (_this.destroyed) {
  152. return;
  153. }
  154. _this.s.cursor.next(function(error, doc) {
  155. if (_this.destroyed) {
  156. return;
  157. }
  158. if (error) {
  159. return __handleError(_this, error);
  160. }
  161. if (!doc) {
  162. _this.push(null);
  163. return _this.s.cursor.close(function(error) {
  164. if (error) {
  165. return __handleError(_this, error);
  166. }
  167. _this.emit('close');
  168. });
  169. }
  170. var bytesRemaining = _this.s.file.length - _this.s.bytesRead;
  171. var expectedN = _this.s.expected++;
  172. var expectedLength = Math.min(_this.s.file.chunkSize,
  173. bytesRemaining);
  174. if (doc.n > expectedN) {
  175. var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n +
  176. ', expected: ' + expectedN;
  177. return __handleError(_this, new Error(errmsg));
  178. }
  179. if (doc.n < expectedN) {
  180. errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n +
  181. ', expected: ' + expectedN;
  182. return __handleError(_this, new Error(errmsg));
  183. }
  184. if (doc.data.length() !== expectedLength) {
  185. if (bytesRemaining <= 0) {
  186. errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n;
  187. return __handleError(_this, new Error(errmsg));
  188. }
  189. errmsg = 'ChunkIsWrongSize: Got unexpected length: ' +
  190. doc.data.length() + ', expected: ' + expectedLength;
  191. return __handleError(_this, new Error(errmsg));
  192. }
  193. _this.s.bytesRead += doc.data.length();
  194. if (doc.data.buffer.length === 0) {
  195. return _this.push(null);
  196. }
  197. var sliceStart = null;
  198. var sliceEnd = null;
  199. var buf = doc.data.buffer;
  200. if (_this.s.bytesToSkip != null) {
  201. sliceStart = _this.s.bytesToSkip;
  202. _this.s.bytesToSkip = 0;
  203. }
  204. if (expectedN === _this.s.expectedEnd && _this.s.bytesToTrim != null) {
  205. sliceEnd = _this.s.bytesToTrim;
  206. }
  207. // If the remaining amount of data left is < chunkSize read the right amount of data
  208. if (_this.s.options.end && (
  209. (_this.s.options.end - _this.s.bytesToSkip) < doc.data.length()
  210. )) {
  211. sliceEnd = (_this.s.options.end - _this.s.bytesToSkip);
  212. }
  213. if (sliceStart != null || sliceEnd != null) {
  214. buf = buf.slice(sliceStart || 0, sliceEnd || buf.length);
  215. }
  216. _this.push(buf);
  217. })
  218. }
  219. /**
  220. * @ignore
  221. */
  222. function init(self) {
  223. var findOneOptions = {};
  224. if (self.s.readPreference) {
  225. findOneOptions.readPreference = self.s.readPreference;
  226. }
  227. if (self.s.options && self.s.options.sort) {
  228. findOneOptions.sort = self.s.options.sort;
  229. }
  230. if (self.s.options && self.s.options.skip) {
  231. findOneOptions.skip = self.s.options.skip;
  232. }
  233. self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) {
  234. if (error) {
  235. return __handleError(self, error);
  236. }
  237. if (!doc) {
  238. var identifier = self.s.filter._id ?
  239. self.s.filter._id.toString() : self.s.filter.filename;
  240. var errmsg = 'FileNotFound: file ' + identifier + ' was not found';
  241. var err = new Error(errmsg);
  242. err.code = 'ENOENT';
  243. return __handleError(self, err);
  244. }
  245. // If document is empty, kill the stream immediately and don't
  246. // execute any reads
  247. if (doc.length <= 0) {
  248. self.push(null);
  249. return;
  250. }
  251. if (self.destroyed) {
  252. // If user destroys the stream before we have a cursor, wait
  253. // until the query is done to say we're 'closed' because we can't
  254. // cancel a query.
  255. self.emit('close');
  256. return;
  257. }
  258. self.s.cursor = self.s.chunks.find({ files_id: doc._id }).sort({ n: 1 });
  259. if (self.s.readPreference) {
  260. self.s.cursor.setReadPreference(self.s.readPreference);
  261. }
  262. self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
  263. self.s.file = doc;
  264. self.s.bytesToSkip = handleStartOption(self, doc, self.s.cursor,
  265. self.s.options);
  266. self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor,
  267. self.s.options);
  268. self.emit('file', doc);
  269. });
  270. }
  271. /**
  272. * @ignore
  273. */
  274. function waitForFile(_this, callback) {
  275. if (_this.s.file) {
  276. return callback();
  277. }
  278. if (!_this.s.init) {
  279. init(_this);
  280. _this.s.init = true;
  281. }
  282. _this.once('file', function() {
  283. callback();
  284. })
  285. }
  286. /**
  287. * @ignore
  288. */
  289. function handleStartOption(stream, doc, cursor, options) {
  290. if (options && options.start != null) {
  291. if (options.start > doc.length) {
  292. throw new Error('Stream start (' + options.start + ') must not be ' +
  293. 'more than the length of the file (' + doc.length +')')
  294. }
  295. if (options.start < 0) {
  296. throw new Error('Stream start (' + options.start + ') must not be ' +
  297. 'negative');
  298. }
  299. if (options.end != null && options.end < options.start) {
  300. throw new Error('Stream start (' + options.start + ') must not be ' +
  301. 'greater than stream end (' + options.end + ')');
  302. }
  303. cursor.skip(Math.floor(options.start / doc.chunkSize));
  304. stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) *
  305. doc.chunkSize;
  306. stream.s.expected = Math.floor(options.start / doc.chunkSize);
  307. return options.start - stream.s.bytesRead;
  308. }
  309. }
  310. /**
  311. * @ignore
  312. */
  313. function handleEndOption(stream, doc, cursor, options) {
  314. if (options && options.end != null) {
  315. if (options.end > doc.length) {
  316. throw new Error('Stream end (' + options.end + ') must not be ' +
  317. 'more than the length of the file (' + doc.length +')')
  318. }
  319. if (options.start < 0) {
  320. throw new Error('Stream end (' + options.end + ') must not be ' +
  321. 'negative');
  322. }
  323. var start = options.start != null ?
  324. Math.floor(options.start / doc.chunkSize) :
  325. 0;
  326. cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
  327. stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
  328. return (Math.ceil(options.end / doc.chunkSize) * doc.chunkSize) -
  329. options.end;
  330. }
  331. }
  332. /**
  333. * @ignore
  334. */
  335. function __handleError(_this, error) {
  336. _this.emit('error', error);
  337. }