反应式编程是一种编程范例,用于处理数据流和变化的传播.这意味着当一个组件发出数据流时,更改将通过响应式编程库传播到其他组件.变化的传播将持续到达最终接收器.事件驱动和反应式编程之间的区别在于事件驱动编程围绕事件而反应式编程围绕数据.
ReactiveX或RX用于反应式编程
ReactiveX或Raective Extension是最着名的反应式编程实现. ReactiveX的工作取决于以下两个类 :
Observable类
此类是数据流或事件的来源,它包装传入数据,以便数据可以从一个线程传递到另一个线程.在某些观察者订阅数据之前,它不会提供数据.
Observer类
此类使用 observable发出的数据流.可以有多个具有可观察性的观察者,每个观察者将接收发射的每个数据项.观察者可以通过订阅observable : 来接收三种类型的事件;
on_next()事件 : 这意味着数据流中有一个元素.
on_completed()事件 : 它意味着排放结束,不再有物品到来.
on_error()事件 : 它还意味着发射结束,但是如果 observable 引发错误.
RxPY - Python用于反应式编程的模块
RxPY是一个可用于反应式编程的Python模块.我们需要确保安装该模块.以下命令可用于安装RxPY模块 :
pip install RxPY
示例
以下是一个Python脚本,它使用 RxPY 模块及其类 Observable 和观察用于反应式编程.基本上有两个类和减号;
get_strings() : 从观察者那里获取字符串.
PrintObserver() : 用于从观察者打印字符串.它使用观察者类的所有三个事件.它还使用subscribe()类.
from rx import Observable, Observerdef get_strings(observer): observer.on_next("Ram") observer.on_next("Mohan") observer.on_next("Shyam") observer.on_completed()class PrintObserver(Observer): def on_next(self, value): print("Received {0}".format(value)) def on_completed(self): print("Finished") def on_error(self, error): print("Error: {0}".format(error))source = Observable.create(get_strings)source.subscribe(PrintObserver())
输出
Received RamReceived MohanReceived ShyamFinished
用于反应式编程的PyFunctional库
PyFunctional 是另一个可用于反应式编程的Python库.它使我们能够使用Python编程语言创建功能程序.它很有用,因为它允许我们使用链式函数运算符创建数据管道.
RxPY和PyFunctional之间的区别
这两个库都用于反应式编程并以类似的方式处理流,但两者之间的主要区别取决于数据的处理. RxPY 处理系统中的数据和事件,而 PyFunctional 专注于使用函数式编程范例转换数据.
安装PyFunctional Module
我们需要在使用之前安装此模块.它可以在pip命令的帮助下安装如下 :
pip install pyfunctional
示例
以下示例使用 PyFunctional 模块及其 seq 类作为流对象我们可以迭代和操纵.在这个程序中,它使用lamda函数映射序列,该函数将每个值加倍,然后过滤x大于4的值,最后将序列缩减为所有剩余值的总和.
from functional import seqresult = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)print ("Result: {}".format(result))
输出
Result: 6