The current version of Splash implements several classical stochastic algorithms for machine learning, including:

Stochastic Gradient Descent

The splash.optimization package implements the AdaGrad SGD algorithm. To use this package, the dataset should be label-feature pairs stored as data: RDD[Double, Vector]. The label should be {0,1,2,…} for classification problems. The Vector is defined in the MLlib package. Call the optimize method to start running the algorithm:

val weights = (new splash.optimization.StochasticGradientDescent())
  .setGradient(new LogisticGradient())
  .setNumIterations(20)
  .setStepSize(0.5)
  .optimize(data, initialWeights)

The setGradient method requires a splash.optimization.Gradient object as input. You may use Splash’s pre-built Gradient classes: LogisticGradient, MultiClassLogisticGradient, HingeGradient or LeastSquaresGradient; or implement your own Gradient class in the following format:

abstract class Gradient extends Serializable {
  /**
   * Request the weight indices that are useful in computing the gradient
   * or return null if all indices are requested
   *
   * @param data features for one data point
   *
   * @return indices: Array[Int]
   */
  def requestWeightIndices(data: Vector) : Array[Int]
  
  /**
   * Compute the gradient and the loss given the features of a single data point.
   *
   * @param data features for one data point
   * @param label label for this data point
   * @param weights weights/coefficients corresponding to features
   *
   * @return gradient: Vector and loss : Double
   */
  def compute(data: Vector, label: Double, weights: Vector): (Vector, Double)
}

You can set the following parameters:

  • numIterations: the number of rounds that SGD runs and synchronizes.
  • stepSize: a scalar value denoting the stepsize of stochastic gradient descent. Although the stepsize of individual iterates are adaptively chosen by AdaGrad, they will always be proportional to this parameter.
  • dataPerIteration: the proportion of local data processed in each iteration. The default value is 1.0. By choosing a smaller proportion, the algorithm will synchronize more frequently or terminate more quickly.
  • maxThreadNum: the maximum number of threads to run. The default value is equal to the number of Parametrized RDD partitions.
  • autoThread: if the value is true, then the number of parallel threads will be chosen automatically by the system but is always bounded by maxThreadNum. Otherwise, the number of parallel threads will be equal to maxThreadNum.


Collaborative Filtering for Personalized Recommendation

The splash.recommendation package implements the stochastic collaborative filtering algroithm for personalized implementation. The algorithm can be significantly faster than MLlib’s batch implementation. To use the package, the dataset should be an instance of RDD[UserRating]. The UserRating takes the form:

class UserRating(initUser : Int, initRatings : Array[(Int,Double)]) extends Serializable{
  var userId = initUser
  var ratings : Array[(Int,Double)] = initRatings
}

In the UserRating data structure, userId is the ID of the user. The array ratings is a list of item-rating pairs. To train a personalized recommendation model, run the code:

val matrixFactorizationModel = (new splash.recommendation.CollaborativeFiltering())
  .setRank(10)
  .setRegParam(0.02)
  .setNumIterations(20)
  .setStepSize(0.1)
  .train(ratings)

The method train returns a MatrixFactorizationModel object. Given this object, the user can make future recommendations. You can set the following parameters:

  • rank: the algorithm approximates the user-rating matrix by a low-rank matrix whose rank is given by rank.
  • regParam: the L2-regularization parameter for matrix factorization.
  • numIterations: the number of rounds that the algorithm runs and synchronizes.
  • stepSize: a scalar value denoting the stepsize of the stochastic algorithm.
  • dataPerIteration: the proportion of local data processed in each iteration. The default value is 1.0. By choosing a smaller proportion, the algorithm will synchronize more frequently or terminate more quickly.
  • maxThreadNum: the maximum number of threads to run. The default value is equal to the number of Parametrized RDD partitions.
  • autoThread: if the value is true, then the number of parallel threads will be chosen automatically by the system but is always bounded by maxThreadNum. Otherwise, the number of parallel threads will be equal to maxThreadNum.


Collapsed Gibbs Sampling for Topic Modelling

The splash.clustering package implements the Collapsed Gibbs Sampling algorithm for fitting the Latent Dirichlet Allocation (LDA) model. To use the package, the dataset should be an instance of RDD[(docId, Array[wordToken])]. The docId is the ID of the document, the wordToken represents a word token in the document. It takes the form:

class WordToken(initWordId : Int, initWordCount : Int, initTopicId : Array[Int]) extends Serializable {
  var wordId = initWordId
  var wordCount = initWordCount
  var topicId = initTopicId 
}

In the WordToken data structure, wordId is the ID of the word. wordCount is the frequency of this word in the document. topicId is a list of topics that are assigned to this word. If the model has never been trained, these topics should be assigned by random indices. Otherwise the topics might be initialized by an earlier training result. Then call the train method to start running the algorithm:

val model = new splash.clustering.LDA()
val corpusWithNewTopics = model.setNumTopics(100).setAlphaBeta((0.1,0.01)).setNumIterations(100).train(corpusWithOldTopics)
val topicsMatrix = model.topicsMatrix

The train method returns an RDD[(docId, Array[wordToken])] object in which the topic of each word token has been resampled. The member model.topicsMatrix is the word-topic probability table. It maintains the probability that a particular topic generates a particular word. See the LDA Example for more details. You can set the following parameters:

  • numIterations: the number of times that Collapsed Gibbs Sampling goes through the dataset.
  • numTopics: the number of topics of the LDA model.
  • alphaBeta: the (alpha, beta) hyper-parameters of the LDA model.