Angular2/Websocket:如何返回传入websocket消息的可观察值

Angular2/Websocket:如何返回传入websocket消息的可观察值

问题描述:

我打算使用Angular2接收websocket传入消息并根据收到的消息更新网页。现在,我正在使用虚拟回声websocket服务并将取代它。Angular2/Websocket:如何返回传入websocket消息的可观察值

从我的理解来看,接收websocket消息的函数必须返回一个由更新网页的处理程序订阅的observable。但我无法弄清楚如何返回一个observable。

代码片段附在下面。 MonitorService创建一个websocket连接并返回一个包含收到消息的observable。

@Injectable() 
export class MonitorService { 

    private actionUrl: string; 
    private headers: Headers; 
    private websocket: any; 
    private receivedMsg: any; 
    constructor(private http: Http, private configuration: AppConfiguration) { 

     this.actionUrl = configuration.BaseUrl + 'monitor/'; 
     this.headers = new Headers(); 
     this.headers.append('Content-Type', 'application/json'); 
     this.headers.append('Accept', 'application/json'); 
    } 

    public GetInstanceStatus =(): Observable<Response> => { 
     this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service 
     this.websocket.onopen = (evt) => { 
      this.websocket.send("Hello World"); 
     }; 

     this.websocket.onmessage = (evt) => { 
      this.receivedMsg = evt; 
     }; 

     return new Observable(this.receivedMsg).share(); 
    } 

} 

下面是另一个组件,它订阅从上面返回的observable并相应地更新网页。

export class InstanceListComponent { 
    private instanceStatus: boolean 
    private instanceName: string 
    private instanceIcon: string 
    constructor(private monitor: MonitorService) { 
    this.monitor.GetInstanceStatus().subscribe((result) => { 
     this.setInstanceProperties(result); 
    }); 
    } 

    setInstanceProperties(res:any) { 
    this.instanceName = res.Instance.toUpperCase(); 
    this.instanceStatus = res.Status; 
    if (res.Status == true) 
    { 
     this.instanceIcon = "images/icon/healthy.svg#Layer_1"; 
    } else { 
     this.instanceIcon = "images/icon/cancel.svg#cancel"; 
    } 
    } 
} 

现在,我遇到了此问题在浏览器控制台 TypeError: this._subscribe is not a function

我把它放在一个plunker,我增加了一个功能,用于发送消息给WebSocket的端点。这里是重要的编辑:

public GetInstanceStatus(): Observable<any>{ 
    this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service 
    this.websocket.onopen = (evt) => { 
     this.websocket.send("Hello World"); 
    }; 
    return Observable.create(observer=>{ 
     this.websocket.onmessage = (evt) => { 
      observer.next(evt); 
     }; 
    }) 
    .share(); 
} 

更新
正如您在您的评论中提到,一个更好的替代方法是使用Observable.fromEvent()

websocket = new WebSocket("ws://echo.websocket.org/"); 
public GetInstanceStatus(): Observable<Event>{ 
    return Observable.fromEvent(this.websocket,'message'); 
} 

plunker exampleObservable.fromEvent();

另外,还可以用它做WebSocketSubject,虽然它看起来并不像它准备好了(如rc.4的):

constructor(){ 
    this.websocket = WebSocketSubject.create("ws://echo.websocket.org/"); 
} 

public sendMessage(text:string){ 
    let msg = {msg:text}; 
    this.websocket.next(JSON.stringify(msg)); 
} 

plunker example

+0

谢谢!并且,事实证明,像这样返回observable也可以工作'return Observable.fromEvent(websocket,'message');'你对它们的区别有什么想法吗? –

+0

@BingLu他们几乎一样。 'fromEvent'方式只是这个事件的一个包装,这正是我在我的答案中所做的。我怀疑是否有任何区别。但是,这是比我的回答更好的方法:D。我会更新它我的答案包括它。谢谢 – Abdulrahman

+0

@Abdulrahman我知道这是很久以前,但我想知道你是否可以更新Plunker来使用'fromEvent'。顺便说一句,不应该'WebSocketSubject'用于流数据? (我只是想知道,我是websockets的新手)。 – brians69

获取的onMessage从套接字数据。

import { Injectable } from '@angular/core'; 
import {Observable} from 'rxjs/Rx'; 


@Injectable() 

export class HpmaDashboardService { 
private socketUrl: any = 'ws://127.0.0.0/util/test/dataserver/ws'; 
private websocket: any;  

public GetAllInstanceStatus(objStr): Observable<any> { 
     this.websocket = new WebSocket(this.socketUrl); 
     this.websocket.onopen = (evt) => { 
     this.websocket.send(JSON.stringify(objStr)); 
     }; 
     return Observable.create(observer => { 
     this.websocket.onmessage = (evt) => { 
      observer.next(evt); 
     }; 
     }).map(res => res.data).share(); 
} 

**Get only single mesage from socket.** 

    public GetSingleInstanceStatus(objStr): Observable<any> { 
     this.websocket = new WebSocket(this.socketUrl); 
     this.websocket.onopen = (evt) => { 
     this.websocket.send(JSON.stringify(objStr)); 
     }; 
     return Observable.create(observer => { 
     this.websocket.onmessage = (evt) => { 
      observer.next(evt); 
      this.websocket.close(); 
     }; 
     }).map(res => res.data).share(); 
    } 

}