Angular Websocket RxJS/WebSocket主题
问题描述:
在过去,我们使用了几个angular2 websocket,但是我们对它们并不满意,使用它们有几个问题。所以我们决定尝试用我们自己的RxJs来做我们的财富。Angular Websocket RxJS/WebSocket主题
这是我们第一次尝试:
@Injectable()
export class WebSocketService{
public createWebsocket(url: string): Subject<MessageEvent> {
let socket = new WebSocket(url);
let observable = Observable.create(
(observer: Observer<MessageEvent>) => {
socket.onmessage = observer.next.bind(observer);
socket.onerror = observer.error.bind(observer);
socket.onclose = observer.complete.bind(observer);
return socket.close.bind(socket);
});
let observer = {
next: (data: Object) => {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(data));
}
}
};
return Subject.create(observer, observable);
}
}
插座被打开,工作了一段时间好。几秒钟后,浏览器关闭套接字,并在服务器端收到关闭事件。
这是封闭的原因,我们得到服务器站点: [1006] WebSocket的阅读EOF
有谁可以帮忙吗?还是有人知道如何使用WebSocketSubject?
答
我不`吨知道,如果它仍然是相关的,但我没有使用从角的WebSocket连接与https://github.com/ohjames/rxjs-websockets
一些我在那里我打电话的ServerSocketService在组件内部所做的修改的类似的东西(专供连接到基于websocket的端点)即重试机制,我使用ReplaySubject而不是示例中给出的QueuingSubject。
@Injectable()
export class ServerSocket {
// private inputStream: QueueingSubject<string>;
private inputStream: ReplaySubject<string>;
public messages: Observable<string>;
private subscription: Subscription;
private websocket: WebSocket;
public connect() {
if (this.messages) {
return;
}
console.log('inside connect');
// this.inputStream = new QueueingSubject<string>();
this.inputStream = new ReplaySubject();
// Using share() causes a single websocket to be created when the first
// observer subscribes. This socket is shared with subsequent observers
// and closed when the observer count falls to zero.
this.messages = websocketConnect(
'ws://localhost:9097/echo',
this.inputStream
).messages.share();
this.messages.retryWhen(errors => errors.delay(1000)).subscribe(message => {
console.log('error', message);
});
}
public send(message: string): void {
// If the websocket is not connected then the QueueingSubject will ensure
// that messages are queued and delivered when the websocket reconnects.
// A regular Subject can be used to discard messages sent when the websocket
// is disconnected.
this.inputStream.next(message);
}
}
然后在组件的OnInit生命周期内进行连接,订阅然后发送消息。一旦组件到达其OnDestroy生命周期,就通过取消订阅来释放资源。
+0
这看起来不错,同时我们已经适应了我们自己的websocket实现,但是如果我们有时间的话我们也会看看 –
https://gearheart.io/blog/auto-websocket-reconnection-with-rxjs/ –
@JuliaPassynkova thanx为您的链接,我已经找到了这个解决方案,这很好。这个解决方案的问题是,总是存在关闭Websocket的问题,这个解决方案在关闭连接时没有问题。但它就像是第一次连接,在服务器上,第一次连接会花费很多昂贵的东西。 因此,如果连接永远不会从浏览器端关闭,那将会更好。 所以问题是,为什么浏览器正在关闭websocket? –
由于您似乎已经完成了您自己的websocket实现,请您分享一下您自己的解决方案吗? – BlackHoleGalaxy