123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- import { Observable } from '../Observable';
- import { tryCatch } from '../util/tryCatch';
- import { errorObject } from '../util/errorObject';
- import { AsyncSubject } from '../AsyncSubject';
- export class BoundNodeCallbackObservable extends Observable {
- constructor(callbackFunc, selector, args, context, scheduler) {
- super();
- this.callbackFunc = callbackFunc;
- this.selector = selector;
- this.args = args;
- this.context = context;
- this.scheduler = scheduler;
- }
-
-
- static create(func, selector = undefined, scheduler) {
- return function (...args) {
- return new BoundNodeCallbackObservable(func, selector, args, this, scheduler);
- };
- }
- _subscribe(subscriber) {
- const callbackFunc = this.callbackFunc;
- const args = this.args;
- const scheduler = this.scheduler;
- let subject = this.subject;
- if (!scheduler) {
- if (!subject) {
- subject = this.subject = new AsyncSubject();
- const handler = function handlerFn(...innerArgs) {
- const source = handlerFn.source;
- const { selector, subject } = source;
- const err = innerArgs.shift();
- if (err) {
- subject.error(err);
- }
- else if (selector) {
- const result = tryCatch(selector).apply(this, innerArgs);
- if (result === errorObject) {
- subject.error(errorObject.e);
- }
- else {
- subject.next(result);
- subject.complete();
- }
- }
- else {
- subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs);
- subject.complete();
- }
- };
-
- handler.source = this;
- const result = tryCatch(callbackFunc).apply(this.context, args.concat(handler));
- if (result === errorObject) {
- subject.error(errorObject.e);
- }
- }
- return subject.subscribe(subscriber);
- }
- else {
- return scheduler.schedule(dispatch, 0, { source: this, subscriber, context: this.context });
- }
- }
- }
- function dispatch(state) {
- const self = this;
- const { source, subscriber, context } = state;
-
- const { callbackFunc, args, scheduler } = source;
- let subject = source.subject;
- if (!subject) {
- subject = source.subject = new AsyncSubject();
- const handler = function handlerFn(...innerArgs) {
- const source = handlerFn.source;
- const { selector, subject } = source;
- const err = innerArgs.shift();
- if (err) {
- self.add(scheduler.schedule(dispatchError, 0, { err, subject }));
- }
- else if (selector) {
- const result = tryCatch(selector).apply(this, innerArgs);
- if (result === errorObject) {
- self.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject }));
- }
- else {
- self.add(scheduler.schedule(dispatchNext, 0, { value: result, subject }));
- }
- }
- else {
- const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs;
- self.add(scheduler.schedule(dispatchNext, 0, { value, subject }));
- }
- };
-
- handler.source = source;
- const result = tryCatch(callbackFunc).apply(context, args.concat(handler));
- if (result === errorObject) {
- self.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject }));
- }
- }
- self.add(subject.subscribe(subscriber));
- }
- function dispatchNext(arg) {
- const { value, subject } = arg;
- subject.next(value);
- subject.complete();
- }
- function dispatchError(arg) {
- const { err, subject } = arg;
- subject.error(err);
- }
|