Signal/Collect inspired compute graph infrastructure - fact graph module


License
Apache-2.0

Documentation

thi.ng/fabric

./assets/fabric-hot-1280x720-orange.jpg

Modular, Signal/Collect inspired compute graph infrastructure for Clojure & Clojurescript. Written in a Literate Programming format.

Contents

About the project

Leiningen coordinates

[thi.ng/fabric "0.0.388"]

Overview

General Compute Graph concepts

These libraries are built around the model of a directed, potentially cyclic graph structure, in which vertices hold state and edges are functions sending possibly transformed states to other vertices in the graph. All processing is done in two phases, both of which are score guided and completely customizable:

  • Signaling: The operations carried out by edge functions, each accepting their source vertex’ state value, transforming it and placing it in a signal map/queue of their target vertex. Vertices only signal if the user defined (or default) scoring function exceeds a threshold (user defined as well).
  • Collecting: Each vertex can define its own collection function, responsible for processing any uncollected signal values and producing a new state value. As with signaling, vertices only collect if their collection score exceeds a threshold value. This allows users to implement different collection behaviors (e.g. semi-lazy or batched or only after certain amount of uncollected signals have been received etc.)

There’re many different ways to execute these two operational phases (e.g. ordered vs. unordered, interleaved, thresholded, eager, async etc.) and the core library provides 3 different execution contexts (also fully configurable) to tailor the execution model for different use cases. For some problem domains choosing one model over another can provide major (500-1000%) speedups (e.g. see Vertex coloring example below).

Common to all supplied execution models/contexts is the notion of graph convergence (a term borrowed from the original Signal/Collect papers, link below). Since both signal and collect operations are score guided, the executors can keep track of the remaining active work set. A graph is called converged if there’re no more vertices left which should emit signals or can collect. Due to the model explicitly allowing cycles (recursion) in the graph, users can also specify halt thresholds to forcefully terminate graph execution when a certain number of operations has been carried out, but then potentially leaving the graph in a non-converged state (which in some use cases is a valid result).

The Signal/Collect model provides both a flexible and elegant metaphor for many problem domains. To avoid a monolithic one-fits-all approach and provide somewhat of a thematic focus, this project is structured into different modules (all described & linked below).

Project features

  • Modular architecture (see Modules section below)
  • Easy-to-use API, each module exposes both high & low-level functionality
  • Protocol based implementation for extensibility
  • Literate programming format w/ extensive docs, diagrams & examples
  • Over 150 tests

fabric-core

  • Customizable vertex, edge, graph & execution context types
  • 3 types of graph execution contexts built-in
  • Sync, async, parallel & continuous processing (and mixtures of)
  • Partial or stepwise execution
  • Activity scoring system (customizable)
  • Automatic graph convergence detection (no more outstanding actions)
  • Computation of minimum active work set
  • Built-in graphs & vertices are mutable (via atoms)
  • Edges are functions and all signal/collect actions are purely functional too
  • Support for graph modification logging (persistence, event sourcing etc.)
  • Self-modifiable graphs during execution (add/remove/modify vertices/edges)

fabric-facts

  • Semantic knowledge graph (triple or quad based)
  • Fully featured, efficient query engine using query trees
    • Graph pattern matching w/ variables
    • Sub-queries (joins, optional joins, unions, negation)
    • Filters / pre-filters
    • Grouping (based on vars or expressions)
    • Computed vars result injection
    • Aggregation
    • Bounded length (variable) path queries (with min/max limits)
    • Pervasive re-use of intermediate results
  • Optional map based query & expression DSL (EDN serializable)
  • Query & rule based fact inferencing (additions & retractions)
  • Fact conversion/transformation/indexing
  • Conversion of nested Clojure map to triples
  • N-Triples parser w/ extensible object literal conversions

fabric-ld

  • Linked Data server components & ready-to-go default setup (using Stuart Sierra’s component lib)
  • HTTP API to manipulate & query fact graphs
  • EDN/N-Triples facts import via URIs (w/ 303 redirect handling)
  • Async request handlers & graph processing (default config uses Aleph server)
  • Registered queries/rules w/ auto-updating results
  • OWL-RL inference rules (sub-set)
  • Multiple response formats (EDN, JSON, JSON-LD, SPARQL JSON, CSV)

See the README of modules below for full feature sets…

Modules

  • fabric-core - Core compute graph functionality
  • fabric-facts - Semantic knowledge graph, query DSL, cached query trees, inferencing, I/O, parsing
  • fabric-ld - Component based Linked Data HTTP server & query endpoint setup (WIP)
  • fabric-redis - Redis backend (WIP, not yet included)

Status

ALPHA quality, in active development.

Example usage

Currently, the included test cases and examples for each module act as the best demonstration of usage patterns and also show most of the currently implemented functionality:

General compute graph examples

Single-source shortest path

Calculate minimum distance from a single node to all others (connected) in the graph. Also see benchmarks.

Only these two custom signal & collect functions are needed (in addition to setting up the graph):

(defn signal-sssp
  [vertex dist] (if-let [v @vertex] (+ dist v)))

(def collect-sssp
  (f/collect-pure
   (fn [val uncollected]
     (if val (reduce min val uncollected) (reduce min uncollected)))))

./assets/sssp.jpg

(Click image to view bigger version to see edge weights/distances)

source

Vertex coloring in random graphs

Assign each vertex a color from a limited pallette such that no two neighbors have the same color. This is one of the examples used for the above benchmarks. Please note the drastic performance difference between different execution models (synchronous vs. eager). If the number of possible colors is reduced further, the synchronous approach will not converge at all.

./assets/vcolor.jpg

source

Transitive closure (type inheritance example)

Given a graph encoding a tree structure (e.g. type hierarchy), compute all super types for each node. This example uses a tiny fragment of top-level classes of phylogenetic Tree Of Life.

./assets/transclosure.jpg

Apart from adding the data to the graph, the only user code required to collect the closure(s) is this custom collection function used for each vertex:

(def collect-transitive-closure
  (f/collect-pure
   (fn [val uncollected]
     (reduce into val uncollected))))

source

fabric-facts query visualization

The fabric-facts module provides a map based DSL to specify fact queries based on graph pattern matching (similar to SPARQL). The module also provides a visualizer to help document & debug complex queries. E.g. the following query can be visualized (using Graphviz) like this:

;; 1) match transitive mother relationships (w/ depth 2-4)
;; 2) exclude rels to descendant "ex:P100"
;; 3) match names for entities
;; 4) optionally match descendant's DOB (only if before given date)
;; 5) collected all DOBs into new result var
;; 6) inject new result var ?res using string formatting
;; 7) pre-bind/restrict possible values for ?p
;; 8) order, group and select vars
(->> '{:q         [{:path [?p ["foaf:mother"] ?d] :min 2 :max 4}
                   {:minus [[?p "foaf:mother" "ex:P100"]]}
                   {:where [[?p "foaf:name" ?pname]]}
                   {:where [[?d "foaf:name" ?dname]]}
                   {:optional [[?d "ex:dob" ?dob]]
                    :filter   (< ?dob #inst "2000-01-01")}]
       :aggregate {?birthdays (agg-collect ?dob)}
       :bind      {?rel (str ?pname " -> " ?dname)}
       :values    {?p #{"ex:P1" "ex:P2"}}
       :group-by  ?p
       :order     ?d
       :select    [?p ?d ?rel ?birthdays]}
     (query->graphviz)
     (spit "qviz-ex01.dot"))

./assets/qviz-ex01.png

fabric-ld server example

Excerpt of an example interaction with a fabric-ld server from the command line (e.g. using httpie instead of curl for brevity):

# import facts from URI (currently EDN or N-Triples only)
http -f POST :8000/facts uri=http://schema.org/version/2.0/schema.nt
# {:body "adding 9023 facts from http://schema.org/version/2.0/schema.nt"}

# add facts in EDN format about this project
# EDN maps offser a concise way to declare multiple relationships for
# for a common subject and are automatically resolved to triples
# facts can also be given as EDN triple vectors
http -f POST :8000/facts \
     facts='{"http://thi.ng/fabric" \
             {"rdf:type" "foaf:Project" \
              "schema:isPartOf" "http://thi.ng" \
              "dcterms:creator" "people:toxi"}}'
# {:body "adding 3 facts"}

# register query: find all projects and all their facts, group by project
http -f POST :8000/queries \
     id=projects \
     q='{:q [{:where [[?prj "rdf:type" "foaf:Project"] [?prj ?p ?o]]}] \
         :order ?p \
         :group-by ?prj \
         :select [?p ?o]}'
# {:id "projects", :body "New query registered"}

# get query results
http :8000/queries/projects
# {:id "projects2", :count 1, :total 1, :offset 0,
#  :spec {:q [{:where [[?prj "rdf:type" "foaf:Project"] [?prj ?p ?o]]}],
#         :order ?p,
#         :group-by ?prj,
#         :select [?p ?o]},
#  :body {"http://thi.ng/fabric"
#         [{?p "http://purl.org/dc/terms/creator", ?o "people:toxi"}
#          {?p "http://schema.org/isPartOf", ?o "http://thi.ng"}
#          {?p "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", ?o "http://xmlns.com/foaf/0.1/Project"}]}}

more examples

fabric-facts query trees & rule based inference

Note: This example is somewhat out of date in that it hardly uses the more user-friendly and powerful query DSL and instead largely defines queries via low-level forms. For a better overview please consult the thi.ng.fabric.facts.dsl namespace in the facts module, as well as the fabric-ld module.

This example (and module) uses a compute graph to store facts, fact queries and inference rules (based on queries) to create new facts. Queries and inference rules are expressed as vertex trees, taking part in the normal graph execution. This example demonstrates how easy it is to use this architecture to build custom rule engines or use with Linked Data applications (the query engine features are heavily inspired by W3C SPARQL, albeit using Clojure syntax and no RDF restrictions). Whenever facts are added or removed from the graph (either manually or via inference), all query results are updated (somewhat equivalent to stored procedures in SQL). In complex queries, intermediate query vertices act as result cache and potentially limit/avoid work to do.

(require '[thi.ng.fabric.core :as f])
(require '[thi.ng.fabric.facts.core :as ff])
(require '[thi.ng.fabric.facts.dsl :as dsl])

;; turn off verbose logging

#?(:clj (taoensso.timbre/set-level! :warn))

;; Initial facts

(def facts
  '[[toxi author fabric]
    [toxi parent noah]
    [ingo parent toxi]
    [fabric type project]
    [fabric url "http://thi.ng/fabric"]
    [author domain person]
    [author range creative-work]
    [parent sub-prop-of ancestor]
    [ancestor type transitive-prop]
    [ancestor domain person]
    [ancestor range person]])

;; Rule specs (all join queries) and their production functions
;; The production fns take 3 args: the graph, the inference vertex
;; (for context, though here unused) and a single query result
;; from which a new fact is inferred...

(def inference-rules
  {;; infer type of subject for any property which declares a domain
   :domain     {:match '[[?a ?prop nil] [?prop domain ?d]]
                :infer (fn [g _ {:syms [?a ?prop ?d]}] (ff/add-fact! g [?a 'type ?d]))}
   ;; infer type of object for any property which declared a range
   :range      {:match '[[nil ?prop ?a] [?prop range ?r]]
                :infer (fn [g _ {:syms [?a ?prop ?r]}] (ff/add-fact! g [?a 'type ?r]))}
   ;; infer transitive property relationships
   ;; e.g. ?a ancestor ?b and ?b ancestor ?c => ?a ancestor ?c
   :transitive {:match '[[?a ?prop ?b] [?b ?prop ?c] [?prop type transitive-prop]]
                :infer (fn [g _ {:syms [?a ?prop ?c]}] (ff/add-fact! g [?a ?prop ?c]))}
   ;; infer super property relationships from sub-properties
   ;; e.g. parent is a specialization/sub-prop of ancestor
   :sub-prop   {:match '[[?a ?prop ?b] [?prop sub-prop-of ?super]]
                :infer (fn [g _ {:syms [?a ?super ?b]}] (ff/add-fact! g [?a ?super ?b]))}})

;; Create empty knowledge graph using default compute graph as backend

(def g (ff/fact-graph))

;; Setup execution context (with default opts)

(def ctx (f/sync-execution-context {:graph g}))

;; Define a query returning *all* facts (nil catches any subject, pred, object)

(def all (ff/add-query! g [nil nil nil] {}))

;; Add facts & rules to graph

(run! #(ff/add-fact! g %) facts)
(run!
 (fn [[id r]] (ff/add-rule! g {:id id :patterns (:match r) :production (:infer r)}))
 inference-rules)

;; Execute!

(f/execute! ctx)

;; The `all` var (defined above) is closing over all its related vertices.
;; Its query result can obtained by dereferencing...

(prn (sort @all))
;; ([ancestor domain person]
;;  [ancestor range person]
;;  [ancestor type transitive-prop]
;;  [author domain person]
;;  [author range creative-work]
;;  [fabric type creative-work]         ;; inferred
;;  [fabric type project]
;;  [fabric url "http://thi.ng/fabric"]
;;  [ingo ancestor noah]                ;; inferred
;;  [ingo ancestor toxi]                ;; inferred
;;  [ingo parent toxi]
;;  [ingo type person]                  ;; inferred
;;  [noah type person]                  ;; inferred
;;  [parent sub-prop-of ancestor]
;;  [toxi ancestor noah]                ;; inferred
;;  [toxi author fabric]
;;  [toxi parent noah]
;;  [toxi type person])                 ;; inferred

;; Add another query, this time with a variable binding (?p)

(def people (ff/add-param-query! g '[?p type person] {}))
(f/execute! ctx)

(prn @people)
;; #{{?p toxi} {?p noah} {?p ingo}}

;; complex queries can be more easily defined via query specs:
;; here, a filtered join query (reusing above people query) to only select
;; people with age < 20 and inject a new result var ?num (aggregated result count)

(def children
  (dsl/add-query-from-spec!
   g '{:q         [{:where  [[?p age ?age] [?p type person]]
                    :filter (< ?age 20)}]
       :aggregate {?num (agg-count)}}))

(ff/add-fact! g '[noah age 13])
(ff/add-fact! g '[toxi age 40])

(f/execute! ctx)
(prn @children)
;; #{{?p noah ?age 13 ?num 1}}

;; join queries can also be constructed directly:
;; a query returning all creative-works w/ their authors & urls
(def projects
  (ff/add-query-join!
   g '[[?prj type creative-work] [?prj url ?uri] [?auth author ?prj]] {}))

(f/execute! ctx)
(prn @projects)
;; #{{?prj fabric, ?uri "http://thi.ng/fabric", ?auth toxi}}

full source

Spreadsheet like computation

(require '[thi.ng.fabric.core :as f])
(require '[clojure.core.async :refer [go-loop <! chan]])

(def g (f/compute-graph))
(def result-chan (chan))
(def ctx (f/async-execution-context {:graph g :result result-chan}))

;; Define vertex collection functions
(defn collect-with
  [f] (fn [v] (f/set-value! v (reduce f (vals (f/signal-map v))))))

(def collect-sum     (collect-with +))
(def collect-product (collect-with *))

;; Define some data vertices with values to sum

(def A1 (f/add-vertex! g 100.00 {}))
(def A2 (f/add-vertex! g 200.00 {}))
(def A3 (f/add-vertex! g 300.00 {}))

;; Another one with VAT rate

(def VAT (f/add-vertex! g 1.2 {}))

;; This vertex will produce the net price (the sum of A1 + A2 + A3, see below...)

(def NET (f/add-vertex! g nil {::f/collect-fn collect-sum}))

;; And this one the total (net * vat)

(def TOTAL (f/add-vertex! g nil {::f/collect-fn collect-product}))

;; Connect everything together:
;; The direction is always src -> dest
;; signal-forward will simply send a vertex' value as signal
;; The nil arg is because signal-foward doesn't use any signal arguments
;; but other signal functions could make use of this extra arg (see SSSP example above)

(run! (fn [[a b]] (f/connect-to! a b f/signal-forward nil))
      [[A1 NET] [A2 NET] [A3 NET] [NET TOTAL] [VAT TOTAL]])

;; Everytime the graph converges into a stable/unchanging state
;; the async execution context will pause & push a result map to the channel provided
;; Here we simply print it out along with the values of the NET & TOTAL vertices

(go-loop []
  (when-let [res (<! result-chan)]
    (prn :result res)
    (prn :net @NET)
    (prn :total @TOTAL)
    (recur)))

;; Kick off computation

(f/execute! ctx)
;; :result {:collections 3, :signals 5, :type :converged, :runtime 1.9211129999999998, :time-per-op 0.24013912499999998}
;; :net 600.0
;; :total 720.0

;; Modify A1 and notify exec context. Calling notify! on an active
;; async context is indempodent, i.e. multiple invocations will only
;; trigger 1 additional execution loop (until the graph is converged
;; again). Calling notify! on a converged graph/context is almost free
;; and will bail out quickly, since no active vertices will be found

(f/set-value! A1 1000)
(f/notify! ctx)
;; :result {:collections 2, :signals 2, :type :converged, :runtime 0.6149319999999999, :time-per-op 0.15373299999999998}
;; :net 1500.0
;; :total 1800.0

;; Add another data vertex to include in results

(f/add-edge! g (f/add-vertex! g 400 {}) NET f/signal-forward nil)
(f/notify! ctx)
;; :result {:collections 2, :signals 2, :type :converged, :runtime 0.563244, :time-per-op 0.140811}
;; :net 1900.0
;; :total 2280.0

Motivation & use cases

Alternative programming model

Some algorithms can be expressed more simply using the Signal/Collect model and allow for parallel execution.

Pervasive caching, easy introspection, algorithm visualization

Intermediate results can be easily reused by multiple consumers and retrieved through vertex inspection. Compute graphs can also be visualized, allowing for the development of visual programming tools.

Self-organizing workflows

Think self-updating visualizations when inputs are changing. Using custom collection thresholds, this model can also be extended to only operate when a minimum number of new signals has been received (throttling) or after a period of time since the last computation.

Reactive programming (WIP)

Truly standalone reactive contexts across CLJ/CLJS, i.e. no mandatory integration or reliance on React.js or DOM rendering cycle

(Semantic) knowledge graphs, queries & reasoning

  • Use compute graphs as auto-updating database using attached queries & inference rules to automatically add or retract facts when certain patterns are matched. Query results are cached in vertices and available immediately. This behavior also allows (and is used) to reuse intermediate sub-query results from similar queries, leading to better performance.
  • Implement Rete-style rule engines, using vertices for pervasive intermediate result caching and define custom signal and scoring functions to inhibit obsolete work in the graph (see fabric-facts module)

Single machine or distributed processing

Distributed, multi-environment graphs not yet implemented (WIP, only in-memory graphs are currently supported).

Toolkit w/ Clojurescript support

  • Provide same functionality for both Clojure & Clojurescript

Differences to Map-Reduce and dataflow DAGs

  • Ordered OR unordered processing
  • Scored / thresholded processing
    • custom scoring & signal functions can inhibit signals to neighboring vertices
    • collection functions can ignore or delay processing of incoming signals
  • Individual mapping functions for each edge
  • Cycles (recursion) allowed and needed for some problems
  • Vertices only hold state, processing done in edges (i.e. single values can be transformed in parallel via different signal functions from same vertex)
  • Graph processing stops after user defined threshold or when graph has converged (i.e. no more active signals or outstanding collections)

Benchmarks

Different graph execution models can have a drastic impact on performance. Therefore, the fabric-core module provides a number of configurations to experiment with different models (with only v. minor code changes), but it’s also feasible for users to provide their own execution contexts.

Single-source shortest path

10,000 vertices, 30,000 paths, max. length 3 hops

Measurements taken with Intel i7-4930K CPU @ 3.40GHz, 16GB RAM, Java 8

Synch (naive) Probabilistic Prob. (eager) Eager async Two-pass
Factor 1.33 1.22 1.49 1 1.32
Total (ms) 102 111 91 136 103
Signals 38532 44594 43239 38532 38532
Colls 13650 21914 15073 13650 13650

Vertex coloring

1000 vertices, max. 180 colors, 5% edge probability

Measurements taken with Intel i7-4930K CPU @ 3.40GHz, 16GB RAM, Java 8

Synch (naive) Probabilistic Prob. (eager) Eager async Two-pass
Factor 1 3.64 5.81 5.71 3.41
Total (ms) 1285 353 221 225 376
Signals 416960 143001 126665 128017 194062
Colls 36362 8474 3936 4235 6530

TODO add charts

Resources / related work

Project definition

Injected properties

thi.ng/fabric

Building this project

This project is written in a literate programming format and requires Emacs & Org-mode to generate usable source code. Assuming both tools are installed, the easiest way to generate a working project is via command line (make sure emacs is on your path or else edit its path in tangle.sh):

git clone https://github.com/thi.ng/fabric.git
cd fabric
# tangle complete project
./tangle-all.sh
# or single module
./tangle-module.sh core
# or individual file(s)
./tangle.sh fabric-core/src/core.org ...

Tangling is the process of extracting & combining source blocks from .org files into an actual working project/source tree. Once tangling is complete, you can cd into the generated project directory (babel) and then use lein as usual.

Testing

The generated project.clj files of each module all define an alias to trigger a complete build & tests for both CLJ & CLJS versions.

cd <module-name>/babel
lein cleantest

To build the Clojurescript version simply run lein cljsbuild test from the same directory. A small HTML harness for the resulting JS file is also located in that folder (babel/index.html), allowing for further experimentation in the browser.

Working with the REPL

Editing code blocks or files in Org-mode, then re-loading & testing changes is quite trivial. Simply launch a REPL (via lein or Emacs) as usual. Everytime you’ve made changes to an .org file, re-tangle it from Emacs (C-c C-v t) or tangle.sh, then reload the namespace in the REPL via (require 'thi.ng.fabric... :reload) or similar.

Leiningen project file

This project file only acts as meta-project definition adding dependencies to all currently existing modules.

(defproject <<project-name>> "<<conf-version()>>"
  :description  "Signal/Collect inspired compute graph infrastructure"
  :url          "<<conf-project-url>>"
  :license      {:name "Apache Software License 2.0"
                 :url "http://www.apache.org/licenses/LICENSE-2.0"
                 :distribution :repo}
  :scm          {:name "git"
                 :url "<<conf-project-url>>"}

  :min-lein-vesion "2.4.0"

  :dependencies [[thi.ng/fabric-core "<<conf-version()>>"]
                 [thi.ng/fabric-facts "<<conf-version()>>"]
                 [thi.ng/fabric-ld "<<conf-version()>>"]]

  :pom-addition [:developers [:developer
                              [:name "Karsten Schmidt"]
                              [:url "http://thi.ng/fabric"]
                              [:timezone "0"]]])

Release history

Version Released Description
0.0.388 2015-11-16 refactor LD default graph import handling & route/handler injection
0.0.386 2015-11-14 update LD system config opts, bugfix query registry startup
0.0.382 2015-11-12 LD handler updates, support for custom handlers & middlewares
0.0.376 2015-09-15 query visualization/validation/prefixes, pname caching, DSL fixes, factlog update
0.0.338 2015-09-12 LD module updates, add response types, request validation, DSL bugfixes, more docs
0.0.312 2015-09-08 no code changes, only include ld module as dependency to main fabric artefact
0.0.310 2015-09-08 add fabric-ld module, add alias indexing, large updates to docs
0.0.262 2015-09-05 add optional collect-final!, bugfix group-by qvar selection, refactor DSL
0.0.247 2015-09-04 :bind, :values query opts, negation queries, index sel optimization, DSL updates
0.0.231 2015-09-02 bugfixes, add sub-query opts, DSL extension, map->facts bnode handling
0.0.214 2015-08-29 DSL/aggregation updates, map->facts conversion, fact transforms, more tests
0.0.204 2015-08-28 major query DSL extension, aggregation, FactVertex type
0.0.189 2015-08-27 fact query DSL, query refactoring, doc updates
0.0.167 2015-08-24 fact transformers, path queries, fact query & inference refactoring
0.0.144 2015-08-20 fact query optimizations, N-Triples parser, quad fact support
0.0.131 2015-08-17 fact query updates & recursive removal
0.0.123 2015-08-16 async ctx updates & fact query index caching
0.0.116 2015-08-15 more reusable fact queries
0.0.108 2015-08-14 1st public release

Contributors

Name Role Website
Karsten Schmidt initiator & principal developer http://thi.ng/

License

Copyright © 2015 Karsten Schmidt

This project is open source and licensed under the Apache Software License 2.0.