RxCPP(一)编程模型

数据流计算和rxcpp库

在函数式反应性编程(FRP)中,所有这些主题都以系统的方式结合在一起。
简单地说,响应式编程就是使用异步数据流进行编程。通过对流应用各种操作,我们可以实现不同的计算目标。响应式程序中的主要任务是将数据转换为流,而不管数据的来源是什么。事件流通常称为可观察对象,事件流订阅者称为观察者。在可观察对象和观察者之间,存在流操作符(过滤器/转换)。
由于隐式假设在数据通过操作符传递时数据源不会发生变化,所以在可观察对象和观察者之间可以有多个操作符路径。不变性为无序执行提供了选项,并且可以将调度委托给称为调度程序的特殊软件。因此,可观测对象、观察者、流操作符和调度程序构成了frp模型的主干。
我们主要讨论以下主题:

  • 数据流计算范例
  • rxcpp库的介绍
  • Rx操作符
  • 调度
  • flat/ concatmap的区别
  • 更多重要的操作符

数据流计算范例

传统上,程序员根据控制流来编码他们的程序。这意味着我们将程序编码为一系列小语句(序列、分支、迭代)或函数(包括递归),以及它们的关联状态。我们使用选择(if/else)、迭代(while/for)和递归函数等构造对计算进行编码。为这些类型的程序处理并发性和状态管理确实存在问题,并且会导致一些细微的bug。我们需要围绕共享可变状态放置锁和其他同步原语。在编译器级别,语言编译器将解析源代码以创建抽象语法树(AST),执行类型分析、代码生成和代码生成。事实上,ast是一个信息流图,您可以在其中执行数据流分析(用于数据/寄存器级优化)和控制流分析,以利用处理器级的代码管道优化。即使程序员根据控制流来编码程序,编译器(至少部分)也会根据数据流来查看程序。这里的底线是,在每个程序中都有一个隐式数据流图处于休眠状态。
数据流计算将计算组织为显式图,其中节点是计算,边是数据在节点之间流动的路径。如果我们对节点上的计算施加某些限制(例如通过处理输入数据的副本来保存数据状态,避免使用就地算法),我们就可以利用并行性的机会。调度程序将通过对图数据结构进行拓扑排序来寻找并行的机会。我们将使用流(路径)和流(节点)上的操作构造图。这可以通过声明的方式实现,因为操作符可以被编码为lambdas,它可以执行一些本地计算。函数式编程社区标识了一组基本的标准(函数/流)操作符,如map、reduce、filter和take。在数据流计算框架中有一个条款是将数据转换为流。用于机器学习的tensorflow库就是使用这种范例的一个库。即使图创建并不像在tensorflow中那样完全显式的,但rxcpp库也可以看作是一个数据流计算库。由于函数式编程构造支持延迟计算,所以在构造具有异步数据流和操作的流管道时,我们正在创建一个计算流图。

rxcpp库的介绍

rxcpp库是一个只读的c++库,可以从github repo下载:http://reactive-extensions.github.io/rxcpp/。RxCpp依赖于现代c++结构,如语言级并发性、lambda函数/表达式、函数组合/转换和操作符重载,来实现反应性编程结构。rxcpp库的结构类似于rx.net和rxjava等库。
与任何其他反应性编程框架一样,在编写第一行代码之前,每个人都应该理解一些关键构造。它们是:

  • 可观察对象(可观察到的流)
  • 观察者(订阅观察对象)
  • 操作符(过滤、映射和归约)
  • 调度器

rxcpp是一个只读库,大部分计算都是基于可观测的概念。该库提供了大量原语来创建来自各种数据源的可观察流。数据源可以是范围、stl容器等等。我们可以在可观察对象和它们的消费者之间放置操作符(称为观察者)。由于函数编程构造支持函数的组合,所以我们可以将操作符链作为单个实体放在可观察对象和订阅流的观察者之间。与库关联的调度程序将确保当数据以可观察流的形式可用时,它将通过操作符传递,并且在经过一系列筛选和转换之后,如果有数据存在,将向订阅者发出通知。当调用订阅者中的一个lambda方法时,观察者需要考虑一些事情。观察员可以把注意力集中在他们主要负责的任务上。

Rx操作符

在本节中,我们将编写一些程序来帮助读者理解rxcpp库的编程模型。这些程序的目的是阐明rx概念,它们在本质上大多是琐碎的。代码将足以让程序员将它们合并到生产实现中,只需要稍作调整。数据生成器及其可观察性将基于范围、stl容器等等。

一个简单的可观察/观察者交互

让我们编写一个简单的程序来帮助我们理解rxcpp库的编程模型。在这个特殊的程序中,我们将有一个可观察的流和一个订阅流的观察者。我们将使用range对象生成一系列从1到12的数字。在创建值的范围及其上的可观察值之后,我们将为可观察值附加一个订阅者。当我们执行程序时,它会打印一系列的数字到控制台,并进行额外的测试:First.cpp

//////////
//first.cpp
// g++ -I<PathToRxcpplibfoldersrc> First.cpp
//
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
 //------------- Create an Observable.. a stream of numbers
 auto observable = rxcpp::observable<>::range(1, 12);
     
 //------------ Subscribe (only OnNext and OnCompleted Lambda given
 observable.
subscribe(
           
    [](int v){printf("OnNext: %d\n", v);},
            
    [](){printf("OnCompleted\n");});
}

可观察对象的过滤和映射

下面的程序将帮助我们理解过滤和映射操作符的工作原理,以及使用订阅方法将观察者连接到可观察流的常用机制。filter方法对流的每个项计算谓词,如果计算碰巧产生一个正断言,则该项将出现在输出流中。map操作符对输入流的每个元素应用一个表达式,并帮助生成一个输出队列:

//------------------ Second.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto values = rxcpp::observable<>::range(1, 12).
        filter([](int v) {
        return v % 2 == 0;
            }).map([](int x) {return x * x; });
            values.
                subscribe(
                    [](int v) {printf("OnNext: %d\n", v); },
                    []() {printf("OnCompleted\n"); });
}

从c++容器中流化值

即使rx用于处理随时间变化的数据,我们也可以将stl容器转换为响应流。我们需要使用iterate操作符来进行转换。这有时很方便,并有助于从使用stl的代码库集成代码:

//------------------ STLContainerStream.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
 std::array< int, 3 > a={{1, 2, 3}};
    auto values1 = rxcpp::observable<>::iterate(a);
    values1.
        subscribe(
            [](int v){printf("OnNext: %d\n", v);},
            [](){printf("OnCompleted\n");});
}

从零开始创建可观察对象

到目前为止,我们已经编写了从范围对象或stl容器创建可观察流的程序。让我们看看如何从头创建一个可观察的流:

//------------------ ObserverFromScratch.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
int main() {
    auto ints = rxcpp::observable<>::create<int>([](rxcpp::subscriber<int> s) {
        s.on_next(1);
        s.on_next(4);
        s.on_next(9);
        s.on_completed();
        });
    ints.subscribe([](int v) {printf("OnNext: %d\n", v); },
        []() {printf("OnCompleted\n"); });
}

连接可观察到的流

concat不交错的发射两个或多个Observable的发射物


image.png

Concat操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。
直到前面一个Observable终止,’concat’才会订阅额外的一个Observable。注意:因此,如果你尝试连接一个”热”Observable(这种Observable在创建后立即开始发射数据,即使没有订阅者),’concat’将不会看到也不会发射它之前发射的任何数据。
startwith操作符类似于’concat’,但是它是插入到前面,而不是追加那些Observable的数据到原始Observable发射的数据序列:

image.png

merge操作符也差不多,它结合两个或多个Observable的发射物,但是数据可能交错,而concat不会让多个Observable的发射物交错。

我们可以将两个流连接起来形成一个新的流,这在某些情况下非常方便。让我们通过编写一个简单的程序来看看它是如何工作的:

//------------------ Concat.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto o1 = rxcpp::observable<>::range(1, 3);
    auto o2 = rxcpp::observable<>::range(4, 6);
    auto values = o1.concat(o2);
    values.
        subscribe(
            [](int v) {printf("OnNext: %d\n", v); },
            []() {printf("OnCompleted\n"); });
}

创建一个发射指定值的Observable

Just将单个数据转换为发射那个数据的Observable:


image.png

//------------------ sixth.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto values = rxcpp::observable<>::just(1);
    values.
        subscribe(
            [](int v) {printf("OnNext: %d\n", v); },
            []() {printf("OnCompleted\n"); });
}

Take

只发射前面的N项数据


image.png

使用take操作符让你可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。


image.png

如果你对一个Observable使用take(n)(或它的同义词limit(n))操作符,而那个Observable发射的数据少于N项,那么take操作生成的Observable不会抛异常或发射onError通知,在完成前它只会发射相同的少量数据。

//------------------ fifth.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers
    auto s1 = values.
        take(3).
        map([](int prime) { return std::make_tuple("1:", prime); });
    auto s2 = values.
        take(3).
        map([](int prime) { return std::make_tuple("2:", prime); });
    s1.
        concat(s2).
        subscribe(rxcpp::util::apply_to(
            [](const char* s, int p) {
                printf("%s %d\n", s, p);
            }));
}

其中,apply_to可以替换成一下方式:

        subscribe(
            [](std::tuple<const char*, int> p) {
                printf("%s, %d\n",std::get<0>(p),std::get<1>(p));
            });

从可观察的流取消订阅

下面的程序显示了如何订阅一个可观察流并停止订阅。程序只显示了可用的选项,应该参考文档:

//---------------- Unsubscribe.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    auto subs = rxcpp::composite_subscription();
    auto values = rxcpp::observable<>::range(1, 10);
    values.subscribe(
        subs,
        [&subs](int v) {
            printf("OnNext: %d\n", v);
            if (v == 6)
                subs.unsubscribe(); //-- Stop recieving events
        },
        []() {printf("OnCompleted\n"); });
}

map

对Observable发射的每一项数据应用一个函数,执行变换操作
map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。


image.png

大理石图的顶部显示了两个时间线,这些时间线通过将第二个时间线的内容附加到第一个时间线来组合在一起,形成一个复合时间线。
Map.cpp

//------------------ Map.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto ints = rxcpp::observable<>::range(1, 10).
        map([](int n) {return n * n; });
    ints.subscribe(
        [](int v) {printf("OnNext: %d\n", v); },
        []() {printf("OnCompleted\n"); });
}

map和管道

//------------------ Map_With_Pipe.cpp
#include "rxcpp/rx.hpp"
namespace Rx {
    using namespace rxcpp;
    using namespace rxcpp::sources;
    using namespace rxcpp::operators;
    using namespace rxcpp::util;
}
using namespace Rx;
#include <iostream>
int main() {
    auto ints = rxcpp::observable<>::range(1, 10) |
        map([](int n) {return n * n; });
    ints.subscribe(
        [](int v) {printf("OnNext: %d\n", v); },
        []() {printf("OnCompleted\n"); });
}

tap/filter

tap是一个RxCPP管道操作符,返回与源可观察相同的可观察值,可用于执行副作用,例如记录源可观察值发出的每个值。

//----------- TapExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    //---- Create a mapped Observable
    auto ints = rxcpp::observable<>::range(1, 3).
        map([](int n) {return n * n; });
    //---- Apply the tap operator...The Operator 
    //---- will act as a filter/debug operator
    auto values = ints.
        tap(
            [](int v)
            {printf("Tap -       OnNext: %d\n", v); },
            []() {printf("Tap -       OnCompleted\n");
            });
    //------- Do some action
    values.
        subscribe(
            [](int v) {printf("Subscribe - OnNext: %d\n", v); },
            []() {printf("Subscribe - OnCompleted\n"); });
}

rxcpp(流)操作符

面向流处理的一个主要优点是,我们可以将函数式编程原语应用于它们。用rxcpp的话说,处理是使用操作符完成的。它们只是流上的过滤器、转换、聚合和规约。在前面的示例中,我们已经了解了map、filter和take操作符的工作原理。

平均运算符

平均运算符从可观察的流计算值的算术平均值。所支持的其他统计运算符包括:

  • 最小值
  • 最大值
  • 计数
  • 求和

计算原始Observable发射数字的平均值并发射它


image.png

下面的程序只演示了平均操作符。对于前面列表中的其他操作符,模式是相同的Average.cpp

//----------- Average.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    auto values = rxcpp::observable<>::range(1, 20).average();
    values.
        subscribe(
            [](double v) {printf("average: %lf\n", v); },
            []() {printf("OnCompleted\n"); });
}

扫描操作符

扫描操作符对流的每个元素依次应用一个函数,并将该值累积为种子值。下面的程序生成一系列数字的平均值,这些值是在何时累计的。
连续地对数据序列的每一项应用一个函数,然后连续发射结果


image.png

Scan操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator。


image.png

//----------- Scan.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    int count = 0;
    auto values = rxcpp::observable<>::range(1, 20).
        scan(
            0,
            [&count](int seed, int v) {
                count++;
                return seed + v;
            });
    values.subscribe(
        [&](int v) {printf("Average through Scan: %f\n", (double)v / count); },
        []() {printf("OnCompleted\n"); });
}

通过管道组合运算符

RxCpp库允许您连接或组合运算符以启用运算符组合。 该库允许您使用管道(|)运算符来组合运算符,程序员可以将一个运算符的输出传递给另一个运算符,就好像它们位于UNIX shell的命令行中一样。 这使我们能够理解一段代码的作用。下面的程序使用| 运算符以映射范围。RxCpp样品含有使用管功能的例子很多

//------------------ Map_With_Pipe.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
namespace Rx {
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::util;
}
using namespace Rx;
#include <iostream>
int main() {
    auto ints = rxcpp::observable<>::range(1,10) | 
                 map( [] ( int n  ) {return n*n; });
    ints | subscribe(
            [](int v){printf("OnNext: %d\n", v);},
            [](){printf("OnCompleted\n");});
}
//------------------ Map_With_Pipe.cpp
#include "rxcpp/rx.hpp"
namespace Rx {
    using namespace rxcpp;
    using namespace rxcpp::sources;
    using namespace rxcpp::operators;
    using namespace rxcpp::util;
}
using namespace Rx;
#include <iostream>
int main() {
    auto ints = rxcpp::observable<>::range(1, 10) |
        map([](int n) {return n * n; });
    ints.subscribe(
        [](int v) {printf("OnNext: %d\n", v); },
        []() {printf("OnCompleted\n"); });
}

以上的管道操作等价于:

auto ints = rxcpp::observable<>::range(1, 10);
auto intsFromMap = ints.map([](int n) {return n * n; });

调度

我们已经在上一节中了解了Observables,Operators和Observers。我们已经知道,在Observables和Observers之间,我们可以应用标准的Rx运算符来过滤和转换Streams。在函数式编程的情况下,我们编写不可变函数(没有副作用的函数),不可变性的结果是无序执行的可能性。如果我们可以保证永远不会修改对运算符的输入,那么我们评估的顺序无关紧要。由于Rx程序将操纵多个观察者和订阅者,我们可以将选择执行顺序的任务委派给调度程序模块。默认情况下,RxCpp将在我们称为订阅者方法的线程中安排执行。可以使用observe_on和subscriber_on运算符指定不同的线程。此外,一些Observable运算符将Scheduler作为参数,其中执行可以在Scheduler管理的线程中进行。
该RxCpp库支持以下两种类型的调度:

  • ImmediateScheduler
  • EventLoopScheduler

ObserveOn

指定一个观察者在哪个调度器上观察这个Observable


image.png

很多ReactiveX实现都使用调度器 Scheduler来管理多线程环境中Observable的转场。你可以使用ObserveOn操作符指定Observable在一个特定的调度器上发送通知给观察者 (调用观察者的onNext, onCompleted, onError方法)。

image.png

注意:当遇到一个异常时ObserveOn会立即向前传递这个onError终止通知,它不会等待慢速消费的Observable接受任何之前它已经收到但还没有发射的数据项。这可能意味着onError通知会跳到(并吞掉)原始Observable发射的数据项前面,正如图例上展示的。

RxCpp库默认是单线程的。 您可以将其配置为使用某些运算符在多个线程中运行:

//----------ObserveOn.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <thread>
int main() {
    //---------------- Generate a range of values
    //---------------- Apply Square function
    auto values = rxcpp::observable<>::range(1, 4).
        map([](int v) {
        return v * v;
            });
    //------------- Emit the current thread details
    std::cout << "Main Thread id => "
        << std::this_thread::get_id()
        << std::endl;
    //---------- observe_on another thread....
    //---------- make it blocking to 
    values.observe_on(rxcpp::synchronize_new_thread()).
        as_blocking().
        subscribe(
            [](int v) {
                std::cout << "Observable Thread id => "
                    << std::this_thread::get_id()
                    << "  " << v << std::endl; },
            []() { std::cout << "OnCompleted" << std::endl; });
    //------------------ Print the main thread details
    std::cout << "Main Thread id => "
        << std::this_thread::get_id()
        << std::endl;
}

SubscribeOn

SubscribeOn操作符的作用类似,但它是用于指定Observable本身在特定的调度器上执行,它同样会在那个调度器上给观察者发通知。


image.png

ObserveOn操作符的作用类似,但是功能很有限,它指示Observable在一个指定的调度器上给观察者发通知,下面的程序将演示subscribe_on方法的用法:

//---------- SubscribeOn.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <thread>
#include <mutex>
//------ A global mutex for output synch.
std::mutex console_mutex;
//------ Print the Current Thread details
void CTDetails() {
    console_mutex.lock();
    std::cout << "Current Thread id => "
        << std::this_thread::get_id() << std::endl;
    console_mutex.unlock();
}
//---------- a function to Yield control to other threads
void Yield(bool y) {
    if (y) { std::this_thread::yield(); }
}
int main() {
    auto threads = rxcpp::observe_on_event_loop();
    auto values = rxcpp::observable<>::range(1);
    //------------- Schedule it in another thread
    auto s1 = values.
        subscribe_on(threads).
        map([](int prime) { CTDetails(); Yield(true); return std::make_tuple("1:", prime); });
    //-------- Schedule it in Yet another theread
    auto s2 = values.
        subscribe_on(threads).
        map([](int prime) { CTDetails(); Yield(true); return std::make_tuple("2:", prime); });
    s1.merge(s2).
        take(6).as_blocking().
        subscribe(rxcpp::util::apply_to(
            [](const char* s, int p) {
                CTDetails();
                console_mutex.lock();
                printf("%s %d\n", s, p);
                console_mutex.unlock();
            }));
}
//---------- SubscribeOn.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
#include <iostream>
#include <thread>
#include <mutex>
std::mutex console_mutex;
void CTDetails(int val = 0) {
    console_mutex.lock();
    std::cout << "Current Thread id => "
        << std::this_thread::get_id()
        << val
        << std::endl;
    console_mutex.unlock();
}
void Yield(bool y) {
    if (y) { std::this_thread::yield(); }
}
int main() {
    //----------- coordination object
    auto coordination = rxcpp::serialize_new_thread();
    //----------------- retrieve the worker
    auto worker = coordination.create_coordinator().get_worker();
    //-------------- Create an Obsrevable
    auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
        take(5).
        replay(coordination);
    // Subscribe from the beginning
    worker.schedule([&](const rxcpp::schedulers::schedulable&)
        {
            values.subscribe(
                [](long v) {CTDetails(v); },
                []() { CTDetails(); });
        });
    // Wait before subscribing
    worker.schedule(coordination.now() + std::chrono::milliseconds(125),
        [&](const rxcpp::schedulers::schedulable&) {
            values.subscribe(
                [](long v) {CTDetails(v*v); },
                []() { CTDetails(); });
        });
    // Start emitting
    worker.schedule([&](const rxcpp::schedulers::schedulable&) {
        values.connect();
        });
    // Add blocking subscription to see results
    values.as_blocking().subscribe();
}

flat/ concatmap的区别

开发人员之间的一个混淆之处通常集中在flatmap和concatmap操作符上。它们之间的差异非常细微。

flatmap

FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable


image.png

FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。
这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。
注意:

  • FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。
  • 如果任何一个通过这个flatMap操作产生的单独的Observable调用onError异常终止了,这个Observable自身会立即调用onError并终止。
    如下,flatmap将lambda应用于可观察流并生成一个新的可观察流。生成的流合并在一起以提供输出:
#include "rxcpp/rx.hpp"
#include <iostream>
namespace rxu = rxcpp::util;
#include <array>
#include <string>
//#include <tuple>
int main() {
    std::array< std::string, 4 > a = { {"Praseed", "Peter", "Sanjay","Raju"} };
    auto values = rxcpp::observable<>::iterate(a).flat_map(
        [](std::string v) {
            std::array<std::string, 3> salutation = { { "Mr." ,  "Monsieur" , "Sri" } };
            return rxcpp::observable<>::iterate(salutation);
        },
        [](std::string f, std::string s) {
            return s + " " + f;
        });
    values.subscribe([](std::string f) {
        std::cout << f << std::endl; }, []() {std::cout << "Hello World.." << std::endl; });
}

concatmap

concatmap不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据。


image.png

contact和merge

为了让区别更清楚,让我们看一下两个操作符:concat和merge。让我们来看看流的串联是如何工作的。它基本上是一个接一个地添加流的内容,保持顺序:

#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto o1 = rxcpp::observable<>::range(1, 3);
    auto o2 = rxcpp::observable<>::range(4, 6);
    auto values = o1.concat(o2);
    values.
        subscribe(
            [](int v) {printf("OnNext: %d\n", v); },
            []() {printf("OnCompleted\n"); });
}


image.png

下面的云石图清楚地显示了当我们合并两个可观察到的流时会发生什么。输出队列的内容将是两个流的交叉组合:

//---------------- Merge.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto o1 = rxcpp::observable<>::range(1, 3);
    auto o2 = rxcpp::observable<>::from(4, 5, 6);
    auto values = o1.merge(o2);
    values.subscribe(
        [](int v) {printf("OnNext: %d\n", v); }, []() {printf("OnCompleted\n"); });
}


image.png

flatmap和concatmap或多或少都执行相同的操作。不同之处在于值的组合方式。flatmap使用merge操作符,而concatmap使用concact操作符。merge操作符顺序无关紧要。concat操作符将可观察值一个接一个地追加。这就是为什么按我们期望的顺序得到值。

更多重要的操作符

现在我们了解了反应性编程模型的关键,因为我们讨论了一些基本主题,比如可观察性、观察者、操作符和调度程序。为了更好地编写逻辑,我们应该了解更多的运算符。

tap

我们将探索tap操作符,它有助于查看流的内容:

//----------- TapExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    //---- Create a mapped Observable
    auto ints = rxcpp::observable<>::range(1, 3).
        map([](int n) {return n * n; });
    //---- Apply the tap operator...The Operator 
    //---- will act as a filter/debug operator
    auto values = ints.
        tap(
            [](int v)
            {printf("Tap -       OnNext: %d\n", v); },
            []() {printf("Tap -       OnCompleted\n");
            });
    //------- Do some action
    values.
        subscribe(
            [](int v) {printf("Subscribe - OnNext: %d\n", v); },
            []() {printf("Subscribe - OnCompleted\n"); });
}

defer

直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable
defer操作符接受一个你选择的Observable工厂函数作为单个参数。这个函数没有参数,返回一个Observable。
defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。
在某些情况下,等待直到最后一分钟(就是知道订阅发生时)才生成Observable可以确保Observable包含最新的数据。


image.png

当有人试图连接到指定的可观察对象时,我们调用observable_factory lambda:

//----------- DeferExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    auto observable_factory = []() {
        return rxcpp::observable<>::range(1, 3).
            map([](int n) {return n * n; });
    };
    auto ints = rxcpp::observable<>::defer(observable_factory);
    ints.
        subscribe(
            [](int v) {printf("OnNext: %d\n", v); },
            []() {printf("OnCompleted\n"); });
    ints.
        subscribe(
            [](int v) {printf("2nd OnNext: %d\n", v); },
            []() {printf("2nd OnCompleted\n"); });
}

buffer

buffer操作符定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。


image.png

buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。Buffer操作符在很多语言特定的实现中有很多种变体,它们在如何缓存这个问题上存在区别。
注意:如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。
Window操作符与Buffer类似,但是它在发射之前把收集到的数据放进单独的Observable,而不是放进一个数据结构。详见:https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Buffer.html
buffer操作符发出一个包含一个可观察对象的非重叠内容的可观察对象,每个可观察对象最多包含count参数指定的项数。这将帮助我们以适合内容的方式处理项目:

//----------- BufferExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    auto values = rxcpp::observable<>::range(1, 10).buffer(3);
    values.
        subscribe(
            [](std::vector<int> v) {
                printf("OnNext:{");
                std::for_each(v.begin(), v.end(), [](int a) {
                    printf(" %d", a);
                    });
                printf("}\n");
            },
            []() {printf("OnCompleted\n"); });
}

timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值。


image.png

这个函数在库中有不同的版本,timer操作符默认在computation调度器上执行。有一个变体可以通过可选参数指定Scheduler:

//----------- TimerExample.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
#include <iostream>
#include <thread>
int main() {
    auto scheduler = rxcpp::observe_on_new_thread();
    auto period = std::chrono::seconds(3);
    auto values = rxcpp::observable<>::timer(period, scheduler).
        finally([]() {
        std::cout << "The final action, thread id: " << std::this_thread::get_id() << std::endl;
            });
    values.
        as_blocking().
        subscribe(
            [](int v) { std::cout << "OnNext: " << v << "thread id: " << std::this_thread::get_id() << std::endl; },
            []() {std::cout << "OnCompleted, thread id: " << std::this_thread::get_id() << std::endl; });
    std::cout << "main thread id: " << std::this_thread::get_id() << std::endl;
}

https://www.jianshu.com/p/3eef06c7d2da

Python量化投资网携手4326手游为资深游戏玩家推荐:《托拉姆物语下载

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
NumPy
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论