EventDriven

Event Driven Controller.


License
MIT
Install
pip install EventDriven==0.1.3

Documentation

EventDriven

事件驱动实现库

Build Status Build Status

EventDriven 是一个事件驱动实现库。主要是为了解决复杂的程序架构之间的线程数据安全和线程的复用问题。

实现方式是控制器通过维护一条事件循环线程,通过事件或函数派遣的方式调用,以一种结构化的方式实现多线程之间的控制通信和处理。

典型的使用情况:

  1. 控制台控制器(Controller) + 工作控制器池(ControllerPool)

  2. 控制台控制器(Controller) + 独立工作控制器(Controller) * n + 工作控制器池(ControllerPool)

安装

$ pip install EventDriven

项目说明

Controller

工作原理

+------------------------------------+   instance
|          AdapterManager            |<------------ Adapter
+------------------------------------+
|   |                     |          |
|   |                     v          |
|   |       func   +-----------------+     EVT
|   |    +---------|  MappingManager |<------------ dispatch
|   |    |         +-----------------+
|   |    |                           |
|   v    v                           |
+------------------------------------+
|                                    |     func
|           EventLoop                |<------------ submit
|                                    |
+------------------------------------+
    ^                    |    process     +-----------------------------+
    |                    +--------------> | Session                     |
    |                                     +-----------------------------+
    |                                     | def func(*args, **kwargs):  |
    |                                     |    ...                      |
    |                                     +-----------------------------+
    |                 return                           v
    ^--------------------------------------------------<

说明

  • 调用控制器方法 dispatch 派遣事件EVT(调用控制器方法submit 提交处理函数)。

  • 事件映射管理器(MappingManager)收到事件EVT后搜索其对应的处理函数(func),并将其交由事件处理循环线程(EventLoop)处理(提交的处理函数直接交由处理)。

  • 事件处理循环线程收到处理函数(func)后,生成执行会话(Session),并调用函数func。

通过方法Adapter可以为控制器添加适配器

ControllerPool

工作原理

+------------------------------------+
|                                    |     EVT/func
|           event_queue              |<--------------- dispatch/submit
|                                    |---------------> Pending
+------------------------------------+     return
|   |                     |          |     
|   |                     v          |
|   |       func   +-----------------+
|   |    +---------|  MappingManager |
|   |    |         +-----------------+
|   |    |                           |
|   v    v                           |
+------------------------------------+
|                                    |     process1                       process2
|           ClientPool               |------------------>----------------------------------->----------- ...
|                                    |                  |                                   |
+------------------------------------+                  v                                   v
    ^                                     +-----------------------------+    +-----------------------------+
    |                                     | Session1                    |    | Session2                    |
    |                                     +-----------------------------+    +-----------------------------+
    |                                     | def func1(*args, **kwargs): |    | def func2(*args, **kwargs): |
    |                                     |    ...                      |    |    ...                      |
    |                                     +-----------------------------+    +-----------------------------+
    |                 return                            v                                  v
    ^---------------------------------------------------<----------------------------------<------------ ...

说明

  • 调用方法dispatch/submit,将事件(EVT)/处理函数(func)交由控制器池(ControllerPool)

  • 控制器池收到处理事件,将其推入待处理事件队列(event_queue)

  • 控制器池客户端线程待处理事件队列中取待处理事件,并交给空闲的客户端控制器处理,同时返回待决事件对象(Pending)

内置适配器

Subprocess(实现控制器支持在子进程操作)

工作原理

            +--------------------------------+                    +--------------------------------+
            |  Parent Process                |                    |  Child Process                 |
            +--------------------------------+                    +--------------------------------+
  dispatch  |                                |                    |   <bri_worker>                 |
--------------------------->v----------------+                    +----------------+               |
   submit   |               |   Controller   |    child_channel   |   Controller   |  dispatch     |
            |               >---------------->>>>>>>>>>>>>>>>>>>>>>---------------->---------v     |
            |               |                                                      |         |     |
   Pending  |               |                                                      |         |     |
<---------------------------<----------------<<<<<<<<<<<<<<<<<<<<<<---------<------+         |     |
            |               |   Subprocess   |   parent_channel   |         ^                |     |
            |               |  AdapterManager|                    |         |                v     |
            |               +----------------+                    +--------------------------------+
            |                                |                    |         ControllerPool         |
            |                                |                    |           <workers>            |
            +--------------------------------+                    +--------------------------------+
                                                                            ^               |
                                                                  >---------|               v
                                                                  |   +----------------------------+
                                                                  |   | Session                    |
                                                                  |   +----------------------------+
                                                                  |   | def func(*args, **kwargs): |
                                                                  |   |    ...                     |
                                                                  ^---+----------------------------+
                                                                   return

说明

  • 通过调用父进程的控制器方法dispatch/submit,将事件(EVT)/处理函数(func)发送给父进程控制器,同时返回待决事件对象Pending

  • 父进程控制器将收到的事件/处理函数通过子通信通道(child_channel)发送到子进程通信控制器(bri_worker)

  • 子进程的通信控制器再次将事件/处理函数转发到子进程的工作线程池(workers)

  • 子进程的工作线程池处理收到的事件/处理函数,事件的处理会话结束后将返回的执行结果转发到子进程的通信控制器

  • 子进程的通信控制器执行返回结果通过父通信通道(parent_channel)发送到父进程的控制器,并通过待决事件对象(Pending)返回运行结果。

Timer

添加定时信号生成器

+------------------------------------+
|          AdapterManager            |
|                      +---------+   |
|                      |  Timer  |   |     x  
|                      |  x sec  |<--------------- set_timing
|                      +---------+   |
+--------------------------|---------+
|                          |         |
|           EVT_DRI_TIMING |         |
|                          v         |
|            func  +-----------------+
|          +-------|  MappingManager |
|          |       +-----------------+
|          v                         |
+------------------------------------+
|                                    |
|            EventLoop               |
|                                    |
+------------------------------------+

说明

  • 定时器(Timer)通过初始化时设定或者方法set_timing设定每隔x秒发送时钟事件信号(EVT_DRI_TIMING)给控制器。

  • 通过可以通过方法set_timing方法使用None作为事件间隔的参数设定时,将停止定时器。

Pending

事件派遣返回待决事件对象Pending。

+------------------------------------+
|          AdapterManager            |
|                     +---------+    |
|                     | Pending |    |
|                     +---------+    |
+------------------------------------+
|                                    |
|           func   +-----------------+     EVT
|        +---------|  MappingManager |<------------ dispatch
|        |         +-----------------+——————————-—> Pending
|        |                           |   return
|        v                           |
+------------------------------------+
|                                    |     func
|           EventLoop                |<------------ submit
|                                    |------------> Pending
+------------------------------------+   return

说明

  • 为控制器打补丁,方法dispatchsubmit的调用后将返回待决事件(Pending)对象。

  • Pending对象通过调用方法pending来等待事件的处理,并且以列表的形式返回事件的执行结果。

使用例子

事件控制器

from eventdriven import Controller, session
from time import time

def event_func():
    runtime = time() - session.start_time
    evt_time = time() - session.evt_time
    print(runtime)
    print(evt_time)

def submit_func(a, b):
    runtime = time() - session.start_time
    print(runtime)
    print(a, b)
    # session['self'] 是当前控制器对象。
    session['self'].shutdown()

# 定义事件1的处理函数是event_func(), 同时传递全局上下文。
con = Controller(mapping={
    1: event_func
}, context={'start_time': time()})
# 启动控制器
con.run()
# 事件驱动,事件1
con.dispatch(1, context={'evt_time': time()})
# 处理函数驱动
con.submit(submit_func, args=('a', 'b'))
# 关闭控制器
con.shutdown()
# 控制器阻塞等待
con.join()

使用事件映射蓝图定义事件处理映射

from eventdriven import Controller, MappingBlueprint
test_model = MappingBlueprint()

EVT_ONE = 'one'
EVT_TWO = 'two'
EVT_THREE = 'three'

@test_model.register(EVT_ONE)
def event_1():
    print('1')

@test_model.register(EVT_TWO)
def event_2():
    print('2')

@test_model.register(EVT_THREE)
def event_3():
    print('3')

con = Controller(mapping=test_model)

con.run()

con.dispatch(EVT_THREE)
con.dispatch(EVT_ONE)
con.dispatch(EVT_TWO)
con.shutdown()
con.join()

使用适配器(Adapter)

from eventdriven import Controller, session
from eventdriven.adapter.subprocess import Subprocess

from multiprocessing import current_process

from eventdriven import Controller, session
from eventdriven.adapter.subprocess import Subprocess
from multiprocessing import current_process


def subprocess_print_dispatch(*args):
    print('pid: %s, dishaptch args=%s' % (current_process().pid, args))


def subprocess_print_submit(*args):
    print('pid: %s, submit() args=%s' % (current_process().pid, args))


if __name__ == '__main__':
    print('main pid: %s' % current_process().pid)
    con = Controller()
    
    # 控制台安装子进程适配器,这回使得事件的处理将在控制器新创建的子进程下处理。
    # 子进程的事件处理映射通过Subprocess实例传递。
    con.Adapter(Subprocess(mapping={'1': subprocess_print_dispatch}, maxsize=10))
    
    # 开启控制器,同时会有适配器启动子进程的初始化工作。
    con.run()
    
    con.dispatch('1', args=(1, 2, 3, 4, 5))
    con.submit(subprocess_print_submit, args=(1, 2, 3, 4, 5))
    for i in range(100):
        con.submit(subprocess_print_submit, args=(i,))
    
    # 等待所有任务完全送到子进程,并且子进程下的工作线程取完所有的待处理任务。
    con.pending()
    # 等待子进程工作线程任务处理完后处于空闲状态。
    con.wait_for_idle()
    # 关闭子进程,同时也会关闭当前线程的控制器con。
    con.shutdown()
    # 等待控制器关闭结束。
    con.wait()

控制器的事件监听

from eventdriven import Controller, session

def event_track():
    print('event_track: %s=> val=%s' % (session['self'].name, session['val']))


def cpy_event():
    print('event_track(copy): %s=> val=%s' % (session['self'].name, session['val']))


con1 = Controller(mapping={'cpy': event_track})
con2 = Controller(mapping={'cpy': cpy_event})

con1.run()
con2.run()
# con2监听con1的事件'cpy'
con2.listen(con1, allow=('cpy',))

con1.dispatch('cpy', value='hhhhhhhh', context={'a_ctx': 'ok'})


# 结果:
# event_track: 2771409784584=> val=hhhhhhhh
# event_track(copy): 2771412370056=> val=ForwardingPacket(value='hhhhhhhh', context={'a_ctx': 'ok'})