Angular Websocket RxJS/WebSocket主题

问题描述:

在过去,我们使用了几个angular2 websocket,但是我们对它们并不满意,使用它们有几个问题。所以我们决定尝试用我们自己的RxJs来做我们的财富。Angular Websocket RxJS/WebSocket主题

这是我们第一次尝试:

@Injectable() 
export class WebSocketService{ 

    public createWebsocket(url: string): Subject<MessageEvent> { 
     let socket = new WebSocket(url); 

     let observable = Observable.create(
            (observer: Observer<MessageEvent>) => { 
             socket.onmessage = observer.next.bind(observer); 
             socket.onerror = observer.error.bind(observer); 
             socket.onclose = observer.complete.bind(observer); 

             return socket.close.bind(socket); 
            }); 

     let observer = { 
      next: (data: Object) => { 
       if (socket.readyState === WebSocket.OPEN) { 
        socket.send(JSON.stringify(data)); 
       } 
      } 
     }; 

     return Subject.create(observer, observable); 
    } 
} 

插座被打开,工作了一段时间好。几秒钟后,浏览器关闭套接字,并在服务器端收到关闭事件。

这是封闭的原因,我们得到服务器站点: [1006] WebSocket的阅读EOF

有谁可以帮忙吗?还是有人知道如何使用WebSocketSubject?

+0

https://gearheart.io/blog/auto-websocket-reconnection-with-rxjs/ –

+0

@JuliaPassynkova thanx为您的链接,我已经找到了这个解决方案,这很好。这个解决方案的问题是,总是存在关闭Websocket的问题,这个解决方案在关闭连接时没有问题。但它就像是第一次连接,在服务器上,第一次连接会花费很多昂贵的东西。 因此,如果连接永远不会从浏览器端关闭,那将会更好。 所以问题是,为什么浏览器正在关闭websocket? –

+0

由于您似乎已经完成了您自己的websocket实现,请您分享一下您自己的解决方案吗? – BlackHoleGalaxy

我不`吨知道,如果它仍然是相关的,但我没有使用从角的WebSocket连接与https://github.com/ohjames/rxjs-websockets

一些我在那里我打电话的ServerSocketService在组件内部所做的修改的类似的东西(专供连接到基于websocket的端点)即重试机制,我使用ReplaySubject而不是示例中给出的QueuingSubject。

@Injectable() 
export class ServerSocket { 
    // private inputStream: QueueingSubject<string>; 
    private inputStream: ReplaySubject<string>; 
    public messages: Observable<string>; 
    private subscription: Subscription; 
    private websocket: WebSocket; 

    public connect() { 
     if (this.messages) { 
      return; 
     } 
     console.log('inside connect'); 
     // this.inputStream = new QueueingSubject<string>(); 
     this.inputStream = new ReplaySubject(); 

     // Using share() causes a single websocket to be created when the first 
     // observer subscribes. This socket is shared with subsequent observers 
     // and closed when the observer count falls to zero. 
     this.messages = websocketConnect(
      'ws://localhost:9097/echo', 
      this.inputStream 
     ).messages.share(); 


     this.messages.retryWhen(errors => errors.delay(1000)).subscribe(message => { 
      console.log('error', message); 
     }); 
    } 

    public send(message: string): void { 
     // If the websocket is not connected then the QueueingSubject will ensure 
     // that messages are queued and delivered when the websocket reconnects. 
     // A regular Subject can be used to discard messages sent when the websocket 
     // is disconnected. 

     this.inputStream.next(message); 

    } 
} 

然后在组件的OnInit生命周期内进行连接,订阅然后发送消息。一旦组件到达其OnDestroy生命周期,就通过取消订阅来释放资源。

+0

这看起来不错,同时我们已经适应了我们自己的websocket实现,但是如果我们有时间的话我们也会看看 –