Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added maxConcurrency argument to flatMap #614

Merged
merged 3 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions reaktive/api/reaktive.api
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ public final class com/badoo/reaktive/observable/FirstOrErrorKt {
}

public final class com/badoo/reaktive/observable/FlatMapCompletableKt {
public static final fun flatMapCompletable (Lcom/badoo/reaktive/observable/Observable;ILkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/completable/Completable;
public static final fun flatMapCompletable (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/completable/Completable;
}

Expand All @@ -699,16 +700,22 @@ public final class com/badoo/reaktive/observable/FlatMapIterableKt {
}

public final class com/badoo/reaktive/observable/FlatMapKt {
public static final fun flatMap (Lcom/badoo/reaktive/observable/Observable;ILkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
public static final fun flatMap (Lcom/badoo/reaktive/observable/Observable;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lcom/badoo/reaktive/observable/Observable;
public static final fun flatMap (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
public static final fun flatMap (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/FlatMapMaybeKt {
public static final fun flatMapMaybe (Lcom/badoo/reaktive/observable/Observable;ILkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
public static final fun flatMapMaybe (Lcom/badoo/reaktive/observable/Observable;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lcom/badoo/reaktive/observable/Observable;
public static final fun flatMapMaybe (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
public static final fun flatMapMaybe (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/FlatMapSingleKt {
public static final fun flatMapSingle (Lcom/badoo/reaktive/observable/Observable;ILkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
public static final fun flatMapSingle (Lcom/badoo/reaktive/observable/Observable;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lcom/badoo/reaktive/observable/Observable;
public static final fun flatMapSingle (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
public static final fun flatMapSingle (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lcom/badoo/reaktive/observable/Observable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,70 @@ import com.badoo.reaktive.base.ValueCallback
import com.badoo.reaktive.base.tryCatch
import com.badoo.reaktive.disposable.CompositeDisposable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.disposable.addTo
import com.badoo.reaktive.utils.ObjectReference
import com.badoo.reaktive.utils.RefCounter
import com.badoo.reaktive.utils.atomic.AtomicBoolean
import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.utils.lock.Lock
import com.badoo.reaktive.utils.lock.synchronized
import com.badoo.reaktive.utils.queue.SharedQueue
import com.badoo.reaktive.utils.use

fun <T, R> Observable<T>.flatMap(mapper: (T) -> Observable<R>): Observable<R> =
observable { emitter ->
val upstreamObserver = FlatMapObserver(emitter.serialize(), mapper)
flatMap(maxConcurrency = Int.MAX_VALUE, mapper = mapper)

fun <T, R> Observable<T>.flatMap(maxConcurrency: Int, mapper: (T) -> Observable<R>): Observable<R> {
require(maxConcurrency > 0) { "maxConcurrency value must be positive" }

return observable { emitter ->
val upstreamObserver = FlatMapObserver(emitter.serialize(), maxConcurrency, mapper)
emitter.setDisposable(upstreamObserver)
subscribe(upstreamObserver)
}
}

fun <T, U, R> Observable<T>.flatMap(mapper: (T) -> Observable<U>, resultSelector: (T, U) -> R): Observable<R> =
flatMap(maxConcurrency = Int.MAX_VALUE, mapper = mapper, resultSelector = resultSelector)

fun <T, U, R> Observable<T>.flatMap(maxConcurrency: Int, mapper: (T) -> Observable<U>, resultSelector: (T, U) -> R): Observable<R> =
flatMap(maxConcurrency = maxConcurrency) { t ->
mapper(t).map { u -> resultSelector(t, u) }
}

private class FlatMapObserver<in T, in R>(
private val callbacks: ObservableCallbacks<R>,
maxConcurrency: Int,
private val mapper: (T) -> Observable<R>
) : CompositeDisposable(), ObservableObserver<T>, ErrorCallback by callbacks {

private val activeSourceCount = AtomicInt(1)

private val queue: FlatMapQueue<Observable<R>>? =
maxConcurrency
.takeIf { it < Int.MAX_VALUE }
?.let { FlatMapQueue(limit = it, callback = ::subscribeInner) }
?.addTo(this)

override fun onSubscribe(disposable: Disposable) {
add(disposable)
}

override fun onNext(value: T) {
activeSourceCount.addAndGet(1)

callbacks.tryCatch({ mapper(value) }) { inner ->
if (queue == null) {
subscribeInner(inner)
} else {
queue.offer(inner)
}
}
}

private fun subscribeInner(inner: Observable<R>) {
callbacks.tryCatch {
mapper(value).subscribe(InnerObserver())
inner.subscribe(InnerObserver())
}
}

Expand All @@ -53,12 +91,61 @@ private class FlatMapObserver<in T, in R>(

override fun onComplete() {
remove(value!!)
queue?.poll()
this@FlatMapObserver.onComplete()
}
}
}

fun <T, U, R> Observable<T>.flatMap(mapper: (T) -> Observable<U>, resultSelector: (T, U) -> R): Observable<R> =
flatMap { t ->
mapper(t).map { u -> resultSelector(t, u) }
private class FlatMapQueue<in T : Any>(
limit: Int,
private val callback: (T) -> Unit
) : Disposable {

private val lock = Lock()
private val count = AtomicInt(limit)
private val queue = SharedQueue<T>()

private val refCounter =
RefCounter {
lock.destroy()
count.value = 0
queue.clear()
}

private val _isDisposed = AtomicBoolean(false)
override val isDisposed: Boolean get() = _isDisposed.value

override fun dispose() {
if (_isDisposed.compareAndSet(expectedValue = false, newValue = true)) {
refCounter.release()
}
}

fun offer(value: T) {
sync {
if (count.value > 0) {
count.value--
value
} else {
queue.offer(value)
null
}
}?.also(callback)
}

fun poll() {
sync {
val next = queue.poll()
if (next == null) {
count.value++
}
next
}?.also(callback)
}

private inline fun <T> sync(block: () -> T): T? =
refCounter.use {
lock.synchronized(block)
}
}
Original file line number Diff line number Diff line change
@@ -1,57 +1,11 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.base.tryCatch
import com.badoo.reaktive.completable.Completable
import com.badoo.reaktive.completable.CompletableCallbacks
import com.badoo.reaktive.completable.CompletableObserver
import com.badoo.reaktive.completable.completable
import com.badoo.reaktive.completable.serialize
import com.badoo.reaktive.disposable.CompositeDisposable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.utils.ObjectReference
import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.completable.asObservable

fun <T> Observable<T>.flatMapCompletable(mapper: (T) -> Completable): Completable =
completable { emitter ->
val upstreamObserver = FlatMapCompletableObserver(emitter.serialize(), mapper)
emitter.setDisposable(upstreamObserver)
subscribe(upstreamObserver)
}
flatMapCompletable(maxConcurrency = Int.MAX_VALUE, mapper = mapper)

private class FlatMapCompletableObserver<in T>(
private val callbacks: CompletableCallbacks,
private val mapper: (T) -> Completable
) : CompositeDisposable(), ObservableObserver<T>, ErrorCallback by callbacks {

private val activeSourceCount = AtomicInt(1)

override fun onSubscribe(disposable: Disposable) {
add(disposable)
}

override fun onNext(value: T) {
activeSourceCount.addAndGet(1)
callbacks.tryCatch {
mapper(value).subscribe(InnerObserver())
}
}

override fun onComplete() {
if (activeSourceCount.addAndGet(-1) <= 0) {
callbacks.onComplete()
}
}

private inner class InnerObserver : ObjectReference<Disposable?>(null), CompletableObserver, ErrorCallback by callbacks {
override fun onSubscribe(disposable: Disposable) {
value = disposable
add(disposable)
}

override fun onComplete() {
remove(value!!)
this@FlatMapCompletableObserver.onComplete()
}
}
}
fun <T> Observable<T>.flatMapCompletable(maxConcurrency: Int, mapper: (T) -> Completable): Completable =
flatMap(maxConcurrency = maxConcurrency) { mapper(it).asObservable<Nothing>() }
.asCompletable()
Original file line number Diff line number Diff line change
@@ -1,49 +1,19 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.CompositeDisposableObserver
import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.base.Observer
import com.badoo.reaktive.base.tryCatch
import com.badoo.reaktive.completable.CompletableCallbacks
import com.badoo.reaktive.maybe.Maybe
import com.badoo.reaktive.maybe.MaybeObserver
import com.badoo.reaktive.maybe.asObservable
import com.badoo.reaktive.maybe.map
import com.badoo.reaktive.utils.atomic.AtomicInt

fun <T, R> Observable<T>.flatMapMaybe(mapper: (T) -> Maybe<R>): Observable<R> =
observable { emitter ->
val serializedEmitter = emitter.serialize()
flatMapMaybe(maxConcurrency = Int.MAX_VALUE, mapper = mapper)

val upstreamObserver =
object : CompositeDisposableObserver(), ObservableObserver<T>, ErrorCallback by serializedEmitter {
private val activeSourceCount = AtomicInt(1)

private val mappedObserver: MaybeObserver<R> =
object : MaybeObserver<R>, Observer by this, CompletableCallbacks by this {
override fun onSuccess(value: R) {
serializedEmitter.onNext(value)
onComplete()
}
}

override fun onNext(value: T) {
activeSourceCount.addAndGet(1)
serializedEmitter.tryCatch(block = { mapper(value).subscribe(mappedObserver) })
}

override fun onComplete() {
if (activeSourceCount.addAndGet(-1) <= 0) {
serializedEmitter.onComplete()
}
}
}

emitter.setDisposable(upstreamObserver)

subscribe(upstreamObserver)
}
fun <T, R> Observable<T>.flatMapMaybe(maxConcurrency: Int, mapper: (T) -> Maybe<R>): Observable<R> =
flatMap(maxConcurrency = maxConcurrency) { mapper(it).asObservable() }

fun <T, U, R> Observable<T>.flatMapMaybe(mapper: (T) -> Maybe<U>, resultSelector: (T, U) -> R): Observable<R> =
flatMapMaybe { t ->
flatMapMaybe(maxConcurrency = Int.MAX_VALUE, mapper = mapper, resultSelector = resultSelector)

fun <T, U, R> Observable<T>.flatMapMaybe(maxConcurrency: Int, mapper: (T) -> Maybe<U>, resultSelector: (T, U) -> R): Observable<R> =
flatMapMaybe(maxConcurrency = maxConcurrency) { t ->
mapper(t).map { u -> resultSelector(t, u) }
}
Original file line number Diff line number Diff line change
@@ -1,48 +1,19 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.CompositeDisposableObserver
import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.base.Observer
import com.badoo.reaktive.base.tryCatch
import com.badoo.reaktive.single.Single
import com.badoo.reaktive.single.SingleObserver
import com.badoo.reaktive.single.asObservable
import com.badoo.reaktive.single.map
import com.badoo.reaktive.utils.atomic.AtomicInt

fun <T, R> Observable<T>.flatMapSingle(mapper: (T) -> Single<R>): Observable<R> =
observable { emitter ->
val serializedEmitter = emitter.serialize()
flatMapSingle(maxConcurrency = Int.MAX_VALUE, mapper = mapper)

val upstreamObserver =
object : CompositeDisposableObserver(), ObservableObserver<T>, ErrorCallback by serializedEmitter {
private val activeSourceCount = AtomicInt(1)

private val mappedObserver: SingleObserver<R> =
object : SingleObserver<R>, Observer by this, ErrorCallback by serializedEmitter {
override fun onSuccess(value: R) {
serializedEmitter.onNext(value)
onComplete()
}
}

override fun onNext(value: T) {
activeSourceCount.addAndGet(1)
serializedEmitter.tryCatch(block = { mapper(value).subscribe(mappedObserver) })
}

override fun onComplete() {
if (activeSourceCount.addAndGet(-1) <= 0) {
serializedEmitter.onComplete()
}
}
}

emitter.setDisposable(upstreamObserver)

subscribe(upstreamObserver)
}
fun <T, R> Observable<T>.flatMapSingle(maxConcurrency: Int, mapper: (T) -> Single<R>): Observable<R> =
flatMap(maxConcurrency = maxConcurrency) { mapper(it).asObservable() }

fun <T, U, R> Observable<T>.flatMapSingle(mapper: (T) -> Single<U>, resultSelector: (T, U) -> R): Observable<R> =
flatMapSingle { t ->
flatMapSingle(maxConcurrency = Int.MAX_VALUE, mapper = mapper, resultSelector = resultSelector)

fun <T, U, R> Observable<T>.flatMapSingle(maxConcurrency: Int, mapper: (T) -> Single<U>, resultSelector: (T, U) -> R): Observable<R> =
flatMapSingle(maxConcurrency = maxConcurrency) { t ->
mapper(t).map { u -> resultSelector(t, u) }
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.badoo.reaktive.utils

internal inline fun RefCounter.use(block: () -> Unit) {
internal inline fun <T> RefCounter.use(block: () -> T): T? =
if (retain()) {
try {
block()
} finally {
release()
}
} else {
null
}
}
Loading