oocas

Out-of-core data processing framework for pandas


License
BSD-3-Clause
Install
pip install oocas==1.0.0

Documentation

oocas

PyPI version fury.io PyPI license PyPI download month

oocas is an out-of-core data processing framework built for pandas. Its main purpose is local ETL processing of large data by sequentially processing smaller files.

Installation

pip install oocas
conda install -c conda-forge oocas 

Usage

The data, which are specified by a list of file paths, are read, transformed, and written/returned. In a transform substep data can be cached to be available in later transform substeps. A processing pipeline is build out of callable components.

  • Process
  • Read, FileRead, ParquetRead, ParquetMetadataRead, MultiRead
  • Transform, CacheTransform
  • Cache, IndexCache, TimeIndexCache
  • Write, FileWrite, ParquetWrite, MultiWrite

High level components (bold) support plugin callables in addition to a variety of other useful arguments. Low level components inherit their behavior by passing a predefined method this way.

Examples

Examples can be found in the docs.

Read files to dataframe

import oocas as oc

paths = oc.find_paths('data/raw')
sqp = oc.Process(oc.ParquetRead(), progress_bar=False)
df = pd.concat(sqp(paths), axis=0)

Random sampling

import oocas as oc

sqp = oc.Process(
    oc.ParquetMetaDataRead(),
    lambda x: x.num_rows,
    progress_bar=False)

nrows = sum(sqp(paths))                     
sample_idxs = np.random.choice(nrows, 10**3)

def sample(df, cache):
    adjusted_idxs = sample_idxs - cache.data
    cache(cache.data + len(df))
    return df.iloc[[                                      
        idx for idx in adjusted_idxs\
             if idx >= 0 and idx < len(df)
    ]].copy()

sqp = oc.Process(
    oc.ParquetRead(),
    oc.CacheTransform(sample, cache=0))

sample_df = pd.concat(sqp(paths), axis=0)

Time series moving average

import oocas as oc

def moving_average(df, cache):
    nrows = len(df)
    df = pd.concat([cache.data, df])
    cache(df)
    return df.rolling(cache.lookback)\
        .mean().iloc[-nrows:]
    
sqp = oc.Process(
    oc.ParquetRead(),
    oc.CacheTransform(
        moving_average,
        oc.TimeIndexCache('1H')),
    oc.ParquetWrite(
        path_part_replace=('raw', 'mavg'),
        mkdirs=True))

mavg_paths = sqp(paths)

N-to-1 file write

import oocas as oc

pickle_multi_to_one_write = oc.FileWrite(
    lambda x, path, **kwargs:\
        x.to_pickle(path, **kwargs),
    path_transform=lambda x: x[0],
    path_part_replace=('raw', 'pickle'),
    name_transform=lambda x:\
        '_'.join(x.split('_')[1:]),
    suffix='.pkl.gz',
    mkdirs=True,
    overwrite=True,
    compression='infer')