chango
go generic channel utilities; inspired by Concurrency in Go
Usage
generators
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
rand.Seed(1)
rand := func() int { return rand.Int() }
for n := range Take(ctx, RepeatFn(ctx, rand), 10) {
fmt.Println(n)
}
Map
mul := func(v int) int { return v * 2 }
add := func(v int) int { return v + 1 }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
src := Generator(ctx, 1, 2, 3, 4)
pipeline := Map(ctx, Map(ctx, Map(ctx, src, mul), add), mul)
for v := range pipeline {
fmt.Println(v)
}
// Output:
// 6
// 10
// 14
// 18
OrDone
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
rand := func() int { return rand.Int() }
src := RepeatFn(ctx, rand)
for v := range OrDone(ctx, src) {
fmt.Println(v)
}
FanIn
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
rand := func() int { return rand.Intn(5000000) }
randCh := RepeatFn(ctx, rand)
N := runtime.NumCPU()
fanout := make([]<-chan int, N)
for i := 0; i < N; i++ {
fanout[i] = primeFinder(ctx, randCh)
}
var result []int
for p := range Take(ctx, FanIn(ctx, fanout...), 10) {
fmt.Println(p)
}
Tee
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
out1, out2 := Tee(ctx, Take(ctx, Repeat(ctx, 1, 2), 4))
for v := range out1 {
fmt.Printf("out1: %v, out2: %v\n", v, <-out2)
}
// Output:
// out1: 1, out2: 1
// out1: 2, out2: 2
// out1: 1, out2: 1
// out1: 2, out2: 2
Bridge
ch := make(chan (<-chan int))
go func() {
defer close(ch)
for i := 0; i < 10; i++ {
s := make(chan int, i)
ch <- s
s <- i
close(s)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
for v := range Bridge(ctx, ch) {
fmt.Printf("%v ", v)
}
// Output:
// 0 1 2 3 4 5 6 7 8 9
Installation
go get github.com/thara/chango
License
MIT
Author
Tomochika Hara (a.k.a thara)