RxGo with generics (v1.18+)

go get github.com/PxyUp/rx_go

This attempt to build generic version of Rx(Reactive) library

Observables

  1. New – create new observable from observer

observer := rx_go.NewObserver[Y]()
observable := rx_go.New(observer)
  1. From – create new observable from static array
observable := rx_go.From([]int{1, 2, 3})
  1. NewInterval – return observable from IntervalObserver observer

// Create interval which start from now
interval := rx_go.NewInterval(time.Second, true)
  1. NewHttp – return Observable from HttpObserver
obs, err := rx_go.NewHttp(http.DefaultClient, req)
  1. MapTo – create new observable with modified values

rx_go.MapTo[int, string](rx_go.From([]int{1, 2, 3}), func(t int) string {
	return fmt.Sprintf("hello %d", t)
}).Subscribe()
  1. Merge – merging multi observables with same type into single one
rx_go.Merge[int](rx_go.From[int]([]int{1, 2, 3, 7}...), rx_go.From[int]([]int{4, 5, 6}...)).Subscribe()
  1. FromChannel – create new observable from readable channel
rx_go.FromChannel[int](intChannel).Subscribe()
  1. Switch – change stream for observable

rx_go.Switch(rx_go.From([]int{1, 2, 3}...), func(value int) *rx_go.Observable[string] {
	return rx_go.From(fmt.Sprintf("HELLO %d", value)).Pipe(rx_go.Repeat[string](2))
}).Subscribe()
  1. Of – create static observable with one value
rx_go.Of("hello").Subscribe()
  1. Concat – create static observable witch emit single array of all values
rx_go.Concat(rx_go.From([]int{1, 2, 3, 4, 5, 6}...)).Subscribe()
  1. Reduce – create new observable which return accumulation value from all previous emitted items

rx_go.Reduce(rx_go.From([]int{1, 2, 3, 4, 5, 6}...), func(y string, t int) string {
	return y + fmt.Sprintf("%d", t)
}, "").Subscribe()

Methods

  1. Subscribe – create subscription channel and cancel function
ch, cancel := obs.Subscribe()
  1. Pipe – function for accept operators

Operators:

  1. Filter – filter out

obs.Pipe(rx_go.Filter[int](func(value int) bool {
	return value > 16
})).Subscribe()
  1. Map – change value

obs.Pipe(rx_go.Map[int](func(value int) int {
	return value * 3
})).Subscribe()
  1. LastOne – get last one from the stream
obs.Pipe(rx_go.LastOne[int]()).Subscribe()
  1. FirstOne – get first one from the stream
obs.Pipe(rx_go.FirstOne[int]()).Subscribe()
  1. Delay – delay before emit next value
obs.Pipe(rx_go.Delay[int](time.Second)).Subscribe()
  1. Debounce – emit value if in provided amount of time new value was not emitted
obs.Pipe(rx_go.Debounce[int](time.Millisecond*500)).Subscribe()
  1. Do – execute action on each value

obs.Pipe(
    rx_go.Do(func(value int) {
        if value == 2 {
            cancel()
        }
    }),
).Subscribe()
  1. UntilCtx – emit value until context not done

obs.Pipe(
    rx_go.UntilCtx[int](ctx),
).Subscribe()
  1. Distinct – execute value if they different from previous
obs.Pipe(rx_go.Distinct[int]()).Subscribe()
  1. DistinctWith – same like Distinct but accept function like comparator
obs.Pipe(rx_go.DistinctWith[int](func(a, b int) bool { return a == b })).Subscribe()
  1. Take – take provided amount from observable
obs.Pipe(rx_go.Take[int](3)).Subscribe()
  1. Repeat – emit value multiple times
rx_go.From(values...).Pipe(rx_go.Repeat[int](2)).Subscribe()
  1. AfterCtx – emit value after ctx is done, all value before is ignored

obs.Pipe(
    rx_go.AfterCtx[int](ctx),
).Subscribe()
  1. Skip – that skips the first count items emitted
rx_go.From([]int{1, 2, 3}...).Pipe(rx_go.Skip[int](2)).Subscribe()

GitHub

View Github