RxJs使用WebSocket可观察

RxJs使用WebSocket可观察

问题描述:

我的角度应用程序使用websocket与后端进行通信。RxJs使用WebSocket可观察

在我的测试案例中,我有2个客户端组件。 Observable timer按预期打印两个不同的客户端ID。

每个ngOnInit()还会打印其客户端的ID。

现在由于某种原因,websocketService.observeClient()的订阅被称为每个消息2次,但this.client.id总是打印第二个客户端的值。

我的继承人客户端组件

@Component({ 
... 
}) 
export class ClientComponent implements OnInit { 

    @Input() client: Client; 

    constructor(public websocketService: WebsocketService) { 
    Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id)); 
    } 

    ngOnInit() { 

    console.log(this.client.id); 
    this.websocketService.observeClient().subscribe(data => { 
     console.log('message', this.client.id); 
    }); 

    } 

} 

而且我的WebSocket服务

@Injectable() 
export class WebsocketService { 

    private observable: Observable<MessageEvent>; 
    private observer: Subject<Message>; 

    constructor() { 

    const socket = new WebSocket('ws://localhost:9091'); 

    this.observable = Observable.create(
     (observer: Observer<MessageEvent>) => { 
     socket.onmessage = observer.next.bind(observer); 
     socket.onerror = observer.error.bind(observer); 
     socket.onclose = observer.complete.bind(observer); 
     return socket.close.bind(socket); 
     } 
    ); 

    this.observer = Subject.create({ 
     next: (data: Message) => { 
     if (socket.readyState === WebSocket.OPEN) { 
      socket.send(JSON.stringify(data)); 
     } 
     } 
    }); 

    } 

    observeClient(): Observable<MessageEvent> { 
    return this.observable; 
    } 

} 

编辑

好,据我已经阅读它的事实,做观测量是单播对象,我必须使用主题,但我不知道如何创建主题。

+0

你确定这个问题是不是在你的'providers'配置?从你的描述看来,你希望每个客户端组件都有自己的'WebsocketService'实例。 – martin

+0

不应该有一个注入websocketService连接到后端 – Pascal

您需要使用share运营商在订户之间共享它。

this.observable = Observable.create(
    (observer: Observer<MessageEvent>) => { 
     socket.onmessage = observer.next.bind(observer); 
     socket.onerror = observer.error.bind(observer); 
     socket.onclose = observer.complete.bind(observer); 
     return socket.close.bind(socket); 
    } 
).share(); 

此外,请确保此服务是单身。

文件:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/share.md

由于rxjs 5,你可以使用内置的WebSocket功能,为您创建的主题。当您在错误发生后重新订阅流时,它也会重新连接。请参考这个答案:

https://*.com/a/44067972/552203

TLDR:

let subject = Observable.webSocket('ws://localhost:8081'); 
subject 
    .retry() 
    .subscribe(
     (msg) => console.log('message received: ' + msg), 
     (err) => console.log(err), 
    () => console.log('complete') 
    ); 
subject.next(JSON.stringify({ op: 'hello' }));