framework for asynchronous programming and goroutine management and safe use of channels


Keywords
channel, concurrency, concurrent, golang, goroutine, goroutine-management, goroutine-pool, parallel, queue, queue-workers, worker-pool
License
MIT
Install
go get github.com/octu0/chanque

Documentation

chanque

MIT License GoDoc Go Report Card Releases

chanque provides simple framework for asynchronous programming and goroutine management and safe use of channel.

Installation

$ go get github.com/octu0/chanque

Usage

Queue

Queue implementation.
It provides blocking and non-blocking methods, as well as panic handling of channels.

func main() {
	que1 := chanque.NewQueue(10)
	defer que1.Close()

	go func() {
		for {
			val := que1.Dequeue()
			fmt.Println(val.(string))
		}
	}()
	if ok := que1.Enqueue("hello"); ok {
		fmt.Println("enqueue")
	}

	que2 := chanque.NewQueue(10,
		QueuePanicHandler(func(pt chanque.PanicType, rcv interface{}) {
			fmt.Println("panic occurred", rcv.(error))
		}),
	)
	defer que2.Close()
	if ok := que2.EnqueueNB("world w/ non-blocking enqueue"); ok {
		fmt.Println("enqueue")
	}
}

Executor

WorkerPool implementation,
which limits the number of concurrent executions of goroutines and creates goroutines as needed,
and can also be used as goroutine resource management.

func main() {
	// minWorker 1 maxWorker 2
	exec := chanque.NewExecutor(1, 2)
	defer exec.Release()

	exec.Submit(func() {
		fmt.Println("job1")
		time.Sleep(1 * time.Second)
	})
	exec.Submit(func() {
		fmt.Println("job2")
		time.Sleep(1 * time.Second)
	})

	// Blocking because it became the maximum number of workers,
	// executing when there are no more workers running
	exec.Submit(func() {
		fmt.Println("job3")
	})

	// Generate goroutines on demand up to the maximum number of workers.
	// Submit does not block up to the size of MaxCapacity
	// Workers that are not running are recycled to minWorker at the time of ReduceInterval.
	exec2 := chanque.NewExecutor(10, 50,
		chanque.ExecutorMaxCapacicy(1000),
		chanque.ExecutorReducderInterval(60*time.Second),
	)
	defer exec2.Release()

	for i := 0; i < 100; i += 1 {
		exec2.Submit(func(i id) func() {
			return func() {
				fmt.Println("heavy process", id)
				time.Sleep(100 * time.Millisecond)
				fmt.Println("done process", id)
			}
		}(i))
	}

	// On-demand tune min/max worker size
	exec.TuneMaxWorker(10)
	exec.TuneMinWorker(5)
}

Worker

Worker implementation for asynchronous execution, register WorkerHandler and execute it with Enqueue parameter.
Enqueue of parameter is blocked while WorkerHandler is running.
There is also a BufferWorker implementation that non-blocking enqueue during asynchronous execution.

func main() {
	handler := func(param interface{}) {
		if s, ok := param.(string); ok {
			fmt.Println(s)
		}
		time.Sleep(1 * time.Second)
	}

	// DefaultWorker executes in order, waiting for the previous one
	w1 := chanque.NewDefaultWorker(handler)
	defer w1.Shutdown()

	go func() {
		w1.Enqueue("hello")
		w1.Enqueue("world") // blocking during 1 sec
	}()

	w2 := chanque.NewBufferWorker(handler)
	defer w2.Shutdown()

	go func() {
		w2.Enqueue("hello")
		w2.Enqueue("world") // non-blocking
	}()

	// BufferWorker provides helpers for performing sequential operations
	// by using PreHook and PostHook to perform the operations collectively.
	pre := func() {
		db.Begin()
	}
	post := func() {
		db.Commit()
	}
	hnd := func(param interface{}) {
		db.Insert(param.(string))
	}
	w3 := chanque.NewBufferWorker(hnd,
		WorkerPreHook(pre),
		WorkerPostHook(post),
	)
	for i := 0; i < 100; i += 1 {
		w3.Enqueue(strconv.Itoa(i))
	}
	w3.ShutdownAndWait()
}

Parallel

Parallel provides for executing in parallel and acquiring the execution result.
extended implementation of Worker.

func main() {
	executor := chanque.NewExecutor(10, 100)
	defer executor.Release()

	para := chanque.NewParallel(
		executor,
		chanque.Parallelism(2),
	)
	para.Queue(func() (interface{}, error) {
		return "#1 result", nil
	})
	para.Queue(func() (interface{}, error) {
		return "#2 result", nil
	})
	para.Queue(func() (interface{}, error) {
		return nil, errors.New("#3 error")
	})

	future := para.Submit()
	for _, r := range future.Result() {
		if r.Value() != nil {
			println("result:", r.Value().(string))
		}
		if r.Err() != nil {
			println("error:", r.Err().Error())
		}
	}
}

Retry

Retry provides function retry based on the exponential backoff algorithm.

func main() {
	retry := chanque.NewRetry(
		chanque.RetryMax(10),
		chanque.RetryBackoffIntervalMin(100*time.Millisecond),
		chanque.RetryBackoffIntervalMax(30*time.Second),
	)
	future := retry.Retry(func(ctx context.Context) (interface{}, error) {
		req, _ := http.NewRequest("GET", url, nil)
		client := &http.Client{}
		resp, err := client.Do(req)
		if err != nil {
			return nil, err
		}
		...
		return ioutil.ReadAll(resp.Body), nil
	})
	r := future.Result()
	if err := r.Err(); err != nil {
		panic(err.Error())
	}
	fmt.Printf("GET resp = %s", r.Value().([]byte))
}

Wait

Wait provides wait handling like sync.WaitGroup and context.Done.
Provides implementations for patterns that run concurrently, wait for multiple processes, wait for responses, and many other use cases.

func one() {
	w := WaitOne()
	defer w.Cancel()

	go func(w *Wait) {
		defer w.Done()

		fmt.Println("heavy process")
	}(w)

	w.Wait()
}

func any() {
	w := WaitN(10)
	defer w.Cancel()

	for i := 0; i < 10; i += 1 {
		go func(w *Wait) {
			defer w.Done()

			fmt.Println("N proc")
		}(w)
	}
	w.Wait()
}

func sequencial() {
	w1, := WaitOne()
	defer w1.Cancel()
	go Preprocess(w1)

	w2 := WaitOne()
	defer w2.Cancel()
	go Preprocess(w2)

	ws := WaitSeq(w1, w2)
	defer ws.Cancel()

	// Wait for A.Done() -> B.Done() -> ... N.Done() ordered
	ws.Wait()
}

func rendezvous() {
	wr := WaitRendez(2)
	defer wr.Cancel()

	go func() {
		if err := wr.Wait(); err != nil {
			fmt.Println("timeout or cancel")
			return
		}
		fmt.Println("run sync")
	}()
	go func() {
		if err := wr.Wait(); err != nil {
			fmt.Println("timeout or cancel")
			return
		}
		fmt.Println("run sync")
	}()
}

func req() {
	wreq := WaitReq()
	defer wreq.Cancel()

	go func() {
		if err := wreq.Req("hello world"); err != nil {
			fmt.Println("timeout or cancel")
		}
		fmt.Println("send req")
	}()

	v, err := wreq.Wait()
	if err != nil {
		fmt.Println("timeout or cancel")
	}
	fmt.Println(v.(string)) // => "hello world"
}

func reqreply() {
	wrr := WaitReqReply()
	go func() {
		v, err := wrr.Req("hello")
		if err != nil {
			fmt.Println("timeout or cancel")
		}
		fmt.Println(v.(string)) // => "hello world2"
	}()
	go func() {
		err := wrr.Reply(func(v interface{}) (interface{}, err) {
			s := v.(string)
			return s + " world2", nil
		})
		if err != nil {
			fmt.Println("timeout or cancel")
		}
	}()
}

Loop

Loop provides safe termination of an infinite loop by goroutine.
You can use callbacks with Queue and time.Ticker.

func newloop() {
	e := NewExecutor(1, 10)

	queue := NewQueue(0)

	loop := NewLoop(e)
	loop.SetDequeue(func(val interface{}, ok bool) chanque.LoopNext {
		if ok != true {
			// queue closed
			return chanque.LoopNextBreak
		}
		println("queue=", val.(string))
		return chanque.LoopNextContinue
	}, queue)

	loop.ExecuteTimeout(10 * time.Second)

	go func() {
		queue.Enqueue("hello1")
		queue.Enqueue("hello2")
		time.Sleep(1 * time.Second)
		queue.EnqueueNB("world") // Enqueue / EnqueueNB / EnqueueRetry
	}()
	go func() {
		time.Sleep(1 * time.Second)
		loop.Stop() // done for loop
	}()
}

Pipeline

Pipeline provides sequential asynchronous input and output. Execute func combination asynchronously

func main() {
	calcFn := func(parameter interface{}) (interface{}, error) {
		// heavy process
		time.Sleep(1 * time.Second)

		if val, ok := parameter.(int); ok {
			return val * 2, nil
		}
		return -1, fmt.Errorf("invalid parameter")
	}
	outFn := func(result interface{}, err error) {
		if err != nil {
			fmt.Fatal(err)
			return
		}

		fmt.Println("value =", parameter.(int))
	}

	pipe := chanque.NewPipeline(calcFn, outFn)
	pipe.Enqueue(10)
	pipe.Enqueue(20)
	pipe.Enqueue(30)
	pipe.ShutdownAndWait()
}

Documentation

https://godoc.org/github.com/octu0/chanque

Benchmark

go func() vs Executor

$ go test -v -run=BenchmarkExecutor -bench=BenchmarkExecutor -benchmem  ./
goos: darwin
goarch: amd64
pkg: github.com/octu0/chanque
BenchmarkExecutor/goroutine-8         	 1000000	      2306 ns/op	     544 B/op	       2 allocs/op
BenchmarkExecutor/executor/100-1000-8 	  952410	      1252 ns/op	      16 B/op	       1 allocs/op
BenchmarkExecutor/executor/1000-5000-8    795402	      1327 ns/op	      18 B/op	       1 allocs/op
--- BENCH: BenchmarkExecutor
    executor_test.go:19: goroutine           	TotalAlloc=546437344	StackInUse=1996357632
    executor_test.go:19: executor/100-1000   	TotalAlloc=25966144	StackInUse=-1993277440
    executor_test.go:19: executor/1000-5000  	TotalAlloc=16092752	StackInUse=7012352
PASS
ok  	github.com/octu0/chanque	6.935s

License

MIT, see LICENSE file for details.