可观察序列(Observable)

在ReactiveX中观察者订阅到可观察序列(Observable)。当可观察序列(Observable)发送数据项或数据项序列时观察者得到触发。这种模式促进了并发操作,因为观察者等待发送数据项的过程中无需阻塞,而是为观察者创建一个哨兵,在可观察序列发送数据项时自动触发相应的行为。

In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits. This pattern facilitates concurrent operations because it does not need to block while waiting for the Observable to emit objects, but instead it creates a sentry in the form of an observer that stands ready to react appropriately at whatever future time the Observable does so.

本页解释什么是响应模式、可观察序列、观察者,以及观察者如何订阅可观察序列。其他页讲述如何使用各种可观察序列操作将可观察序列进行组合并改变他们的行为。

This page explains what the reactive pattern is and what Observables and observers are (and how observers subscribe to Observables). Other pages show how you use the variety of Observable operators to link Observables together and change their behaviors.

本文按下图进行解释。下图展示了可观察序列及其变换:

This documentation accompanies its explanations with “marble diagrams.” Here is how marble diagrams represent Observables and transformations of Observables:

可观察序列(Observable)

其他信息

背景

在软件开发过程中,你肯定希望你写的代码执行时逐步完成(同步),并多次调用(重用)。在Reactive X中,很多指令并行执行,观察者延时接收计算结果,返回顺序也是无规律的。不再是简单的调用方法,而是以可观察序列的形式定义了一种机制接收和变换计算结果数据。观察者在其上进行订阅,并提前定义好处理结果的算法,操作执行完毕后由哨兵触发算法获取和处理计算结果。

In many software programming tasks, you more or less expect that the instructions you write will execute and complete incrementally, one-at-a-time, in order as you have written them. But in ReactiveX, many instructions may execute in parallel and their results are later captured, in arbitrary order, by “observers.” Rather than callinga method, you define a mechanism for retrieving and transforming the data, in the form of an “Observable,” and then subscribe an observer to it, at which point the previously-defined mechanism fires into action with the observer standing sentry to capture and respond to its emissions whenever they are ready.

这种方式的优势是当有一组不相互依赖的任务,可以同时启动执行,而不必等待一个执行完毕在执行另一个。这样这组任务的总耗时只是这组任务中的最长耗时,而不是每个任务耗时之和。

An advantage of this approach is that when you have a bunch of tasks that are not dependent on each other, you can start them all at the same time rather than waiting for each one to finish before starting the next one — that way, your entire bundle of tasks only takes as long to complete as the longest task in the bundle.

有很多术语描述这种异步的编程和设计模型。本文用如下术语:观察者订阅可观察序列。观察者序列调用观察者方法向观察者发送数据项或发送通知。

There are many terms used to describe this model of asynchronous programming and design. This document will use the following terms: An observer subscribes to an Observable. An Observable emits items or sends notifications to its observers by calling the observers’ methods.

其他文档或上下文中,将观察者叫做订阅者、观看者或响应者。这种模型通常叫做响应者模式。

In other documents and other contexts, what we are calling an “observer” is sometimes called a “subscriber,” “watcher,” or “reactor.” This model in general is often referred to as the “reactor pattern”.

创建观察者Establishing Observers

本页使用近似的伪代码来举例,但ReactiveX有很多语言的实现。

This page uses Groovy-like pseudocode for its examples, but there are ReactiveX implementations in many languages.

一个普通的方法调用描述--不是ReactiveX典型的异步、并行调用方式,如下所示:

  1. 调用一个方法.
  2. 用一个变量存储方法的返回值.
  3. 使用变量及其新的值进行运算.

In an ordinary method call — that is, not the sort of asynchronous, parallel calls typical in ReactiveX — the flow is something like this:

  1. Call a method.
  2. Store the return value from that method in a variable.
  3. Use that variable and its new value to do something useful.

或如下所示:

Or, something like this:

// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal

异步模型大多如下所示描述:

  1. 定义一个方法使用异步调用返回的值进行计算;这个方法是观察者的一部分。
  2. 定义一个可观察序列进行异步调用
  3. 将观察者附加到可观察序列进行订阅(同时也会触发可观察序列的执行)
  4. 代码继续执行;当调用返回,观察者的方法得到异步执行的返回值并调用执行,返回数据由可观察序列发送。

In the asynchronous model the flow goes more like this:

  1. Define a method that does something useful with the return value from the asynchronous call; this method is part of the observer.
  2. Define the asynchronous call itself as an Observable.
  3. Attach the observer to that Observable by subscribing it (this also initiates the actions of the Observable).
  4. Go on with your business; whenever the call returns, the observer’s method will begin to operate on its return value or values — the items emitted by the Observable.

如下所述:

// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business

onNext, onCompleted, 和 onError

Subscribe方法连接观察者和可观察序列。观察者实现如下方法的响应动作:

onNext

可观察序列调用这个方法发送数据项。这个方法的参数是可观察序列要发送的数据项。

onError

可观察序列调用这个方法指示生成期望数据失败,或发送了一些错误。这将不会在调用onNext或onCompleted。onError方法的参数指定了发生的错误对象。

onCompleted

可观察序列调用onNext后调用这个方法结束运行(如果没有发生错误)。

The Subscribe method is how you connect an observer to an Observable. Your observer implements some subset of the following methods:

onNext

An Observable calls this method whenever the Observable emits an item. This method takes as a parameter the item emitted by the Observable.

onError

An Observable calls this method to indicate that it has failed to generate the expected data or has encountered some other error. It will not make further calls to onNext or onCompleted. The onError method takes as its parameter an indication of what caused the error.

onCompleted

An Observable calls this method after it has called onNext for the final time, if it has not encountered any errors.

根据可观察序列的约定,需要调用onNext一次或多次,而后可能会调用onCompleted或onError,但不会同时调用,作为最后的调用结束。按惯例,在本文中调用onNext方法叫做发送数据项,调用onCompleted或onError叫做通知。

By the terms of the Observable contract, it may call onNext zero or more times, and then may follow those calls with a call to either onCompleted or onError but not both, which will be its last call. By convention, in this document, calls to onNext are usually called “emissions” of items, whereas calls to onCompleted or onErrorare called “notifications.”

完整的subscribe调用如下所示:

A more complete subscribe call example looks like this:

def myOnNext     = { item -> /* do something useful with item */ };
def myError      = { throwable -> /* react sensibly to a failed call */ };
def myComplete   = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business

更多见

注销订阅(Unsubscribing)

在有些ReactiveX实现中,有一个特定的观察者接口,Subscribe,实现了一个unsubscribe方法。可以调用这个方法指示订阅者不再关注任一个已订阅的观察者序列。观察者序列可以选择停止发送新的数据项(如果已经没有关联的观察者)。

In some ReactiveX implementations, there is a specialized observer interface, Subscriber, that implements an unsubscribe method. You can call this method to indicate that the Subscriber is no longer interested in any of the Observables it is currently subscribed to. Those Observables can then (if they have no other interested observers) choose to stop generating new items to emit.

注销订阅的结果将推迟反馈到观察者订阅的可观察序列的操作链,可能使每个链接停止发送数据。不保证立即生效,但可观察序列也可能生成并尝试发送数据项,即使没有观察者也会保留这个数据项。

The results of this unsubscription will cascade back through the chain of operators that applies to the Observable that the observer subscribed to, and this will cause each link in the chain to stop emitting items. This is not guaranteed to happen immediately, however, and it is possible for an Observable to generate and attempt to emit items for a while even after no observers remain to observe these emissions.

一些命名约定说明(Some Notes on Naming Conventions)

ReactiveX每种语言实现都有自己的约定。没有固定的命名标准,虽然每种实现间有部分公用标准。

因此在不同的实现上下文命名不尽相同,或有些特定实现中的命名不够自然。

例如onEvent命名规则(如onNext、onCompleted、onError)。有些上下文命名应该暗示方法被注册到事件处理句柄。然而在ReactiveX中,独立命名事件句柄。

Each language-specific implementation of ReactiveX has its own naming quirks. There is no canonical naming standard, though there are many commonalities between implementations.

Furthermore, some of these names have different implications in other contexts, or seem awkward in the idiom of a particular implementing language.

For example there is the onEvent naming pattern (e.g. onNextonCompletedonError). In some contexts such names would indicate methods by means of which event handlers are registered. In ReactiveX, however, they name the event handlers themselves.

热观察者序列和冷观察者序列

观察者序列什么时候发送数据项序列呢?这依赖于观察者序列。热观察者序列创建后就开始发送数据项,以后订阅的观察者可能在序列的中间部位开始观察。而冷观察者序列,创建后进行等待,直到有观察者进行订阅才会发送数据项,可以保证一个观察者能够从头开始观察序列。

在有些ReactiveX实现中,有些观察者序列叫做可连接的观察者序列。观察者序列不会发送数据项,直到调用了Connect方法,无论是否有观察者进行了订阅。

When does an Observable begin emitting its sequence of items? It depends on the Observable. A “hot” Observable may begin emitting items as soon as it is created, and so any observer who later subscribes to that Observable may start observing the sequence somewhere in the middle. A “cold” Observable, on the other hand, waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning.

In some implementations of ReactiveX, there is also something called a “Connectable” Observable. Such an Observable does not begin emitting items until its Connect method is called, whether or not any observers have subscribed to it.

提供可观察序列操作进行组合Composition via Observable Operators

可观察序列和观察者只是ReactiveX的开始。其自身只是观察者模式的简单扩展,更多用于处理一系列事件而不是单个回调。

真正的威力来自于”响应式扩展“(及ReactiveX)--用于变换、组合、操作观察者序列发送的数据项的操作符。

这些Rx运算符允许您以声明的方式将异步序列组合在一起,具有回调的所有效率优点,但是没有嵌套回调处理程序的缺点,这些回调处理程序通常与异步系统相关联。

Observables and observers are only the start of ReactiveX. By themselves they’d be nothing more than a slight extension of the standard observer pattern, better suited to handling a sequence of events rather than a single callback.

The real power comes with the “reactive extensions” (hence “ReactiveX”) — operators that allow you to transform, combine, manipulate, and work with the sequences of items emitted by Observables.

These Rx operators allow you to compose asynchronous sequences together in a declarative manner with all the efficiency benefits of callbacks but without the drawbacks of nesting callback handlers that are typically associated with asynchronous systems.

本文将这些操作符及其范例进行汇总,内容如下:

This documentation groups information about the various operators and examples of their usage into the following pages:

Creating Observables

CreateDeferEmpty/Never/ThrowFromIntervalJustRangeRepeatStart, and Timer

Transforming Observable Items

BufferFlatMapGroupByMapScan, and Window

Filtering Observables

DebounceDistinctElementAtFilterFirstIgnoreElementsLastSampleSkipSkipLastTake, and TakeLast

Combining Observables

And/Then/WhenCombineLatestJoinMergeStartWithSwitch, and Zip

Error Handling Operators

Catch and Retry

Utility Operators

DelayDoMaterialize/DematerializeObserveOnSerializeSubscribeSubscribeOn,TimeIntervalTimeoutTimestamp, and Using

Conditional and Boolean Operators

AllAmbContainsDefaultIfEmptySequenceEqualSkipUntilSkipWhileTakeUntil, and TakeWhile

Mathematical and Aggregate Operators

AverageConcatCountMaxMinReduce, and Sum

Converting Observables

To

Connectable Observable Operators

ConnectPublishRefCount, and Replay

Backpressure Operators

a variety of operators that enforce particular flow-control policies

这些页面包括关于一些操作符的信息,这些操作符不是ReactiveX核心的一部分,而是在一个或多个特定于语言的实现和/或可选模块中实现的。

These pages include information about some operators that are not part of the core of ReactiveX but are implemented in one or more of language-specific implementations and/or optional modules.

操作链(Chaining Operators)

大多数操作符操作可观察序列并返回可观察序列。这允许您应用这些运算符组成操作链。链中的每个操作符修改了先前操作符的操作可观察序列的结果。

还有其他模式,如Builder模式,其中特定类的各种方法通过方法的操作修改该对象来对同一类的项进行操作。这些模式还允许您以类似的方式链接这些方法。但是,虽然在Builder模式中,方法在链中出现的顺序通常并不重要,但可观察操作符的顺序很重要。

可观察序列操作链不独立地在源自该链的原始可观察序列对象上操作,而是依次执行,每个操作符在链中紧邻之前由操作符生成的可观察序列上操作。

Most operators operate on an Observable and return an Observable. This allows you to apply these operators one after the other, in a chain. Each operator in the chain modifies the Observable that results from the operation of the previous operator.

There are other patterns, like the Builder Pattern, in which a variety of methods of a particular class operate on an item of that same class by modifying that object through the operation of the method. These patterns also allow you to chain the methods in a similar way. But while in the Builder Pattern, the order in which the methods appear in the chain does not usually matter, with the Observable operators order matters.

A chain of Observable operators do not operate independently on the original Observable that originates the chain, but they operate in turn, each one operating on the Observable generated by the operator immediately previous in the chain.