使用RxCpp构造观察者/可观察模式

使用RxCpp构造观察者/可观察模式

问题描述:

我试图在Rx-cpp中实现observer/observable模式。这些是Rx.Net中的一个非常有趣的tutorial,有人可以这样做。使用RxCpp构造观察者/可观察模式

在这个C#例子,有具体的interfaces,我们必须覆盖:

public interface IObserver<in T> 
{ 
    void OnCompleted(); 
    void OnError(Exception error); 
    void OnNext(T value); 
} 


public interface IObservable<out T> 
{ 
    IDisposable Subscribe(IObserver<T> observer); 
} 

据我了解,在Rx-cpp没有这样的便利。那么,是否有可能向我提供一些头部示例(myObservable.h/myObserver.h),类似于上面的interfaces,我可以用它来指导定义相同的通信模式?

任何帮助,高度赞赏, 谢谢!

编辑1: 感谢@zentrunix,我试图做一个面向类的通信。到目前为止,我已经有了可观察模式的代码。我想要的是定义一个观察者列表,这些观察者列表将被添加到可观察值中,当调用这些观察者时,应该通知这些观察者。但是,缺少一部分。

  1. 我怎样才能subscribe()这些观察员(Rx::subscribers<int>)当myObservable::Subscribe()函数被调用。
  2. 另外我怎样才能unsubscribe()
  3. 最后,如何在多个onNext观察员中看到相应的o.subscribe(onNext, onEnd);?是否有可能构建一个相应的类? (再次受到here的启发)
  4. 对不起,但这是一个有意义的组织吗?到目前为止,我正在与tutorial中提供的体系结构一起工作,这就是我沉迷于此任务的原因。我发现它是一种参与RxCpp的方式。任何意见非常感谢。 (同样对不起我的无知。)

    class myObservable { 
    
    private: 
    
    std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers; 
    
    public: 
    
    myObservable() { observers = std::make_shared<std::list<Rx::subscriber<int>>>(); }; 
    
    Rx::observable<int> Attach(std::shared_ptr<rxcpp::subscriber<int>> out) { 
    
        return Rx::observable<>::create<int>([&, out]() { 
         auto it = observers->insert(observers->end(), *out); 
         it->add([=]() { 
          observers->erase(it); 
         }); 
        }); 
    
    }; 
    
    void OnNext(int sendItem) { 
    
        for (Rx::subscriber<int> observer : *observers) { 
         (observer).on_next(sendItem); 
        } 
    } 
    
    void Disposer(Rx::subscriber<int> out) { 
    
        observers->erase(std::remove(observers->begin(), observers->end(), &out), observers->end()); 
    }; 
    }; 
    
+0

我想要做的是构造两个继承'RxCpp''observer'和'observable'函数的类。在'Rx.Net'示例中,这由'class myIObserver:IObserver '完成。这不是如何将类转换为“虚拟”,而是如何构造相应地执行与“SubjectObserver:IObserver ”和“SubjectObservable:IObservable ”相同功能的“myObserver.h”和“myObservable.h” 。感谢他的兴趣。 – Thoth

+0

我附加'c#'''interfaces'的原因是因为我应该实现相同的函数('OnComplete()','OnNext(T value)'等),其'虚拟'我没有找到。 – Thoth

+0

我认为你的问题需要重新表述。另外,我刚刚查看了一些RxCpp源代码(例如https://github.com/Reactive-Extensions/RxCpp/blob/master/Rx/v2/src/rxcpp/rx-observer.hpp),而不是看到任何看起来像界面的东西。我想你可能会以完全错误的方式来解决这个问题。 – Rook

在下面RxCpp一个很简单的例子。 尽管有(至少)一个警告:典型的RxCpp代码大量使用lambda,我不太喜欢它。

我也试图找到在互联网上的文档和教程,但找不到任何。我对关于线程模型的解释特别感兴趣。

如果您愿意通过代码和Doxygen文档进行沟通,RxCpp GitHub站点中有很多示例。

#include <iostream> 
#include <exception> 

#include "rxcpp/rx.hpp" 
namespace rx = rxcpp; 

static void onNext(int n) { std::cout << "* " << n << "\n"; } 
static void onEnd() { std::cout << "* end\n"; } 

static void onError(std::exception_ptr ep) 
{ 
    try { std::rethrow_exception(ep); } 
    catch (std::exception& e) { std::cout << "* exception " << e.what() << '\n'; } 
} 

static void observableImpl(rx::subscriber<int> s) 
{ 
    s.on_next(1); 
    s.on_next(2); 
    s.on_completed(); 
} 

int main() 
{ 
    auto o = rxcpp::observable<>::create<int>(observableImpl); 
    std::cout << "*\n"; 
    o.subscribe(onNext, onEnd); 
}