index.js 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. 'use strict';
  2. function or(option, alternate, required) {
  3. var result = option === false ? false : option || alternate;
  4. if ((required && !result) || (result && typeof result !== 'string')) {
  5. throw new TypeError(alternate + 'Event must be a string.');
  6. }
  7. return result;
  8. }
  9. module.exports = function create(Observable) {
  10. return function (stream, opts) {
  11. opts = opts || {};
  12. var complete = false;
  13. var dataListeners = [];
  14. var awaited = opts.await;
  15. var dataEvent = or(opts.dataEvent, 'data', true);
  16. var errorEvent = or(opts.errorEvent, 'error');
  17. var endEvent = or(opts.endEvent, 'end');
  18. function cleanup() {
  19. complete = true;
  20. dataListeners.forEach(function (listener) {
  21. stream.removeListener(dataEvent, listener);
  22. });
  23. dataListeners = null;
  24. }
  25. var completion = new Promise(function (resolve, reject) {
  26. function onEnd(result) {
  27. if (awaited) {
  28. awaited.then(resolve);
  29. } else {
  30. resolve(result);
  31. }
  32. }
  33. if (endEvent) {
  34. stream.once(endEvent, onEnd);
  35. } else if (awaited) {
  36. onEnd();
  37. }
  38. if (errorEvent) {
  39. stream.once(errorEvent, reject);
  40. }
  41. if (awaited) {
  42. awaited.catch(reject);
  43. }
  44. }).catch(function (err) {
  45. cleanup();
  46. throw err;
  47. }).then(function (result) {
  48. cleanup();
  49. return result;
  50. });
  51. return new Observable(function (observer) {
  52. if (!complete) {
  53. var onData = function onData(data) {
  54. observer.next(data);
  55. };
  56. stream.on(dataEvent, onData);
  57. dataListeners.push(onData);
  58. }
  59. completion
  60. .catch(function (err) {
  61. observer.error(err);
  62. })
  63. .then(function (result) {
  64. observer.complete(result);
  65. });
  66. });
  67. };
  68. };