使用一个主题通过

使用一个主题通过

问题描述:

完整地传播不同的事件流我需要通过一个主题代理所有不同的事件流。使用一个主题通过

我想出了这个代码:

var mySubject, 
    getObservable; 

getObservable = function (subject, eventName) { 
    return subject 
     .asObservable() 
     .filter(function (x) { 
      return x.EventName === eventName; 
     }) 
     .flatMap(function (x) { 
      if (x.Type === 'onNext') { 
       return Rx.Observable.return(x.Data); 
      } 

      if (x.Type === 'onError') { 
       return Rx.Observable.throw(x.Data); 
      } 

      return Rx.Observable.empty(); 
     }); 
}; 

mySubject = new Rx.Subject(); 

getObservable(mySubject, 'foo') 
    .subscribe(function(x){ 
     console.log('foo onNext ' + x); 
    }, function(x){ 
     console.log('foo onError ' + x); 
    }, function(){ 
     console.log('foo onComplete'); 
    }); 

getObservable(mySubject, 'bar') 
    .subscribe(function(x){ 
     console.log('bar onNext ' + x); 
    }, function(x){ 
     console.log('bar onError ' + x); 
    }, function(){ 
     console.log('bar onComplete'); 
    }); 

mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5}); 
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'}); 

mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5}); 
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'}); 

得到输出:

foo onNext 5 

bar onNext 5 
bar onError Error message 

预期输出:

foo onNext 5 
foo onCompleted 

bar onNext 5 
bar onError Error message 

对于bar事件,这就像一个魅力:onNext将会传播d并且一旦错误发生,onError函数被调用并且事件流结束。但是,我无法让它为onComplete工作。

每当一个完整的通知提出我看到,Rx.Observable.empty()被调用,但不会导致订户onComplete处理程序被调用。相反,它调用它的onNext处理程序。

getObservable函数返回一个observable,该observable订阅通过subject发送的eventName事件。

let getObservable = function (subject, eventName) { 
    return Rx.Observable.create(function (observer) { 
     subject 
      .asObservable() 
      .filter(function(x) { 
       return x.EventName === eventName; 
      }) 
      .map(function(x) { 
       if (x.Type === 'onNext') { 
        observer.onNext(x.Data); 
       } 

       if (x.Type === 'onError') { 
        observer.onError(x.Data); 
       } 

       if (x.Type === 'onCompleted') { 
        observer.onCompleted(); 
       } 

       return x; 
      }) 
      .subscribe(); 
    }); 
}; 

这是使用来自原始质询数据的工作示例:

var mySubject, 
 
    getObservable; 
 

 
getObservable = function (subject, eventName) { 
 
    return Rx.Observable.create(function (observer) { 
 
     subject 
 
      .asObservable() 
 
      .filter(function(x) { 
 
       return x.EventName === eventName; 
 
      }) 
 
      .map(function(x) { 
 
       if (x.Type === 'onNext') { 
 
        observer.onNext(x.Data); 
 
       } 
 

 
       if (x.Type === 'onError') { 
 
        observer.onError(x.Data); 
 
       } 
 
       
 
       if (x.Type === 'onCompleted') { 
 
        observer.onCompleted(); 
 
       } 
 
       
 
       return x; 
 
      }) 
 
      .subscribe(); 
 
    }); 
 
}; 
 

 
mySubject = new Rx.Subject(); 
 

 
getObservable(mySubject, 'foo') 
 
    .subscribe(function(x){ 
 
     console.log('SomethingHappened onNext ' + x); 
 
    }, function(x){ 
 
     console.log('SomethingHappened onError ' + x); 
 
    }, function(){ 
 
     console.log('SomethingHappened onComplete'); 
 
    }); 
 

 

 
getObservable(mySubject, 'bar') 
 
    .subscribe(function(x){ 
 
     console.log('DataUpdated onNext ' + x); 
 
    }, function(x){ 
 
     console.log('DataUpdated onError ' + x); 
 
    }, function(){ 
 
     console.log('DataUpdated onComplete'); 
 
    }); 
 

 
mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5}); 
 
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'}); 
 

 
mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5}); 
 
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js'></script>

+0

伟大的工作迈克。不过,我不明白为什么我最初的尝试不起作用。 – Christoph 2012-03-06 13:30:26

在.NET Observable.SelectMany使用Observable.Merge合并到一起流引导到一个复合观察。恕我直言Observable.Merge只有在任何合并的观察结束时才会完成。

例如http://theburningmonk.com/2010/02/rx-framework-iobservable-merge/

这可能是问题的原因。

+0

是的,可以。但是你的解决方案也适用。考虑将其重构为我的更新版本,因为你的函数作为dispose函数返回一个空函数,而我的更新版本将返回dispose函数。另外,它不使用选择,但订阅做的工作。 (选择应该是免费的) – Christoph 2012-03-07 09:30:52

+0

是的,有道理。把SignalR和RxJs结合起来的好主意。我会玩弄时间。 – 2012-03-07 13:03:40

+0

如果你对它感兴趣,你可以在这里查看https://github.com/cburgdorf/SignalR.Reactive – Christoph 2012-03-07 20:20:29