ParallelRcpp

There are two primary methods for parallelisation in Rcpp, the first being OpenMP and the second being the RcppParallel package for Rcpp. The RcppParallel package builds on existing methods and uses OpenMP, but provides neat functionality and simple usage, and so will be focused on in this portfolio.

The reader must have a working knowledge of C++ and Rcpp. To begin, each Rcpp file must include

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>

in the header. This ensures that the settings for RcppParallel are automatically included in the compilation of the C++ code.

Regular parallel approaches to Cpp can cause crashes due to the single-threaded nature of R. This is due to multiple threads attempting to access and interact with the same data structure. RcppParallel provides a straightforward way to account for this.

Basic Parallel Operations

There are two functions inbuilt which can provide the bulk of the parallel operations; parallelFor and parallelReduce. These are interfaces to a parallel for loop and reduce function (see ?Reduce in R).

RcppParallel also provides two accessor classes, RVector and RMatrix, which are thread-safe accessors for an Rcpp vector and matrix, helping to deal with the problem of accessing the same data structure across multiple threads.

Example: Matrix Transformations

Consider taking the log of every element in a large matrix. In R, this process is simple, since log is a vectorised function, we can just run

log(A)

where A is a matrix. This would also be easy to implement in Rcpp using the std::transform operator:

#include <Rcpp.h>
using namespace Rcpp;

// [[Rcpp:export]]
NumericMatrix MatrixLog(NumericMatrix A)
{
  int n = A.nrow();
  int d = A.ncol();
  NumericMatrix out(n, d);
  std::transform(A.begin(), A.end(), out.begin(), ::log);
  return(out);
}

The std::transform function applies a given function across a range of values, here these are specified as all the elements in the matrix A, where the starting and ending point are supplied by A.begin() and A.end(). These transformed values are saved in out, starting at out.begin(). We can compare the speed of this function to an R implementation by applying both the base R log function and MatrixLog to a large matrix (and check that they give the same result.

library(Rcpp)
sourceCpp("MatrixLog.cpp")
d = 200
A = matrix(1:(d^2), d, d)
all.equal(log(A), MatrixLog(A))
## [1] TRUE

Okay, they are equal, this is a good first step.

library(microbenchmark)
microbenchmark(log(A), MatrixLog(A), times = 1000, unit="relative")
## Unit: relative
##          expr      min       lq     mean   median       uq      max neval
##        log(A) 1.008818 1.042968 0.980134 1.057816 0.979937 0.398702  1000
##  MatrixLog(A) 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000  1000

So on average, the C++ implementation is actually around the same speed as the base R implementation. We can speed up the Rcpp code through the use of parallel computing, using parallelFor.

Firstly, parallelFor has four arguments:

  • begin: the beginning of the for loop
  • end: the end of the for loop
  • worker: an object of type Worker, where the operations are specified
  • grainSize: minimal chunk size for parallelisation, minimum number of operations for each thread

Before defining the parallel code, parallelFor needs a Worker object to specify what processes to perform within the for loop. For this case, we need to create a worker that takes the log of each set of elements that are passed to each thread.

struct Log : public RcppParallel::Worker
{
   const RcppParallel::RMatrix<double> input;
   RcppParallel::RMatrix<double> output;
  
   Log(const NumericMatrix input, NumericMatrix output) 
      : input(input), output(output) {}
   
   void operator()(std::size_t begin, std::size_t end) {
      std::transform(input.begin() + begin, 
                     input.begin() + end, 
                     output.begin() + begin, 
                     ::log);
   }
};

Let’s break down this structure. Firstly, two objects of type RMatrix are specified, for the input and output (recall that an RMatrix is a thread-safe object given by RcppParallel). Since different chunks of the matrix will be passed between threads, they need to be converted to this safe RMatrix object before they are interacted with. Secondly, the Log function is defined, so that these inputs and outputs are passed through.

Finally, the operator() is the main part, which is what will natively be called by parallelFor. This performs the same operation as what we saw before in MatrixLog, with a few key differences. Namely the begin and end function inputs, which change the range that std::transform is applied to based on the chunk of the matrix that parallelFor will be giving this worker.

Now that this is set up, we can rewrite MatrixLog in parallel:

#include <Rcpp.h>
#include <RcppParallel.h>
using namespace Rcpp;
// [[Rcpp::depends(RcppParallel)]]

// [[Rcpp::export]]
NumericMatrix MatrixLogPar(NumericMatrix A) {
  
  int n = A.nrow();
  int d = A.ncol();
  NumericMatrix output(n, d);
  
  Log log_(A, output);
  parallelFor(0, A.length(), log_);
  
  return output;
}

This function is similar to the original MatrixLog, however the std::transform section has been replaced by the definition of the worker log_ (with class Log), and then the call to parallelFor. To reiterate, a worker is defined which has the pre-built operator that it will take the log of each element within the chunk that is specified to it. This worker is then spread out across multiple threads by the call to parallelFor, and the output is saved in output.

Now we can compare the speed!

sourceCpp("MatrixLogPar.cpp")
all.equal(log(A), MatrixLog(A), MatrixLogPar(A))
## [1] TRUE
microbenchmark(log(A), MatrixLog(A), MatrixLogPar(A), unit="relative", times=1000)
## Unit: relative
##             expr      min       lq     mean   median       uq       max neval
##           log(A) 2.339494 2.318958 1.749330 2.048938 1.820713 0.7575495  1000
##     MatrixLog(A) 2.232776 2.197518 1.731507 1.977766 1.801228 0.8874397  1000
##  MatrixLogPar(A) 1.000000 1.000000 1.000000 1.000000 1.000000 1.0000000  1000

So originally we had roughly the same processing time as the base R implementation. Now this is roughly twice as fast as base R!

Previous
Next