Skip to content

Commit

Permalink
[jvm-packages] Exposed baseMargin (#2450)
Browse files Browse the repository at this point in the history
* Disabled excessive Spark logging in tests

* Fixed a singature of XGBoostModel.predict

Prior to this commit XGBoostModel.predict produced an RDD with
an array of predictions for each partition, effectively changing
the shape wrt the input RDD. A more natural contract for prediction
API is that given an RDD it returns a new RDD with the same number
of elements. This allows the users to easily match inputs with
predictions.

This commit removes one layer of nesting in XGBoostModel.predict output.
Even though the change is clearly non-backward compatible, I still
think it is well justified.

* Removed boxing in XGBoost.fromDenseToSparseLabeledPoints

* Inlined XGBoost.repartitionData

An if is more explicit than an opaque method name.

* Moved XGBoost.convertBoosterToXGBoostModel to XGBoostModel

* Check the input dimension in DMatrix.setBaseMargin

Prior to this commit providing an array of incorrect dimensions would
have resulted in memory corruption. Maybe backport this to C++?

* Reduced nesting in XGBoost.buildDistributedBoosters

* Ensured consistent naming of the params map

* Cleaned up DataBatch to make it easier to comprehend

* Made scalastyle happy

* Added baseMargin to XGBoost.train and trainWithRDD

* Deprecated XGBoost.train

It is ambiguous and work only for RDDs.

* Addressed review comments

* Revert "Fixed a singature of XGBoostModel.predict"

This reverts commit 06bd5dc.

* Addressed more review comments

* Fixed NullPointerException in buildDistributedBoosters
  • Loading branch information
superbobry authored and CodingCat committed Jun 30, 2017
1 parent 6b28717 commit d535340
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object SparkWithRDD {
"eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "binary:logistic").toMap
val xgboostModel = XGBoost.train(trainRDD, paramMap, numRound, nWorkers = args(1).toInt,
val xgboostModel = XGBoost.trainWithRDD(trainRDD, paramMap, numRound, nWorkers = args(1).toInt,
useExternalMemory = true)
xgboostModel.booster.predict(new DMatrix(testSet))
// save model to HDFS path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package ml.dmlc.xgboost4j.scala.spark

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, DMatrix => JDMatrix, RabitTracker => PyRabitTracker}
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
Expand All @@ -30,7 +29,6 @@ import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.{SparkContext, TaskContext}
import scala.concurrent.duration.{Duration, FiniteDuration, MILLISECONDS}

object TrackerConf {
def apply(): TrackerConf = TrackerConf(0L, "python")
Expand All @@ -53,97 +51,86 @@ case class TrackerConf(workerConnectionTimeout: Long, trackerImpl: String)
object XGBoost extends Serializable {
private val logger = LogFactory.getLog("XGBoostSpark")

private def convertBoosterToXGBoostModel(booster: Booster, isClassification: Boolean):
XGBoostModel = {
if (!isClassification) {
new XGBoostRegressionModel(booster)
} else {
new XGBoostClassificationModel(booster)
}
}

private def fromDenseToSparseLabeledPoints(
denseLabeledPoints: Iterator[MLLabeledPoint],
missing: Float): Iterator[MLLabeledPoint] = {
if (!missing.isNaN) {
val sparseLabeledPoints = new ListBuffer[MLLabeledPoint]
for (labelPoint <- denseLabeledPoints) {
val dVector = labelPoint.features.toDense
val indices = new ListBuffer[Int]
val values = new ListBuffer[Double]
for (i <- dVector.values.indices) {
if (dVector.values(i) != missing) {
denseLabeledPoints.map { case MLLabeledPoint(label, features) =>
val dFeatures = features.toDense
val indices = new mutable.ArrayBuilder.ofInt()
val values = new mutable.ArrayBuilder.ofDouble()
for (i <- dFeatures.values.indices) {
if (dFeatures.values(i) != missing) {
indices += i
values += dVector.values(i)
values += dFeatures.values(i)
}
}
val sparseVector = new SparseVector(dVector.values.length, indices.toArray,
values.toArray)
sparseLabeledPoints += MLLabeledPoint(labelPoint.label, sparseVector)
val sFeatures = new SparseVector(dFeatures.values.length, indices.result(),
values.result())
MLLabeledPoint(label, sFeatures)
}
sparseLabeledPoints.iterator
} else {
denseLabeledPoints
}
}

private def repartitionData(trainingData: RDD[MLLabeledPoint], numWorkers: Int):
RDD[MLLabeledPoint] = {
if (numWorkers != trainingData.partitions.length) {
logger.info(s"repartitioning training set to $numWorkers partitions")
trainingData.repartition(numWorkers)
} else {
trainingData
}
}

private[spark] def buildDistributedBoosters(
trainingSet: RDD[MLLabeledPoint],
xgBoostConfMap: Map[String, Any],
params: Map[String, Any],
rabitEnv: java.util.Map[String, String],
numWorkers: Int, round: Int, obj: ObjectiveTrait, eval: EvalTrait,
useExternalMemory: Boolean, missing: Float = Float.NaN): RDD[Booster] = {
numWorkers: Int,
round: Int,
obj: ObjectiveTrait,
eval: EvalTrait,
useExternalMemory: Boolean,
missing: Float,
baseMargin: RDD[Float]): RDD[Booster] = {
import DataUtils._
val partitionedTrainingSet = repartitionData(trainingSet, numWorkers)

val partitionedTrainingSet = if (trainingSet.getNumPartitions != numWorkers) {
logger.info(s"repartitioning training set to $numWorkers partitions")
trainingSet.repartition(numWorkers)
} else {
trainingSet
}
val partitionedBaseMargin = Option(baseMargin)
.getOrElse(trainingSet.sparkContext.emptyRDD)
.repartition(partitionedTrainingSet.getNumPartitions)
val appName = partitionedTrainingSet.context.appName
// to workaround the empty partitions in training dataset,
// this might not be the best efficient implementation, see
// (https://github.com/dmlc/xgboost/issues/1277)
partitionedTrainingSet.mapPartitions {
trainingSamples =>
rabitEnv.put("DMLC_TASK_ID", TaskContext.getPartitionId().toString)
Rabit.init(rabitEnv)
var booster: Booster = null
if (trainingSamples.hasNext) {
val cacheFileName: String = {
if (useExternalMemory) {
s"$appName-${TaskContext.get().stageId()}-" +
s"dtrain_cache-${TaskContext.getPartitionId()}"
} else {
null
}
}
val partitionItr = fromDenseToSparseLabeledPoints(trainingSamples, missing)
val trainingSet = new DMatrix(new JDMatrix(partitionItr, cacheFileName))
try {
if (xgBoostConfMap.contains("groupData") && xgBoostConfMap("groupData") != null) {
trainingSet.setGroup(xgBoostConfMap("groupData").asInstanceOf[Seq[Seq[Int]]](
TaskContext.getPartitionId()).toArray)
}
booster = SXGBoost.train(trainingSet, xgBoostConfMap, round,
watches = new mutable.HashMap[String, DMatrix] {
put("train", trainingSet)
}.toMap, obj, eval)
Rabit.shutdown()
} finally {
trainingSet.delete()
}
} else {
Rabit.shutdown()
throw new XGBoostError(s"detect the empty partition in training dataset, partition ID:" +
s" ${TaskContext.getPartitionId().toString}")
partitionedTrainingSet.zipPartitions(partitionedBaseMargin) { (trainingSamples, baseMargin) =>
if (trainingSamples.isEmpty) {
throw new XGBoostError(
s"detected an empty partition in the training data, partition ID:" +
s" ${TaskContext.getPartitionId()}")
}
val cacheFileName = if (useExternalMemory) {
s"$appName-${TaskContext.get().stageId()}-" +
s"dtrain_cache-${TaskContext.getPartitionId()}"
} else {
null
}
rabitEnv.put("DMLC_TASK_ID", TaskContext.getPartitionId().toString)
Rabit.init(rabitEnv)
val partitionItr = fromDenseToSparseLabeledPoints(trainingSamples, missing)
val trainingMatrix = new DMatrix(new JDMatrix(partitionItr, cacheFileName))
try {
if (params.contains("groupData") && params("groupData") != null) {
trainingMatrix.setGroup(params("groupData").asInstanceOf[Seq[Seq[Int]]](
TaskContext.getPartitionId()).toArray)
}
if (baseMargin.nonEmpty) {
trainingMatrix.setBaseMargin(baseMargin.toArray)
}
val booster = SXGBoost.train(trainingMatrix, params, round,
watches = Map("train" -> trainingMatrix), obj, eval)
Iterator(booster)
} finally {
Rabit.shutdown()
trainingMatrix.delete()
}
}.cache()
}

Expand Down Expand Up @@ -191,8 +178,8 @@ object XGBoost extends Serializable {
fit(trainingData)
}

private[spark] def isClassificationTask(paramsMap: Map[String, Any]): Boolean = {
val objective = paramsMap.getOrElse("objective", paramsMap.getOrElse("obj_type", null))
private[spark] def isClassificationTask(params: Map[String, Any]): Boolean = {
val objective = params.getOrElse("objective", params.getOrElse("obj_type", null))
objective != null && {
val objStr = objective.toString
objStr == "classification" || (!objStr.startsWith("reg:") && objStr != "count:poisson" &&
Expand All @@ -212,18 +199,26 @@ object XGBoost extends Serializable {
* @param useExternalMemory indicate whether to use external memory cache, by setting this flag as
* true, the user may save the RAM cost for running XGBoost within Spark
* @param missing the value represented the missing value in the dataset
* @param baseMargin initial prediction for boosting.
* @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed
* @return XGBoostModel when successful training
*/
@deprecated("Use XGBoost.trainWithRDD instead.")
def train(
trainingData: RDD[MLLabeledPoint], params: Map[String, Any], round: Int,
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = {
require(nWorkers > 0, "you must specify more than 0 workers")
trainWithRDD(trainingData, params, round, nWorkers, obj, eval, useExternalMemory, missing)
trainingData: RDD[MLLabeledPoint],
params: Map[String, Any],
round: Int,
nWorkers: Int,
obj: ObjectiveTrait = null,
eval: EvalTrait = null,
useExternalMemory: Boolean = false,
missing: Float = Float.NaN,
baseMargin: RDD[Float] = null): XGBoostModel = {
trainWithRDD(trainingData, params, round, nWorkers, obj, eval, useExternalMemory,
missing, baseMargin)
}

private def overrideParamMapAccordingtoTaskCPUs(
private def overrideParamsAccordingToTaskCPUs(
params: Map[String, Any],
sc: SparkContext): Map[String, Any] = {
val coresPerTask = sc.getConf.get("spark.task.cpus", "1").toInt
Expand Down Expand Up @@ -262,14 +257,21 @@ object XGBoost extends Serializable {
* @param useExternalMemory indicate whether to use external memory cache, by setting this flag as
* true, the user may save the RAM cost for running XGBoost within Spark
* @param missing the value represented the missing value in the dataset
* @param baseMargin initial prediction for boosting.
* @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed
* @return XGBoostModel when successful training
*/
@throws(classOf[XGBoostError])
def trainWithRDD(
trainingData: RDD[MLLabeledPoint], params: Map[String, Any], round: Int,
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = {
trainingData: RDD[MLLabeledPoint],
params: Map[String, Any],
round: Int,
nWorkers: Int,
obj: ObjectiveTrait = null,
eval: EvalTrait = null,
useExternalMemory: Boolean = false,
missing: Float = Float.NaN,
baseMargin: RDD[Float] = null): XGBoostModel = {
if (params.contains("tree_method")) {
require(params("tree_method") != "hist", "xgboost4j-spark does not support fast histogram" +
" for now")
Expand All @@ -288,9 +290,10 @@ object XGBoost extends Serializable {
}
val tracker = startTracker(nWorkers, trackerConf)
try {
val overridedConfMap = overrideParamMapAccordingtoTaskCPUs(params, trainingData.sparkContext)
val boosters = buildDistributedBoosters(trainingData, overridedConfMap,
tracker.getWorkerEnvs, nWorkers, round, obj, eval, useExternalMemory, missing)
val overriddenParams = overrideParamsAccordingToTaskCPUs(params, trainingData.sparkContext)
val boosters = buildDistributedBoosters(trainingData, overriddenParams,
tracker.getWorkerEnvs, nWorkers, round, obj, eval, useExternalMemory, missing,
baseMargin)
val sparkJobThread = new Thread() {
override def run() {
// force the job
Expand All @@ -302,7 +305,7 @@ object XGBoost extends Serializable {
val isClsTask = isClassificationTask(params)
val trackerReturnVal = tracker.waitFor(0L)
logger.info(s"Rabit returns with exit code $trackerReturnVal")
postTrackerReturnProcessing(trackerReturnVal, boosters, overridedConfMap, sparkJobThread,
postTrackerReturnProcessing(trackerReturnVal, boosters, overriddenParams, sparkJobThread,
isClsTask)
} finally {
tracker.stop()
Expand All @@ -311,11 +314,10 @@ object XGBoost extends Serializable {

private def postTrackerReturnProcessing(
trackerReturnVal: Int, distributedBoosters: RDD[Booster],
configMap: Map[String, Any], sparkJobThread: Thread, isClassificationTask: Boolean):
params: Map[String, Any], sparkJobThread: Thread, isClassificationTask: Boolean):
XGBoostModel = {
if (trackerReturnVal == 0) {
val xgboostModel = convertBoosterToXGBoostModel(distributedBoosters.first(),
isClassificationTask)
val xgboostModel = XGBoostModel(distributedBoosters.first(), isClassificationTask)
distributedBoosters.unpersist(false)
xgboostModel
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,15 @@ abstract class XGBoostModel(protected var _booster: Booster)
case (null, _) => {
val predStr = broadcastBooster.value.evalSet(Array(dMatrix), Array(evalName), iter)
val Array(evName, predNumeric) = predStr.split(":")
Rabit.shutdown()
Iterator(Some(evName, predNumeric.toFloat))
}
case _ => {
val predictions = broadcastBooster.value.predict(dMatrix)
Rabit.shutdown()
Iterator(Some((evalName, evalFunc.eval(predictions, dMatrix))))
}
}
} finally {
Rabit.shutdown()
dMatrix.delete()
}
} else {
Expand Down Expand Up @@ -170,10 +169,9 @@ abstract class XGBoostModel(protected var _booster: Booster)
}
val dMatrix = new DMatrix(flatSampleArray, numRows, numColumns, missingValue)
try {
val res = broadcastBooster.value.predict(dMatrix)
Rabit.shutdown()
Iterator(res)
Iterator(broadcastBooster.value.predict(dMatrix))
} finally {
Rabit.shutdown()
dMatrix.delete()
}
}
Expand All @@ -185,13 +183,16 @@ abstract class XGBoostModel(protected var _booster: Booster)
*
* @param testSet test set represented as RDD
* @param useExternalCache whether to use external cache for the test set
* @param outputMargin whether to output raw untransformed margin value
*/
def predict(testSet: RDD[MLVector], useExternalCache: Boolean = false):
RDD[Array[Array[Float]]] = {
def predict(
testSet: RDD[MLVector],
useExternalCache: Boolean = false,
outputMargin: Boolean = false): RDD[Array[Array[Float]]] = {
val broadcastBooster = testSet.sparkContext.broadcast(_booster)
val appName = testSet.context.appName
testSet.mapPartitions { testSamples =>
if (testSamples.hasNext) {
if (testSamples.nonEmpty) {
import DataUtils._
val rabitEnv = Array("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString).toMap
Rabit.init(rabitEnv.asJava)
Expand All @@ -204,10 +205,9 @@ abstract class XGBoostModel(protected var _booster: Booster)
}
val dMatrix = new DMatrix(new JDMatrix(testSamples, cacheFileName))
try {
val res = broadcastBooster.value.predict(dMatrix)
Rabit.shutdown()
Iterator(res)
Iterator(broadcastBooster.value.predict(dMatrix))
} finally {
Rabit.shutdown()
dMatrix.delete()
}
} else {
Expand Down Expand Up @@ -334,6 +334,13 @@ abstract class XGBoostModel(protected var _booster: Booster)
}

object XGBoostModel extends MLReadable[XGBoostModel] {
private[spark] def apply(booster: Booster, isClassification: Boolean): XGBoostModel = {
if (!isClassification) {
new XGBoostRegressionModel(booster)
} else {
new XGBoostClassificationModel(booster)
}
}

override def read: MLReader[XGBoostModel] = new XGBoostModelModelReader

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
log4j.logger.org.apache.spark=ERROR
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
trait SharedSparkContext extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
with Serializable {

@transient protected implicit var sc: SparkContext = null
@transient protected implicit var sc: SparkContext = _

override def beforeAll() {
// build SparkContext
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("XGBoostSuite").
set("spark.driver.memory", "512m")
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("XGBoostSuite")
.set("spark.driver.memory", "512m")
.set("spark.ui.enabled", "false")

sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
}

override def afterAll() {
if (sc != null) {
sc.stop()
sc = null
}
}
}
Loading

0 comments on commit d535340

Please sign in to comment.