# pyspark combineByKey explained

This post explains how to use the function “combineByKey”, as the pyspark document does not seem very clear on this.

When to use combineByKey?

• You have an RDD of (key, value) pairs – a paired RDD
• In Python, such an RDD is constructed by creating elements of tuples
• You want to aggregate the values based on Key

Why use combineByKey, instead of aggregateByKey or groupByKey, or reduceByKey?

• groupByKey is computationally expensive. I once had a document of 50000+ elements, and the sheer computational resource made my program un-runnable.
• aggregateByKey – the docs are not very clear. I did not find resource on how to use them.
• reduceByKey — is powered by combineByKey.  so the latter is the more generic one.

How to use combineByKey?

The document goes like this:

# create an RDD
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
#
def to_list(a):
return [a]

def append(a, b):
a.append(b)
return a

def extend(a, b):
a.extend(b)
return a

sorted(x.combineByKey(to_list, append, extend).collect())

>>> [('a', [1, 2]), ('b', [1])]



Here is how the document goes for combineByKey:

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=)
Generic function to combine the elements for each key using a custom set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.

Users provide three functions:

• createCombiner, which turns a V into a C (e.g., creates a one-element list)
• mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
• mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.

In addition, users can control the partitioning of the output RDD.

What does that mean? Basically the combineByKey functions takes three arugments, each is a function – or an operation – on the elements of an RDD.

In this example, the keys are ‘a’ and ‘b’. The values are ints. The task is to combine the ints to a list, for each key.

1. The first function is createCombiner — which turn a value in the original key-value pair, to a combined type. In this example, the combined type is a list. This function will takes an int and returns a list.
2. The second function is mergeValue — which adds a new int into the list. This is to combine value with combiners. — thus uses append.
3. The third function is mergeCombiners — which merges two combiners. Thus in this example is to merge two lists  — thus uses extend.

Reference:

http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

# Machine learning: soft slassifiers and ROC

This post explains the concept of soft classifiers (in its simple form) and offers examples in sklearn.

Soft classifiers

In classification problems, hard classifiers gives the exact predicted class.

But soft classifiers gives a probability estimation over all classes. Prediction can then be made using a threshold. This also gives the possibility of multi-label classifications.

Code in sklearn:

This is a sample program in python using a KNN classier.

import pandas

url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']

#  change to binary classification
new_class = np.random.randint(0, 2, len(dataset), dtype='l')
dataset['class'] = new_class


Now build the classifiers.


from sklearn.neighbors import KNeighborsClassifier

# Make predictions on validation dataset
knn = KNeighborsClassifier()
knn.fit(X_train, Y_train)
predictions = knn.predict(X_validation) # hard
predictions_prob = knn.predict_proba(X_validation) # soft



Results of hard predictor.


predictions

array([ 1.,  1.,  0.,  0.,  1.,  0.,  0.,  1.,  0.,  0.,  0.,  0.,  0.,
1.,  1.,  1.,  1.,  1.,  0.,  1.,  0.,  1.,  1.,  1.,  0.,  0.,
0.,  1.,  1.,  1.])



Results of soft classifier:

array([[ 0.2,  0.8],
[ 0.2,  0.8],
[ 0.6,  0.4],
[ 0.6,  0.4],
[ 0.4,  0.6],
[ 0.6,  0.4],
[ 0.6,  0.4],
[ 0.4,  0.6],
[ 0.8,  0.2],
[ 0.6,  0.4],
[ 0.8,  0.2],
[ 0.6,  0.4],
[ 0.8,  0.2],
[ 0.4,  0.6],
[ 0.2,  0.8],
[ 0.4,  0.6],
[ 0.4,  0.6],
[ 0.4,  0.6],
[ 0.6,  0.4],
[ 0.4,  0.6],
[ 0.6,  0.4],
[ 0.4,  0.6],
[ 0.4,  0.6],
[ 0.4,  0.6],
[ 0.6,  0.4],
[ 0.6,  0.4],
[ 0.8,  0.2],
[ 0.4,  0.6],
[ 0.4,  0.6],
[ 0.2,  0.8]])


This will have results on the ROC curve produced. For the hard classifier, the ROC is linear. For the soft classifier, the ROC is continuous… Note the input of soft classifier: pred = predictions_prob[:, 1] === this generates the probability of the positive class and input to ROC function.

import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.metrics import roc_curve, auc

plt.figure(figsize = (12, 8))

truth = Y_validation
pred = predictions
fpr, tpr, thresholds = roc_curve(truth, pred)
roc_auc = auc(fpr, tpr)
c = (np.random.rand(), np.random.rand(), np.random.rand())
plt.plot(fpr, tpr, color=c, label= 'HARD'+' (AUC = %0.2f)' % roc_auc)

truth = Y_validation
pred = predictions_prob[:, 1]
fpr, tpr, thresholds = roc_curve(truth, pred)
roc_auc = auc(fpr, tpr)
c = (np.random.rand(), np.random.rand(), np.random.rand())
plt.plot(fpr, tpr, color=c, label= 'SOFT'+' (AUC = %0.2f)' % roc_auc)

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.title('ROC')
plt.legend(loc="lower right")

plt.show()


Why is this the case?

This has to do with the input parameters of sklearn. See doc:

sklearn.metrics.roc_curve(y_true, y_score, pos_label=None, sample_weight=None, drop_intermediate=True)

Parameters:

y_score : array, shape = [n_samples]
Target scores, can either be probability estimates of the positive class, confidence values, or non-thresholded measure of decisions (as returned by “decision_function” on some classifiers).

But still, why is the ROC curve continuous? This has to do with what is ROC curve?

For an ROC graph, the x-axis is the false positive rate, the y-axis is the true positive rate.

Each point corresponding to a particular classification strategy.

(0, 0) = classify all instances to negative.

(1, 1) = classifying all instances to positive. ??

A ranking model produces a set of points in ROC space. Each point correspond to the result of a threshold – each threshold produces a different point in ROC. Thus this soft classifier in effect equals to (many) strategies.

Final plot:

# My first independent implementation of a NLP research model…

A tiny tiny milestone… But still some lessons learnt for next time.

Key take-aways:

• It is not that hard. After finding a code base, it takes probably 20 hours of work to complete the model. This includes going back and forth revisiting basic concepts.
• Cannot cheat programming. If I stop to think, that usually mean I don’t really understand the model.
• When faced with large and difficult tasks (thesis, big programs), will feel lost in the beginning. Two strategies :
• Focus on starting so can start early.
• Work on what you do understand now, even though it seems irrelevant. Work on the exact next step. Don’t solve a more general problem for a specific case! (e.g. do not read the whole book, whole documentation… that’s procrastination too!)

Experiments for a research consist of the following parts:

• Preprocessing and data cleaning : my teammates have kindly done this for me.
• Here the key questions are,
• what should the final data structure be?
• what should be the input of the model,
• what should be output of the model?
• This needs to be very specific.
• Store in a list or dictionary or pandas dataframe or numpy array or pytorch tensor?
• Someone told me for my current computational need, these questions are not that significant.
• The best strategy is to work by doing. Pre-design usually change later.
• Implementing the models. This is the most intellectually challenging part.
• First, need to make sure you really understand the model.
• e.g. What’s the size of the matrix? What is passed to the next layer? How is attention calculated?
• Second, when the model is really complex, it is difficult to digest all the parts at once!
• Strategy: It’s frustrating when you don’t understand, but don’t be discouraged and distracted. Instead, work on what you can understand now, and slowly move forward.
• The test process usually consists of loops. e.g. batch gradient descent — there is batch. And there is epoch. Basically there is hierarchical structure.
• Key question:
• What is the smallest working unit? Take it out.

Other tips:

• Know what you don’t understand. It’s usually a missing part of understanding that gets people think and stop. Write it done.
• When debugging, duplicate the notebook. Create a new notebook for each unit wants to test! Do not drag along the whole notebook.
• After clean up some code, duplicate the notebook and delete the test process code.
• For each class, write step-by-step code first then wrap them together later.

# What is a recurrent neural network (RNN)

This post explains what a recurrent neural network – or RNN model – is in machine learning.

RNN is a model that works with input sequences. Difference with sequences and normal data is that for sequence, order matters. Kind of like time-series but this is discrete time. This also means inputs are not independent of each other.

RNN takes input as a sequence of words. The output at each step, is a hidden state vector and an output.

Hidden state vector is a function of previous hidden state $s_{t-1}$ and current input $x$. It is the memory of the network. The formula is $s_t = f(Ux_t + Ws_{t-1})$.

Output is a possibility across vocabulary $o_t = \text{softmax} (Vs_t)$.

Here size of parameters:

• $U$ is a matrix $m * n$. m is the size of the hidden state vector, n is the size of input vector.
• $W$ is a matrix $m * m$. m is the size of the hidden state vector.
• $V$ is a matrix of size h * m. h is the size of the output vector, m is the size of the hidden state vector.

Depending on the context, the output of an RNN can also be a single vector called context. This will be discussed in the next post.

Reference:

Recurrent Neural Networks Tutorial, Part 1 – Introduction to RNNs

# What is cross-validation?

This post explains the concept of cross-validation in machine learning.

Cross validation is a way to do model validation when only limited amount of data is available.

Model validation is a step in model selection, whose goal is to select the best model that fits the data.

Model selection is a two-stage process.

• 1) select a family of hypotheses/models (hypothesis space). e.g. a neural network with 2 hidden layers size fixed vs another neural network with 5 hidden layers size fixed vs decision tree …
• 2) Then for each of the hypothesis family, select the optimal sets of parameters.

Training set and validation set take care of stage 2). That is, for every hypothesis family of interest –

• First use training set to iteratively search for estimates of parameters, that minimize empirical loss function over the training set
• Then (after a few iterations), use the current estimate of parameters, to calculate the empirical loss function over the validation set.
• Optimal estimates of parameters are chosen when the empirical loss over validation set do not increase – early stop.

Test sets are used for stage 1). For every tuned hypothesis, use test sets to do a meta-comparison and select the one with desirable evaluation metrics.

Usually the whole dataset is split into 7:2:1 as training /  validation / test set.

But when the dataset is small, such split will leave few data available for training. With cross validation, there is no need for a separate validation set.

The steps of cross validation is the following:

• Partition the data into a test set and a training set. The test set is left untouched until the very end.
• Divide the training set into K folds.
• For each i in K, train the model using the K-1 folds left in training set.
• The model is validated using this hold out set.
• After completing the K validations, will have K accuracy numbers. Take average of these numbers to be the final validation results.
• Select models based on these validation results.

# Pyspark: RDD join, intersection and cartesian

This post explains three transformations on Spark RDD: join, intersection and cartesian.

What is needed to replicate these examples:

If you have not used Spark, here is a post for introduction and installation of Spark in local mode (in contrast to cluster).

What can be done using these three actions? From a higher level, they combine information from two RDDs together so we can use the map function to manipulate it. We need this because Spark does not allow access to RDD from within RDD – meaning no for loop. Also, lambda expression does not support for loop too.

Join

Operations are performed on the element pairs that have the same keys.

• Each element in the first RDD, is combined with every element in the second RDD that has the same key.
• How to combine? The key is not changed. But the two values are combined into a tuple. The key-tuple is then wrapped around by a container.
• Note the level of containers here. The element structures in the third RDD, is different from the element structures in the two original RDDs.

Example:


rdd = sc.parallelize([("red",20),("red",30),("blue", 100)])
rdd2 = sc.parallelize([("red",40),("red",50),("yellow", 10000)])
rdd.join(rdd2).collect()

# output
[('red', (30, 50)), ('red', (30, 40)), ('red', (20, 50)), ('red', (20, 40))]


Intersection

For two RDDs, return the part that they are in common.


rdd1 = sc.parallelize([(1, 2), (2, 3), (3, 4), (2, 4), (1, 5)])
rdd = sc.parallelize([(1, 2), (2, 3), (3, 4), (1, 5), (2, 6)])
rdd.intersection(rdd1).collect()



Cartesian

Compute the “product” of two RDDs. Every element in the first RDD is combined with every element in the second RDD, to form a element in the new RDD.

If RDD1 is of length 3, RDD2 is of length 3, this will produce a new RDD of length 9.

Example: an RDD cartesian with itself.

rdd2 = sc.parallelize([("red",40),("red",50),("yellow", 10000)])
rdd.cartesian(rdd2).collect()

# output
[(('red', 20), ('red', 40)), (('red', 20), ('red', 50)), (('red', 20), ('yellow', 10000)), (('red', 30), ('red', 40)), (('blue', 100), ('red', 40)), (('red', 30), ('red', 50)), (('red', 30), ('yellow', 10000)), (('blue', 100), ('red', 50)), (('blue', 100), ('yellow', 10000))]


# Three types of gradient descent

Reference – Notes from this blog: https://machinelearningmastery.com/gentle-introduction-mini-batch-gradient-descent-configure-batch-size/

During one epoch, evaluate error for one sample at a time. But update model only after evaluating all errors in the training set

Pros:

• Calculation of prediction errors and the model update are seperated. Thus the algorithm can use parallel processing based implementations.

Cons:

• Need to have all data in memory
• The more stable error gradient may result in premature convergence of the model to a less optimal set of parameters.

Algo:

• Model.initialize()
• For i in n_epoches:
• training_data.shuffle
• X, Y = split(training_data)
• For x in X
• Y_pred = model(X) # get a vector
• error = get_error(Y_pred, Y)
• error_sum += error
• model.update(error_sum)

During one epoch, evaluate error for one sample at a time, then update model immediate after that evaluation.

Pros:

• immediate feedback of model performance and improvement rate
• simple to implement
• frequent update — faster learning rate
• The noisy update process can allow the model to avoid local minima (e.g. premature convergence).

Cons;

• frequent update – computationally extensive
• add a noise parameter /  gradient signal, causing the parameters to jump around
• Hard to settle to an error minimum

Algo

• Model.initialize()
• For i in n_epoches:
• training_data.shuffle
• X, Y = split(training_data)
• for  each x in X
• Y_pred = model(X)
• error = get_error(Y_pred, Y)
• model.update()

During one epoch, split the data into batches (which adds a batch size parameter). Then for each batch, evaluate error for one sample at a time. Update the model after evaluating for all data in one batches. Repeat for different batches. Repeat for different epoch.

Pros:

• more robust converge to local minima, compared to stochastic gradient descent
• frequent update — faster learning rate, compared to batch gradient descent
• efficiency: no need to have all data in memory

Cons;

• configuration of an additional mini-batch parameter

Algo

• Model.initialize()
• For i in n_epoches:
• training_data.shuffle
• Data.split.batches
• For j in n_batches
• X, Y = split(batch_data)
• For x in X:
• Y_pred = model(X) # here a vector
• error = get_error(Y_pred, Y)
• error_sum += error
• model.update()

# First Try of AWS

First try of Amazon AWS, as in course Natural Language Processing. I should have reviewed this tutorial a week ago! Use all the time I spent on Facebook and Douban and talking to people…

Step-by-step guidance from the course tutor. I will note down my understandings and relevant materials in red.

AWS

• Under Services tab on the upper left corner, click EC2 under Compute section
• On the upper right corner, switch your region to Oregon.
• Under IMAGES section click AMIs
• On the dropdown box in the search bar, change from “Owned by me” to “Public images
• To launch a CPU EC2 Instance with PyTorch environment
• Search for NYU-DSGA1011-PyTorch-CPU-0
• Right click the AMI and click Launch
• Select your instance type (t2.micro for instance)
• Click Review and Launch
• In the pop-up window, select Choose an existing key pair and select your key pair below
• Need to create my own key pair before this. What is a key pair? http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#having-ec2-create-your-key-pair
• So seems to just be a key / username/password. My keypair for this is “nlp2017” and the file is “nlp2017.pem.txt”
• Click View Instance
• Here instances need to be in a running state until it can be connected! How to connect here: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/AccessingInstances.html?icmpid=docs_ec2_console
• Open a terminal window, type “ssh -i [my-key-pair.pem] ec2-user@[dns]”. Please replace [my-key-pair.pem] with a directory to your key pair and [dns] with the string under Public DNS (IPv4) column.
• chmod 600 nlp2017.pem.txt [This is to change permission see this thread: https://stackoverflow.com/questions/9270734/ssh-permissions-are-too-open-error]ssh –i nlp2017.pem.txt ec2-user@ec2-52-37-234-187.us-west-2.compute.amazonaws.com
• source ~/.bashrc
• cd pytorch_test/src/
• python pytorch_test_lr_cpu.py
• You can confirm it’s working by observing messages like “Epoch: [1/5], Step: [100/600], Loss: 2.2161”
• Example client code from https://github.com/yunjey/pytorch-tutorial/blob/master/tutorials/01-basics/logistic_regression/main.py
• In the future, you can replace the example client code with your .py file.
• To launch a GPU EC2 Instance with PyTorch environment.
• Remember to stop an instance ! http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Stop_Start.html

• You will be charged for any EC2 Instance with GPU

• Search for nyu-nlp-pytorch-gpu-0
• Right click the AMI and click Launch
• Select g2.2xlarge as your instance type
• Click Review and Launch
• In the pop-up window, select Choose an existing key pair and select your key pair below
• Click View Instance
• Open a terminal window, type “ssh -i [my-key-pair.pem] ec2-user@[dns]”. Please replace [my-key-pair.pem] with a directory to your key pair and [dns] with the string under Public DNS (IPv4) column.
• source ~/.bashrc
• cd nlp_client_code/src/
• python pytorch_cnn_tutorial_gpu.py
• You can confirm it’s working by observing messages like “Epoch [1/5], Iter [100/600] Loss: 0.2209”
• Example client code from https://github.com/yunjey/pytorch-tutorial/blob/master/tutorials/02-intermediate/convolutional_neural_network/main-gpu.py
• In the future, you can replace the example client code with your .py file.

• Please don’t forget to stop (or terminate) your instance. Otherwise, you may be charged for AWS usage.

# Keep away from attention drainers

Yesterday night I set off to return to the apartment at around 10:30 pm. This morning, when I set off to school, it was 10:30 am or so. So I have 12 hours between daily commute, without doing anything.

How did I spend the time? Mostly on mobile phones. I did not even start my computer.

I remember going to bed at 12:50 am and was waken up by the alarm clock at 7:30 am. Thus quality sleep time is less than six hours. I do not feel like working now because of the lack of rest.

So, there are roughly 6 hours wasted. A huge harm to productivity and this cannot be continued.

Activities:

• Posted a WeChat moment and was drawn to that
• Browsing through other people’s WeChat moments
• Weibo probably taken 30 minutes or so
• Yelp search for where to buy bikes in NYC
• Internet surfing last night, contents I don’t quite remember now

These activities need to be eliminated. I missed a morning class also. Since human attention and time are limited resources, companies build apps and make money out of them (e.g. selling ads)! Every minute I spend on Weibo, the company is going to make money without really taken my benefits into consideration. I am being exploited by a company.

However, it is impossible to not to use iPhone, because the tradeoff is to lose connection with friends. There is also the convenience of apps despite the waste of energy.

Steps and plans:

At night, after a day’s work willpower is weak. So, need to take concrete steps to avoid being drawn to distractions.

• When heading off from school, open SelfControl on App.
• Limit daily social media use from 800am-810am, 1200-1210 pm, 600pm -610pm, 950-1000pm. That is already one hour in total!
• This includes: Douban, Weibo, Facebook, WeChat.
• Between 10pm to 8am, do not use mobile phone.
• What to do with the extra time?
• Actually focus on my tasks.
• If feel bored, do not use mobile phone or social media. Instead, do a five5 minute mediation instead.
• Walk around or move outside.
• Do not listen to songs, as they lead to unstable emotion and weak self-control. Do not taken in new information (read a story, podcast, etc.) Take a real rest.

I will log my daily progress on this in a Google Sheet. The goal is to form a habit from Sep 14 to Oct 31. I might fall back, but the moral is never miss two days in a row.

# Machine Learning Re-cap – keep updating

Generative Learning:

Supervised learning – input labeled x and y. then give x and calculate probability p(y|x)

generative learning – basically Bayesian idea. forgot about sample and population ..

p(y|x) = p(y, x) / p(x) = p(x|y) * p(y) / sum(P(x, y)) ~ p(x|y) * p(y)

p(y) is the prior distribution of y.

p(x|y) is the probability of x given a y

So if assume a prior y, also p(x|y), which means assuming the distribution of labels and the distribution of input given a label, can get the probability of current observant under each label. Then maximize this. (Does this relate to mle as well?)

Gaussian Discriminant Analysis Model

x is continuous real valued vectors

The prior for y is Bernoulli(phi)

x|y=0 ~ multivariate Gaussian with mean mu0 and cov sigma

x|y=0 ~ multivariate Gaussian with mean mu1 and cov sigma — same sigma!!

GDA and logistic regression :

if p(x|y) is a multivariate gaussian then p(y|x) follows a logistic function.

Q: GDA is a generative algo, while logistic regression is a discriminate algo. When use one over the other?

• GDA makes stronger assumptions about p(x|y). i.e. if p(x|y) ~ Gaussian, y ~ Bernoulli, then p(y|x) ~ logistic regresssion. But the inverse is not true. if p(y|x) ~ logistic, then it’s possible that p(x|y) ~ Poisson. Thus the assumption of logistic regression is weaker.
• When p(x|y) is indeed Gaussian, then GDA is asymptotically efficienc. i.e. if the assumption is met GDA is better than logistic regression
• then logistic is robust and less sensitive to incorrect models

Naive Bayes

x is discrete vectors

Sep 13, 2017

A list of models to review:

• Neural networks
• GAN
• Energy-based models
• CNN
• LSTM
• Support Vector Machine
• LASSO
• PCA
• KNN

Oct 11, 2017

• today learnt KNN — select k nearest neighbors and classify it to be the most prevelant class in these neighbors
• Supervised model : given input predict output — from elements of statistical learning
• k means — ??

Oct 12, 2017

Today learnt:

• decision trees
• CART — classification and regression tree – each step only choose one feature and a threshold. (machine learning book with tensorflow)
•  feature importance — calculated using Gini
• Gini
• MLE vs KL divergence vs cross entropy
• KL divergence is measuring dissimilarities between empirical distribution and theoretical distributions
• MLE is to tune the parameters s.t. the likelihood is maximized given seen data
• cross entropy ?
• PCA – eigenvalue decomposition of covariance matrix ??
• RNN — used when the input data has different lengths but have to train the same model !