Generic Concurrency Patterns Library
Reference (generated by gomarkdoc)
import "github.com/butuzov/harmony"Package harmony provides generic concurrency patterns library, created for educational proposes by it's author. It provides next patterns:
BridgeFanInFeatureOrDone/OrWithDone/OrWithContextPipelineQueueTeeWorkerPool
Example (Fastest Sqrt)
What SQRT funtion is faster? Complex example that shows the combination of few patterns Queue, Tee, FanIn patterns.
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
"log"
"math/rand"
"time"
)
func main() {
// the fastert square root cracker....
type Report struct {
Method string
Value uint64
}
var (
// Babylonian method
sqrtBabylonian = func(n uint64) Report {
var (
o = float64(n) // Original value as float64
x = float64(n) // x of binary search
y = 1.0 // y of binary search
e = 1e-5 // error
)
for x-y > e {
x = (x + y) / 2
y = o / x
// fmt.Printf("y=%f, x=%f.", y, x)
}
return Report{"Babylonian", uint64(x)}
}
// Bakhshali method
sqrtBakhshali = func(n uint64) Report {
iterate := func(x float64) float64 {
a := (float64(n) - x*x) / (2 * x)
xa := x + a
return xa - ((a * a) / (2 * xa))
}
var (
o = float64(n)
x = float64(n) / 2.0
e = 1e-5
)
for x*x-o > e {
x = iterate(x)
}
return Report{"Bakhshali", uint64(x)}
}
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch, _ := harmony.FututeWithContext(ctx, func() uint64 {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
v := r.Uint64()
fmt.Printf("Initial number: %d.", v)
return v
})
if ch1, ch2, err := harmony.TeeWithContext(ctx, ch); err == nil {
log.Printf("err: %v", err)
return
} else {
chRep1, _ := harmony.PipelineWithContext(ctx, ch1, 1, sqrtBabylonian)
chRep2, _ := harmony.PipelineWithContext(ctx, ch2, 1, sqrtBakhshali)
chRep1, _ = harmony.OrDoneWithContext(ctx, chRep1)
chRep2, _ = harmony.OrDoneWithContext(ctx, chRep2)
out, _ := harmony.FanInWithContext(ctx, chRep1, chRep2)
fmt.Printf("Result is :%v", <-out)
}
}- Variables
- func Bridge[T any](done <-chan struct{}, incoming <-chan (<-chan T)) (<-chan T, error)
- func BridgeWithContext[T any](ctx context.Context, incoming <-chan (<-chan T)) (<-chan T, error)
- func FanIn[T any](done <-chan struct{}, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)
- func FanInWithContext[T any](ctx context.Context, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)
- func Futute[T any](done <-chan struct{}, futureFn func() T) (<-chan T, error)
- func FututeWithContext[T any](ctx context.Context, futureFn func() T) (<-chan T, error)
- func OrDone[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, error)
- func OrDoneWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, error)
- func Pipeline[T1, T2 any](done <-chan struct{}, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)
- func PipelineWithContext[T1, T2 any](ctx context.Context, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)
- func Queue[T any](done <-chan struct{}, genFn func() T) (<-chan T, error)
- func QueueWithContext[T any](ctx context.Context, genFn func() T) (<-chan T, error)
- func Tee[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, <-chan T, error)
- func TeeWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, <-chan T, error)
- func WorkerPool[T any](done <-chan struct{}, jobQueue chan T, maxWorkers int, workFunc func(T)) error
- func WorkerPoolWithContext[T any](ctx context.Context, jobQueue chan T, maxWorkers int, workFunc func(T)) error
var ErrContext = errors.New("harmony: nil Context")var ErrDone = errors.New("harmony: nil done chant")func Bridge[T any](done <-chan struct{}, incoming <-chan (<-chan T)) (<-chan T, error)Bridge will return chan of generic type T used a pipe for the values received from the sequence of channels or ErrDone. Close received channel .one you got fromincoming. in order to switch for a new one. Goroutines exists on close of incoming or done chan closed.
func BridgeWithContext[T any](ctx context.Context, incoming <-chan (<-chan T)) (<-chan T, error)BridgeWithContext will return chan of generic type T used a pipe for the values received from the sequence of channels or ErrContext. Close received channel .one you got fromincoming. in order to switch for a new one. Goroutines exists on close of incoming or context canceled.
func FanIn[T any](done <-chan struct{}, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)FanIn returns unbuffered channel of generic type T which serves as delivery pipeline for the values received from at least 2 incoming channels, it's closed once all of the incoming channels closed or done is closed
func FanInWithContext[T any](ctx context.Context, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)FanInWithContext returns unbuffered channel of generic type T which serves as delivery pipeline for the values received from at least 2 incoming channels, it's closed once all of the incoming channels closed or context cancelled.
Example
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
)
func main() {
// return channel that generate
filler := func(start, stop int) chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := start; i <= stop; i++ {
ch <- i
}
}()
return ch
}
ch1 := filler(10, 12)
ch2 := filler(12, 14)
ch3 := filler(15, 16)
ctx := context.Background()
if ch, err := harmony.FanInWithContext(ctx, ch1, ch2, ch3); err != nil {
for val := range ch {
fmt.Println(val)
}
}
}func Futute[T any](done <-chan struct{}, futureFn func() T) (<-chan T, error)Futute.T any. will return buffered channel of size 1 and generic type T, which will eventually contain the results of the execution `futureFn``, or be closed in case if context cancelled.
func FututeWithContext[T any](ctx context.Context, futureFn func() T) (<-chan T, error)FututeWithContext.T any. will return buffered channel of size 1 and generic type T, which will eventually contain the results of the execution `futureFn``, or be closed in case if context cancelled.
Example
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
)
func main() {
// Requests random dogs picture from dog.ceo (dog as service)
ctx := context.Background()
a, _ := harmony.FututeWithContext(ctx, func() int { return 1 })
b, _ := harmony.FututeWithContext(ctx, func() int { return 0 })
fmt.Println(<-a, <-b)
}1 0
Example (Dogs_as_service)
FututeWithContext is shows creation of two "futures" that are used in our "rate our dogs" startup.
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/butuzov/harmony"
"io/ioutil"
"log"
"net/http"
"time"
)
func main() {
// Requests random dogs picture from dog.ceo (dog as service)
getRandomDogPicture := func() string {
var data struct {
Message string "json:'message'"
}
const API_URL = "https://dog.ceo/api/breeds/image/random"
ctx := context.Background()
if req, err := http.NewRequestWithContext(ctx, http.MethodGet, API_URL, nil); err != nil {
log.Println(fmt.Errorf("request: %w", err))
return ""
} else if res, err := http.DefaultClient.Do(req); err != nil {
log.Println(fmt.Errorf("request: %w", err))
return ""
} else {
defer res.Body.Close()
if body, err := ioutil.ReadAll(res.Body); err != nil {
log.Println(fmt.Errorf("reading body: %w", err))
return ""
} else if err := json.Unmarshal(body, &data); err != nil {
log.Println(fmt.Errorf("unmarshal: %w", err))
return ""
}
}
return data.Message
}
a, _ := harmony.FututeWithContext(context.Background(), func() string {
return getRandomDogPicture()
})
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
b, _ := harmony.FututeWithContext(ctx, func() string {
return getRandomDogPicture()
})
fmt.Printf("Rate My Dog: ..a) %s..b) %s.", <-a, <-b)
}func OrDone[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, error)OrDone will return a new unbuffered channel of type T that serves as a pipeline for the incoming channel. Channel is closed once the context is canceled or the incoming channel is closed. This is variation or the pattern that usually called OrWithDone orCancel.
func OrDoneWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, error)OrDoneWithContext will return a new unbuffered channel of type T that serves as a pipeline for the incoming channel. Channel is closed once the context is canceled or the incoming channel is closed. This is variation or the pattern that usually called OrWithDone orCancel.
func Pipeline[T1, T2 any](done <-chan struct{}, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)Pipeline returns the channel of generic type T2 that can serve as a pipeline for the next stage. It's implemented in almost same manner as a WorkerPool and allows to specify number of workers that going to proseed values received from the incoming channel. Outgoing channel is going to be closed once the incoming chan is closed or context canceld.
func PipelineWithContext[T1, T2 any](ctx context.Context, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)PipelineWithContext returns the channel of generic type T2 that can serve as a pipeline for the next stage. It's implemented in almost same manner as a WorkerPool and allows to specify number of workers that going to proseed values received from the incoming channel. Outgoing channel is going to be closed once the incoming chan is closed or context canceld.
Example (0rimes)
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
"log"
"math"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
var (
incomingCh = make(chan uint64)
isPrime = func(n uint64) bool {
for i := uint64(2); i < (n/2)+1; i++ {
if n%i == 0 {
return false
}
}
return true
}
)
var results []uint64
workerFunc := func(n uint64) uint64 {
if isPrime(n) {
return n
}
return 0
}
// Producer: Initial numbers
go func() {
for i := uint64(0); i < math.MaxUint64; i++ {
incomingCh <- i
}
}()
if ch, err := harmony.PipelineWithContext(ctx, incomingCh, 100, workerFunc); err != nil {
log.Printf("Error: %v", err)
} else {
for val := range ch {
if val == 0 {
continue
}
results = append(results, val)
}
fmt.Println(results)
}
}func Queue[T any](done <-chan struct{}, genFn func() T) (<-chan T, error)Queue returns an unbuffered channel that is populated by func genFn. Chan is closed once context is Done. It's similar to Future pattern, but doesn't have a limit to just one result.
func QueueWithContext[T any](ctx context.Context, genFn func() T) (<-chan T, error)QueueWithContext returns an unbuffered channel that is populated by func genFn. Chan is closed once context is Done. It's similar to Future pattern, but doesn't have a limit to just one result.
Example
Generate fibonacci sequence
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
"log"
)
func main() {
// fin returns function that returns Fibonacci sequence up to n element,
// it returns 0 after limit reached.
fib := func(limit int) func() int {
a, b, nTh := 0, 1, 1
return func() int {
if nTh > limit {
return 0
}
nTh++
a, b = b, a+b
return a
}
}
first10FibNumbers := make([]int, 10)
incoming, err := harmony.QueueWithContext(context.Background(), fib(10))
if err != nil {
log.Printf("err: %v", err)
return
}
for i := 0; i < cap(first10FibNumbers); i++ {
first10FibNumbers[i] = <-incoming
}
fmt.Println(first10FibNumbers)
}[1 1 2 3 5 8 13 21 34 55]
func Tee[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, <-chan T, error)Tee will return two channels of generic type T used to fan
-out data from the incoming channel. Channels needs to be read in order next iteration over incoming chanel happen.
func TeeWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, <-chan T, error)TeeWithContext will return two channels of generic type T used to fan
-out data from the incoming channel. Channels needs to be read in order next iteration over incoming chanel happen.
func WorkerPool[T any](done <-chan struct{}, jobQueue chan T, maxWorkers int, workFunc func(T)) errorWorkerPool accepts channel of generic type T which is used to serve jobs to max workersTotal workers. Goroutines stop: Once channel closed and drained, or done is closed
func WorkerPoolWithContext[T any](ctx context.Context, jobQueue chan T, maxWorkers int, workFunc func(T)) errorWorkerPoolWithContext accepts channel of generic type T which is used to serve jobs to max workersTotal workers. Goroutines stop: Once channel closed and drained, or context cancelled.
Example (0rimes)
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
"math"
"runtime"
"sync"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
var (
primesCh = make(chan uint64)
incomingCh = make(chan uint64)
isPrime = func(n uint64) bool {
for i := uint64(2); i < (n/2)+1; i++ {
if n%i == 0 {
return false
}
}
return true
}
totalWorkers = runtime.NumCPU() - 1
)
// Producer: Initial numbers
go func() {
for i := uint64(0); i < math.MaxUint64; i++ {
incomingCh <- i
}
}()
// Consumers Worker Pool: checking primes of incoming numbers.
harmony.WorkerPoolWithContext(ctx, incomingCh, totalWorkers, func(n uint64) {
if !isPrime(n) {
return
}
primesCh <- n
})
var results []uint64
var mu sync.RWMutex
go func() {
for n := range primesCh {
mu.Lock()
results = append(results, n)
mu.Unlock()
}
}()
<-ctx.Done()
mu.RLock()
fmt.Println(results)
mu.RUnlock()
}talkBryan C. Mills - Rethinking Classical Concurrency Patterns +slides+comments+notesbookKatherine Cox-Buday - Concurrency In GotalkRob Pike - Concurrency is not Parallelism +slidesblogGo Concurrency Patterns: ContextblogGo Concurrency Patterns: Pipelines and cancellationtalkSameer Ajmani - Advanced Go Concurrency Patterns +slidestalkRob Pike - Go Concurrency Patterns +slidesblogGo Concurrency Patterns: Timing out, moving on