Spark SQL quick intro

This simple post describes what spark SQL is and how to use it. Basic operations concepts are not difficult to grasp.

What is SparkSQL?

  • Spark’s interface to work with structured and semistructured data.

How to use SparkSQL?

  • Use it inside a Spark application. In pySpark this translates to create a SQLContext based on SparkContext. Code is :
    sqlContext = SQLContext(sc)
    

This is initialization.

What are the objects in Spark SQL?

  • DataFrame (previously SchemaRDD)

What is the difference between a DataFrame and a regular RDD?

  • DataFrame is also a RDD. Thus, can use map and reduce methods.
  • Elements in DataFrame are Rows.
    • A row is simply a fixed length of array, wrapped by some more methods.
    • A row represent a record.
    • A row knows its schema.
    • DataFrame has information of the schema — the structure of the data fields.
    • A DataFrame provides new methods to run SQL queries on it.

How to run query on SparkSQL?

  • First initialize sql context.
  • Then create DataFrame.
  • Then register the DataFrame as a temporary table. Code in Python:
    • input = sqlcontext.jsonFile(inputFile)
      input.registerTempTable("tweets")
      toptweets = sqlcontext.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
      
  • Register it as table — seems to have new versions in Spark 2.0

Here is a good guide to working with SparkSQL using 20newsgroup data: https://github.com/kundan-git/pyspark-text-classification/blob/master/Spark-Text-Classification.ipynb
 

 

Recommendation System: Matrix Factorization

This post explains how to build a recommendation system based on matrix factorization. However in practice, the engineering aspects are more challenging when the dataset is huge – these are not covered here.

Recommendation system:

The goal is to recommend an item to a user based on this user’s previous ratings on the item. Such systems are now used everywhere.

The formulation of this problem is to build a utility matrix, in which:

  • Each row represent a user
  • Each column represent an item
  • The elements in that matrix can be 1) user rating on this item or 2) frequency user visited this item (e.g. in the context of Spotify)

The problem is such matrix is usually sparse. Not every user has rated every item. Thus the goal is to fill in the blanks.

After filling in the blanks, we can recommend the items with high ratings but the user has not yet reviewed, to the user.

Algorithms

Two sets of algorithms:

  • Collaborative filtering — use clustering to find similar rows/columns, then compute mean
  • Matrix Factorization  — as the name indicates.

We only talk about matrix factorization here.

The utility matrix can be decomposed as :

U = \Sigma \cdot V^{T}

If U is a m * n matrix, Now $\Sigma$ is a m * k matrix, and V is a n * k matrix.

We can interpret U as :

  • each column represent a latent factor
  • each row represent a user
  • each element represents the users’ preference to that latent factor

We can interpret V as :

  • each column represent a latent factor
  • each row represent an item
  • each element represents the latent factor’s contribution to that item in rating

This looks complex but in fact is really simple. We basically interpret the rating of user i on item j, as the summarization of user i’s ratings on each latent factor!

An example:

  • I rate the book Little Prince a 3.6/5.
  • I rate, because Little Prince is 20% about youth and 80% about romance.
  • I give the youth factor 2 score.
  • I give the romance factor 4 score.
  • My total score of little prince is then 2 * 0.2 + 4 * 0.8 = 3.6.

Compute matrix factorization

The above is about a theoretical, fully filled utility matrix.

Now our goal is to find such a matrix, by finding corresponding \Sigma and V.

We can view this as a supervised learning problem.

The observed value is the observed user-item utility matrix.

The theoretical value is \Sigma \cdot V.

The cost function is square loss, for every i and j in that matrix.

Our goal is to minimize the empirical cost function – the sum of all square loss. We can of course add regularization terms to this.

How to achieve this? Using iteration.

  • First randomly initialize  \Sigma and V.
  • Then choose one element (parameter) in \Sigma or V. Set this one to be the argmin of the empirical cost function
  • Change to another parameter and iterate the process, until convergence.

Now the following is to be noticed:

  • Proprocessing of matrix: since different users have different rating scale, can normalize the utilitity matrix by row, by column.
  • How to choose the order in choosing parameters?
    • Can randomly choose!
    • Can choose by column!
  • Overfit:
    • Early stop.

Till now I have described this algorithm. It turns out to be pretty simple! Took me 15 minutes to understand and 18 minutes to write this post. Next time when learning techniques, should aim for the same time span.

Reference:

The Stanford textbook : http://infolab.stanford.edu/~ullman/mmds/ch9.pdf

(Not really)Solving a memory limit problem

Through a few months’ study I noticed the challenges of big data is less of an intellectual one than an engineering one. There are algorithms designed for actually sampling huge data streams (e.g. this Stanford course), but still in my practice the biggest challenges are usually long running time, out of resource problems, and thus difficulty in debugging, and prolonged development circle.

Frustrations arise from one minor problem, which can hinder days’ of efforts.

The key is to break the process down step by step and identifying the source problem.

This posts describes such a case and my solution to it.

Problem

I want to train an hierarchical GRU model with attention on 20000 documents of medical summary. I use the model for classification on the test set.

My implementation of attention is a fixed linear layer. Due to attention mechanism, the number of hidden states in each level of GRU should be the same for the training set and the test set.

Since number of hidden states corresponds to the max number of sentences in all documents, and the max number of tokens in all sentences, an easier way is to truncate all documents and pad them into the same length of sentences and tokens.

To ensure training set and test set have the same shape, I need to read them all in memory to truncate together.

My group mate already break the files down into smaller chunks: Full, Med, and Tiny for running the program and testing.

The problem is since the files are huge, I cannot read them all into memory. The data are stored in pickle files.

Possible solutions

  1. Increase the memory limit on HPC
  2. Read pickle file incrementally. But still, eventually I need them all together in memory.
  3. Change a different storage format.

Investigation:

  • The problem lies in the format of source file. It’s pickle file, thus difficult to read line by line.
  • Also, the problem lie in running python interactively on server. It seems that server allocate a small memory limits for interactive mode. However, for batch submission to the computing nodes, the memory limits are higher and adjustable. See here.

Possible solutions phrase 2:

  • Now I have two sets of memory: on login node, and on cluster (?)
  • The original problem becomes : I cannot read big files on login node interactively
  • However,
    • I can read small files on login node interactively, thus can debug and test
    • I can request for more memory on cluster. Thus I can read big files from cluster, and break them down into smaller chunks
    • I can then read the smaller chunks from login node interactive

Problem is then solved.

But the solution to this problem is specific, in that:

  • I can break the source file down in later programs. Thus, big files are not really necessary for my program
  • I in fact have access to larger memory.

I have not solved the problem when there is a real memory limit and hard file size specification. For that, might need to store in different format (pickles are not meant to be read line by line) or design different algorithms.

pyspark combineByKey explained

This post explains how to use the function “combineByKey”, as the pyspark document does not seem very clear on this.

When to use combineByKey?

  • You have an RDD of (key, value) pairs – a paired RDD
    • In Python, such an RDD is constructed by creating elements of tuples
  • You want to aggregate the values based on Key

Why use combineByKey, instead of aggregateByKey or groupByKey, or reduceByKey?

  • groupByKey is computationally expensive. I once had a document of 50000+ elements, and the sheer computational resource made my program un-runnable.
  • aggregateByKey – the docs are not very clear. I did not find resource on how to use them.
  • reduceByKey — is powered by combineByKey.  so the latter is the more generic one.

How to use combineByKey?

The document goes like this:

# create an RDD
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
#
def to_list(a):
return [a]

def append(a, b):
a.append(b)
return a

def extend(a, b):
a.extend(b)
return a

sorted(x.combineByKey(to_list, append, extend).collect())

>>> [('a', [1, 2]), ('b', [1])]

Here is how the document goes for combineByKey:

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=)
Generic function to combine the elements for each key using a custom set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.

Users provide three functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.

In addition, users can control the partitioning of the output RDD.

What does that mean? Basically the combineByKey functions takes three arugments, each is a function – or an operation – on the elements of an RDD.

In this example, the keys are ‘a’ and ‘b’. The values are ints. The task is to combine the ints to a list, for each key.

  1. The first function is createCombiner — which turn a value in the original key-value pair, to a combined type. In this example, the combined type is a list. This function will takes an int and returns a list.
  2. The second function is mergeValue — which adds a new int into the list. This is to combine value with combiners. — thus uses append.
  3. The third function is mergeCombiners — which merges two combiners. Thus in this example is to merge two lists  — thus uses extend.

 

 

Reference:

http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

 

Machine learning: soft slassifiers and ROC

This post explains the concept of soft classifiers (in its simple form) and offers examples in sklearn.

Soft classifiers

In classification problems, hard classifiers gives the exact predicted class.

But soft classifiers gives a probability estimation over all classes. Prediction can then be made using a threshold. This also gives the possibility of multi-label classifications.

Code in sklearn:

This is a sample program in python using a KNN classier.

import pandas

url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
dataset = pandas.read_csv(url, names=names)

#  change to binary classification
new_class = np.random.randint(0, 2, len(dataset), dtype='l')
dataset['class'] = new_class

 

Now build the classifiers.


from sklearn.neighbors import KNeighborsClassifier

# Make predictions on validation dataset
knn = KNeighborsClassifier()
knn.fit(X_train, Y_train)
predictions = knn.predict(X_validation) # hard 
predictions_prob = knn.predict_proba(X_validation) # soft

 

Results of hard predictor.


predictions

array([ 1.,  1.,  0.,  0.,  1.,  0.,  0.,  1.,  0.,  0.,  0.,  0.,  0.,
        1.,  1.,  1.,  1.,  1.,  0.,  1.,  0.,  1.,  1.,  1.,  0.,  0.,
        0.,  1.,  1.,  1.])

 

Results of soft classifier:

array([[ 0.2,  0.8],
       [ 0.2,  0.8],
       [ 0.6,  0.4],
       [ 0.6,  0.4],
       [ 0.4,  0.6],
       [ 0.6,  0.4],
       [ 0.6,  0.4],
       [ 0.4,  0.6],
       [ 0.8,  0.2],
       [ 0.6,  0.4],
       [ 0.8,  0.2],
       [ 0.6,  0.4],
       [ 0.8,  0.2],
       [ 0.4,  0.6],
       [ 0.2,  0.8],
       [ 0.4,  0.6],
       [ 0.4,  0.6],
       [ 0.4,  0.6],
       [ 0.6,  0.4],
       [ 0.4,  0.6],
       [ 0.6,  0.4],
       [ 0.4,  0.6],
       [ 0.4,  0.6],
       [ 0.4,  0.6],
       [ 0.6,  0.4],
       [ 0.6,  0.4],
       [ 0.8,  0.2],
       [ 0.4,  0.6],
       [ 0.4,  0.6],
       [ 0.2,  0.8]])

 

This will have results on the ROC curve produced. For the hard classifier, the ROC is linear. For the soft classifier, the ROC is continuous… Note the input of soft classifier: pred = predictions_prob[:, 1] === this generates the probability of the positive class and input to ROC function.

 

import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.metrics import roc_curve, auc

plt.figure(figsize = (12, 8))

truth = Y_validation
pred = predictions
fpr, tpr, thresholds = roc_curve(truth, pred)
roc_auc = auc(fpr, tpr)
c = (np.random.rand(), np.random.rand(), np.random.rand())
plt.plot(fpr, tpr, color=c, label= 'HARD'+' (AUC = %0.2f)' % roc_auc)

truth = Y_validation
pred = predictions_prob[:, 1]
fpr, tpr, thresholds = roc_curve(truth, pred)
roc_auc = auc(fpr, tpr)
c = (np.random.rand(), np.random.rand(), np.random.rand())
plt.plot(fpr, tpr, color=c, label= 'SOFT'+' (AUC = %0.2f)' % roc_auc)

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.title('ROC')
plt.legend(loc="lower right")

plt.show()

 

Why is this the case?

This has to do with the input parameters of sklearn. See doc:

sklearn.metrics.roc_curve(y_true, y_score, pos_label=None, sample_weight=None, drop_intermediate=True)

Parameters:

y_score : array, shape = [n_samples]
Target scores, can either be probability estimates of the positive class, confidence values, or non-thresholded measure of decisions (as returned by “decision_function” on some classifiers).

 

But still, why is the ROC curve continuous? This has to do with what is ROC curve?

ROC = receiver operating characteristics

For an ROC graph, the x-axis is the false positive rate, the y-axis is the true positive rate.

Each point corresponding to a particular classification strategy.

(0, 0) = classify all instances to negative.

(1, 1) = classifying all instances to positive. ??

A ranking model produces a set of points in ROC space. Each point correspond to the result of a threshold – each threshold produces a different point in ROC. Thus this soft classifier in effect equals to (many) strategies.

Final plot:

My first independent implementation of a NLP research model…

A tiny tiny milestone… But still some lessons learnt for next time.

Key take-aways:

  • It is not that hard. After finding a code base, it takes probably 20 hours of work to complete the model. This includes going back and forth revisiting basic concepts.
  • Cannot cheat programming. If I stop to think, that usually mean I don’t really understand the model.
  • When faced with large and difficult tasks (thesis, big programs), will feel lost in the beginning. Two strategies :
    • Focus on starting so can start early.
    • Work on what you do understand now, even though it seems irrelevant. Work on the exact next step. Don’t solve a more general problem for a specific case! (e.g. do not read the whole book, whole documentation… that’s procrastination too!)

Experiments for a research consist of the following parts:

  • Preprocessing and data cleaning : my teammates have kindly done this for me.
    • Here the key questions are,
      • what should the final data structure be?
      • what should be the input of the model,
      • what should be output of the model?
      • This needs to be very specific.
        • Store in a list or dictionary or pandas dataframe or numpy array or pytorch tensor?
        • Someone told me for my current computational need, these questions are not that significant.
    • The best strategy is to work by doing. Pre-design usually change later.
  • Implementing the models. This is the most intellectually challenging part.
    • First, need to make sure you really understand the model.
      • e.g. What’s the size of the matrix? What is passed to the next layer? How is attention calculated?
    • Second, when the model is really complex, it is difficult to digest all the parts at once!
      • Strategy: It’s frustrating when you don’t understand, but don’t be discouraged and distracted. Instead, work on what you can understand now, and slowly move forward.
    • The test process usually consists of loops. e.g. batch gradient descent — there is batch. And there is epoch. Basically there is hierarchical structure.
      • Key question:
        • What is the smallest working unit? Take it out.

Other tips:

  • Know what you don’t understand. It’s usually a missing part of understanding that gets people think and stop. Write it done.
  • When debugging, duplicate the notebook. Create a new notebook for each unit wants to test! Do not drag along the whole notebook.
  • After clean up some code, duplicate the notebook and delete the test process code.
  • For each class, write step-by-step code first then wrap them together later.

What is a recurrent neural network (RNN)

This post explains what a recurrent neural network – or RNN model – is in machine learning.

RNN is a model that works with input sequences. Difference with sequences and normal data is that for sequence, order matters. Kind of like time-series but this is discrete time. This also means inputs are not independent of each other.

RNN takes input as a sequence of words. The output at each step, is a hidden state vector and an output.

Hidden state vector is a function of previous hidden state s_{t-1} and current input $x$. It is the memory of the network. The formula is s_t = f(Ux_t + Ws_{t-1}).

Output is a possibility across vocabulary o_t = \text{softmax} (Vs_t) .

Here size of parameters:

  • U is a matrix m * n. m is the size of the hidden state vector, n is the size of input vector.
  • W is a matrix m * m. m is the size of the hidden state vector.
  • V is a matrix of size h * m. h is the size of the output vector, m is the size of the hidden state vector.

Depending on the context, the output of an RNN can also be a single vector called context. This will be discussed in the next post.

Reference:

Recurrent Neural Networks Tutorial, Part 1 – Introduction to RNNs

What is cross-validation?

This post explains the concept of cross-validation in machine learning.

Cross validation is a way to do model validation when only limited amount of data is available.

Model validation is a step in model selection, whose goal is to select the best model that fits the data.

Model selection is a two-stage process.

  • 1) select a family of hypotheses/models (hypothesis space). e.g. a neural network with 2 hidden layers size fixed vs another neural network with 5 hidden layers size fixed vs decision tree …
  • 2) Then for each of the hypothesis family, select the optimal sets of parameters.

Training set and validation set take care of stage 2). That is, for every hypothesis family of interest –

  • First use training set to iteratively search for estimates of parameters, that minimize empirical loss function over the training set
  • Then (after a few iterations), use the current estimate of parameters, to calculate the empirical loss function over the validation set.
  • Optimal estimates of parameters are chosen when the empirical loss over validation set do not increase – early stop.

Test sets are used for stage 1). For every tuned hypothesis, use test sets to do a meta-comparison and select the one with desirable evaluation metrics.

Usually the whole dataset is split into 7:2:1 as training /  validation / test set.

But when the dataset is small, such split will leave few data available for training. With cross validation, there is no need for a separate validation set.

The steps of cross validation is the following:

  • Partition the data into a test set and a training set. The test set is left untouched until the very end.
  • Divide the training set into K folds.
    • For each i in K, train the model using the K-1 folds left in training set.
    • The model is validated using this hold out set.
  • After completing the K validations, will have K accuracy numbers. Take average of these numbers to be the final validation results.
  • Select models based on these validation results.

Python *args and **kwargs

(A new post, in the spirit of always be jabbing, always be firing, always be shipping. )

This post deals with Python *args and **kwargs. Here args and kwargs are just naming conventions, the grammar is actually * and ** .

*args

Define variable length of parameters for a function. In plain English, the number and type of parameters are not known beforehand.

Example:


def test_var_args(f_arg, *argv):
print("first normal arg: ", f_arg)
for arg in argv:
print("another arg through *argv :", arg)

test_var_args('Yasoob', 'python', 'eggs', 'test')

The output is


first normal arg: Yasoob
another arg through *argv : python
another arg through *argv : eggs
another arg through *argv : test


**kwargs

Similar to *args in that it enables variable length inputs. But different in that the inputs can be named.

Example:


def table_things(**kwargs):
for name, value in kwargs.items():
print( '{0} = {1}'.format(name, value))

table_things(apple = 'fruit', cabbage = 'vegetable')

The output is


apple = fruit
cabbage = vegetable

References:

https://stackoverflow.com/questions/3394835/args-and-kwargs

https://pythontips.com/author/yasoob008/

 

Pyspark: RDD join, intersection and cartesian

This post explains three transformations on Spark RDD: join, intersection and cartesian.

What is needed to replicate these examples:

  • Access to Pyspark

If you have not used Spark, here is a post for introduction and installation of Spark in local mode (in contrast to cluster).

What can be done using these three actions? From a higher level, they combine information from two RDDs together so we can use the map function to manipulate it. We need this because Spark does not allow access to RDD from within RDD – meaning no for loop. Also, lambda expression does not support for loop too.

Join

Operations are performed on the element pairs that have the same keys.

  • Each element in the first RDD, is combined with every element in the second RDD that has the same key. 
  • How to combine? The key is not changed. But the two values are combined into a tuple. The key-tuple is then wrapped around by a container.
  • Note the level of containers here. The element structures in the third RDD, is different from the element structures in the two original RDDs.

Example:


rdd = sc.parallelize([("red",20),("red",30),("blue", 100)])
rdd2 = sc.parallelize([("red",40),("red",50),("yellow", 10000)])
rdd.join(rdd2).collect()

# output
[('red', (30, 50)), ('red', (30, 40)), ('red', (20, 50)), ('red', (20, 40))]

Intersection

For two RDDs, return the part that they are in common.


rdd1 = sc.parallelize([(1, 2), (2, 3), (3, 4), (2, 4), (1, 5)])
rdd = sc.parallelize([(1, 2), (2, 3), (3, 4), (1, 5), (2, 6)])
rdd.intersection(rdd1).collect()

Cartesian

Compute the “product” of two RDDs. Every element in the first RDD is combined with every element in the second RDD, to form a element in the new RDD.

If RDD1 is of length 3, RDD2 is of length 3, this will produce a new RDD of length 9.

Example: an RDD cartesian with itself.

rdd2 = sc.parallelize([("red",40),("red",50),("yellow", 10000)])
rdd.cartesian(rdd2).collect()

# output
[(('red', 20), ('red', 40)), (('red', 20), ('red', 50)), (('red', 20), ('yellow', 10000)), (('red', 30), ('red', 40)), (('blue', 100), ('red', 40)), (('red', 30), ('red', 50)), (('red', 30), ('yellow', 10000)), (('blue', 100), ('red', 50)), (('blue', 100), ('yellow', 10000))]