如何创建可在单项和批处理模式之间切换的Rx(RxJS)流?

问题描述:

我有一个Rx流,它是来自特定组件的传出更改源。如何创建可在单项和批处理模式之间切换的Rx(RxJS)流?

偶尔我希望能够在数据流进入批处理模式使项目传递给onNext累加并仅在批处理模式下退出传递。

通常我将通过流传递单项目:

stream.onNext(1); 
stream.onNext(2); 

有传递到项目onNext和项之间的1对1映射在接收订阅,所以上面的代码片段结果两次调用订阅与值1和2

批处理模式我寻找可能的工作是这样的:

stream.enterBatchMode(); 
stream.onNext(1); 
stream.onNext(2); 
stream.exitBatchMode(); 

在这种情况下我想订阅到只能与单一串联阵列调用一次[1,2]。

只是为了重申,我只有时需要批处理模式,在其他时间我通过非连接的项目。

我该如何用Rx实现这种行为?

注意:以前我通过在下一个上传递数组,虽然这主要是因为在个人模式和批处理模式下类型保持不变。

例如:

stream.onNext([1, 2, 3]); 
stream.onNext([4, 5, 6]); 

订阅接收[1,2,3]然后[4,5,6],但是,当在批处理模式订阅接收到连接结果[1,2,3,4 ,5,6]。

所以,当你看它这样它更像unconcatenated级联模式(而不是个别批量模式)。但我认为这个问题非常相似。

+2

我有点今天时间按下,而不是最热的'rxjs',但这里是'system.reactive'类似的问题可能会激发一种方法来解决这个问题:http://*.com/questions/23431018/how-can-i-alternately-buffer-and-flow-a-live-data-stream-in-rx/23431077#23431077 - 如果有人幻想翻译挑战。 – 2014-11-24 14:07:35

+0

如果您确切地知道何时在“OnNext”调用之间调用“开始”和“停止”批处理模式,那么为什么不在将它们传递给'OnNext'之前自己简单地连接数组? (好吧,我想你刚刚给出了一个简单的例子?) – 2014-11-24 15:31:09

+0

这是一个简化的例子。我想要连接的onNext将从多个其他上下文中调用,并打算从callstack中的更高级别进入和退出批处理模式。 – 2014-11-24 23:02:11

James的评论可能会提供您正在寻找的答案,但我想提供一个简单的替代方案,您可能也会喜欢。

我把你的问题看成是这样的:“我如何传递A型或B型的值?”

回答:定义与“不是”语义的类型,它包含任一类型A的数据或类型B.

var valueOrBuffer = function(value, buffer) 
{ 
    this.Value = value; 
    this.Buffer = buffer; 
}; 
var createValue = function(value) { return new valueOrBuffer(value); } 
var createBuffer = function(buffer) { return new valueOrBuffer(undefined, buffer); } 

stream.OnNext(createValue(1)); 
stream.OnNext(createValue(2)); 
stream.OnNext(createValue(3)); 

stream.OnNext(createBuffer([4, 5, 6])); 

没有必要切换。您的观察员可能不得不改变一点:

stream.Subscribe(function(v) { 
    if (v.Value) 
    onNextValue(v.Value); 
    else 
    onNextBuffer(v.Buffer); 
}); 
+0

在重读您的问题的细节之后,您似乎并没有问“单项”与“批”作为问题的标题。 – 2014-11-24 15:30:33

+0

我会稍微改写我的问题。 – 2014-11-24 23:04:05

+0

这是行不通的,因为调用onNext的代码并不知道它所处的模式。只有代码更高的调用堆栈才有这方面的知识。 – 2014-11-25 00:29:37

一个解决方案...的种类。在未能通过Window函数获得任何东西(来自James的链接示例)后,我提出了以下解决方案,尽管它使用C#,但我不知道如何将其转换为RxJS和Javascript。

如果有人可以更好地使用内置的Rx功能,请这样做,我会非常感谢一个更好的解决方案。

我已经实施了BatchedObservable类,它是类型T的观察者和可观察到的IEnumerable类型Ť的。在正常模式下,它将每个T包装在传递的数组中。在批处理模式下,它将累积T s,直到退出批处理模式,然后它将通过所收集的批处理列表T s。

public class BatchedObservable<T> : IObservable<IEnumerable<T>>, IObserver<T> 
    { 
     bool batching = false; 
     List<T> batchedItems; 
     List<IObserver<IEnumerable<T>>> observers = new List<IObserver<IEnumerable<T>>>(); 

     public void EnterBatchMode() 
     { 
      batching = true; 
      batchedItems = new List<T>(); 
     } 

     public void ExitBatchMode() 
     { 
      batching = false; 
      if (batchedItems.Count > 0) 
      { 
       foreach (var observer in observers) 
       { 
        observer.OnNext(batchedItems); 
       } 
      } 
      batchedItems = null; 
     } 


     public IDisposable Subscribe(IObserver<IEnumerable<T>> observer) 
     { 
      observers.Add(observer); 

      return Disposable.Create(() 
       => observers.Remove(observer) 
      ); 
     } 

     public void OnCompleted() 
     { 
      foreach (var observer in observers) 
      { 
       observer.OnCompleted(); 
      } 
     } 

     public void OnError(Exception error) 
     { 
      foreach (var observer in observers) 
      { 
       observer.OnError(error); 
      } 
     } 

     public void OnNext(T value) 
     { 
      if (batching) 
      { 
       batchedItems.Add(value); 
      } 
      else 
      { 
       foreach (var observer in observers) 
       { 
        observer.OnNext(new T[] { value }); 
       } 
      } 
     } 
    } 

用法如下。运行时,按任意键切换批处理模式。

static void Main(string[] args) 
    { 
     var batched = new BatchedObservable<long>(); 

     var dataStream = Observable.Interval(TimeSpan.FromSeconds(1)); 
     dataStream.Subscribe(batched); 

     batched.Subscribe(PrintArray); 

     var batchModeEnabled = false; 
     while (true) 
     { 
      Console.ReadKey(); 

      batchModeEnabled = !batchModeEnabled; 
      if (batchModeEnabled) 
      { 
       batched.EnterBatchMode(); 
      } 
      else 
      { 
       batched.ExitBatchMode(); 
      } 
     } 
    } 

    private static void PrintArray<T>(IEnumerable<T> a) 
    { 
     Console.WriteLine("[" + string.Join(", ", a.Select(i => i.ToString()).ToArray()) + "]"); 
    } 

这里是詹姆斯世界的C#解决方案的精神转换为JavaScript,并针对您的具体问题量身定制。

var source = new Rx.Subject(); 
 
source.batchMode = new Rx.BehaviorSubject(false); 
 
source.enterBatchMode = function() { this.batchMode.onNext(true); }; 
 
source.exitBatchMode = function() { this.batchMode.onNext(false); }; 
 
var stream = source 
 
    .window(source.batchMode 
 
    .distinctUntilChanged() 
 
    .skipWhile(function(mode) { return !mode; })) 
 
    .map(function(window, i) { 
 
    if ((i % 2) === 0) { 
 
     // even windows are unbatched windows 
 
     return window; 
 
    } 
 

 
    // odd windows are batched 
 
    // collect the entries in an array 
 
    // (e.g. an array of arrays) 
 
    // then use Array.concat() to join 
 
    // them together 
 
    return window 
 
     .toArray() 
 
     .filter(function(array) { return array.length > 0; }) 
 
     .map(function(array) { 
 
     return Array.prototype.concat.apply([], array); 
 
     }); 
 
    }) 
 
    .concatAll(); 
 

 

 
stream.subscribe(function(value) { 
 
    console.log("received ", JSON.stringify(value)); 
 
}); 
 

 
source.onNext([1, 2, 3]); 
 

 
source.enterBatchMode(); 
 
source.onNext([4]); 
 
source.onNext([5, 6]); 
 
source.onNext([7, 8, 9]); 
 
source.exitBatchMode(); 
 

 
source.onNext([10, 11]); 
 

 
source.enterBatchMode(); 
 
source.exitBatchMode(); 
 

 
source.onNext([12]); 
 

 
source.enterBatchMode(); 
 
source.onNext([13]); 
 
source.onNext([14, 15]); 
 
source.exitBatchMode();
<script src="https://getfirebug.com/firebug-lite-debug.js"></script> 
 
<script src="https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js"></script>