Flowcept captures and queries workflow provenance at runtime with minimal code changes and low overhead. It unifies data from diverse tools and workflows across the Edge–Cloud–HPC continuum and provides ML-aware capture, MCP agents provenance, telemetry, extensible adapters, and flexible storage.
The easiest way to capture provenance from plain Python functions, with no external services needed:
- Install and initialize settings
# Make sure you activate your Python environment (e.g., conda, venv) first
pip install flowcept
flowcept --init-settings
This generates a minimal settings file in ~/.flowcept/settings.yaml.
- Run the minimal example
Save the following script as quickstart.py
and run python quickstart.py.
"""
A minimal example of Flowcept's instrumentation using @decorators.
This example needs no DB, broker, or external service.
"""
import json
from flowcept import Flowcept, flowcept_task
from flowcept.instrumentation.flowcept_decorator import flowcept
@flowcept_task(output_names="o1")
def sum_one(i1):
return i1 + 1
@flowcept_task(output_names="o2")
def mult_two(o1):
return o1 * 2
@flowcept
def main():
n = 3
o1 = sum_one(n)
o2 = mult_two(o1)
print("Final output", o2)
if __name__ == "__main__":
main()
prov_messages = Flowcept.read_messages_file()
assert len(prov_messages) == 2
print(json.dumps(prov_messages, indent=2))
This creates a provenance file in flowcept_messages.jsonl
. In it, you will see two provenance messages, each related to an executed function.
[
{
"activity_id": "sum_one",
"workflow_id": "fe546706-ef46-4482-8f70-3af664a7131b",
"campaign_id": "76088532-3bef-4343-831e-d8a5d9156174",
"used": {
"i1": 3
},
"started_at": 1757171258.637908,
"hostname": "my_laptop",
"task_id": "1757171258.637908",
"status": "FINISHED",
"ended_at": 1757171258.6379142,
"generated": {
"o1": 4
},
"type": "task"
},
{
"activity_id": "mult_two",
"workflow_id": "fe546706-ef46-4482-8f70-3af664a7131b",
"campaign_id": "76088532-3bef-4343-831e-d8a5d9156174",
"used": {
"o1": 4
},
"started_at": 1757171258.637933,
"hostname": "my_laptop",
"task_id": "1757171258.637933",
"status": "FINISHED",
"ended_at": 1757171258.6379352,
"generated": {
"o2": 8
},
"type": "task"
}
]
For online querying using databases, MCP agents and Grafana, telemetry, adapters (MLflow, Dask, TensorBoard), PyTorch and MCP instrumentation, HPC optimization or federated runs, and more, see the Jupyter Notebooks, the Examples directory and the complete documentation.
- Overview
- Features
- Installation
- Setup and the Settings File
- Running with Containers
- Examples
- Data Persistence
- Performance Tuning
- AMD GPU Setup
- Further Documentation
Flowcept captures and queries workflow provenance at runtime with minimal code changes and low data capture overhead, unifying data from diverse tools and workflows.
Designed for scenarios involving critical data from multiple, federated workflows in the Edge-Cloud-HPC continuum, Flowcept supports end-to-end monitoring, analysis, querying, and enhanced support for Machine Learning (ML) and for agentic workflows.
- Automatic workflow provenance capture with minimal intrusion
- Adapters for MLflow, Dask, TensorBoard; easy to add more
- Optional explicit instrumentation via decorators
- ML-aware capture, from workflow to epoch and layer granularity
- Agentic workflows: MCP agents-aware provenance capture
- Low overhead, suitable for HPC and highly distributed setups
- Telemetry capture for CPU, GPU, memory, linked to dataflow
- Pluggable MQ and storage backends (Redis, Kafka, MongoDB, LMDB)
- W3C PROV adherence
Explore Jupyter Notebooks and Examples for usage.
Flowcept can be installed in multiple ways, depending on your needs.
To install Flowcept with its basic dependencies from PyPI, run:
pip install flowcept
This installs the minimal Flowcept package, not including MongoDB, Redis, MCP, or any adapter-specific dependencies.
Flowcept integrates with several tools and services, but you should only install what you actually need.
Good practice is to cherry-pick the extras relevant to your workflow instead of installing them all.
pip install flowcept[mongo] # MongoDB support
pip install flowcept[mlflow] # MLflow adapter
pip install flowcept[dask] # Dask adapter
pip install flowcept[tensorboard] # TensorBoard adapter
pip install flowcept[kafka] # Kafka message queue
pip install flowcept[nvidia] # NVIDIA GPU runtime capture
pip install flowcept[telemetry] # CPU/GPU/memory telemetry capture
pip install flowcept[lmdb] # LMDB lightweight database
pip install flowcept[mqtt] # MQTT support
pip install flowcept[llm_agent] # MCP agent, LangChain, Streamlit integration: needed either for MCP capture or for the Flowcept Agent.
pip install flowcept[llm_google] # Google GenAI + Flowcept agent support
pip install flowcept[analytics] # Extra analytics (seaborn, plotly, scipy)
pip install flowcept[dev] # Developer dependencies (docs, tests, lint, etc.)
pip install flowcept[extras]
The extras
group is a convenience shortcut that bundles the most common runtime dependencies.
It is intended for users who want a fairly complete, but not maximal, Flowcept environment.
You might choose flowcept[extras]
if:
- You want Flowcept to run out-of-the-box with Redis, telemetry, and MongoDB.
- You prefer not to install each extra one by one
extras
.
Flowcept provides a combined all extra, but installing everything into a single environment is not recommended for users. Many of these dependencies are unrelated and should not be mixed in the same runtime. This option is only intended for Flowcept developers who need to test across all adapters and integrations.
pip install flowcept[all]
To install Flowcept from the source repository:
git clone https://github.com/ORNL/flowcept.git
cd flowcept
pip install .
You can then install specific dependencies similarly as above:
pip install .[optional_dependency_name]
This follows the same pattern as step 2, allowing for a customized installation from source.
The Quickstart example works with just pip install flowcept
, no extra setup is required.
For online queries or distributed capture, Flowcept relies on two optional components:
- Message Queue (MQ) — message broker / pub-sub / data stream
- Database (DB) — persistent storage for historical queries
- Required for anything beyond Quickstart
- Flowcept publishes provenance data to the MQ during workflow runs
- Developers can subscribe with custom consumers (see this example.
- You can monitor or print messages in motion using
flowcept --stream-messages --print
.
Supported MQs:
- Redis → default, lightweight, works on Linux, macOS, Windows, and HPC (tested on Frontier and Summit)
- Kafka → for distributed environments or if Kafka is already in your stack
- Mofka → optimized for HPC runs
-
Optional, but required for:
- Persisting provenance beyond MQ memory/disk buffers
- Running complex analytical queries on historical data
Supported DBs:
- MongoDB → default, efficient bulk writes + rich query support
- LMDB → lightweight, no external service, basic query capabilities
- Without a DB:
- Provenance remains in the MQ only (persistence not guaranteed)
- Complex historical queries are unavailable
- Flowcept’s architecture is modular: other MQs and DBs (graph, relational, etc.) can be added in the future
- Deployment examples for MQ and DB are provided in the deployment directory
Flowcept uses external services for message queues (MQ) and databases (DB). You can start them with Docker Compose, plain containers, or directly on your host.
We provide a Makefile with shortcuts:
-
Redis only (no DB):
make services
(LMDB can be used in this setup as a lightweight DB) -
Redis + MongoDB:
make services-mongo
-
Kafka + MongoDB:
make services-kafka
-
Mofka only (no DB):
make services-mofka
To customize, edit the YAML files in deployment and run docker compose -f deployment/<compose-file>.yml up -d
See the deployment/ compose files for expected images and configurations. You can adapt them to your environment and use standard docker pull / run / exec
commands.
-
Install binaries for the service you need:
-
macOS users can install with Homebrew.
Example for Redis:brew install redis brew services start redis
-
On Linux, use your distro package manager (e.g.
apt
,dnf
,yum
) -
If non-root (typically the case if you want to deploy these services locally in an HPC system), search for the installed binaries for your OS/hardware architecture, download them in a directory that you have r+w permission, and run them.
-
On Windows, utilize WSL to use a Linux distro.
-
-
Start services normally (
redis-server
,mongod
,kafka-server-start.sh
, etc.).
Flowcept uses a settings file for configuration.
-
To create a minimal settings file (recommended), run:
flowcept --init-settings
→ creates~/.flowcept/settings.yaml
-
To create a full settings file with all options, run:
flowcept --init-settings --full
→ creates~/.flowcept/settings.yaml
- Message queue and database routes, ports, and paths
- MCP agent ports and LLM API keys
- Buffer sizes and flush settings
- Telemetry capture settings
- Instrumentation and PyTorch details
- Log levels
- Data observability adapters
- And more (see example file)
Flowcept looks for its settings in the following order:
-
~/.flowcept/settings.yaml
— created by runningflowcept --init-settings
- Environment variable
FLOWCEPT_SETTINGS_PATH
— if set, Flowcept will use this environment variable - Default sample file — used if neither of the above is found
See the Jupyter Notebooks and Examples directory for utilization examples.
Category | Supported Options |
---|---|
Data Observability Adapters | MLflow, Dask, TensorBoard |
Instrumentation and Decorators | - @flowcept: encapsulate a function (e.g., a main function) as a workflow. - @flowcept_task: encapsulate a function as a task. - @telemetry_flowcept_task : same as @flowcept_task , but optimized for telemetry capture. - @lightweight_flowcept_task : same as @flowcept_task , but very lightweight, optimized for HPC workloads - Loop - PyTorch Model - MCP Agent |
Context Manager |
with Flowcept(): # Workflow code Similar to the @flowcept decorator, but more flexible for instrumenting code blocks that aren’t encapsulated in a single function and for workflows with scattered code across multiple files. |
Custom Task Creation |
FlowceptTask(activity_id=<id>, used=<inputs>, generated=<outputs>, ...) Use for fully customizable task instrumentation. Publishes directly to the MQ either via context management ( with FlowceptTask(...) ) or by calling send() . It needs to have a Flowcept().start() first (or within a with Flowcept() context). See example. |
Message Queues (MQ) | - Disabled (offline mode: provenance events stay in an in-memory buffer, not accessible to external processes) - Redis → default, lightweight, easy to run anywhere - Kafka → for distributed, production setups - Mofka → optimized for HPC runs Setup example: docker compose |
Databases | - Disabled → Flowcept runs in ephemeral mode (data only in MQ, no persistence) - MongoDB → default, rich queries and efficient bulk writes - LMDB → lightweight, file-based, no external service, basic query support |
Querying and Monitoring | - Grafana → dashboarding via MongoDB connector - MCP Flowcept Agent → LLM-based querying of provenance data |
Custom Consumer | You can implement your own consumer to monitor or query the provenance stream in real time. Useful for custom analytics, monitoring, debugging, or to persist the data in a different data model (e.g., graph) . See example. |
In the settings.yaml file, many variables may impact interception efficiency. Please be mindful of the following parameters:
-
mq
-
buffer_size
andinsertion_buffer_time_secs
. --buffer_size: 1
is really bad for performance, but it will give the most up-to-date info possible to the MQ.
-
-
log
- set both stream and files to disable
-
telemetry_capture
The more things you enable, the more overhead you'll get. For GPU, you can turn on/off specific metrics. -
instrumentation
This will configure whether every single granular step in the model training process will be captured. Disable very granular model inspection and try to use more lightweight methods. There are commented instructions in the settings.yaml sample file.
Other thing to consider:
project:
replace_non_json_serializable: false # Here it will assume that all captured data are JSON serializable
db_flush_mode: offline # This disables the feature of runtime analysis in the database.
mq:
chunk_size: -1 # This disables chunking the messages to be sent to the MQ. Use this only if the main memory of the compute notes is large enough.
Other variables depending on the adapter may impact too. For instance, in Dask, timestamp creation by workers add interception overhead. As we evolve the software, other variables that impact overhead appear and we might not stated them in this README file yet. If you are doing extensive performance evaluation experiments using this software, please reach out to us (e.g., create an issue in the repository) for hints on how to reduce the overhead of our software.
This section is only important if you want to enable GPU runtime data capture and the GPU is from AMD. NVIDIA GPUs don't need this step.
For AMD GPUs, we rely on the official AMD ROCM library to capture GPU data.
Unfortunately, this library is not available as a pypi/conda package, so you must manually install it. See instructions in the link: https://rocm.docs.amd.com/projects/amdsmi/en/latest/
Here is a summary:
- Install the AMD drivers on the machine (check if they are available already under
/opt/rocm-*
). - Suppose it is /opt/rocm-6.2.0. Then, make sure it has a share/amd_smi subdirectory and pyproject.toml or setup.py in it.
- Copy the amd_smi to your home directory:
cp -r /opt/rocm-6.2.0/share/amd_smi ~
- cd ~/amd_smi
- In your python environment, do a pip install .
Current code is compatible with this version: amdsmi==24.7.1+0012a68 Which was installed using Frontier's /opt/rocm-6.3.1/share/amd_smi
Some unit tests utilize torch==2.2.2
, torchtext=0.17.2
, and torchvision==0.17.2
. They are only really needed to run some tests and will be installed if you run pip install flowcept[ml_dev]
or pip install flowcept[all]
. If you want to use Flowcept with Torch, please adapt torch dependencies according to your project's dependencies.
Full documentation is available on Read the Docs.
If you used Flowcept in your research, consider citing our paper.
Towards Lightweight Data Integration using Multi-workflow Provenance and Data Observability
R. Souza, T. Skluzacek, S. Wilkinson, M. Ziatdinov, and R. da Silva
19th IEEE International Conference on e-Science, 2023.
Bibtex:
@inproceedings{souza2023towards,
author = {Souza, Renan and Skluzacek, Tyler J and Wilkinson, Sean R and Ziatdinov, Maxim and da Silva, Rafael Ferreira},
booktitle = {IEEE International Conference on e-Science},
doi = {10.1109/e-Science58273.2023.10254822},
link = {https://doi.org/10.1109/e-Science58273.2023.10254822},
pdf = {https://arxiv.org/pdf/2308.09004.pdf},
title = {Towards Lightweight Data Integration using Multi-workflow Provenance and Data Observability},
year = {2023}
}
Refer to Contributing for adding new adapters or contributing with the codebase.
Please note that this a research software. We encourage you to give it a try and use it with your own stack. We are continuously working on improving documentation and adding more examples and notebooks, but we are continuously improving documentation and examples. If you are interested in working with Flowcept in your own scientific project, we can give you a jump start if you reach out to us. Feel free to create an issue, create a new discussion thread or drop us an email (we trust you'll find a way to reach out to us 😉).
This research uses resources of the Oak Ridge Leadership Computing Facility at the Oak Ridge National Laboratory, which is supported by the Office of Science of the U.S. Department of Energy under Contract No. DE-AC05-00OR22725.