Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JavaFuture <=> TwitterFuture bijection #247

Merged
merged 16 commits into from
May 14, 2016
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2012 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.bijection.twitter_util

import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ Future => JFuture }

import com.twitter.util.{ Future, Promise, Try }

import scala.annotation.tailrec

/**
* Utility class for converting Java futures to Twitter's
*
* @param waitTimeMs Time spent sleeping by the thread converting java futures to
* twitter futures when there are no futures to convert in ms
*/
private[twitter_util] class JavaFutureToTwitterFutureConverter(waitTimeMs: Long = 1000L) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems overly complicated (and induces a lot of latency). How about instead the default is that we use a thread per future? the base implementation would be something like:

import com.twitter.util.{FuturePool, Promise, Future}
import java.util.concurrent.{Future => JFuture}
import scala.util.control.NonFatal

class FuturePoolConverter(mayInterruptWhileRunning: Boolean) {
  def apply[A](jfuture: JFuture[A]): Future[A] = {
    val f = FuturePool.unboundedPool { jfuture.get() }
    val p = Promise.detached(f)
    p.setInterruptHandler { case NonFatal(e) =>
      if (p.detach()) {
        f.raise(e)
        jfuture.cancel(mayInterruptWhileRunning)
      }
    }
    p
  }
}

A more sophisticated converter could use a Timer explicitly, and do something similar to what the current implementation does.

import com.twitter.util.{FuturePool, Promise, Timer, Try, Throw, Future}
import java.util.concurrent.{Future => JFuture}
import scala.util.control.NonFatal

class FuturePoolConverter(timer: Timer, checkFrequency: Duration, mayInterruptWhileRunning: Boolean) {
  def apply[A](jfuture: JFuture[A]): Future[A] = {
    val p = Promise[A]()
    val task = timer.schedule(checkFrequency) { 
      if (jfuture.isDone) {
        p.updateIfEmpty(Try(jfuture.get()))
        task.cancel()
      }
    }
    p.setInterruptHandler { case NonFatal(e) =>
      task.cancel()
      p.updateIfEmpty(Throw(e))
    }
    p
  }
}

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't using one thread per future be a bit wasteful compared to the current approach?
On the other hand, this approach would simplify the API as there wouldn't be a need for an implicit parameter.
Also, could you explain what you mean by "induces a lot of latency" compared to your second approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BenFradet I think we should still use the implicit parameter.

Yes, it would be wasteful. That's why I also proposed the Timer approach, which I think is quite similar to your approach, and is appropriate for cases where there may be many futures. The overhead will be mostly in memory for threads (if you have many concurrent futures you're waiting for) and in context switches (if you have many futures finishing at the same time).

A timer approach is almost always going to be relatively high latency, because they all work using tick mechanisms. When you block, you move the work to the scheduler, which has the ability to work up a specific thread when there's work to do. This means that a thread can be notified immediately if we're blocking, but if we're waiting for the next tick to check, it will add the latency until the next tick necessarily. This will move the latency of discovering a future has finished from ~microseconds to ~milliseconds.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread per future seems like a bad idea to me.

I'm not crazy about the polling all the time, but maybe the current approach is equivalent to the timer anyway (assuming a well implemented Timer).

We could do both approaches:

abstract class JavaFutureConverter {
  def apply[T](j: JFuture[T]): Future[T]
}

class FuturePoolJConverter(implicit fp: FuturePool) {
  ...
}

class TimerJConverter(t: Timer, d: ...) {

}

Then the bijection takes an implicit JavaFutureConverter which in some cases might be FuturePool based, in others timer/polling?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnynek yep, that's exactly what I'm proposing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mosesn thanks for the explanation!


case class Link[T](future: JFuture[T], promise: Promise[T]) {
def maybeUpdate: Boolean = future.isDone && {
promise.update(Try(future.get()))
true
}

def cancel(): Unit = {
promise.setException(new Exception("Promise not completed"))
future.cancel(true)
}
}

sealed abstract class State
case object Closed extends State
case class Open(links: List[Link[_]]) extends State
val EmptyState: State = Open(Nil)

private val pollRun = new Runnable {
override def run(): Unit =
try {
if (!Thread.currentThread().isInterrupted)
loop(list.getAndSet(EmptyState))
} catch {
case e: InterruptedException =>
}

@tailrec
def swapOpen(old: List[Link[_]]): Option[List[Link[_]]] = list.get match {
case Closed => None
case s @ Open(links) =>
if (list.compareAndSet(s, Open(old))) Some(links)
else swapOpen(links)
}

@tailrec
def loop(state: State): Unit = state match {
case s @ Open(links) =>
val notDone = links.filterNot(_.maybeUpdate)
if (links.isEmpty || notDone.nonEmpty) Thread.sleep(waitTimeMs)
swapOpen(notDone) match {
case None => notDone.foreach(_.cancel())
case Some(next) => loop(Open(next))
}
case Closed =>
}
}
private val list = new AtomicReference[State](EmptyState)
private val thread = new Thread(pollRun)

def apply[T](javaFuture: JFuture[T]): Future[T] = {
val promise = new Promise[T]()
poll(Link(javaFuture, promise))
promise
}

def start(): Unit = {
thread.setDaemon(true)
thread.start()
}

def stop(): Unit = {
list.getAndSet(Closed) match {
case Closed => // already closed
case s @ Open(links) => links.foreach(_.cancel())
}
}

private def poll[T](link: Link[T]): Unit = list.get match {
case Closed => link.cancel()
case s @ Open(tail) =>
if (list.compareAndSet(s, Open(link :: tail))) ()
else poll(link)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ limitations under the License.

package com.twitter.bijection.twitter_util

import com.twitter.bijection.{ AbstractBijection, Bijection, ImplicitBijection }
import java.util.concurrent.{ Future => JavaFuture }

import com.twitter.bijection._
import com.twitter.io.Buf
import com.twitter.util.{ Future => TwitterFuture, Try => TwitterTry, Promise => TwitterPromise, Return, Throw, FuturePool }

Expand All @@ -40,7 +42,7 @@ trait UtilBijections {
implicit def futureBijection[A, B](implicit bij: ImplicitBijection[A, B]): Bijection[TwitterFuture[A], TwitterFuture[B]] =
new AbstractBijection[TwitterFuture[A], TwitterFuture[B]] {
override def apply(fa: TwitterFuture[A]) = fa.map(bij(_))
override def invert(fb: TwitterFuture[B]) = fb.map(bij.invert(_))
override def invert(fb: TwitterFuture[B]) = fb.map(bij.invert)
}

/**
Expand All @@ -50,7 +52,7 @@ trait UtilBijections {
implicit def futureScalaBijection[A, B](implicit bij: ImplicitBijection[A, B], executor: ExecutionContext): Bijection[ScalaFuture[A], ScalaFuture[B]] =
new AbstractBijection[ScalaFuture[A], ScalaFuture[B]] {
override def apply(fa: ScalaFuture[A]) = fa.map(bij(_))
override def invert(fb: ScalaFuture[B]) = fb.map(bij.invert(_))
override def invert(fb: ScalaFuture[B]) = fb.map(bij.invert)
}

/**
Expand Down Expand Up @@ -78,6 +80,22 @@ trait UtilBijections {
}
}

/**
* Injection from java futures to twitter futures
*/
implicit def twitter2JavaFuture[A](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might make sense to expose an injection which doesn't need a converter too, in case folks are only going JFuture => TwitterFuture.

Copy link
Contributor Author

@BenFradet BenFradet May 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean going TwitterFuture => JavaFuture?
In any case, I agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, my mistake.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I'm not sure how I would go about inverting without a converter, ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

injections only need to go one way. you can just return None if it's not a ConstFuture.

implicit converter: JavaFutureToTwitterFutureConverter
): Bijection[TwitterFuture[A], JavaFuture[A]] = {
new AbstractBijection[TwitterFuture[A], JavaFuture[A]] {
override def apply(f: TwitterFuture[A]): JavaFuture[A] =
f.toJavaFuture.asInstanceOf[JavaFuture[A]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I can't use CompletableFuture I found this way around, I don't know if there is a better way, feedback welcome.

Copy link
Contributor Author

@BenFradet BenFradet May 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that matter, I don't understand why toJavaFuture returns a future of some subtype of A and not just a future of A.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because \forall B <: A, TwitterFuture[B] <: TwitterFuture[A], but the same isn't true for JFuture, since they encode covariance differently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, thanks for explaining


override def invert(f: JavaFuture[A]): TwitterFuture[A] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for { } here

converter(f)
}
}
}

/**
* Bijection between twitter and scala style Trys
*/
Expand All @@ -102,7 +120,7 @@ trait UtilBijections {
implicit def tryBijection[A, B](implicit bij: ImplicitBijection[A, B]): Bijection[TwitterTry[A], TwitterTry[B]] =
new AbstractBijection[TwitterTry[A], TwitterTry[B]] {
override def apply(fa: TwitterTry[A]) = fa.map(bij(_))
override def invert(fb: TwitterTry[B]) = fb.map(bij.invert(_))
override def invert(fb: TwitterTry[B]) = fb.map(bij.invert)
}

/**
Expand All @@ -112,7 +130,7 @@ trait UtilBijections {
implicit def tryScalaBijection[A, B](implicit bij: ImplicitBijection[A, B]): Bijection[ScalaTry[A], ScalaTry[B]] =
new AbstractBijection[ScalaTry[A], ScalaTry[B]] {
override def apply(fa: ScalaTry[A]) = fa.map(bij(_))
override def invert(fb: ScalaTry[B]) = fb.map(bij.invert(_))
override def invert(fb: ScalaTry[B]) = fb.map(bij.invert)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2012 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.bijection.twitter_util

import java.util.concurrent.{ Callable, FutureTask }

import com.twitter.util.Await
import org.scalatest.{ Matchers, WordSpec }

class JavaFutureToTwitterFutureConverterSpec extends WordSpec with Matchers {
trait Fixtures {
val converter = {
val c = new JavaFutureToTwitterFutureConverter
c.start()
c
}
}

"JavaFutureToTwitterFutureConverter" should {
"do a proper round trip between a java future to a twitter future" in new Fixtures {
val jFuture = new FutureTask[Int](new Callable[Int] {
override def call(): Int = 3
})
val tFuture = converter(jFuture)
tFuture.toJavaFuture === jFuture
}
"cancel the future when calling apply after stop" in new Fixtures {
val jFuture = new FutureTask[Int](new Callable[Int] {
override def call(): Int = 3
})
converter.stop()
val tFuture = converter(jFuture)
the[Exception] thrownBy {
Await.result(tFuture)
} should have message "Promise not completed"
jFuture.isCancelled shouldBe true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,72 +16,92 @@ limitations under the License.

package com.twitter.bijection.twitter_util

import com.twitter.bijection.{ CheckProperties, BaseProperties, Bijection }
import com.google.common.util.concurrent.Futures
import com.twitter.bijection.{ CheckProperties, BaseProperties }
import com.twitter.io.Buf
import com.twitter.util.{ Future => TwitterFuture, Try => TwitterTry, Await => TwitterAwait }
import java.lang.{ Integer => JInt, Long => JLong }
import java.util.concurrent.{ Future => JavaFuture }
import org.scalacheck.Arbitrary
import org.scalatest.{ PropSpec, MustMatchers }
import org.scalatest.prop.PropertyChecks
import org.scalatest.BeforeAndAfterAll

import org.scalacheck.Prop.forAll
import scala.concurrent.{ Future => ScalaFuture, Await => ScalaAwait }
import scala.concurrent.duration.Duration
import scala.util.{ Try => ScalaTry }
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global

class UtilBijectionLaws extends CheckProperties with BaseProperties {
class UtilBijectionLaws extends CheckProperties with BaseProperties with BeforeAndAfterAll {
import UtilBijections._

protected def toOption[T](f: TwitterFuture[T]): Option[T] = TwitterTry(TwitterAwait.result(f)).toOption
protected def toOption[T](f: TwitterFuture[T]): Option[T] =
TwitterTry(TwitterAwait.result(f)).toOption

protected def toOption[T](f: ScalaFuture[T]): Option[T] = TwitterTry(ScalaAwait.result(f, Duration.Inf)).toOption
protected def toOption[T](f: ScalaFuture[T]): Option[T] =
TwitterTry(ScalaAwait.result(f, Duration.Inf)).toOption

implicit def futureArb[T: Arbitrary] = arbitraryViaFn[T, TwitterFuture[T]] { TwitterFuture.value(_) }
protected def toOption[T](f: JavaFuture[T]): Option[T] =
TwitterTry(f.get()).toOption

implicit def futureArb[T: Arbitrary] = arbitraryViaFn[T, TwitterFuture[T]] { TwitterFuture.value }
implicit def scalaFutureArb[T: Arbitrary] = arbitraryViaFn[T, ScalaFuture[T]] { future(_) }
implicit def javaFutureArb[T: Arbitrary] =
arbitraryViaFn[T, JavaFuture[T]] { Futures.immediateFuture }
Copy link
Contributor

@mosesn mosesn May 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is worth entailing the dependency. How do you feel about rolling your own with FutureTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into it.

implicit def tryArb[T: Arbitrary] = arbitraryViaFn[T, TwitterTry[T]] { TwitterTry(_) }
implicit def scalaTryArb[T: Arbitrary] = arbitraryViaFn[T, ScalaTry[T]] { ScalaTry(_) }

implicit val jIntArb = arbitraryViaBijection[Int, JInt]
implicit val jLongArb = arbitraryViaBijection[Long, JLong]
implicit val bufArb: Arbitrary[Buf] = arbitraryViaFn[Array[Byte], Buf](Buf.ByteArray.Owned.apply)

implicit protected def futureEq[T: Equiv]: Equiv[TwitterFuture[T]] = Equiv.fromFunction { (f1, f2) =>
Equiv[Option[T]].equiv(toOption(f1), toOption(f2))
implicit val futureConverter = {
val converter = new JavaFutureToTwitterFutureConverter()
converter.start()
converter
}

implicit protected def scalaFutureEq[T: Equiv]: Equiv[ScalaFuture[T]] = Equiv.fromFunction { (f1, f2) =>
Equiv[Option[T]].equiv(toOption(f1), toOption(f2))
}
override protected def afterAll(): Unit = futureConverter.stop()

implicit protected def futureEq[T: Equiv]: Equiv[TwitterFuture[T]] =
Equiv.fromFunction { (f1, f2) => Equiv[Option[T]].equiv(toOption(f1), toOption(f2)) }

implicit protected def scalaFutureEq[T: Equiv]: Equiv[ScalaFuture[T]] =
Equiv.fromFunction { (f1, f2) => Equiv[Option[T]].equiv(toOption(f1), toOption(f2)) }

implicit protected def javaFutureEq[T: Equiv]: Equiv[JavaFuture[T]] =
Equiv.fromFunction { (f1, f2) => Equiv[Option[T]].equiv(toOption(f1), toOption(f2)) }

type FromMap = Map[Int, Long]
type ToMap = Map[JInt, JLong]

property("round trips com.twitter.util.Future[Map[Int, String]] -> com.twitter.util.Future[JInt, JLong]") {
property("round trips TwitterFuture[Map[Int, String]] -> Twitter.Future[JInt, JLong]") {
isBijection[TwitterFuture[FromMap], TwitterFuture[ToMap]]
}

property("round trips scala.concurrent.Future[Map[Int, String]] -> scala.concurrent.Future[JInt, JLong]") {
property("round trips ScalaFuture[Map[Int, String]] -> ScalaFuture[JInt, JLong]") {
isBijection[ScalaFuture[FromMap], ScalaFuture[ToMap]]
}

property("round trips com.twitter.util.Try[Map[Int, String]] -> com.twitter.util.Try[Map[JInt, JLong]]") {
property("round trips TwitterTry[Map[Int, String]] -> TwitterTry[Map[JInt, JLong]]") {
isBijection[TwitterTry[FromMap], TwitterTry[ToMap]]
}

property("round trips scala.util.Try[Map[Int, String]] -> scala.util.Try[Map[JInt, JLong]]") {
property("round trips ScalaTry[Map[Int, String]] -> ScalaTry[Map[JInt, JLong]]") {
isBijection[ScalaTry[FromMap], ScalaTry[ToMap]]
}

property("round trips com.twitter.util.Try[Map[JInt, JLong]] -> scala.util.Try[Map[JInt, JLong]]") {
property("round trips TwitterTry[Map[JInt, JLong]] -> ScalaTry[Map[JInt, JLong]]") {
isBijection[TwitterTry[ToMap], ScalaTry[ToMap]]
}

property("round trips com.twitter.util.Future[Map[JInt, JLong]] -> scala.concurrent.Future[Map[JInt, JLong]]") {
property("round trips TwitterFuture[Map[JInt, JLong]] -> ScalaFuture[Map[JInt, JLong]]") {
isBijection[TwitterFuture[ToMap], ScalaFuture[ToMap]]
}

property("round trips TwitterFuture[Map[JInt, JLong]] -> JavaFuture[Map[JInt, JLong]]") {
isBijection[TwitterFuture[ToMap], JavaFuture[ToMap]]
}

property("round trips shared com.twitter.io.Buf -> Array[Byte]") {
import Shared.byteArrayBufBijection

Expand Down
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ lazy val bijectionJson = module("json").settings(

lazy val bijectionUtil = module("util").settings(
osgiExportAll("com.twitter.bijection.twitter_util"),
libraryDependencies += "com.twitter" %% "util-core" % "6.24.0"
libraryDependencies ++= Seq(
"com.twitter" %% "util-core" % "6.24.0",
"com.google.guava" % "guava" % "14.0" % "test"
)
).dependsOn(bijectionCore % "test->test;compile->compile")

lazy val bijectionFinagleMySql = module("finagle-mysql").settings(
Expand Down