RxJava编译下列情况下错误
问题描述:
我想尝试像下面RxJava编译下列情况下错误
int[] test = {1,2,3,4};
Observable<Integer> findAverage = Observable.fromArray(test);
averageInteger(findAverage).subscribe(System.out::println);
首先,很简单RxJava代码我遇到了Observeable.fromArray(测试),
编译错误(incompitible界)二,似乎severageInteger不能再被发现。我正在使用RxJava的2.0.8版本。
答
这是因为错误的类型findAverage的:
它应该是:
Observable<int[]> observable = Observable.fromArray(array);
与以下摇篮封装完整例如:“com.github.akarnokd:rxjava2的扩展: 0.16.4'
List<Integer> integers = Arrays.asList(1, 2, 3, 4);
Observable<Integer> integerObservable = Observable.fromIterable(integers);
Observable<Double> doubleObservable = MathObservable.averageDouble(integerObservable);
doubleObservable.subscribe(System.out::println);
实现为运营商:
@Test
public void averageDoubleOperator() throws Exception {
List<Integer> integers = Arrays.asList(1, 2, 3, 4);
Observable<Integer> integerObservable = Observable.fromIterable(integers);
integerObservable.lift(new AvgOperator())
.test()
.assertResult(2.5);
}
@Test
public void averageDoubleOperator_empty() throws Exception {
List<Integer> integers = Collections.emptyList();
Observable<Integer> integerObservable = Observable.fromIterable(integers);
integerObservable.lift(new AvgOperator())
.test()
.assertTerminated()
.assertError(IllegalStateException.class);
}
class Avg {
long count;
int sum;
Avg(long count, int sum) {
this.count = count;
this.sum = sum;
}
}
class AvgOperator implements ObservableOperator<Double, Integer> {
@Override
public Observer<? super Integer> apply(Observer<? super Double> observer) throws Exception {
return new Operation(observer);
}
class Operation implements Observer<Integer> {
private final Observer<? super Double> observer;
private Subscription s;
private long sum;
private int count;
Operation(Observer<? super Double> observer) {
this.observer = observer;
}
@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
}
@Override
public void onNext(Integer integer) {
++count;
sum = sum + integer;
}
@Override
public void onError(Throwable t) {
this.observer.onError(t);
}
@Override
public void onComplete() {
if (count == 0) {
this.onError(new IllegalStateException("Average is not defined for 0 values."));
return;
}
this.observer.onNext(sum/(double) count);
this.observer.onComplete();
}
}
}
+0
看起来像MathObservable来自rxjava2数学不受rxjava2支持 –
+0
如前所述,MathObservable取自rxjava2扩展,因为它在本地rxjava2中不受支持。您可以复制MathObservable的实现或使用reduce操作符编写自己的实现。 –
您至少需要'Integer [] test = {1,2,3,4};'因为原始数组不受RxJava支持。 – akarnokd