# ML Package

The current version of Splash implements several classical stochastic algorithms for machine learning, including:

- Stochastic Gradient Descent
- Collaborative Filtering for Personalized Recommendation
- Collapsed Gibbs Sampling for Topic Modelling

# Stochastic Gradient Descent

The **splash.optimization** package implements the AdaGrad SGD algorithm. To use this package, the dataset should be label-feature pairs stored as `data: RDD[Double, Vector]`

. The label should be {0,1,2,…} for classification problems. The `Vector`

is defined in the MLlib package. Call the `optimize`

method to start running the algorithm:

```
val weights = (new splash.optimization.StochasticGradientDescent())
.setGradient(new LogisticGradient())
.setNumIterations(20)
.setStepSize(0.5)
.optimize(data, initialWeights)
```

The `setGradient`

method requires a **splash.optimization.Gradient** object as input. You may use Splash’s pre-built Gradient classes: `LogisticGradient`

, `MultiClassLogisticGradient`

, `HingeGradient`

or `LeastSquaresGradient`

; or implement your own Gradient class in the following format:

```
abstract class Gradient extends Serializable {
/**
* Request the weight indices that are useful in computing the gradient
* or return null if all indices are requested
*
* @param data features for one data point
*
* @return indices: Array[Int]
*/
def requestWeightIndices(data: Vector) : Array[Int]
/**
* Compute the gradient and the loss given the features of a single data point.
*
* @param data features for one data point
* @param label label for this data point
* @param weights weights/coefficients corresponding to features
*
* @return gradient: Vector and loss : Double
*/
def compute(data: Vector, label: Double, weights: Vector): (Vector, Double)
}
```

You can set the following parameters:

**numIterations**: the number of rounds that SGD runs and synchronizes.**stepSize**: a scalar value denoting the stepsize of stochastic gradient descent. Although the stepsize of individual iterates are adaptively chosen by AdaGrad, they will always be proportional to this parameter.**dataPerIteration**: the proportion of local data processed in each iteration. The default value is`1.0`

. By choosing a smaller proportion, the algorithm will synchronize more frequently or terminate more quickly.**maxThreadNum**: the maximum number of threads to run. The default value is equal to the number of Parametrized RDD partitions.**autoThread**: if the value is`true`

, then the number of parallel threads will be chosen automatically by the system but is always bounded by**maxThreadNum**. Otherwise, the number of parallel threads will be equal to**maxThreadNum**.

# Collaborative Filtering for Personalized Recommendation

The **splash.recommendation** package implements the stochastic collaborative filtering algroithm for personalized implementation. The algorithm can be significantly faster than MLlib’s batch implementation. To use the package, the dataset should be an instance of `RDD[UserRating]`

. The `UserRating`

takes the form:

```
class UserRating(initUser : Int, initRatings : Array[(Int,Double)]) extends Serializable{
var userId = initUser
var ratings : Array[(Int,Double)] = initRatings
}
```

In the `UserRating`

data structure, `userId`

is the ID of the user. The array `ratings`

is a list of item-rating pairs. To train a personalized recommendation model, run the code:

```
val matrixFactorizationModel = (new splash.recommendation.CollaborativeFiltering())
.setRank(10)
.setRegParam(0.02)
.setNumIterations(20)
.setStepSize(0.1)
.train(ratings)
```

The method `train`

returns a MatrixFactorizationModel object. Given this object, the user can make future recommendations. You can set the following parameters:

**rank**: the algorithm approximates the user-rating matrix by a low-rank matrix whose rank is given by`rank`

.**regParam**: the L2-regularization parameter for matrix factorization.**numIterations**: the number of rounds that the algorithm runs and synchronizes.**stepSize**: a scalar value denoting the stepsize of the stochastic algorithm.**dataPerIteration**: the proportion of local data processed in each iteration. The default value is`1.0`

. By choosing a smaller proportion, the algorithm will synchronize more frequently or terminate more quickly.**maxThreadNum**: the maximum number of threads to run. The default value is equal to the number of Parametrized RDD partitions.**autoThread**: if the value is`true`

, then the number of parallel threads will be chosen automatically by the system but is always bounded by**maxThreadNum**. Otherwise, the number of parallel threads will be equal to**maxThreadNum**.

# Collapsed Gibbs Sampling for Topic Modelling

The **splash.clustering** package implements the Collapsed Gibbs Sampling algorithm for fitting the Latent Dirichlet Allocation (LDA) model. To use the package, the dataset should be an instance of `RDD[(docId, Array[wordToken])]`

. The `docId`

is the ID of the document, the `wordToken`

represents a word token in the document. It takes the form:

```
class WordToken(initWordId : Int, initWordCount : Int, initTopicId : Array[Int]) extends Serializable {
var wordId = initWordId
var wordCount = initWordCount
var topicId = initTopicId
}
```

In the `WordToken`

data structure, `wordId`

is the ID of the word. `wordCount`

is the frequency of this word in the document. `topicId`

is a list of topics that are assigned to this word. If the model has never been trained, these topics should be assigned by random indices. Otherwise the topics might be initialized by an earlier training result. Then call the `train`

method to start running the algorithm:

```
val model = new splash.clustering.LDA()
val corpusWithNewTopics = model.setNumTopics(100).setAlphaBeta((0.1,0.01)).setNumIterations(100).train(corpusWithOldTopics)
val topicsMatrix = model.topicsMatrix
```

The `train`

method returns an `RDD[(docId, Array[wordToken])]`

object in which the topic of each word token has been resampled. The member `model.topicsMatrix`

is the word-topic probability table. It maintains the probability that a particular topic generates a particular word. See the LDA Example for more details. You can set the following parameters:

**numIterations**: the number of times that Collapsed Gibbs Sampling goes through the dataset.**numTopics**: the number of topics of the LDA model.**alphaBeta**: the (alpha, beta) hyper-parameters of the LDA model.