Machine Learning in Spark

This post describes Spark’s architecture for building and running machine learning algos.

Machine learning algorithms can be found by Spark’s MLlib API.

The data structure used are DataFrame. With regard the the last post, this means we need to import spark SQL context to use as well.

There are five key concepts:

  • DataFrame: the data structure. Each column can be text, labels, features, or numbers…
  • Transformer: an object to transform a DataFrame to another one. A machine learning algo is represented as a transformer that takes an input DataFrame with features and outputs a DataFrame with the predictions. A transformer is called by a transform() method on a DataFrame.
  • Estimator: an algorithm can be fit to a DataFrame and produces a transformer.  e.g. a ML algo can be fit to the training set and produce a model. An estimator is called by a fit() method on a DataFrame.
  • Pipeline: a collection of transformers and estimators that eventually consists of a trained model.
  • Parameters: parameters of the transformer and the estimator.

Now go over the example on Spark Website for understanding of how to use machine learning Pipeline.

Step one: read in data 

First create the spark SQL context, and use that to create a SparkDataFrame. This assumes a spark context is already created as sc.

sqlContext = SQLContext(sc)

training = sqlContext.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

The results of this step:
| id|            text|label|
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|


Step two: preprocess + training = ML pipeline 

Now, preprocess data by constructing a ML pipeline.

  • Tokenizer here tokenize every text into words ….
  • hashingTF: this one is more tricky. As per the document,

Our implementation of term frequency utilizes the hashing trick. A raw feature is mapped into an index (term) by applying a hash function. Then term frequencies are calculated based on the mapped indices. This approach avoids the need to compute a global term-to-index map, which can be expensive for a large corpus, but it suffers from potential hash collisions, where different raw features may become the same term after hashing. To reduce the chance of collision, we can increase the target feature dimension, i.e., the number of buckets of the hash table. The default feature dimension is 220=1,048,576

My understanding on hashTF (hash term frequency?) is that :

  • Hash functions map data of arbitrary size to a fixed size. It maps every word to a unique position, hopefully. But there will be collisions, which we ignore…
  • Next we just count the number of items at each position.
  • This saves the step of storing a map of word – index. Instead we just need to store a function.

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model =

Now let’s go over the output of each step. First, check the results of the tokenizer….

p1 = Pipeline(stages = [tokenizer])

| id|            text|label|               words|
|  0| a b c d e spark|  1.0|[a, b, c, d, e, s...|
|  1|             b d|  0.0|              [b, d]|
|  2|     spark f g h|  1.0|    [spark, f, g, h]|
|  3|hadoop mapreduce|  0.0| [hadoop, mapreduce]|

Then, check the results of the HashingTF….

hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
p2 = Pipeline(stages = [tokenizer, hashingTF])

| id|            text|label|               words|            features|
|  0| a b c d e spark|  1.0|[a, b, c, d, e, s...|(262144,[97,98,99...|
|  1|             b d|  0.0|              [b, d]|(262144,[98,100],...|
|  2|     spark f g h|  1.0|    [spark, f, g, h]|(262144,[102,103,...|
|  3|hadoop mapreduce|  0.0| [hadoop, mapreduce]|(262144,[134181,1...|

Third, check the output of the logistic regression model:

lr = LogisticRegression(maxIter=10, regParam=0.001)
p3 = Pipeline(stages=[tokenizer, hashingTF, lr])
['id', 'text', 'label', 'words', 'features', 'rawPrediction', 'probability', 'prediction']

In our original data, we only have id, text and label.

  • ‘words’ are added by the tokenizer,
  • ‘features’ are added by the hashingTF,
  • ‘rawPrediction’ is created by logistic regression. It is the “score” of each class labels. It varies algo by algo. For example, in a neural network it can be interpreted as the last layer passed to the softmax.
  • prediction: the predicted class label. the argmax of raw prediction.
  • probability : the “real” probability given raw prediction… Looks like passing rawprediction to a softmax for multi-class classification problems.

Step three: making predictions

After building the model now we can start to make predictions by building and testing on a toy test set.

First build test set — it is similar to the training set, except without labels

test = sqlContext.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])
| id|              text|
|  4|       spark i j k|
|  5|             l m n|
|  6|spark hadoop spark|
|  7|     apache hadoop|

Make predictions…

prediction = model.transform(test)

DataFrame[id: bigint, text: string, words: array<string>, features: vector, rawPrediction: vector, probability: vector, prediction: double]

Here prediction is of the same format as the transformed train dataset. The reason is, fitting machine learning algorithm is equivalent to transform the test data to add other columns…

Finally, printing out the results we care: prediction

selected ="id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.159640773879,0.840359226121], prediction=1.000000  
(5, l m n) --> prob=[0.837832568548,0.162167431452], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.0692663313298,0.93073366867], prediction=1.000000
(7, apache hadoop) --> prob=[0.982157533344,0.0178424666556], prediction=0.000000

Question: what will the output be if some of the classes are not in the training set?


Share this post

Leave a Reply

Your email address will not be published. Required fields are marked *