如何确保Observable流处理所有请求

问题描述:

我正在使用RxJS制作简单的网络服务器,但我不确定如何处理它。如何确保Observable流处理所有请求

如果不是我的第一个map我使用一个switchMap,服务器会调用处理程序,它返回一个可观察的。但是如果在第一个请求还在接收数据时又有另一个请求进来,那么服务器会在第一个请求完成之前切换到新的请求,并且第一个请求会简单地被丢弃,是否正确?另一方面,如果我使用concatMap,它将调用处理程序,但它不会消耗更多的连接,直到第一个连接收到所有数据并继续。这是没有意义的,因为服务器在处理快速连接(即GET请求)之前将闲置等待慢速连接(即,POST文件上载)。

我应该使用什么操作符?

Observable.fromEvent(server, 'request', (req, res) => { return { req: req, res: res } }) 
    //should this be a concatMap or switchMap or what? 
    .map(state => { 
     state.data = []; 
     return Observable.fromEvent(state.req, 'data') 
      .takeUntil(Observable.fromEvent(state.req, 'end')) 
      .reduce((n, e) => { n.push(e); return n; }, []) 
      .map(e => { 
       var buf = Buffer.concat(e).toString('utf8'); 
       try { 
        state.req.body = JSON.parse(state.data); 
       } catch (e) { 
        state.req.body = data; 
       } 
       return state; 
      }); 
    }) 
    .do(obs => { 
     //I want to do the next thing here, but first I 
     //have to consume the Observable, which doesn't make sense 
     state.do(state => {}) 
    }) 
+1

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeMap – cartant

+0

@cartant,如果你作出这样的回答,我会接受它。否则,我会做出回答:) –

mergeMap是你想要的操作。

concatMap不同,在连接下一个之前,它不会等待每个inner observable完成。 (事实上​​,concatMap相当于mergeMap with the concurrency parameter set to 1。)