1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- 'use strict';
- function or(option, alternate, required) {
- var result = option === false ? false : option || alternate;
- if ((required && !result) || (result && typeof result !== 'string')) {
- throw new TypeError(alternate + 'Event must be a string.');
- }
- return result;
- }
- module.exports = function create(Observable) {
- return function (stream, opts) {
- opts = opts || {};
- var complete = false;
- var dataListeners = [];
- var awaited = opts.await;
- var dataEvent = or(opts.dataEvent, 'data', true);
- var errorEvent = or(opts.errorEvent, 'error');
- var endEvent = or(opts.endEvent, 'end');
- function cleanup() {
- complete = true;
- dataListeners.forEach(function (listener) {
- stream.removeListener(dataEvent, listener);
- });
- dataListeners = null;
- }
- var completion = new Promise(function (resolve, reject) {
- function onEnd(result) {
- if (awaited) {
- awaited.then(resolve);
- } else {
- resolve(result);
- }
- }
- if (endEvent) {
- stream.once(endEvent, onEnd);
- } else if (awaited) {
- onEnd();
- }
- if (errorEvent) {
- stream.once(errorEvent, reject);
- }
- if (awaited) {
- awaited.catch(reject);
- }
- }).catch(function (err) {
- cleanup();
- throw err;
- }).then(function (result) {
- cleanup();
- return result;
- });
- return new Observable(function (observer) {
- if (!complete) {
- var onData = function onData(data) {
- observer.next(data);
- };
- stream.on(dataEvent, onData);
- dataListeners.push(onData);
- }
- completion
- .catch(function (err) {
- observer.error(err);
- })
- .then(function (result) {
- observer.complete(result);
- });
- });
- };
- };
|