如何确保Observable流处理所有请求
问题描述:
我正在使用RxJS制作简单的网络服务器,但我不确定如何处理它。如何确保Observable流处理所有请求
如果不是我的第一个map我使用一个switchMap,服务器会调用处理程序,它返回一个可观察的。但是如果在第一个请求还在接收数据时又有另一个请求进来,那么服务器会在第一个请求完成之前切换到新的请求,并且第一个请求会简单地被丢弃,是否正确?另一方面,如果我使用concatMap,它将调用处理程序,但它不会消耗更多的连接,直到第一个连接收到所有数据并继续。这是没有意义的,因为服务器在处理快速连接(即GET请求)之前将闲置等待慢速连接(即,POST文件上载)。
我应该使用什么操作符?
Observable.fromEvent(server, 'request', (req, res) => { return { req: req, res: res } })
//should this be a concatMap or switchMap or what?
.map(state => {
state.data = [];
return Observable.fromEvent(state.req, 'data')
.takeUntil(Observable.fromEvent(state.req, 'end'))
.reduce((n, e) => { n.push(e); return n; }, [])
.map(e => {
var buf = Buffer.concat(e).toString('utf8');
try {
state.req.body = JSON.parse(state.data);
} catch (e) {
state.req.body = data;
}
return state;
});
})
.do(obs => {
//I want to do the next thing here, but first I
//have to consume the Observable, which doesn't make sense
state.do(state => {})
})
答
mergeMap
是你想要的操作。
与concatMap
不同,在连接下一个之前,它不会等待每个inner observable完成。 (事实上,concatMap
相当于mergeMap
with the concurrency
parameter set to 1
。)
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeMap – cartant
@cartant,如果你作出这样的回答,我会接受它。否则,我会做出回答:) –