RxJS:可观察对象和单观察者的递归列表
我一直在观察对象的递归链中遇到一些麻烦。RxJS:可观察对象和单观察者的递归列表
我与RxJS,这是目前在1.0.10621版本,包含最基本的Rx功能,在其中Rx jQuery的协同工作。
让我为我的问题介绍一个示例场景: 我正在轮询包含特定关键字的tweets/updates的Twitter search API(JSON响应)。响应还包括一个“refresh_url”,用于生成后续请求。对后续请求的响应将再次包含一个新的refresh_url等。
Rx.jQuery允许我使Twitter搜索API调用一个可观察事件,该事件会产生一个onNext,然后完成。我到目前为止所尝试的是让onNext处理程序记住refresh_url,并在onCompleted处理程序中使用它来为下一个请求生成一个新的observable和相应的观察者。这样,一个可观察+观察者对无限期地跟随另一个。
这种方法的问题是:当前人尚未处理完毕
随访观察的/观察者已经活着。
我必须做很多讨厌的簿记保持对目前生活的观察者,其中有可能实际上是两个有效的参考。 (一个在onCompleted中,另一个在其生命周期中的其他地方)当然,这个引用需要取消订阅/处理观察者。 一个替代簿记的方法是通过“仍然运行?” - 布尔值来实现副作用,就像我在我的例子中所做的那样。
示例代码:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function() {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function() {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
不要被观测/观察员从请求鸣叫双层上当。 我的例子主要关注第一层:从Twitter请求数据。如果在解决这个问题时第二层(将响应转换为推文)可以与第一层一起使用,那就太棒了;但我认为这是完全不同的事情。目前。
埃里克·梅蒂赫尔指出展开操作者给我(见下例),并建议Join patterns作为替代。
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => (i == 10 ? Observable.Empty<int>() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
这应该是可复制到LINQPad。它假设单身可观测量并产生一个最终观察者。
所以我的问题是:我怎样才能在RxJS最好的扩展技巧?
编辑:
扩展运算符可能可以实现,如this thread所示。但需要generators(我只有JS < 1.6)。
不幸的是RxJS 2.0.20304-beta没有实现Extend方法。
因此,我将尝试解决您的问题,与您的问题稍有不同,并采取一些*度,您可以更轻松地解决问题。
所以1件事我不能告诉的是,你尝试以下步骤
- 获取第一鸣叫列表,其中包含下一个URL
- 一旦鸣叫名单收到,onNext当前观测并获得下一组鸣叫
- 做这个无限期
还是有用户操作(获取更多/滚动到底部)。无论哪种方式,它都是同样的问题。我可能会错误地阅读你的问题。这是答案。
function getMyTweets(headUrl) {
return Rx.Observable.create(function(observer) {
innerRequest(headUrl);
function innerRequest(url) {
var next = '';
// Some magic get ajax function
Rx.get(url).subscribe(function(res) {
observer.onNext(res);
next = res.refresh_url;
},
function() {
// Some sweet handling code
// Perhaps get head?
},
function() {
innerRequest(next);
});
}
});
}
这可能不是您要求的答案。如果没有,对不起!
编辑:查看代码后,它看起来像要将结果视为数组并观察它。
// From the results perform a select then a merge (if ordering does not matter).
getMyTweets('url')
.selectMany(function(data) {
return Rx.Observable.fromArray(data.results.reverse());
});
// Ensures ordering
getMyTweets('url')
.select(function(data) {
return Rx.Observable.fromArray(data.results.reverse());
})
.concat();
我得出结论,解决这个问题的确是[Expand operator](http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9),该版本尚未在RxJS中实施至版本2.0.20304-beta。 – derabbink 2012-05-25 11:26:56
扩展正是你所需要的。我意识到这篇文章已经过时了,但可以将您需要的运营商从未来版本的Rx移植到您自己的Rx版本中。它可能需要一些操作,但代码库并没有从v1之前大幅改变。 – 2014-06-10 17:14:33
此外,升级Rx(如果可能)是一个很好的决定。最新版本中有许多错误修复和改进。 – 2014-06-10 17:15:08