RxJs使用WebSocket可观察
问题描述:
我的角度应用程序使用websocket与后端进行通信。RxJs使用WebSocket可观察
在我的测试案例中,我有2个客户端组件。 Observable timer按预期打印两个不同的客户端ID。
每个ngOnInit()还会打印其客户端的ID。
现在由于某种原因,websocketService.observeClient()的订阅被称为每个消息2次,但this.client.id
总是打印第二个客户端的值。
我的继承人客户端组件
@Component({
...
})
export class ClientComponent implements OnInit {
@Input() client: Client;
constructor(public websocketService: WebsocketService) {
Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id));
}
ngOnInit() {
console.log(this.client.id);
this.websocketService.observeClient().subscribe(data => {
console.log('message', this.client.id);
});
}
}
而且我的WebSocket服务
@Injectable()
export class WebsocketService {
private observable: Observable<MessageEvent>;
private observer: Subject<Message>;
constructor() {
const socket = new WebSocket('ws://localhost:9091');
this.observable = Observable.create(
(observer: Observer<MessageEvent>) => {
socket.onmessage = observer.next.bind(observer);
socket.onerror = observer.error.bind(observer);
socket.onclose = observer.complete.bind(observer);
return socket.close.bind(socket);
}
);
this.observer = Subject.create({
next: (data: Message) => {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(data));
}
}
});
}
observeClient(): Observable<MessageEvent> {
return this.observable;
}
}
编辑
好,据我已经阅读它的事实,做观测量是单播对象,我必须使用主题,但我不知道如何创建主题。
答
您需要使用share
运营商在订户之间共享它。
this.observable = Observable.create(
(observer: Observer<MessageEvent>) => {
socket.onmessage = observer.next.bind(observer);
socket.onerror = observer.error.bind(observer);
socket.onclose = observer.complete.bind(observer);
return socket.close.bind(socket);
}
).share();
此外,请确保此服务是单身。
文件:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/share.md
答
由于rxjs 5,你可以使用内置的WebSocket功能,为您创建的主题。当您在错误发生后重新订阅流时,它也会重新连接。请参考这个答案:
https://*.com/a/44067972/552203
TLDR:
let subject = Observable.webSocket('ws://localhost:8081');
subject
.retry()
.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
subject.next(JSON.stringify({ op: 'hello' }));
你确定这个问题是不是在你的'providers'配置?从你的描述看来,你希望每个客户端组件都有自己的'WebsocketService'实例。 – martin
不应该有一个注入websocketService连接到后端 – Pascal