-
Notifications
You must be signed in to change notification settings - Fork 125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JavaFuture <=> TwitterFuture bijection #247
Conversation
val converter = new JavaFutureToTwitterFutureConverter() | ||
converter.start() | ||
converter | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have ideas on how to close the converter properly it'll be most welcome.
oops |
* @param waitTimeMs Time spent sleeping by the thread converting java futures to | ||
* twitter futures when there are no futures to convert in ms | ||
*/ | ||
private[twitter_util] class JavaFutureToTwitterFutureConverter(waitTimeMs: Long = 1000L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems overly complicated (and induces a lot of latency). How about instead the default is that we use a thread per future? the base implementation would be something like:
import com.twitter.util.{FuturePool, Promise, Future}
import java.util.concurrent.{Future => JFuture}
import scala.util.control.NonFatal
class FuturePoolConverter(mayInterruptWhileRunning: Boolean) {
def apply[A](jfuture: JFuture[A]): Future[A] = {
val f = FuturePool.unboundedPool { jfuture.get() }
val p = Promise.detached(f)
p.setInterruptHandler { case NonFatal(e) =>
if (p.detach()) {
f.raise(e)
jfuture.cancel(mayInterruptWhileRunning)
}
}
p
}
}
A more sophisticated converter could use a Timer explicitly, and do something similar to what the current implementation does.
import com.twitter.util.{FuturePool, Promise, Timer, Try, Throw, Future}
import java.util.concurrent.{Future => JFuture}
import scala.util.control.NonFatal
class FuturePoolConverter(timer: Timer, checkFrequency: Duration, mayInterruptWhileRunning: Boolean) {
def apply[A](jfuture: JFuture[A]): Future[A] = {
val p = Promise[A]()
val task = timer.schedule(checkFrequency) {
if (jfuture.isDone) {
p.updateIfEmpty(Try(jfuture.get()))
task.cancel()
}
}
p.setInterruptHandler { case NonFatal(e) =>
task.cancel()
p.updateIfEmpty(Throw(e))
}
p
}
}
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't using one thread per future be a bit wasteful compared to the current approach?
On the other hand, this approach would simplify the API as there wouldn't be a need for an implicit parameter.
Also, could you explain what you mean by "induces a lot of latency" compared to your second approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BenFradet I think we should still use the implicit parameter.
Yes, it would be wasteful. That's why I also proposed the Timer approach, which I think is quite similar to your approach, and is appropriate for cases where there may be many futures. The overhead will be mostly in memory for threads (if you have many concurrent futures you're waiting for) and in context switches (if you have many futures finishing at the same time).
A timer approach is almost always going to be relatively high latency, because they all work using tick mechanisms. When you block, you move the work to the scheduler, which has the ability to work up a specific thread when there's work to do. This means that a thread can be notified immediately if we're blocking, but if we're waiting for the next tick to check, it will add the latency until the next tick necessarily. This will move the latency of discovering a future has finished from ~microseconds to ~milliseconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thread per future seems like a bad idea to me.
I'm not crazy about the polling all the time, but maybe the current approach is equivalent to the timer anyway (assuming a well implemented Timer
).
We could do both approaches:
abstract class JavaFutureConverter {
def apply[T](j: JFuture[T]): Future[T]
}
class FuturePoolJConverter(implicit fp: FuturePool) {
...
}
class TimerJConverter(t: Timer, d: ...) {
}
Then the bijection
takes an implicit JavaFutureConverter which in some cases might be FuturePool based, in others timer/polling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnynek yep, that's exactly what I'm proposing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mosesn thanks for the explanation!
): Bijection[TwitterFuture[A], JavaFuture[A]] = { | ||
new AbstractBijection[TwitterFuture[A], JavaFuture[A]] { | ||
override def apply(f: TwitterFuture[A]): JavaFuture[A] = | ||
f.toJavaFuture.asInstanceOf[JavaFuture[A]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I can't use CompletableFuture
I found this way around, I don't know if there is a better way, feedback welcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For that matter, I don't understand why toJavaFuture returns a future of some subtype of A and not just a future of A.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's because \forall B <: A, TwitterFuture[B] <: TwitterFuture[A], but the same isn't true for JFuture, since they encode covariance differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, thanks for explaining
The failing test doesn't concern this pr:
|
@@ -79,6 +81,22 @@ trait UtilBijections { | |||
} | |||
|
|||
/** | |||
* Injection from java futures to twitter futures | |||
*/ | |||
implicit def twitter2JavaFuture[A]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might make sense to expose an injection
which doesn't need a converter too, in case folks are only going JFuture => TwitterFuture
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean going TwitterFuture => JavaFuture
?
In any case, I agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yeah, my mistake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However I'm not sure how I would go about inverting without a converter, ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
injections only need to go one way. you can just return None if it's not a ConstFuture.
override def cancel(mayInterruptIfRunning: Boolean): Boolean = false | ||
override def isDone: Boolean = true | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason:
new FutureTask[T](new Callable[T] {
override def call(): T = t
}
and
new FutureTask[T](new Runnable {
override def run(): Unit = ()
}, t)
wouldn't work and take forever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BenFradet did you try calling run
on the task after you created it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, you were right 😳
f.toJavaFuture.asInstanceOf[JavaFuture[A]] | ||
|
||
override def invert(f: JavaFuture[A]): ScalaTry[TwitterFuture[A]] = | ||
Inversion.attemptWhen(f)(_.isDone)(jf => TwitterFuture.value(jf.get())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mosesn I tried this approach, could you tell me what you think? thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you want TwitterFuture(jf.get)
since .get
can throw?
val p = Promise[T] | ||
val task = timer.schedule(checkFrequency) { | ||
if (javaFuture.isDone) { | ||
p.updateIfEmpty(Try(javaFuture.get())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we not need to unschedule this task once completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do, I haven't found out how yet.
If it'd had been a j.u.Timer we would have been able to do:
if (javaFuture.isDone) {
p.updateIfEmpty(Try(javaFuture.get()))
timer.cancel()
timer.purge()
}
But c.t.u.Timer doesn't expose the same interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to call task.cancel()
. you might need to mutate a reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mosesn I'm not sure to get what you mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's an example:
var task = null
task = timer.schedule(...) {
...
task.cancel()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could do
lazy val task = timer.schedule(...) { ... task.cancel() }
lazy val can be recursive. I prefer that to the var here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was afraid the first solution wouldn't work / be too awful.
I'll try the second one, didnt know this could be possible, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you'll have to force the task
after though. Something like require(task != null)
to actually schedule it, which is also a bit janky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed :'(
Thank you! |
This implements a bijection between
JavaFuture
s andTwitterFuture
s.It originally came from the storehaus-kafka module because the kafka api uses
JavaFuture
.The idea is to make it available here since it'd fit nicely with the other bijection between
ScalaFuture
s andTwitterFuture
s.credit to @johnynek for the implementation