Monthly Archives: November 2017

Pyspark: RDD join, intersection and cartesian

This post explains three transformations on Spark RDD: join, intersection and cartesian.

What is needed to replicate these examples:

  • Access to Pyspark

If you have not used Spark, here is a post for introduction and installation of Spark in local mode (in contrast to cluster).

What can be done using these three actions? From a higher level, they combine information from two RDDs together so we can use the map function to manipulate it. We need this because Spark does not allow access to RDD from within RDD – meaning no for loop. Also, lambda expression does not support for loop too.

Join

Operations are performed on the element pairs that have the same keys.

  • Each element in the first RDD, is combined with every element in the second RDD that has the same key. 
  • How to combine? The key is not changed. But the two values are combined into a tuple. The key-tuple is then wrapped around by a container.
  • Note the level of containers here. The element structures in the third RDD, is different from the element structures in the two original RDDs.

Example:


rdd = sc.parallelize([("red",20),("red",30),("blue", 100)])
rdd2 = sc.parallelize([("red",40),("red",50),("yellow", 10000)])
rdd.join(rdd2).collect()

# output
[('red', (30, 50)), ('red', (30, 40)), ('red', (20, 50)), ('red', (20, 40))]

Intersection

For two RDDs, return the part that they are in common.


rdd1 = sc.parallelize([(1, 2), (2, 3), (3, 4), (2, 4), (1, 5)])
rdd = sc.parallelize([(1, 2), (2, 3), (3, 4), (1, 5), (2, 6)])
rdd.intersection(rdd1).collect()

Cartesian

Compute the “product” of two RDDs. Every element in the first RDD is combined with every element in the second RDD, to form a element in the new RDD.

If RDD1 is of length 3, RDD2 is of length 3, this will produce a new RDD of length 9.

Example: an RDD cartesian with itself.

rdd2 = sc.parallelize([("red",40),("red",50),("yellow", 10000)])
rdd.cartesian(rdd2).collect()

# output
[(('red', 20), ('red', 40)), (('red', 20), ('red', 50)), (('red', 20), ('yellow', 10000)), (('red', 30), ('red', 40)), (('blue', 100), ('red', 40)), (('red', 30), ('red', 50)), (('red', 30), ('yellow', 10000)), (('blue', 100), ('red', 50)), (('blue', 100), ('yellow', 10000))]

Thinking is counter-productive

I spent a lot of time on thinking and reflecting. But they are counter-productive.

Real productive behaviors are writing and coding.

When I caught myself thinking, something is usually wrong. Examples:

  • Career decisions. Actions should be:  collecting facts, ask for plans, compare pros and cons, decide, plan, execute. I thought a lot and take forever to decide. The real problems are a fear for failure, low self-esteem, unclear values and priorities.
  • Coding. Action should be writing code, test, and revise. When stuck, ask for help. When I stop and think how to write a code, I was unproductive. I once spent two weeks writing a k-means algorithm. Here I think because I got stuck and don’t know what to do. The reason behind that, are
    • 1) unfamiliarity with data structure and algorithms and
    • 2) didn’t fully understand minHash algorithm and
    • 3) didn’t know spark and
    • 4) unwise coding strategies – start from simple.
  • Blogging. I think about what I plan to write. I wrote drafts of posts explaining concepts. But I hadn’t posted a single piece yet. The real problem were:
    • I did not understand the concepts well to explain
    • fear of being judged
  • Writing paper. I think about methodologies but not actually write them. The real problem is I don’t know what to write. This is because my data cannot support quantitative conclusions, while I did not know how to do qualitative ones.
  • Research. I think about how to do this, ask people how to do this. But not actually reading papers. Reason – procrastination.
    • Lack of confidence. I fear I will not understand. Since work is related to self-esteem, I am hiding from shame from failure.

This post identifies solutions for programming including:

 

 

  • Understanding: ask someone, learn what will help you understand
  • Drawing: too many concepts together cannot hold in their mind. Solution is to draw something.
  • Starting: I don’t know what to write. Then just write whatever you can write right now. Start with what you do understand. Even if it’s just one function or an unimportant class.
  • Skipping to the next step and come back later.
  • Get some sleep or food. Might be low energy.
  • avoid distraction.
  • self-doubt: feel unsure about oneself. (true!) solution = start from what you are sure. (I can apply this to research) You will become better in the future.

 

 

That said, the most important thing is to actually sit and write. Take actions. As this author says, always be firing.

Yet another motivational post – now going back to some real work…

Three types of gradient descent

Reference – Notes from this blog: https://machinelearningmastery.com/gentle-introduction-mini-batch-gradient-descent-configure-batch-size/

Three types are : batch gradient descent, stochastic gradient descent, mini-batch gradient descent

Batch gradient descent

During one epoch, evaluate error for one sample at a time. But update model only after evaluating all errors in the training set

Pros:

  • Calculation of prediction errors and the model update are seperated. Thus the algorithm can use parallel processing based implementations.

Cons:

  • Need to have all data in memory
  • The more stable error gradient may result in premature convergence of the model to a less optimal set of parameters.

Algo:

  • Model.initialize()
  • For i in n_epoches:
    • training_data.shuffle
    • X, Y = split(training_data)
      • For x in X
      • Y_pred = model(X) # get a vector
      • error = get_error(Y_pred, Y)
      • error_sum += error
    • model.update(error_sum)

Stochastic gradient descent

During one epoch, evaluate error for one sample at a time, then update model immediate after that evaluation.

Pros:

  • immediate feedback of model performance and improvement rate
  • simple to implement
  • frequent update — faster learning rate
  • The noisy update process can allow the model to avoid local minima (e.g. premature convergence).

Cons;

  • frequent update – computationally extensive
  • add a noise parameter /  gradient signal, causing the parameters to jump around
  • Hard to settle to an error minimum

Algo

  • Model.initialize()
  • For i in n_epoches:
    • training_data.shuffle
    • X, Y = split(training_data)
      • for  each x in X
        • Y_pred = model(X)
        • error = get_error(Y_pred, Y)
        • model.update()

 

Mini batch gradient descent

During one epoch, split the data into batches (which adds a batch size parameter). Then for each batch, evaluate error for one sample at a time. Update the model after evaluating for all data in one batches. Repeat for different batches. Repeat for different epoch.

Pros:

  • more robust converge to local minima, compared to stochastic gradient descent
  • frequent update — faster learning rate, compared to batch gradient descent
  • efficiency: no need to have all data in memory

Cons;

  • configuration of an additional mini-batch parameter

Algo

  • Model.initialize()
  • For i in n_epoches:
    • training_data.shuffle
    • Data.split.batches
    • For j in n_batches
      • X, Y = split(batch_data)
      • For x in X:
        • Y_pred = model(X) # here a vector
        • error = get_error(Y_pred, Y)
        • error_sum += error
      • model.update()