data-flow

A system transform data consistently and easily


License
MIT
Install
pip install data-flow==0.3

Documentation

DataFlow

DataFlow is a framework to process your data easily. DataFlow lets you to create a flow to transform your data.

How to use

First, you need to create a flow to be able to manage your transformation steps. And then, apply every transform function you need.

from dataflow import Flow, Enum, Promise

Flow.from_enumerable([1, 2, 3, 4])\
>> Enum.map(lambda x: x * 2)\
>> Promise.as_list

# 2, 4, 6, 8

Async Support

From v0.3 DataFlow has async/await support for Python >=3.5. The inclusion of Python3.5 forced me to drop the support of lazyness in AsyncEnum. From Python3.6 this support will become to life again.

from asyncio import get_event_loop

loop = get_event_loop()

async def by_two(n):
    return n * 2

Flow.from_enumerable([1, 2, 3, 4], loop)\
>> Enum.map(lambda x: x * 2)\
>> AsyncEnum.map(by_two)\
>> Enum.map(lambda x: x * 2)\
>> AsyncEnum.map(by_two)\
>> Promise.as_list

# [16, 32, 48, 64, 80]

Enum Module

In Enum module you can find functions wich works with enumerates.

Enum.map

This function takes another function and apply that function to every element on your flow.

Flow.from_enumerate([1, 2, 3])\
>> Enum.map(lambda x: x * 2)\
>> Promise.as_list

# [2, 4, 6]

Enum.flat_map

This function takes another function and apply that function, but returns a flattened list.

Flow.from_enumerate([1, 2, 3])\
>> Enum.flat_map(lambda x: [x, x*2])\
>> Promise.as_list

# [1, 2, 3, 6, 4, 8]

Enum.filter

This function takes another function to filter the elements of your flow. This functions only allos flow your elements that evaluate True for that function.

Flow.from_enumerable([1, 2, 3])\
>> Enum.filter(lambda x: x >= 2)\
>> Promise.as_list

# [2, 3]

Enum.grouper

This function takes an integer and gather your data into sets of that length. If a set can be completed, then a fillvalue will be used. Defaults to None.

Flow.from_enumerable([1, 2, 3, 4, 5])\
>> Enum.grouper(2)\
>> Promise.as_list

# [[1, 2], [3, 4], [5, None]]

Enum.reduce

This functions takes another funtion that remember the last accumulated value and returns the new accumulated value. You can specify the initial accumulated value. Empty list by default

Flow.from_enumerable([1, 2, 3])\
>> Enum.reduce(lambda acc, data: acc + data, 0)\
>> Promise.as_int

# 6

Enum.dropwhile

This function takes another function that drops elements until the condition returns False.

Flow.from_enumerable([1, 2, 3, 4])\
>> Enum.dropwhile(lambda x: x < 3)\
>> Promise.as_list

# [3, 4]

Enum.takewhile

This functions is the inverted version of Enum.dropwhile. That functions take elements of the iterable until the condition is False.

Flow.from_enumerable([1, 2, 3, 4])\
>> Enum.takewhile(lambda x: x < 3)\
>> Promise.as_list

# [1, 2]

Enum.sort

This function sort any collection of items. You can set the key to sort by and if you what it reversed or not.

from operator import itemgetter

Flow.from_enumerable([('A', 1), ('B', 0), ('C', 3), ('D', 2)])\
>> Enum.sort(key=itemgetter(1), reverse=True)\
>> Promise.as_list

# [('C', 3), ('D', 2), ('A', 1), ('B', 0)]

AsyncEnum

AsyncEnum module gives you support to transform data with asynchronous functions. The implemented methods are:

  • AsyncEnum.map
  • AsyncEnum.flat_map
  • AsyncEnum.filter
  • AsyncEnum.reduce
  • AsyncEnum.dropwhile
  • AsyncEnum.takewhile

String module

In the String module you can find functions to manipulate strings.

String.split

It allows you to split your string flow into a smaller pieces. By default it splits by whitespace.

Flow.from_enumerable("One Two Three Four Five Six Seven")\
>> String.split(" ")\
>> Enum.filter(lambda x: len(x) >= 4)\
>> Promise.as_list

# ["Three", "Four", "Seven"]

String.join

It allows to join a list of strings into and unique string. This functions allows you to choose the string for concate. Default as empty string.

Flow.from_enumerable("Hello world!")\
>> String.split(" ")\
>> String.join("-")\
>> Promise.resolve

# Hello-world!

Promise module

This module is in charge to return a valid data structure from a flow.

Promise.as_list

Returns the data from the flow as a list.

Promise.as_int

Returns the data from the flow as integer.

Promise.as_float

Returns the data from the flow as float.

Promise.resolve

Returns the data from the flow without any casting.

Promise.for_each

This functions take a function as argument and apply that function to every element of your flow. This method does not return anything.

How can I create my own modules?

Easy, you just need to create a python class with all methods statics. All the methods must returns a function that get the data flow as argument and return a new Flow element.

class MyModule:
    @staticmethod
    def per_two(data):
        new_data = [element * 2 for element in data]
	return Flow(new_data)

    @staticmethod
    def multiply(n):
        def _multiplier(data):
	    new_data = [element * n for element in data]
	    return Flow(new_data)
	return _multiplier


Flow.from_enumerable([1, 2, 3])\
>> MyModule.per_two\
>> MyModule.multiply(2)\
>> Promise.as_list

# [4, 8, 12]

With async/await support:

class MyAsyncModule:
    @staticmethod
    def multiply(n):
        async def _multiply(data):
	    data = [await awesome_multiplier(datum, n) for datum in data]
	    return Flow(data)
	return _multiply