rxjs/Redux的观察者:observable.retry重新建立连接
我在我建立一个应用程序使用elixir phoenix websocket和我有一个看起来像这样的史诗:rxjs/Redux的观察者:observable.retry重新建立连接
const socketObservable = Observable.create((observer: Object) => {
const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: {
token: readSession(),
} });
socket.connect();
socket.onOpen(() =>
observer.next({ type: SOCKET_CONNECTED, socket }),
);
socket.onError((error) =>
observer.error({ type: WEBSOCKET_ERROR, error }),
);
return() => {
// socket.disconnect();
};
});
const connectToSocket = (
action$: Object,
) => action$.ofType(CONNECT_TO_SOCKET)
.switchMap(() =>
socketObservable
.catch((error) => Observable.of(error)),
)
.retry();
export default connectToSocket;
我想当网络连接通过发出{ type: WEBSOCKET_ERROR, error }
而消失时通知用户,并在通过发出{ type: SOCKET_CONNECTED, socket }
重新建立连接时删除通知。那么我得到了第一部分工作,但是当重新连接发生时,{ type: SOCKET_CONNECTED, socket }
从不发送。使用终极版,传奇,我可以使用下面的代码,使这项工作:
const connectToSocket =(): Object =>
eventChannel((emitter: (Object) => mixed) => {
const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: {
token: readSession(),
} });
socket.connect();
socket.onOpen(() => emitter({ socket }));
socket.onError((error) => {
emitter({ error });
});
return() => {
// socket.disconnect();
};
});
export function* callConnectToSocket(): Generator<IOEffect, *, *> {
const chan = yield call(connectToSocket);
while (true) {
const { socket, error } = yield take(chan);
if (socket) {
yield put({ type: SOCKET_CONNECTED, socket });
} else {
yield put({ error, type: WEBSOCKET_ERROR });
}
}
}
export function* watchConnectToSocket(): Generator<IOEffect, *, *> {
yield takeLatest(CONNECT_TO_SOCKET, callConnectToSocket);
}
对于rxjs代码,我以为在链的末端套结.retry()
应该触发我的源代码的重试观察到,如果en错误按照documentation for rxjs Observable.retry发出,但可能我并不真正了解retry
应该做什么或者如何正确使用它。可能有人可以帮助实现我想要的。
对于retry
算子生效,其源观察值必须产生一个错误。而且在您的示例中,似乎错误通知从未达到retry
,因为它被从错误中恢复的catch
运算符吞噬。
要使其工作,你可以尝试让catch
运营商返回一个可观察的,首先发出一个动作,然后产生一个错误:
const connectToSocket = action$ =>
actions$.ofType(CONNECT_TO_SOCKET)
.switchMap(() => socketObservable
.catch(error => Observable.of(error).concat(Observable.throw(error)))
)
.retry();
更新:
我认为这是值得一提的是,Rx
遵循语法next* (complete|error)?
,这意味着next()
呼叫在同一观察者上的error()
之后将不起作用。因此,如果socket
从错误中恢复并且在执行onError
后执行onOpen
回调,SOCKET_CONNECTED
通知将不会到达使用者。
这可以通过两种替代error
与next
通知或重新发生错误socketObservable
每一次,这意味着一个新的socket
实例将被创建来处理可能(但是这可能不是你想要的)。
下面是一个可运行的代码示例演示如何retry
可能的工作:
const { createStore, applyMiddleware } = Redux;
const { createEpicMiddleware } = ReduxObservable;
const socketObservable = Rx.Observable.create(observer => {
const t1 = setTimeout(() => observer.next({ type: "SOCKET_CONNECTED" }), 200);
const t2 = setTimeout(() => observer.error({ type: "SOCKET_ERROR" }), 400);
return() => {
clearTimeout(t1);
clearTimeout(t2);
};
})
const connectToSocket = action$ => action$
.do(action => console.log(action))
.ofType("CONNECT_TO_SOCKET")
.switchMap(() => socketObservable
.catch(error => Rx.Observable.of(error).concat(Rx.Observable.throw(error)))
// make 2 attempts to re-connect, i.e. restart socketObservable
.retry(2)
)
// recover in case if both attempts to reconnect have failed
.retry();
const store = createStore(
(state, action) => state,
applyMiddleware(createEpicMiddleware(connectToSocket)));
// dispatch CONNECT_TO_SOCKET two times
Rx.Observable.interval(2000)
.take(2)
.subscribe(x => store.dispatch({ type: "CONNECT_TO_SOCKET" }));
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>
<script src="https://unpkg.com/[email protected]/dist/redux.min.js"></script>
<script src="https://unpkg.com/[email protected]/dist/redux-observable.min.js"></script>
那么我终于放弃了在连接丢失时抛出错误并将此行更改为observer.error({ type: WEBSOCKET_ERROR, error })
至observer.next({ type: WEBSOCKET_ERROR, error })
。但我仍然想知道我在做什么错误retry
。任何帮助原代码将不胜感激。
谢谢@Sergey Karavaev。我已经尝试了您的建议,但没有奏效。 – samba6
只是想澄清一下:你想让'retry'重新建立套接字连接还是从错误中恢复?在第一种情况下,你可能试着将'retry'调用移动到内部observable上,紧接着'catch'后面。 –
我还有一个想法:你是否期望你的'socketObservable'在产生'WEBSOCKET_ERROR'之后产生'SOCKET_CONNECTED'通知?即即使在同一套接字实例上调用了'onError'之后,onOpen'回调函数可能会被调用吗?如果是这样,那么这是不可能的,因为'Rx'不允许这样做。 'Rx'遵循语法:'next *(complete | error)?',这意味着在同一个流内的'error'之后不会有其他通知。因此,无论您是仅使用“next”通知(如最终所做的那样),还是在收到“error”后重新创建'socketObservable'。 –