ParallelRcpp
There are two primary methods for parallelisation in Rcpp
, the first being OpenMP
and the second being the RcppParallel
package for Rcpp
. The RcppParallel
package builds on existing methods and uses OpenMP
, but provides neat functionality and simple usage, and so will be focused on in this portfolio.
The reader must have a working knowledge of C++ and Rcpp
. To begin, each Rcpp
file must include
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
in the header. This ensures that the settings for RcppParallel
are automatically included in the compilation of the C++ code.
Regular parallel approaches to Cpp can cause crashes due to the single-threaded nature of R. This is due to multiple threads attempting to access and interact with the same data structure. RcppParallel
provides a straightforward way to account for this.
Basic Parallel Operations
There are two functions inbuilt which can provide the bulk of the parallel operations; parallelFor
and parallelReduce
. These are interfaces to a parallel for loop and reduce function (see ?Reduce
in R).
RcppParallel
also provides two accessor classes, RVector
and RMatrix
, which are thread-safe accessors for an Rcpp vector and matrix, helping to deal with the problem of accessing the same data structure across multiple threads.
Example: Matrix Transformations
Consider taking the log of every element in a large matrix. In R, this process is simple, since log
is a vectorised function, we can just run
log(A)
where A
is a matrix. This would also be easy to implement in Rcpp
using the std::transform
operator:
#include <Rcpp.h>
using namespace Rcpp;
// [[Rcpp:export]]
NumericMatrix MatrixLog(NumericMatrix A)
{
int n = A.nrow();
int d = A.ncol();
NumericMatrix out(n, d);
std::transform(A.begin(), A.end(), out.begin(), ::log);
return(out);
}
The std::transform
function applies a given function across a range of values, here these are specified as all the elements in the matrix A
, where the starting and ending point are supplied by A.begin()
and A.end()
. These transformed values are saved in out
, starting at out.begin()
. We can compare the speed of this function to an R implementation by applying both the base R log
function and MatrixLog
to a large matrix (and check that they give the same result.
library(Rcpp)
sourceCpp("MatrixLog.cpp")
d = 200
A = matrix(1:(d^2), d, d)
all.equal(log(A), MatrixLog(A))
## [1] TRUE
Okay, they are equal, this is a good first step.
library(microbenchmark)
microbenchmark(log(A), MatrixLog(A), times = 1000, unit="relative")
## Unit: relative
## expr min lq mean median uq max neval
## log(A) 1.008818 1.042968 0.980134 1.057816 0.979937 0.398702 1000
## MatrixLog(A) 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000 1000
So on average, the C++ implementation is actually around the same speed as the base R implementation. We can speed up the Rcpp
code through the use of parallel computing, using parallelFor
.
Firstly, parallelFor
has four arguments:
begin
: the beginning of the for loopend
: the end of the for loopworker
: an object of typeWorker
, where the operations are specifiedgrainSize
: minimal chunk size for parallelisation, minimum number of operations for each thread
Before defining the parallel code, parallelFor
needs a Worker
object to specify what processes to perform within the for loop. For this case, we need to create a worker that takes the log of each set of elements that are passed to each thread.
struct Log : public RcppParallel::Worker
{
const RcppParallel::RMatrix<double> input;
RcppParallel::RMatrix<double> output;
Log(const NumericMatrix input, NumericMatrix output)
: input(input), output(output) {}
void operator()(std::size_t begin, std::size_t end) {
std::transform(input.begin() + begin,
input.begin() + end,
output.begin() + begin,
::log);
}
};
Let’s break down this structure. Firstly, two objects of type RMatrix
are specified, for the input and output (recall that an RMatrix
is a thread-safe object given by RcppParallel
). Since different chunks of the matrix will be passed between threads, they need to be converted to this safe RMatrix
object before they are interacted with. Secondly, the Log
function is defined, so that these inputs and outputs are passed through.
Finally, the operator()
is the main part, which is what will natively be called by parallelFor
. This performs the same operation as what we saw before in MatrixLog
, with a few key differences. Namely the begin
and end
function inputs, which change the range that std::transform
is applied to based on the chunk of the matrix that parallelFor
will be giving this worker.
Now that this is set up, we can rewrite MatrixLog
in parallel:
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace Rcpp;
// [[Rcpp::depends(RcppParallel)]]
// [[Rcpp::export]]
NumericMatrix MatrixLogPar(NumericMatrix A) {
int n = A.nrow();
int d = A.ncol();
NumericMatrix output(n, d);
Log log_(A, output);
parallelFor(0, A.length(), log_);
return output;
}
This function is similar to the original MatrixLog
, however the std::transform
section has been replaced by the definition of the worker log_
(with class Log
), and then the call to parallelFor
. To reiterate, a worker is defined which has the pre-built operator that it will take the log of each element within the chunk that is specified to it. This worker is then spread out across multiple threads by the call to parallelFor
, and the output is saved in output
.
Now we can compare the speed!
sourceCpp("MatrixLogPar.cpp")
all.equal(log(A), MatrixLog(A), MatrixLogPar(A))
## [1] TRUE
microbenchmark(log(A), MatrixLog(A), MatrixLogPar(A), unit="relative", times=1000)
## Unit: relative
## expr min lq mean median uq max neval
## log(A) 2.339494 2.318958 1.749330 2.048938 1.820713 0.7575495 1000
## MatrixLog(A) 2.232776 2.197518 1.731507 1.977766 1.801228 0.8874397 1000
## MatrixLogPar(A) 1.000000 1.000000 1.000000 1.000000 1.000000 1.0000000 1000
So originally we had roughly the same processing time as the base R implementation. Now this is roughly twice as fast as base R!