Examples
First, download the Splash Example package and extract it at any directory. The source code locates at /src/main/scala/
. The Splash library is included at /lib/
, which puts Splash in your project classpath. To compile the code, cd
into the directory where you extract the package and type:
sbt package
This generates a jar file at ./target/scala-2.10/splashexample.jar
. To run the code, submit this jar file as a Spark job:
YOUR_SPARK_HOME/bin/spark-submit --class ExampleName \
--driver-memory 4G \
--jars lib/splash-0.1.0.jar target/scala-2.10/splashexample.jar \
[data files] > output.txt
Here, YOUR_SPARK_HOME should be replaced by the directory that Spark is installed; ExampleName should be replaced by the name of the example (see the following sections). The file splash-0.1.0.jar
is the Splash library and splashexample.jar
is the compiled code to be executed. The arguments [data files]
should be replaced by the path of data files (see the following sections). The result is written to output.txt
.
Here is a list of examples:
Document Statistics
The Document Statistics example computes the word statistics of a document. Please choose ExampleName = DocumentStatistics
and [data files] = data/covtype.txt
. The processing function reads a weighted line
from the document to update the number of lines
, words
and characters
through a shared variable sharedVar
. The weight
argument tells the algorithm that this line appears weight
times in the current observation.
import org.apache.spark.{SparkConf,SparkContext}
import splash.core.ParametrizedRDD
object DocumentStatistics {
def main(args: Array[String]) {
val path = args(0)
val sc = new SparkContext(new SparkConf())
val paramRdd = new ParametrizedRDD(sc.textFile(path))
val sharedVar = paramRdd.setProcessFunction((line, weight, sharedVar, localVar) => {
sharedVar.add("lines", weight)
sharedVar.add("words", weight * line.split(" ").length)
sharedVar.add("characters", weight * line.length)
}).run().getSharedVariable()
println("Lines: " + sharedVar.get("lines").toLong)
println("words: " + sharedVar.get("words").toLong)
println("Characters: " + sharedVar.get("characters").toLong)
}
}
SGD for Logistic Regression
We provide a bare-hands Logistic Regression implementation using the Splash programming interface. This gives you a concrete idea of how the data processing function can be implemented. To run the code on the Covtype classification dataset, please choose ExampleName = LogisticRegression
and [data files] = data/covtype.txt
.
val process = (elem: LabeledPoint, weight: Double, sharedVar : SharedVariableSet, localVar: LocalVariableSet) => {
val label = elem.label
val features = elem.features.asInstanceOf[SparseVector]
val xIndices = features.indices
val xValues = features.values
val n = xIndices.length
sharedVar.add("t", weight)
val t = sharedVar.get("t")
val learningRate = sharedVar.get("learningRate")
val regParam = sharedVar.get("regParam")
// get weight vector w
val w = sharedVar.getArrayElements("w", xIndices)
// compute the inner product x * w
var innerProduct = 0.0
for(i <- 0 until n){
innerProduct += xValues(i) * w(i)
}
val margin = - 1.0 * innerProduct
val multiplier = (1.0 / (1.0 + math.exp(margin))) - label
// compute the update
val delta = new Array[Double](n)
val stepsize = weight * learningRate / math.sqrt(t)
for(i <- 0 until n){
delta(i) = - stepsize * multiplier * xValues(i)
}
// update the weight vector
sharedVar.multiplyArray("w", 1 - stepsize * regParam)
sharedVar.addArrayElements("w", xIndices, delta)
}
The above code uses multiple operators on the shared variable. The get and getArrayElements methods return the variable values. The multiplyArray method scales the whole array by a constant. The addArrayElements method adds a delta
vector to a subset of coordinates. See the full code base for more details.
SGD via ML Package
Splash has a collection of pre-built algorithms for machine learning. One of them is the SGD algorithm for optimization. It provides an API similar to that of the MLlib’s optimization package. MLlib implements a mini-batch version of SGD but doesn’t support the native sequential SGD. To run the code on the Covtype classification dataset, please choose ExampleName = SGDExample
and [data files] = data/covtype.txt
.
val weights = (new splash.optimization.StochasticGradientDescent())
.setGradient(new splash.optimization.LogisticGradient())
.setNumIterations(20)
.optimize(rdd, initialWeights)
The above code creates a StochasticGradientDescent
object, sets the gradient to be the logistic gradient and tells the algorithm to take 20 passes over the rdd
dataset. The output of SGD is assigned to the vector weights
. The full code in the Splash Example package illustrates how this piece of code can be integrated with the data analytics pipeline of Spark.
LDA via ML Package
Another algorithm in the ML Package is the Collapsed Gibbs Sampling for training the LDA model. Its functionality is similar to the Variational Inference of MLlib. Since the Collapsed Gibbs Samping is a stochastic algorithm, it converges faster than MLlib and sometimes converges to better solutions. To run the example on the NIPS article dataset, please choose ExampleName = LDAExample
and [data files] = data/docword.nips.txt data/vocab.nips.txt
.
// train LDA using collapsed Gibbs sampling
val model = new splash.clustering.LDA
model.setNumTopics(numTopics).setAlphaBeta((alpha,beta)).setNumIterations(numIterations).train(corpus)
// view topics and their top words
val topicsMatrix = model.topicsMatrix
for(topicId <- 0 until numTopics){
val wordProb = new ListBuffer[(Int,Double)]
for(wordId <- 0 until vocSize) wordProb.append((wordId, topicsMatrix(wordId)(topicId)))
val sortedWordProb = wordProb.sortWith( (a,b) => a._2 > b._2 )
// output top words
println()
print("Topic " + (topicId+1) + ": ")
for(i <- 0 until 20) print(vocab(sortedWordProb(i)._1) + " ")
}
The above code trains a LDA model on the corpus
, which is an RDD containing the word distribution of 1,500 documents (see the source code for the construction of this data structure). The member model.topicsMatrix
is the word-topic probability table. It maintains the probability that a particular topic generates a particular word. The last few lines of the code prints the top 20 words for each topic.