用于RxJava的泛型Observable的TestScheduler
问题描述:
我开始使用TestScheduler
。如果我用这个代替用于RxJava的泛型Observable的TestScheduler
@Test
public void test1() throws Exception {
//when
TestScheduler scheduler = new TestScheduler();
TestObserver<Long> subscriber = new TestObserver<>();
//run
Observable
.interval(1L, TimeUnit.SECONDS, scheduler)
.subscribeWith(subscriber);
//check
scheduler.advanceTimeBy(200, TimeUnit.SECONDS);
assertEquals(200, subscriber.valueCount());
}
:
@Test
public void test2() throws Exception {
//when
TestScheduler scheduler = new TestScheduler();
TestObserver<Long> subscriber = new TestObserver<>();
//run
Observable
.interval(1L, TimeUnit.SECONDS)
.observeOn(scheduler)
.subscribeOn(scheduler)
.subscribeWith(subscriber);
//check
scheduler.advanceTimeBy(200, TimeUnit.SECONDS);
assertEquals(200, subscriber.valueCount());
}
测试失败,因为用户没有被调用一切正常,像这样的东西。
我找到的所有例子都使用TestScheduler
和Observable.interval
,并将调度器传递给工厂方法,就像我在第一个例子中那样。 我不能使用这种方法的原因是,在真正的应用程序中,observables并不像这样简单,我无法通过调度程序。 我认为设置Scheduler
就像我在第二个例子中做的那样很好,但看起来不是。
对于更通用的Observable,使用TestScheduler的正确方法是什么?
如果不使用TestScheduler我能成功使用这些方法:如果你想覆盖标准的调度
@Test
public void test3() throws Exception {
//when
Scheduler trampoline = Schedulers.trampoline();
//run
TestObserver<Long> test = Observable
.interval(1L, TimeUnit.SECONDS)
.observeOn(trampoline)
.subscribeOn(trampoline)
.test();
//check
test.await(3100,TimeUnit.MILLISECONDS);
assertEquals(3, test.valueCount());
}
@Test
public void test4() throws Exception {
//when
Scheduler trampoline = Schedulers.trampoline();
//run
TestObserver<Long> test = Observable
.fromArray(1L, 2L, 3L)
.subscribeOn(trampoline)
.observeOn(trampoline)
.test();
//check
assertEquals(3, test.valueCount());
}
@Test
public void test5() throws Exception {
//when
Scheduler trampoline = Schedulers.trampoline();
//run
TestObserver<Long> test = Observable
.fromArray(1L, 2L, 3L)
.subscribeOn(trampoline)
.observeOn(AndroidSchedulers.mainThread())
.test();
//check
test.awaitTerminalEvent();
assertEquals(3, test.valueCount());
}
编辑
没有什么区别,如果我使用
@BeforeClass
public static void setupClass() {
mScheduler = new TestScheduler();
RxAndroidPlugins.setInitMainThreadSchedulerHandler(__ -> mScheduler);
RxJavaPlugins.setIoSchedulerHandler(__ -> mScheduler);
}
@Test
public void test2() throws Exception {
//when
TestObserver<Long> subscriber = new TestObserver<>();
//run
Observable
.interval(1L, TimeUnit.SECONDS)
.observeOn(mScheduler)
.subscribeOn(mScheduler)
.subscribeWith(subscriber);
//check
mScheduler.advanceTimeBy(200, TimeUnit.SECONDS);
assertEquals(200, subscriber.valueCount());
}
答
你可以使用(间隔)。也可以覆盖其他标准调度程序。
RxJavaPlugins.setComputationSchedulerHandler(scheduler -> testScheduler);
testScheduler将是你TestScheduler。设置插件后,您可以使用advanceTime作为test1的
例子:
@Test
// fails because interval schedules on different thread then JUnit-Runner-thread -> fall through
void notWorkingTest1() throws Exception {
TestScheduler scheduler = new TestScheduler();
TestObserver<Long> subscriber = new TestObserver<>();
Observable
.interval(1L, TimeUnit.SECONDS)
.observeOn(scheduler)
.subscribeOn(scheduler)
.subscribeWith(subscriber);
//check
scheduler.advanceTimeBy(200, TimeUnit.SECONDS);
assertEquals(200, subscriber.valueCount());
}
@Test
// not working because interval will not be scheduled on virtual time -> JUnit-Runner-Thread will close because test observable emits on different thread
void notWorkingTest2() throws Exception {
//when
TestScheduler scheduler = new TestScheduler();
//run
TestObserver<Long> test = Observable
.interval(1L, TimeUnit.SECONDS)
.observeOn(scheduler)
.subscribeOn(scheduler)
.test();
scheduler.advanceTimeBy(200, TimeUnit.SECONDS);
test.assertValueCount(200);
}
@Test
// runs sync. -> no JUnit-Runner-thread blocking needed
void workingTest() throws Exception {
TestScheduler scheduler = new TestScheduler();
RxJavaPlugins.setComputationSchedulerHandler(s -> scheduler);
TestObserver<Long> test = Observable
.interval(1L, TimeUnit.SECONDS) // executed on Schedulers.computation()
.observeOn(scheduler)
.subscribeOn(scheduler)
.test();
scheduler.advanceTimeBy(200, TimeUnit.SECONDS);
test.assertValueCount(200);
}
因为我没有使用的计算调度这不能首先的工作。无论如何,即使我重写其他调度程序,结果与我在test2中的结果相同。我编辑了我的问题。 – user6405527
请看一下interval()的文档。在那里您会看到,该间隔使用默认调度程序。默认调度程序是计算。如果用户未被呼叫,则可能使用了另一个调度程序。所以你需要阻止JUnit-Runner-Thread,该方法不会只是通过并停止JVM。 –
我为test2添加了一个示例,它正在工作并附加一些解释。 –