Monthly Archives: December 2018

Distributed Stochastic Gradient Descent with MPI and PyTorch

This post describes how to implement stochastic gradient in a distributed fashion with MPI. It will cover the following topics in a high-level fashion, as it is challenging to cover every details in a single post. I will point to other resources helpful in understanding key concepts.

  • What is MPI
  • How to do distributed SGD
  • PyTorch modules necessary for writing distributed SGD and how to design the program
  • Engineering caveats for building PyTorch with MPI backend 

What is MPI

I understand MPI as a programming framework that handles communication between computers. 

It’s like the lower-level software underneath MapReduce, another programming framework for distributed computing. In MapReduce you need to specify a map operation and a reduce operation and that’s all. The system take care of the allocation of workers and server and memory, which can of course taken care of by meta-information input into a shell script. The main challenge is designing the parallel algorithms at the algorithmic level, for example how to parse the two matrices when doing matrix multiplication, or how to design the mapper and reducer for min-hash and shingling documents. 

But in MPI, the programmer needs to specify the worker and server by their ID, and actively coordinate the computers, which probably requires another set of algorithms. For example, if many workers need to communicate with the server at the same time but the server cannot serve them all, there needs to be a scheduling algorithm e.g. round-robin. In MPI, the programmer needs to code the scheduling by themselves so that messages from different workers won’t be mixed. We’ll see an example later in the asynchronous SGD. 

MPI has many concrete implementations, for example OpenMPI and MPICH. 

MPI has the following basic operations:

  • point-to-point communications: blocking and non-blocking. This enables a single computer to send message to another, and another computer to receive message. blocking means the process is blocked until the send / receive operation is finished, while block means the process is returned immediately, not wait till the process is finished.
  • collective communications : examples are reduce and gather.

Here is a good place to learn basic operations of MPI. You need to go through all tutorials on that website to be able to understand and write a project that does distributed SGD with PyTorch.

How to do distributed SGD

There are two types of distributed SGD, depending on when to synchronize gradients computed on individual workers.

  •  synchronous SGD:
    • all workers have a copy of the model. they are all the same
    • at every mini-batch, all workers compute their share of gradient, and then compute average
    • the model update on every worker
    • then move on to the next batch
    • if there is a server,
      • the server has a copy of the model
      • the workers also have copies of the model
      • workers are assigned data to calculate forward and backward
      • gradients are sent to the server to take care of the averaging
      • workers receive updated gradients from the server
    • if there is no server, the gradient is calculated in all_reduce. all_reduce can (but not necessarily) be implemented in the ring algorithm: every model send the results to its neighbor ?
  • asynchronous SGD:
    • all workers have a copy of the model, but they are not the same. the model is inconsistent across workers
    • at every mini-batch:
      • workers get parameters from the server
      • workers get data from its own data loader, or randomly selected dataset (is there an epoch concept anymore?)
      • workers calculate forward and gradient
      • once the calculation is done, gradient is sent to the server,
      • the server updates the parameters

Now we need to conceptually understand how this workflow can be implemented using data structure and APIs provided by PyTorch. To do this, let’s first analyze how a single machine SGD is implemented in PyTorch.

In particular, in a single machine SGD implemented with PyTorch, 1) the output of a mini-batch is calculated by calling the forward function of the model, 2) the loss is calculated by sending the output and the target to a loss function, and 3) the gradient is calculated by calling loss.backward, and 4) the update to parameters is done by calling optimizer.step(), and we would pass the model parameters to the optimizer beforehand.

Now for this single-machine workflow, key functions are:

  • We calculate the gradients in the loss.backward() step
  • The gradients are stored in model.parameters()
  • The model parameters are updated when we call optimizer.step()

So, in a multi-machine scenario, the following questions need to be considered:

  1. How many copies of the model should be created?
  2. What should the server store?
  3. What should the slaves store?
  4. What should they communicate with each other?

To see why 1) is important, note the way we deploy MPI is by having the same script sending to every machine, but use MPI_rank to identify servers and slaves, and use if-else condition to decide which block of code should be run on which machine. So, we can theoretically create some objects when we encounter a server, but do not create these objects when we encounter a slave. To answer 1), an obvious answer is everybody has its own copy of a whole model (i.e. the whole computation graph created by model = myModelClass()), but is this necessary?

It turned out it is. Why? Although server only need the parameter value and gradient values to perform updates, and theoretically we only need to put the optimizer on server, and model parameters on slaves, this is not doable in operation because in PyTorch optimizer is tied to a set of model parameters. More, the whole computation graph contains more than data, but also relations. So, we must initialize a model and a optimizer on both server and slaves, and use communication to make sure their values are on the same page.

To answer question 4, here is the logic flow:

  • a slave establishes connection with the server
  • a slave fetches initial parameter values from the server
  • a slave breaks connection with the server
  • a slave calculate the loss on its own batch of data
  • a slave calculates the gradients by calling loss.backward()
  • a slave establishes connection with the server
  • a slave send the gradients and loss to the server
  • a slave breaks connection with the server
  • the server updates its parameter value.

Note here we have multiple steps concerning how to set up connections. In effect, parameter values have many layers corresponding to different layers of a neural network, and we have multiple salves. So if multiple salves trying to send the same set of parameters to the server, data from different sets might be messed up. In other words, MPI needs to know who sends data and programmers need to use a protocol to ensure that during the transmission, only one pair of connection is in effect.

Note MPI does not allow one slave to “block” other slaves, so we need to code this “block” up by using “handshake” technique in TCP. The idea is a slave first send a “hello” message to the server, and when a server is busy this message will wait in a queue until being received. And when the server is idle, and it receives the “hello” message, it will realize someone else is calling it and will wait for this particular person by only receiving message from the same ID for this round. It will also send a message to this ID, telling it “I’m ready for you and no one else is able to mess around”. When a slave receives this confirmation from the server, it can go on to send the important gradient information to the server. After it finishes, it will receive the information send from the server. And when the server finished sending update information to the slave, it will be idle again and able to receive “hello” from another slave. This is a very interesting process to establish connections!

Engineering caveats for building PyTorch with MPI backend 

Some caveats to watch out for:

  • To use PyTorch with MPI, it cannot be installed from Conda. You have to build it yourself. But PyTorch is a large package so check your disk space and act accordingly. It took me 4 days to build it successfully….

Some links:

  • PyTorch tutorial for distributed SGD. Note this guy uses a synchronized SGD so the process is easier – you can just run an all-reduce and you do not have to worry about slaves mess up with each other. https://pytorch.org/tutorials/intermediate/dist_tuto.html#advanced-topics
  • My implementation of the distributed SGD process… https://github.com/yuqli/hpc/blob/master/lab4/src/lab4_multiplenode.py

Finally, this blog post is not very clearly written. If someone really reads this and has questions, please email me at greenstone1564@gmail.com.

重读 hierarchical attention network for document classification

这篇文章的 key idea 是,把关于文档结构的层级结构信息加入模型,有助于生成更好的文本表征。

这里的文档结构主要是说文章由句子组成,是层级结构。之前的方法是把所有句子连成一起输入一个 RNN 模型,这样其实丢失了段落这样的层级结构。

相应地,另一种方法是把每个句子里的词先分步输入一个 RNN 模型,生成句子表征;再将所有的句子表征输入另一个RNN模型,生成文本表征;最后再用文本表征进行分类。这样的方法,神经网络之间并不共享参数,而且两次输入RNN,故可以捕捉到层级结构。

将文本分层级处理,还涉及到更为灵活的注意力机制的应用。句子和词层面分别实现注意力机制,可以使表征从特定的“重要”元素里获取更多信息。

具体的网络实现有几个特点:

关于注意力机制的实现;

  • 将每个句子里的词向量送入 GRU (see last post on explanation on what is a GRU),收集每一步输出的 hidden state (因而不能直接调用 pytorch nn.GRU 函数而需要稍作改变写个 for-loop 并把结果存起来)
  • 把所有的 hidden state 送入MLP,生成对词向量的表征
  • 随机初始化一个 context vector,这个 context vector 的意义是句子的含义。用它和每个向量的表征求点积,代表 attention score。score 越高,说明两个向量越相似,也就说明这个词在这个句子里有更显著的意义。因此给它的 attention weight 也就应该比较高。
  • 将 attention score 送入 softmax 函数求得权重。用这个权重和原始的 hidden states sequence 求 weighted sum 得到整个句子的表征。

关于层级结构的实现:

  • 我们一共只训练两套 GRU 单元,一个负责总结句子,一个负责总结段落。因此所有的句子必须一样长,所有的段落必须有一样长度的句子。因此在预处理时,过长的句子被剪掉,过短的句子被补足。具体的长度选取可以看训练数据中长度的分布,用 qunatile 选择。
  • 将数据划整齐后,首先用上述方法得到每个句子的表征。
  • 其次,针对每个段落,再把所有的句子表征送入GRU,得到段落表征。
  • 最后就可以用这个表征做分类了。

论文地址:https://www.cs.cmu.edu/~hovy/papers/16HLT-hierarchical-attention-networks.pdf

2D convolution with tiling technique on GPU

This post describes how to write CUDA C code to perform 2D convolution on GPU with tiling technique. It is very brief, only covers basic concepts but with links to code examples. We are not going to use cuDNN, only the bare bones of CUDA. But here is an excellent post if you want to use cuDNN – for this particular task, cuDNN is slower than the code I will present below.

You can find the source code for this post here.

This post will consist of the following sections:

  • Installation CUDA
  • Basic GPU programming model
  • Vanilla convolution on GPU
  • Constant memory in GPU
  • Tiling technique and indexing
  • Link to code example

Install CUDA

First, make sure if you have a NVIDIA GPU on your machine. Here is how. Or just search the model online and ask on reddit 🙂

Next, follow the official NVIDIA guide here to download CUDA Toolkit. Note not every card support every version of CUDA kit.

Note also the download path, as you will need to specify that to the compiler later. If you are using Linux you can type whereis nvcc to find out. nvcc is the NVIDIA C compiler.

It’s easy to get wrong. Just follow carefully every step in the official installation guide. In particular don’t forget to finish the post-installation set-ups! Wrong PATH environment variables can be a real pain.

Basic CUDA Programming model

The most prominent characteristic of CUDA programming is parallization. When you write one line of code in a CUDA kernel (explained later), it will be executed by thousands of threads on different data. This is called the SIMD (same instruction multiple data) parallel paradigm.

With this idea in mind, here is what a typical CUDA C code look like:

  • Setting up the program on the host (CPU) by initializing variables and populate them. Write non-parallel sequential code.
  • Preparing for GPU work by initializing device (GPU) variables that reside in device memory, and copy contents from the corresponding host variables to it.
  • Write a GPU kernel function to deal with the parallel part of computation (e.g. convolution).
  • Set up the parallel configurations by specifying how many blocks and grids are used for the kernel. These two variables will decide how many threads we started for parallel execution.
  • Call the kernel from host code. Store the results in a device variable.
  • Copy contents from device to the host.

Some concepts to clear:

  • How many threads we use in the GPU is controlled by Grid size and Block size. A grid consists of many blocks. Grid and Block are both 3D structures. For example, a grid of dimension <2, 2, 2> will have 8 blocks in total, and a block of dimension <2, 2, 2> will have 8 threads in total. A block cannot have more than 1024 threads.

Here is a basic matrix addition example:

#include<stdio.h>
#include<cuda.h>

// function declarations 
__global__ void vecAddKernel(float * a, float * b, float * c, unsigned int N);

// main function 
int main()
{   
    int N = 10;    // length of vector 
    float * a, * b, * c;  // a and b are vectors. c is the result
    unsigned int size = N * sizeof(float);  // number of bytes to allocate 
    a = (float *)calloc(N, sizeof(float));
    b = (float *)calloc(N, sizeof(float));

    int i = 0;
    float sum = 0;
    for (i = 0; i < N; i++){
        a[i] = (float)i / 0.23 + 1;
        b[i] = (float)i / 5.89 + 9;
        sum += a[i] + b[i];
    }

    c = (float*) malloc(size);
 
    // 1. allocate memory on CUDA
    float * d_a, * d_b, * d_c;   // device memory 
    cudaError_t err1 =  cudaMalloc((void **) & d_a, size);
    cudaError_t err2 = cudaMalloc((void **) & d_b, size);
    cudaError_t err3 = cudaMalloc((void **) & d_c, size);
    if (err1 != cudaSuccess){
        printf("%s in %s at line %d\n", cudaGetErrorString(err1), __FILE__, __LINE__);
        exit(EXIT_FAILURE);
    }
    if (err2 != cudaSuccess){
        printf("%s in %s at line %d\n", cudaGetErrorString(err2), __FILE__, __LINE__);
        exit(EXIT_FAILURE);
    }
    if (err3 != cudaSuccess){
        printf("%s in %s at line %d\n", cudaGetErrorString(err3), __FILE__, __LINE__);
        exit(EXIT_FAILURE);
    }
     
     
     
    // copy memory 
    cudaMemcpy(d_a, a, size, cudaMemcpyHostToDevice);
    cudaMemcpy(d_b, b, size, cudaMemcpyHostToDevice);

    // 2. operate on kernels 
    vecAddKernel<<<ceil(N/256.0), 256>>>(d_a, d_b, d_c, N);

    // 3. copy the results back to host
    cudaMemcpy(c, d_c, size, cudaMemcpyDeviceToHost);

    cudaDeviceSynchronize();
    cudaError_t error = cudaGetLastError();
    if(error!=cudaSuccess)
    {
        fprintf(stderr,"ERROR: %s\n", cudaGetErrorString(error) );
        exit(-1);
    }
 
    float cuda_res = 0;
    for(i = 0; i < N; i++){
        printf("%.2f\t", c[i]);
        cuda_res += c[i];
    }
 
    printf("Results from host :%.2f\n", sum);
    printf("Results from device:%.2f\n", cuda_res);

    cudaFree(d_a); 
    cudaFree(d_b);
    cudaFree(d_c);

    return 0;
}

__global__
void vecAddKernel(float * a, float * b, float * c, unsigned int N){
    int i = threadIdx.x + blockDim.x * blockIdx.x;
    if (i<N)  c[i] = a[i] + b[i];
}

 

Vanilla convolution on GPU

Where to parallelize for convolution? We use every thread to correspond to one single element in the output, and all threads do the same convolution operation but with different data chunks.

Constant memory in GPU

Constant memory variables are shared by all threads in all blocks. This memory is small but fast. So we should put the filter in it because everyone needs it, and it’s small.

Tiling technique and indexing

Tiling is making use of the shared memory on GPU. Shared memory is shared by all threads in the same block, it is small but fast. So the idea is for every thread in a block, they will operate on data regions that have some overlaps. And we make use of that to copy the small region of the larger picture that these threads need into the shared memory and dealing with them.

We use all threads to copy these data elements. Here is a further illustration of this tiling idea.

Code example

Here.

 

Here is the tiling organization :

Understanding GRUs

This post follows this post last year about vanilla Recurrent Neural Network structures.

One of the ultimate goal of a recurrent neural network is to summarize a sequence of vectors into a single vector representing all the information.

In the simplest case, imagine for the past month, every day  you and your friend did some push-ups. Let’s denote your number as x_i at day i, and your friend’s number as y_i at day i. Now a month has passed and you two want to compare who is more keen on working out.

So what you do you? Directly comparing two vectors is hard, because there is no obvious “greater-than” relations across dimensions? One way is to calculate the simple average of the two sequences, i.e. on average you did \bar{x} and your friend did \bar{y} per day. Whichever number is larger, you can bet that person is a better athlete!

Now in this example, time order does not really matter, because we are treating numbers from 30 days ago as the same as number yesterday. Another example where time does matter is interests from bank savings. Suppose every year you save b_i amount of Yuan into the bank, and the bank pays out interests at the end of year, once per year. How much will your total savings be at the end of the fifth year? You can image the oldest money will receive the highest interests, because of confounding.

With the same logic, we can apply this idea to summarizing sentences. Suppose every word can be represented as a n-D vector in a semantic space. How do we produce one single vector that represent the meaning of this sentence?

Taking average is a way, but note every word will have some influence on the words after it. e.g. “this burger is delicious.” Note how the “burger” constrains word choices after “is” … And that’s why we need recurrent neural network : at every step of the model, the hidden vector (which contains information about previous words) will concatenate with the current word, and becomes the input into producing the next hidden vector. So every word will have a lasting influence in the final output.

However, simple RNN has a vanishing (explosive) gradient problem. Mathematically, the older a word is, the higher order it will has on the multiplication factor of weight matrix. When taking first-order gradients, the weight matrix will have an order of n-1. Now if one term is larger than 1, as n approaches infinity this will will approach infinity too. If one term is smaller than 1, as n approaches infinity this will will approach zero and thus the model will “stop learning” (i.e. weights will not update).

Let’s formulate the intuition above more formally. For a simple RNN, every update we have

h_t =g (W x_t + U h_{t-1} + b)

Here g can be any non-linear activation function, e.g. a RELU or a sigmoid. Now we consider the gradient of h_t with regard to W.

Write

h_t = g (O_t)

and

O_t =W x_t + U h_{t-1} + b

Using the chain rule we have:

\frac{\partial h_t}{\partial W} =\frac{\partial h_t}{\partial O_t} \frac{\partial O_t}{\partial W}

We also know:

\frac{\partial O_t}{\partial W} = X_t + U_h \frac{\partial h_{t-1}}{\partial W}

So plug in the second equation above into the first equation, we have:

\frac{\partial h_t}{\partial W} = {\partial g} \cdot (X_t + U_h \frac{\partial h_{t-1}}{\partial W})

We can already see a recurrent relation between \frac{\partial h_t}{\partial W} and \frac{\partial h_{t-1}}{\partial W}. To make things clear, write \frac{\partial h_t}{\partial W} = \alpha_t, and expand the equation a few more steps:

\alpha_t = {\partial g} \cdot (X_t + U_h \alpha_{t-1})

\alpha_t = {\partial g} \cdot (X_t + U_h \cdot \partial g \cdot (X_{t-1} + \alpha_{t-2}) )

\ldots

\alpha_t = C + (\partial g U_h)^{n-1} \alpha_0

Here $C$ represent the other terms (with lower order of \alpha_t-s) in the formula that we omitted for simplicity. So we can see, as n increases, if norm of \partial g U_h is greater than 1 this term will approach infinity, or if is less than 1 it will approach zero.

The main reason is the same term is multiplied n-1 times. i.e. information always flow at the same rate every step. If you think about a sentence, this does not really make sense as words meaning / importance does not really decrease (increase) exponentially w.r.t. it’s distance to the end of the sentence. The first word might very well be very important (e.g. a female name which will influence later pronoun choices “he” or “her”), but this vanilla RNN structure is too simple to capture that. We need more dynamic structures to allow information flow freely between time stamps.

Gated recurrent unit solves the problem by adapting different terms in every update step. There are two key ideas:

  • Introduces an external “memory” vector to capture long distance dependencies.
  • Allow error messages to flow at different strengths depending on inputs.

It achieves this by first computing two “gates”:

update gate : z_t = \sigma (W_z x_t + U_z h_{t-1} + b_z)

reset gate: r_t = \sigma (W_r x_t + U_r h_{t-1} + b_r)

They are both continuous vectors of the same length as the hidden state, constructed by passing the current word x_t and the last hidden vector h_{t-1} through an MLP. The sigmoid function makes every element between 0 and 1, so when used to perform element-wise multiplications \o, these two gates essentially controls how much information “flows” through.

There is also a candidate vector representing “new memory content”:

\tilde{h_t} = \text{tanh} (W_h X_t + r_t \circ (U_h h_{t-1} + b_h)

The usage of tanh and W_h X_t is pretty standard as in all other unites. But note the meaning of reset gate r_t here. If r_t is close to zero, then we “forget” all the previous information in the h_{t-1} vector, and just use the current word information.

An example where the reset gate is useful is sentiment analysis on a movie review, where the author spend many sentences describing and summarizing the movie plot, but conclude that “But the movie is really boring”. With this sentence, all the previous information become useless. This reset gate can help the model “forget” the previous summaries.

Finally, we update the new hidden vector as :

h_t =  z_t \circ h_{t-1} + (1 - z_t) \circ \tilde{h_t}

You can see the update controls how much information is remembered from old state, and how much information is acquired from the new memory content.

A caveat is why do we need the reset gate at all now that we have the update gate? Doesn’t we already have a gate to control how much old information is retained by using an update gate? Doesn’t the update gate also have the ability to eliminate all historical information embedded in h_{t-1}? I did not find satisfactory information about this point online.

Now think about why GRU can solve the vanish gradient problem. The update gate allows us to retain all previous information by setting all elements of z_t to 1, and h_t is exactly the same as h_{t-1}. In this case, we do not have the vanishing gradient problem because no weight matrix is multiplied.

For example, in the same movie review suppose the author says “I love this movie so much” and then goes on to explain the movie plot. Now the first sentence is really important, but with recurrent neural network, we update the gradient at every step and the content will be washed out as time passes. But now the update gate allows the model to retain the first sentence without exponentially decay its content.

Units with short term memories usually have reset gate very activate (i.e. numbers close to 1), and thus forget most about the past…

If reset gate is entirely 1 and update gate is entirely zero, then we just have a standard RNN.

Why tanh and sigmoid?

  • In theory tanh can also be RElu, but in practice it just performs better.
  • Sigmoid will control value between 0 and 1, but again no formal justification.

Finally, here is an illustration that pictures what’s going on in a GRU unit:

Reference: