EventDriven
事件驱动实现库
EventDriven 是一个事件驱动实现库。主要是为了解决复杂的程序架构之间的线程数据安全和线程的复用问题。
实现方式是控制器通过维护一条事件循环线程,通过事件或函数派遣的方式调用,以一种结构化的方式实现多线程之间的控制通信和处理。
典型的使用情况:
-
控制台控制器(Controller) + 工作控制器池(ControllerPool)
-
控制台控制器(Controller) + 独立工作控制器(Controller) * n + 工作控制器池(ControllerPool)
安装
$ pip install EventDriven
项目说明
Controller
工作原理
+------------------------------------+ instance
| AdapterManager |<------------ Adapter
+------------------------------------+
| | | |
| | v |
| | func +-----------------+ EVT
| | +---------| MappingManager |<------------ dispatch
| | | +-----------------+
| | | |
| v v |
+------------------------------------+
| | func
| EventLoop |<------------ submit
| |
+------------------------------------+
^ | process +-----------------------------+
| +--------------> | Session |
| +-----------------------------+
| | def func(*args, **kwargs): |
| | ... |
| +-----------------------------+
| return v
^--------------------------------------------------<
说明
-
调用控制器方法
dispatch
派遣事件EVT(调用控制器方法submit
提交处理函数)。 -
事件映射管理器(MappingManager)
收到事件EVT
后搜索其对应的处理函数(func)
,并将其交由事件处理循环线程(EventLoop)
处理(提交的处理函数直接交由处理)。 -
事件处理循环线程
收到处理函数(func)
后,生成执行会话(Session)
,并调用函数func。
通过方法Adapter
可以为控制器添加适配器
。
ControllerPool
工作原理
+------------------------------------+
| | EVT/func
| event_queue |<--------------- dispatch/submit
| |---------------> Pending
+------------------------------------+ return
| | | |
| | v |
| | func +-----------------+
| | +---------| MappingManager |
| | | +-----------------+
| | | |
| v v |
+------------------------------------+
| | process1 process2
| ClientPool |------------------>----------------------------------->----------- ...
| | | |
+------------------------------------+ v v
^ +-----------------------------+ +-----------------------------+
| | Session1 | | Session2 |
| +-----------------------------+ +-----------------------------+
| | def func1(*args, **kwargs): | | def func2(*args, **kwargs): |
| | ... | | ... |
| +-----------------------------+ +-----------------------------+
| return v v
^---------------------------------------------------<----------------------------------<------------ ...
说明
-
调用方法
dispatch/submit
,将事件(EVT)/处理函数(func)
交由控制器池(ControllerPool)
。 -
控制器池
收到处理事件,将其推入待处理事件队列(event_queue)
。 -
控制器池客户端线程
从待处理事件队列
中取待处理事件
,并交给空闲的客户端控制器
处理,同时返回待决事件对象(Pending)
。
内置适配器
Subprocess(实现控制器支持在子进程操作)
工作原理
+--------------------------------+ +--------------------------------+
| Parent Process | | Child Process |
+--------------------------------+ +--------------------------------+
dispatch | | | <bri_worker> |
--------------------------->v----------------+ +----------------+ |
submit | | Controller | child_channel | Controller | dispatch |
| >---------------->>>>>>>>>>>>>>>>>>>>>>---------------->---------v |
| | | | |
Pending | | | | |
<---------------------------<----------------<<<<<<<<<<<<<<<<<<<<<<---------<------+ | |
| | Subprocess | parent_channel | ^ | |
| | AdapterManager| | | v |
| +----------------+ +--------------------------------+
| | | ControllerPool |
| | | <workers> |
+--------------------------------+ +--------------------------------+
^ |
>---------| v
| +----------------------------+
| | Session |
| +----------------------------+
| | def func(*args, **kwargs): |
| | ... |
^---+----------------------------+
return
说明
-
通过调用
父进程的控制器
方法dispatch/submit
,将事件(EVT)/处理函数(func)
发送给父进程控制器
,同时返回待决事件对象Pending
。 -
父进程控制器
将收到的事件/处理函数
通过子通信通道(child_channel)
发送到子进程
的通信控制器(bri_worker)
。 -
子进程的通信控制器
再次将事件/处理函数
转发到子进程的工作线程池(workers)
。 -
子进程的工作线程池
处理收到的事件/处理函数
,事件的处理会话
结束后将返回的执行结果
转发到子进程的通信控制器
。 -
子进程的通信控制器
将执行返回结果
通过父通信通道(parent_channel)
发送到父进程的控制器
,并通过待决事件对象(Pending)
返回运行结果。
Timer
添加定时信号生成器
+------------------------------------+
| AdapterManager |
| +---------+ |
| | Timer | | x
| | x sec |<--------------- set_timing
| +---------+ |
+--------------------------|---------+
| | |
| EVT_DRI_TIMING | |
| v |
| func +-----------------+
| +-------| MappingManager |
| | +-----------------+
| v |
+------------------------------------+
| |
| EventLoop |
| |
+------------------------------------+
说明
-
定时器(Timer)
通过初始化
时设定或者方法set_timing
设定每隔x
秒发送时钟事件信号(EVT_DRI_TIMING)
给控制器。 -
通过可以通过方法
set_timing
方法使用None
作为事件间隔的参数设定时,将停止定时器。
Pending
事件派遣返回待决事件对象Pending。
+------------------------------------+
| AdapterManager |
| +---------+ |
| | Pending | |
| +---------+ |
+------------------------------------+
| |
| func +-----------------+ EVT
| +---------| MappingManager |<------------ dispatch
| | +-----------------+——————————-—> Pending
| | | return
| v |
+------------------------------------+
| | func
| EventLoop |<------------ submit
| |------------> Pending
+------------------------------------+ return
说明
-
为控制器打补丁,方法
dispatch
和submit
的调用后将返回待决事件(Pending)
对象。 -
Pending对象
通过调用方法pending
来等待事件的处理,并且以列表的形式返回事件的执行结果。
使用例子
事件控制器
from eventdriven import Controller, session
from time import time
def event_func():
runtime = time() - session.start_time
evt_time = time() - session.evt_time
print(runtime)
print(evt_time)
def submit_func(a, b):
runtime = time() - session.start_time
print(runtime)
print(a, b)
# session['self'] 是当前控制器对象。
session['self'].shutdown()
# 定义事件1的处理函数是event_func(), 同时传递全局上下文。
con = Controller(mapping={
1: event_func
}, context={'start_time': time()})
# 启动控制器
con.run()
# 事件驱动,事件1
con.dispatch(1, context={'evt_time': time()})
# 处理函数驱动
con.submit(submit_func, args=('a', 'b'))
# 关闭控制器
con.shutdown()
# 控制器阻塞等待
con.join()
使用事件映射蓝图定义事件处理映射
from eventdriven import Controller, MappingBlueprint
test_model = MappingBlueprint()
EVT_ONE = 'one'
EVT_TWO = 'two'
EVT_THREE = 'three'
@test_model.register(EVT_ONE)
def event_1():
print('1')
@test_model.register(EVT_TWO)
def event_2():
print('2')
@test_model.register(EVT_THREE)
def event_3():
print('3')
con = Controller(mapping=test_model)
con.run()
con.dispatch(EVT_THREE)
con.dispatch(EVT_ONE)
con.dispatch(EVT_TWO)
con.shutdown()
con.join()
使用适配器(Adapter)
from eventdriven import Controller, session
from eventdriven.adapter.subprocess import Subprocess
from multiprocessing import current_process
from eventdriven import Controller, session
from eventdriven.adapter.subprocess import Subprocess
from multiprocessing import current_process
def subprocess_print_dispatch(*args):
print('pid: %s, dishaptch args=%s' % (current_process().pid, args))
def subprocess_print_submit(*args):
print('pid: %s, submit() args=%s' % (current_process().pid, args))
if __name__ == '__main__':
print('main pid: %s' % current_process().pid)
con = Controller()
# 控制台安装子进程适配器,这回使得事件的处理将在控制器新创建的子进程下处理。
# 子进程的事件处理映射通过Subprocess实例传递。
con.Adapter(Subprocess(mapping={'1': subprocess_print_dispatch}, maxsize=10))
# 开启控制器,同时会有适配器启动子进程的初始化工作。
con.run()
con.dispatch('1', args=(1, 2, 3, 4, 5))
con.submit(subprocess_print_submit, args=(1, 2, 3, 4, 5))
for i in range(100):
con.submit(subprocess_print_submit, args=(i,))
# 等待所有任务完全送到子进程,并且子进程下的工作线程取完所有的待处理任务。
con.pending()
# 等待子进程工作线程任务处理完后处于空闲状态。
con.wait_for_idle()
# 关闭子进程,同时也会关闭当前线程的控制器con。
con.shutdown()
# 等待控制器关闭结束。
con.wait()
控制器的事件监听
from eventdriven import Controller, session
def event_track():
print('event_track: %s=> val=%s' % (session['self'].name, session['val']))
def cpy_event():
print('event_track(copy): %s=> val=%s' % (session['self'].name, session['val']))
con1 = Controller(mapping={'cpy': event_track})
con2 = Controller(mapping={'cpy': cpy_event})
con1.run()
con2.run()
# con2监听con1的事件'cpy'
con2.listen(con1, allow=('cpy',))
con1.dispatch('cpy', value='hhhhhhhh', context={'a_ctx': 'ok'})
# 结果:
# event_track: 2771409784584=> val=hhhhhhhh
# event_track(copy): 2771412370056=> val=ForwardingPacket(value='hhhhhhhh', context={'a_ctx': 'ok'})