RxGo with generics (v1.18+)
go get github.com/PxyUp/rx_go
This attempt to build generic version of Rx(Reactive) library
Observables
- New - create new observable from observer
observer := rx_go.NewObserver[Y]()
observable := rx_go.New(observer)
- From - create new observable from static array
observable := rx_go.From([]int{1, 2, 3})
- NewInterval - return observable from IntervalObserver observer
// Create interval which start from now
interval := rx_go.NewInterval(time.Second, true)
- NewHttp - return Observable from HttpObserver
obs, err := rx_go.NewHttp(http.DefaultClient, req)
- MapTo - create new observable with modified values
rx_go.MapTo[int, string](rx_go.From([]int{1, 2, 3}), func(t int) string {
return fmt.Sprintf("hello %d", t)
}).Subscribe()
- Merge - merging multi observables with same type into single one
rx_go.Merge[int](rx_go.From[int]([]int{1, 2, 3, 7}...), rx_go.From[int]([]int{4, 5, 6}...)).Subscribe()
- FromChannel - create new observable from readable channel
rx_go.FromChannel[int](intChannel).Subscribe()
- Switch - change stream for observable
rx_go.Switch(rx_go.From([]int{1, 2, 3}...), func(value int) *rx_go.Observable[string] {
return rx_go.From(fmt.Sprintf("HELLO %d", value)).Pipe(rx_go.Repeat[string](2))
}).Subscribe()
- Of - create static observable with one value
rx_go.Of("hello").Subscribe()
- Concat - create static observable witch emit single array of all values
rx_go.Concat(rx_go.From([]int{1, 2, 3, 4, 5, 6}...)).Subscribe()
- Reduce - create new observable which return accumulation value from all previous emitted items
rx_go.Reduce(rx_go.From([]int{1, 2, 3, 4, 5, 6}...), func(y string, t int) string {
return y + fmt.Sprintf("%d", t)
}, "").Subscribe()
- Pairwise - create new observable with groups pairs of consecutive emissions together and emits them as an array of two values.
rx_go.Pairwise[int](rx_go.From([]int{1, 2, 3, 4, 5, 6}...)).Subscribe()
- Never - observable which never emiting
rx_go.Never
- ForkJoin - wait for Observables to complete and then combine last values they emitted; complete immediately if an empty array is passed.
rx_go.ForkJoin(rx_go.From([]int{1, 2, 3}...), rx_go.From([]int{4, 5, 6}...), rx_go.From([]int{7, 8, 9}...)).Subscribe()
- Empty - create new Observer which just completed
rx_go.Empty
Methods
- Subscribe - create subscription channel and cancel function
ch, cancel := obs.Subscribe()
- Pipe - function for accept operators
Operators:
- Filter - filter out
obs.Pipe(rx_go.Filter[int](func(value int) bool {
return value > 16
})).Subscribe()
- Map - change value
obs.Pipe(rx_go.Map[int](func(value int) int {
return value * 3
})).Subscribe()
- LastOne - get last one from the stream
obs.Pipe(rx_go.LastOne[int]()).Subscribe()
- FirstOne - get first one from the stream
obs.Pipe(rx_go.FirstOne[int]()).Subscribe()
- Delay - delay before emit next value
obs.Pipe(rx_go.Delay[int](time.Second)).Subscribe()
- Debounce - emit value if in provided amount of time new value was not emitted
obs.Pipe(rx_go.Debounce[int](time.Millisecond*500)).Subscribe()
- Do - execute action on each value
obs.Pipe(
rx_go.Do(func(value int) {
if value == 2 {
cancel()
}
}),
).Subscribe()
- UntilCtx - emit value until context not done
obs.Pipe(
rx_go.UntilCtx[int](ctx),
).Subscribe()
- Distinct - execute value if they different from previous
obs.Pipe(rx_go.Distinct[int]()).Subscribe()
- DistinctWith - same like Distinct but accept function like comparator
obs.Pipe(rx_go.DistinctWith[int](func(a, b int) bool { return a == b })).Subscribe()
- Take - take provided amount from observable
obs.Pipe(rx_go.Take[int](3)).Subscribe()
- Repeat - emit value multiple times
rx_go.From(values...).Pipe(rx_go.Repeat[int](2)).Subscribe()
- AfterCtx - emit value after ctx is done(values not ignored, they are not emitted)
obs.Pipe(
rx_go.AfterCtx[int](ctx),
).Subscribe()
- Skip - that skips the first count items emitted
rx_go.From([]int{1, 2, 3}...).Pipe(rx_go.Skip[int](2)).Subscribe()
- StartWith - start emitting with predefined value
rx_go.From([]int{1}...).Pipe(rx_go.StartWith(2)).Subscribe()
- EndWith - end emitting with predefined value
rx_go.From([]int{1}...).Pipe(rx_go.EndWith(2)).Subscribe()
- SkipUntilCtx - skips items emitted by the Observable until a ctx not done
rx_go.From([]int{1, 2, 3}...).Pipe(
rx_go.SkipUntilCtx[int](ctx),
).Subscribe()
- SkipUntil - skips items emitted by the Observable until a second Observable emits an item(at least one).
rx_go.From([]int{1, 2, 3}...).Pipe(
rx_go.AfterCtx[int](ctx),
rx_go.SkipUntil[int, int](rx_go.Of(1).Pipe(rx_go.Do(func(value int) {
cancelCtx()
}))),
).Subscribe()
- Finally - do action before closing of observer(last value already emitted but observer not completed yet)
rx_go.From([]int{1, 2, 3}...).Pipe(rx_go.Finally[int](func() {
done = true
})).Subscribe()
- ElementAt - emit single value from observable which contains element on this position
rx_go.From([]int{1, 2, 3}...).Pipe(rx_go.ElementAt[int](1)).Subscribe()
- Find - only the first value emitted by the source Observable that meets some condition.
rx_go.From([]int{1, 2, 3}...).Pipe(rx_go.Find(func(t int) bool {
return t == 3
})).Subscribe()
- InitialDelay - emit values with initial delay
rx_go.Of[int](1).Pipe(rx_go.InitialDelay[int](time.Second)).Subscribe()