Skip to content

Commit

Permalink
Added KDocs for Observable operators (part 1)
Browse files Browse the repository at this point in the history
  • Loading branch information
arkivanov committed Oct 1, 2021
1 parent b0c9c85 commit 985d59e
Show file tree
Hide file tree
Showing 44 changed files with 495 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fun <T> Maybe<T>.onErrorResumeNext(nextSupplier: (Throwable) -> Maybe<T>): Maybe
}

/**
* When the [Maybe] signals `onError`, resumes the flow with [next] [Maybe].
* When the [Maybe] signals `onError`, resumes the flow with [next][next] [Maybe].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Maybe.html#onErrorResumeNext-io.reactivex.MaybeSource-).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import com.badoo.reaktive.disposable.plusAssign
import com.badoo.reaktive.utils.ObjectReference
import com.badoo.reaktive.utils.atomic.AtomicBoolean

/**
* Runs multiple [Observable]s and signals events of the first one signalled (disposing the rest).
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#amb-java.lang.Iterable-).
*/
fun <T> Iterable<Observable<T>>.amb(): Observable<T> =
observable { emitter ->
val sources = toList()
Expand All @@ -25,6 +30,11 @@ fun <T> Iterable<Observable<T>>.amb(): Observable<T> =
}
}

/**
* Runs multiple [Observable]s and signals events of the first one signalled (disposing the rest).
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#ambArray-io.reactivex.ObservableSource...-).
*/
fun <T> amb(vararg sources: Observable<T>): Observable<T> = sources.asList().amb()

private class AmbObserver<in T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import com.badoo.reaktive.completable.CompletableCallbacks
import com.badoo.reaktive.completable.completable
import com.badoo.reaktive.disposable.Disposable

/**
* Returns a [Completable] which signals `onComplete` when this [Observable] signals `onComplete`.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#ignoreElements--).
*/
fun Observable<*>.asCompletable(): Completable =
completable { emitter ->
subscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package com.badoo.reaktive.observable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.utils.atomic.AtomicInt

/**
* Returns an [Observable] that automatically connects (at most once) to this [ConnectableObservable]
* when the [subscriberCount] number of observers subscribe to it.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/observables/ConnectableObservable.html#autoConnect-int-).
*/
fun <T> ConnectableObservable<T>.autoConnect(subscriberCount: Int = 1): Observable<T> {
if (subscriberCount <= 0) {
connect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.utils.queue.SharedQueue

/**
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#buffer-int-int-)
* Returns an [Observable] that emits buffered [List]s of items it collects from the source [Observable].
* The first buffer is started with the first element emitted by the source [Observable].
* Every subsequent buffer is started every [skip] elements, making overlapping buffers possible.
* Buffers are emitted once the size reaches [count] elements.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#buffer-int-int-).
*/
fun <T> Observable<T>.buffer(count: Int, skip: Int = count): Observable<List<T>> {
require(count > 0) { "Count value must be positive" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ import com.badoo.reaktive.single.Single
import com.badoo.reaktive.single.single
import com.badoo.reaktive.utils.ObjectReference

/**
* Collects elements emitted by the **finite** source [Observable] into a data structure [C]
* and returns a [Single] that emits this structure. The data structure can be mutable or immutable.
* The [accumulator] should either mutate the structure and return the same reference,
* or copy the structure and return a reference to the new copy.
*
* Please be aware that the structure may become [frozen](https://github.com/badoo/Reaktive#kotlin-native-pitfalls) in Kotlin/Native.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#collectInto-U-io.reactivex.functions.BiConsumer-).
*/
fun <T, C> Observable<T>.collect(initialCollection: C, accumulator: (C, T) -> C): Single<C> =
single { emitter ->
subscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import com.badoo.reaktive.utils.Uninitialized
import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.utils.serializer.serializer

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-java.lang.Iterable-io.reactivex.functions.Function-).
*/
fun <T, R> Iterable<Observable<T>>.combineLatest(mapper: (List<T>) -> R): Observable<R> =
observable { emitter ->
val sources = toList()
Expand Down Expand Up @@ -95,11 +101,23 @@ private sealed class CombineLatestEvent<out T> {
class OnError(val error: Throwable) : CombineLatestEvent<Nothing>()
}

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-java.lang.Iterable-io.reactivex.functions.Function-).
*/
fun <T, R> combineLatest(vararg sources: Observable<T>, mapper: (List<T>) -> R): Observable<R> =
sources
.asList()
.combineLatest(mapper)

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.functions.BiFunction-).
*/
fun <T1, T2, R> combineLatest(
source1: Observable<T1>,
source2: Observable<T2>,
Expand All @@ -111,6 +129,12 @@ fun <T1, T2, R> combineLatest(
mapper(values[0] as T1, values[1] as T2)
}

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.functions.Function3-).
*/
fun <T1, T2, T3, R> combineLatest(
source1: Observable<T1>,
source2: Observable<T2>,
Expand All @@ -123,6 +147,12 @@ fun <T1, T2, T3, R> combineLatest(
mapper(values[0] as T1, values[1] as T2, values[2] as T3)
}

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.functions.Function4-).
*/
fun <T1, T2, T3, T4, R> combineLatest(
source1: Observable<T1>,
source2: Observable<T2>,
Expand All @@ -136,6 +166,12 @@ fun <T1, T2, T3, T4, R> combineLatest(
mapper(values[0] as T1, values[1] as T2, values[2] as T3, values[3] as T4)
}

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.functions.Function5-).
*/
fun <T1, T2, T3, T4, T5, R> combineLatest(
source1: Observable<T1>,
source2: Observable<T2>,
Expand All @@ -150,6 +186,12 @@ fun <T1, T2, T3, T4, T5, R> combineLatest(
mapper(values[0] as T1, values[1] as T2, values[2] as T3, values[3] as T4, values[4] as T5)
}

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.functions.Function6-).
*/
fun <T1, T2, T3, T4, T5, T6, R> combineLatest(
source1: Observable<T1>,
source2: Observable<T2>,
Expand All @@ -165,6 +207,12 @@ fun <T1, T2, T3, T4, T5, T6, R> combineLatest(
mapper(values[0] as T1, values[1] as T2, values[2] as T3, values[3] as T4, values[4] as T5, values[5] as T6)
}

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.functions.Function7-).
*/
fun <T1, T2, T3, T4, T5, T6, T7, R> combineLatest(
source1: Observable<T1>,
source2: Observable<T2>,
Expand All @@ -189,6 +237,12 @@ fun <T1, T2, T3, T4, T5, T6, T7, R> combineLatest(
)
}

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.functions.Function8-).
*/
fun <T1, T2, T3, T4, T5, T6, T7, T8, R> combineLatest(
source1: Observable<T1>,
source2: Observable<T2>,
Expand All @@ -215,6 +269,12 @@ fun <T1, T2, T3, T4, T5, T6, T7, T8, R> combineLatest(
)
}

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.ObservableSource-io.reactivex.functions.Function9-).
*/
fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> combineLatest(
source1: Observable<T1>,
source2: Observable<T2>,
Expand Down Expand Up @@ -243,6 +303,12 @@ fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> combineLatest(
)
}

/**
* Combines source [Observable]s by emitting an element that aggregates the latest elements of each of the source [Observable]s
* each time an element is received from any of the source [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#combineLatest-java.lang.Iterable-io.reactivex.functions.Function-).
*/
fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, R> combineLatest(
source1: Observable<T1>,
source2: Observable<T2>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package com.badoo.reaktive.observable

/**
* Concatenates elements of each [Observable] into a single [Observable] without interleaving them.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#concat-java.lang.Iterable-).
*/
fun <T> Iterable<Observable<T>>.concat(): Observable<T> =
asObservable()
.concatMap { it }

/**
* Concatenates elements of each [Observable] into a single [Observable] without interleaving them.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#concatArray-io.reactivex.ObservableSource...-).
*/
fun <T> concat(vararg sources: Observable<T>): Observable<T> =
sources
.asList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import com.badoo.reaktive.utils.queue.SharedQueue
import com.badoo.reaktive.utils.serializer.Serializer
import com.badoo.reaktive.utils.serializer.serializer

/**
* Returns an [Observable] that applies the [mapper] to every element emitted by the source [Observable]
* and concatenates the returned [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#concatMap-io.reactivex.functions.Function-).
*/
fun <T, R> Observable<T>.concatMap(mapper: (T) -> Observable<R>): Observable<R> =
observable { emitter ->
val upstreamObserver = ConcatMapObserver(emitter.serialize(), mapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package com.badoo.reaktive.observable
import com.badoo.reaktive.maybe.Maybe
import com.badoo.reaktive.maybe.asObservable

/**
* Returns an [Observable] that applies the [mapper] to every element emitted by the source [Observable]
* and concatenates the returned [Maybe]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#concatMapMaybe-io.reactivex.functions.Function-).
*/
fun <T, R> Observable<T>.concatMapMaybe(mapper: (T) -> Maybe<R>): Observable<R> =
concatMap {
mapper(it).asObservable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package com.badoo.reaktive.observable
import com.badoo.reaktive.single.Single
import com.badoo.reaktive.single.asObservable

/**
* Returns an [Observable] that applies the [mapper] to every element emitted by the source [Observable]
* and concatenates the returned [Single]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#concatMapSingle-io.reactivex.functions.Function-).
*/
fun <T, R> Observable<T>.concatMapSingle(mapper: (T) -> Single<R>): Observable<R> =
concatMap {
mapper(it).asObservable()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package com.badoo.reaktive.observable

/**
* Concatenates both the source and the [other][other] [Observable]s.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#concatWith-io.reactivex.ObservableSource-).
*/
fun <T> Observable<T>.concatWith(other: Observable<T>): Observable<T> = concat(this, other)

/**
* Returns an [Observable] that first emits all elements from the source [Observable] and then the provided [value].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#concatWith-io.reactivex.ObservableSource-).
*/
fun <T> Observable<T>.concatWithValue(value: T): Observable<T> = concat(this, value.toObservable())
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,10 @@ package com.badoo.reaktive.observable

import com.badoo.reaktive.base.Connectable

/**
* Resembles an ordinary [Observable], except that it does not begin emitting items when it is subscribed to,
* but only when its [Connectable.connect] method is called.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/observables/ConnectableObservable.html).
*/
interface ConnectableObservable<out T> : Observable<T>, Connectable
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import com.badoo.reaktive.utils.atomic.AtomicReference
import com.badoo.reaktive.utils.atomic.getAndUpdate
import com.badoo.reaktive.utils.atomic.update

/**
* Returns an [Observable] that mirrors the source [Observable], but drops elements
* that are followed by newer ones before the [timeoutMillis] timeout expires on a specified [Scheduler].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#debounce-long-java.util.concurrent.TimeUnit-io.reactivex.Scheduler-).
*/
fun <T> Observable<T>.debounce(timeoutMillis: Long, scheduler: Scheduler): Observable<T> =
observable { emitter ->
val disposables = CompositeDisposable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import com.badoo.reaktive.disposable.plusAssign
import com.badoo.reaktive.utils.atomic.AtomicReference
import com.badoo.reaktive.utils.atomic.getAndUpdate

/**
* Returns an [Observable] that mirrors the source [Observable], but drops elements
* that are followed by newer ones within a computed debounce duration.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#debounce-io.reactivex.functions.Function-).
*/
fun <T> Observable<T>.debounce(debounceSelector: (T) -> Completable): Observable<T> =
observable { emitter ->
val disposables = CompositeDisposable()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
package com.badoo.reaktive.observable

/**
* Returns an [Observable] that emits the elements emitted from the source [Observable]
* or the specified [defaultValue] if the source [Observable] is empty.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#defaultIfEmpty-T-).
*/
fun <T> Observable<T>.defaultIfEmpty(defaultValue: T): Observable<T> =
switchIfEmpty(observableOf(defaultValue))
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package com.badoo.reaktive.observable

import com.badoo.reaktive.disposable.Disposable

/**
* Calls the [supplier] for each new observer and subscribes to the returned [Observable].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#defer-java.util.concurrent.Callable-).
*/
fun <T> observableDefer(supplier: () -> Observable<T>): Observable<T> =
observable { emitter ->
supplier().subscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.disposable.plusAssign
import com.badoo.reaktive.scheduler.Scheduler

/**
* Delays `onNext` and `onComplete` signals from the current [Observable] for the specified time.
* The `onError` signal is not delayed by default, which can be enabled by setting the [delayError] flag.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#delay-long-java.util.concurrent.TimeUnit-io.reactivex.Scheduler-boolean-).
*/
fun <T> Observable<T>.delay(delayMillis: Long, scheduler: Scheduler, delayError: Boolean = false): Observable<T> =
observable { emitter ->
val disposables = CompositeDisposable()
Expand Down
Loading

0 comments on commit 985d59e

Please sign in to comment.