megad2

Simple API for ab-log.ru mega-d


License
BSD-3-Clause
Install
pip install megad2==0.3.4

Documentation

Simple integration for mega-d

Интеграция с устрйоством MegaD-2561 от ab-log.ru Полностью асинхронная (aiohttp) Может работать как микро-сервер (метод start_listen)

Реакция на команды от меги реализована через словарь, в котором ключ - это значение передаваемой команды от меги (InputQuery), а значения - любая функция или асинхронная функция, например:

from logging import basicConfig, DEBUG
import aiohttp

basicConfig(level=DEBUG)

from megad import Mega, InputQuery
import asyncio

async def test_click(t):
   print('test temp', t)

mega = Mega(
   '192.168.0.14/sec/'
   , {
       InputQuery(pt=1, m=1): test_click
   }
)


async def main():
   await mega.start_listen(9191)
   async with aiohttp.request('get', 'http://localhost:9191/?pt=1&m=1') as req:
       await req.text()
   while True:
       await asyncio.sleep(10000)

asyncio.get_event_loop().run_until_complete(main())

Управление реле осуществляется с помощью класса Relay:

from logging import basicConfig, DEBUG

basicConfig(level=DEBUG)

from megad import Mega, Relay
import asyncio


mega = Mega('192.168.0.14/sec/')
relay = Relay(mega=mega, port=11)

async def main():
    await relay.turn_off()
    await relay.turn_on()
    await relay.turn_on_and_off(10)

asyncio.get_event_loop().run_until_complete(main())

В модуле так же реализован класс Servo для управления серво-приводами на двух реле.

from logging import basicConfig, DEBUG

basicConfig(level=DEBUG)

from megad import Mega, Servo, Relay
import asyncio

def print_calibrated():
    print('calibrated')

def print_new_value(v):
    print(f'new value {v}')

mega = Mega('192.168.0.14/sec/')
servo = Servo(dir_rel=Relay(mega, 15)
              , move_rel=Relay(mega, 15)
              , close_time=12
              , calibrated_cb=print_calibrated
              , value_set_cb=print_new_value)

async def main():
    await servo.calibrate()
    await servo.set_value(.5)

asyncio.get_event_loop().run_until_complete(main())

Так же реализован класс OneWire для обновления температуры из шины OneWire При обновлении температуры вызывается колбэк из словаря колбэков, в котором ключ - адрес onewire, значение - колбэк

from logging import basicConfig, DEBUG

basicConfig(level=DEBUG)

from megad import Mega, OneWireBus
import asyncio

async def print_temp(t):
    print('test click', t)

mega = Mega('192.168.0.14/sec/')
ow = OneWireBus(mega, port=14, cb_map={
    'ffd9da801704': print_temp
})

async def main():
    await ow.update()

asyncio.get_event_loop().run_until_complete(main())

Чтобы обновлять температуру периодически можно воспользоваться стандартными методами, например так:

# регулярное обновление

async def reg_update(interval):
    await asyncio.sleep(interval)
    await ow.update()
    asyncio.ensure_future(reg_update(interval))