a small and simple language within the project sblog.
Install
pip install sdataflow
Concepts
sdataflow
provides:
- A small and simple language to define the relation of entities. An
entity
is a logic unit defined by user(i.e. a data processing function), it generates some kind ofoutcome
as a respond to some kind of inputoutcome
(which might be generated by other Entity). Relations of entities forms adataflow
. - An command line program
sdataflow
generates html file for debugging. - A scheduler automatically runs entities and ships outcome to its destination.
Language
Tutorial
Let's start with a simplest case(one-to-one relation):
A --> B
where entity B
accepts outcome of A
as its input.
To define a one-to-more or more-to-one relation:
# one-to-more
A --> B
A --> C
A --> D
# more-to-one
B --> A
C --> A
D --> A
where in the one-to-more case, copies of outcome of A
could be passed to B
, C
and D
. In the more-to-one case, outcomes of B
, C
and D
would be passed to A
.
And here's the form of outcome dispatching, that is, a mechanism of sending different kinds of outcome of an entity to different destinations. For instance, entity A
generates two kinds of outcome, say [type1]
and [type2]
, and pass outcomes of [type1]
to B
, outcomes of [type2]
to C
:
# one way.
A --> [type1]
A --> [type2]
[type1] --> B
[type2] --> C
# another way.
A --[type1]--> B
A --[type2]--> C
where identifier embraced in brackets(i.e. [type1]
) represents the name of outcome. In contrast to the form of outcome dispatching, A --> B
would simple pass outcome of A
, with default name A
(the name of entity generates the outcome), to entity B
. Essentially, above form(statement contains brackets) overrides the name of outcome, and acts like a filter for outcome dispatching.
Outcome could be used to define one-to-more, more-to-one relations as well, in the same way discussed above:
# one-to-more example.
A --> [type1]
A --> [type2]
[type1] --> B
[type1] --> C
[type2] --> D
[type2] --> E
# more-to-one example.
A --> [type1]
B --> [type1]
[type1] --> C
After loading all user defined dataflow, there are basically two steps of analysis will be applied:
- Build a DAG of dataflow. Break if error happens(i.e. syntax error, cyclic path).
- Apply topology sort to DAG to get the linear ordering of entity invocation.
Lexical Rules
ARROW : re.escape('-->')
DOUBLE_HYPHENS : re.escape('--')
BRACKET_LEFT : re.escape('[')
BRACKET_RIGHT : re.escape(']')
ID : r'\w+'
The effect of above rules would be equivalent as if passing such rules to Python's re
module with the flag UNICODE
being set.
CFGs
start : stats
stats : stats single_stat
| empty
single_stat : entity_to_entity
| entity_to_outcome
| outcome_to_entity
entity_to_entity : ID general_arrow ID
general_arrow : ARROW
| DOUBLE_HYPHENS outcome ARROW
outcome : BRACKET_LEFT ID BRACKET_RIGHT
entity_to_outcome : ID ARROW outcome
outcome_to_entity : outcome ARROW ID
Command-line program
After install sdataflow
through pip
, user can invoke a command-line program sdataflow
. Synopsis of sdataflow
is simple:
Usage:
sdataflow [-m <mode>] <file>
Options:
-m <mode> <mode> could be 1, 2 or 3. `-m1` is default mode; `-m2` prints
outcome as text of link; `-m3` ignore outcomes.
User could pass the file path of datafow definition to sdataflow
, then the program will parse the file, analyse the dataflow and finally generate a html file. Ues a browser to open such html file(based on project mermaid), and then, you get a graphic representation of your dataflow!
An example is given for illustration:
$ cat example.sd
A --[odd]--> B
A --[even]--> C
B --> D
C --> D
$ sdataflow example.sd
$ ls
example.html example.sd
Ues a browser to open example.html
:
Besides, sdataflow
currently provides three different modes for graph generating:
$ sdataflow -m1 example.sd
$ sdataflow -m2 example.sd
$ sdataflow -m3 example.sd
-m1
mode is equivalent to omitting the -m
option; -m2
would present outcome as the text of link instead of a node; -m3
would simply ignore outcome.
-m2
mode of above example:
-m3
mode of above example:
API
Form of Callback
As mentioned above, an entity stands for a user defined logic unit. Hence, after defining the relations of entities in the language discussed above, user should defines a set of callbacks, corresponding to each entity in the definition.
A callback is a callable(function, generator, bound method) that returns None
(i.e. a function with no return
statement), or an iterable object of which the element is a (key, value) tuple, with key as the name of outcome and value as user defined object. Argument list of such callable could be:
- An empty list, meaning that such callback accept no data.
- An one-element list.
Code fragment for illustration:
# normal function returns `None`, with empty argument list.
def func1():
pass
# normal function return `None`, with one-element argument list.
def func2(items):
for name_of_outcome, obj in items:
# do something.
# normal function return elements, with one-element argument list.
def func3(items):
# ignore `items`
data = [('some outcome name', i) for i in range(10)]
return data
# generator yield element, with one-element argument list.
def gen1(items):
# ignore `items`
for i in range(10):
yield 'some outcome name', i
class ExampleClass(object):
@classmethod
def method1(cls):
pass
@classmethod
def method2(cls, items):
pass
def method3(self):
pass
def method4(self, items):
pass
# class bound method, with empty argument list.
ExampleClass.method1
# class bound method, with one-element argument list.
ExampleClass.method2
example_instance = ExampleClass()
# class bound method, with empty argument list.
example_instance.method3
# class bound method, with one-element argument list.
example_instance.method4
Note that the name of outcome is the string embraced in brackets(not including the brackets).
All In One Interface
sdataflow
provides a class sdataflow.DataflowHandler
to parse doc
(a string represents the relations of entities), register callbacks and schedule the execution of callbacks.
class DataflowHandler
__init__(self, doc, name_callback_mapping=None)
`doc`: unicode or utf-8 encoded binary data.
`name_callback_mapping`: a dict of (`name`, `callback`) pairs. `name`
could be unicode or utf-8 encoded binary data. `callback` is a function
or generator. `name_callback_mapping` could be `None`, since callback
can be registered by function decorator(see next section).
run(self)
Automatically execute all registered callbacks.
Example:
from sdataflow import DataflowHandler
from sdataflow.callback import create_data_wrapper
doc = ('A --[odd]--> B '
'A --[even]--> C '
'B --> D '
'C --> D ')
def a():
odd = create_data_wrapper('odd')
even = create_data_wrapper('even')
for i in range(1, 10):
if i % 2 == 0:
yield even(i)
else:
yield odd(i)
def b(items):
default = create_data_wrapper('B')
# remove 1.
for outcome_name, number in items:
if number == 1:
continue
yield default(number)
def c(items):
default = create_data_wrapper('C')
# remove 2.
for outcome_name, number in items:
if number == 2:
continue
yield default(number)
def d(items):
numbers = {i for _, i in items}
assert set(range(3, 10)) == numbers
name_callback_mapping = {
'A': a,
'B': b,
'C': c,
'D': d,
}
# parse `doc`, register `a`, `b`, `c`, `d`.
handler = DataflowHandler(doc, name_callback_mapping)
# execute callbacks.
handler.run()
In above example, A
generates numbers in the range of 1 to 9, of which the odd numbers(1, 3, 5, 7, 9) are sent to B
, the even numbers(2, 4, 6, 8) are sent to C
. Then B
removes number 1 and sends the rest(3, 5, 7, 9) to D
, while C
removes number 2 and sends the rest(4, 6, 8) to D
. Finally, D
receives outcomes of both C
and D
, and make sure that is equal to set(range(3, 10))
.
Use Decorator To Register Normal Function
sdataflow.callback.register_callback
is a function decorator with signature:
register_callback(entity_name, *outcome_names)
where entity_name
could be an unicode or utf-8 encoded binary string, indicating the entity to which the function should be registered. If outcome_names
is given, the decorator would inject several sdataflow.callback.create_data_wrapper
generated data wrapper to the function being decorated.
Example:
@register_callback('A')
def zero_arg():
return 0
@register_callback('C')
def one_arg(items):
return 1
DataflowHandler(doc)
where zero_arg
is registered to entity A
, one_arg
is registered to entity B
. Note that as mentioned above, second parameter of DataflowHandler
can be ignored.
When names of decorator registered callback conflict with names of name_callback_mapping
, the second parameter of DataflowHandler
, callbacks in name_callback_mapping
will be accepted, and callbacks registered by function decorator will be discarded. For example:
@register_callback('A')
def zero_arg():
return 0
@register_callback('C')
def should_not_be_registered(items):
return 1
def one_arg(items):
return 42
DataflowHandler(doc, {'C': one_arg})
where one_arg
will be registered instead of should_not_be_registered
.
Example of function injection:
@register_callback('A', 'type1', 'type2')
def func():
return func.type1(1), func.type2(2)
assert (
('type1', 1),
('type2', 2),
) == func()
Be careful to apply register_callback
to things other than function
, let's say, you want to register a class method:
class Example(object):
# wrong, `classmethod` is not bound.
@register_callback('A')
@classmethod
def func(cls):
pass
# try following code instead.
register_callback('A')(Example.func)
sdataflow
Language
Pure Interface of sdataflow.lang.parse
can be used to parse the definition of dataflow:
parse(doc)
input: `doc` with type of six.binary_type or six.text_type.
output: linear ordering and root nodes of dataflow.
parse
returns a 2-tuple, with the first element is a list of linear ordering of dataflow, and the second element is a list of root nodes of the forest.