Inlfuxed influx query language and orm


Keywords
Influx, InfluxDb, Query, Language, QL, ORM
Licenses
GPL-3.0/GPL-3.0+
Install
pip install influxed==0.1.5.2

Documentation

Source code and development can be followed in this repository

Feel free to open issues or share your experience :)

Installation

Navigate to folder in terminal

Run

python setup.py install

Or

pip install influxed

and your good to go.

Connect to server:

from influxed.orm import engine
engine.add_server('http://influxserverurlorip:port', 'username', 'password', reflect=True)

Or secure:

from influxed.orm import engine
engine.add_server('https://influxserverurlorip:port', 'username', 'password', reflect=True)

Or with asyncio:

from influxed.orm import engine
await engine.add_server('https://influxserverurlorip:port', 'username', 'password', reflect=True, isAsync=True)

Example explorer

Show databases

engine.server.ls()

Show measurements

engine.server.[database_name].ls()

Show fields

engine.server.[database_name].[measurement_name].ls()

In the case that a name of a measurement or a database is not complient with python,

then they can be accessed by using indexing

Example

engine.server['1Db']['measurement one'].ls()

Select data from a measurement:

engine.server.database.meter_usage.query.select('field1', 'field2').all()

Another way is to select it from the measurement itself

engine.server.database.meter_usage.field1.query.all()

Limit selection:

engine.server.database.meter_usage.field1.query.first(5) OR
engine.server.database.meter_usage.field1.query.last(7) OR
engine.server.database.meter_usage.field1.query.limit(2).all()

Filtering:

col = engine.server.database.meter_usage.field1
col.query.filter( # Everything between 5 and 6
    col > 5,
    col <= 6
).all()

Filtering by time

import datetime as dt
from influxed.ifql import time

engine.server.database.meter_usage.field1.query.filter(
    time > dt.datetime.now()
).all()

Filtering using OR statement

from influxed.ifql import OPERATORS

col = engine.server.database.meter_usage.field1
col.query.filter(
    OPERATORS.or_(
        time > dt.datetime.now(), # or
        col > 2,
    )

).all()

Aggregators:

col = engine.server.database.meter_usage.field1
col.query.min.all() # Minimum value
col.query.max.all() # Maximum value
col.query.mean.all() # Mean value
col.query.sum.all() # Summed Value
col.query.std.all() # Standard deviation

Group by function

from influxed.ifql import time

col = engine.server.database.meter_usage.field1

col.query.min.group_by(
    time('2d') # Group into buckets of 2 days and take the minimum value
).all()

# Available interval selectors:
# week = 'w'
# day = 'd'
# hour = 'h'
# minute = 'm'
# sec = 's'
# milisec = 'ms'
# microsec ='u'
# nanosec = 'ns'

Lets spice things up:


engine.server.dap.meterusage.query.select(
    '871694831000088656',
    '871690910000005079'
).filter(
    time > dt.datetime(2016,1,1),
    time > dt.datetime(2016,2,1),
).group_by(
    time('1d')
).sum().fill(0).all()

# Will give you:
# 'SELECT SUM("871694831000088656"), SUM("871690910000005079") FROM meterusage WHERE time > \'2016-01-01 00:00:00.000\' AND time > \'2016-02-01 00:00:00.000\'  GROUP BY time(1d) FILL(1)'

With algebraic aritmatic:

a = engine.server.database.meter_usage.field1
b = engine.server.database.meter_usage.field2
engine.query.select((a.sum()+2)/b)*42).filter( # Everything between 5 and 6
    a > 5,
    b <= 6
).all()
# Will give you:
# SELECT ((SUM(field1) + 2) / field2) * 42 WHERE field1 > 5 AND field2 < 6

Test

Run

    python -m unittest discover -p '*_test.py' -s src -t .

Debugging -------

One can always call .format() on any statement after the .query in order to see the raw sql-produced by a given statement Additional debug information can be obtained by attaching a logger:

import logging
logger = logging.getLogger()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel('DEBUG')

Available logging channels: - Transport layer = InfluxedClient - Transport layer Retry logic = InfluxedRetryStrat

Code coverage:

 coverage run --source=src/influxed -m xmlrunner -o test-reports discover -s ./src -p *_test.py
 coverage xml
 sonar-scanner

Lastly, if you find bugs or have feature requests feel free to open issues