From 0b3c2504e3316537b29350ee7d4b51c6ac500cf8 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Wed, 14 Aug 2024 09:05:58 -0700 Subject: [PATCH 1/2] prepare for release --- DESCRIPTION | 2 +- NEWS.md | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 15125abc..dccfb25c 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: RcppParallel Type: Package Title: Parallel Programming Tools for 'Rcpp' -Version: 5.1.8.9000 +Version: 5.1.9 Authors@R: c( person("JJ", "Allaire", role = c("aut"), email = "jj@rstudio.com"), person("Romain", "Francois", role = c("aut", "cph")), diff --git a/NEWS.md b/NEWS.md index 290d8057..33f3db36 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,10 @@ +## RcppParallel 5.1.9 + +* RcppParallel no longer passes `-rpath` when building / linking on Windows. + This fixes build issues when building RcppParallel when using the LLVM + linker on Windows. (@kalibera) + ## RcppParallel 5.1.8 * RcppParallel now explicitly links to the bundled copy of TBB on macOS. (#206; @jeroen) From 8e3b5d5035ba92b132bebec6ec25114d0f1a8af8 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Wed, 27 Aug 2025 10:57:15 -0700 Subject: [PATCH 2/2] CRAN release 5.1.11 --- DESCRIPTION | 31 ++- NEWS.md | 10 + R/flags.R | 18 +- R/tbb.R | 72 ++++--- R/zzz.R | 2 +- inst/include/RcppParallel.h | 14 +- inst/include/RcppParallel/Common.h | 47 +++-- inst/include/RcppParallel/TBB.h | 307 +++++++++-------------------- inst/skeleton/vector-sum.cpp | 22 +-- inst/tests/cpp/innerproduct.cpp | 30 +-- inst/tests/cpp/sum.cpp | 26 +-- src/Makevars.in | 23 +-- src/options.cpp | 4 + src/tbb.cpp | 244 +++++++++++++++++++++++ src/tbb/include/tbb/task.h | 8 +- tools/config/configure.R | 17 +- 16 files changed, 531 insertions(+), 344 deletions(-) create mode 100644 src/tbb.cpp diff --git a/DESCRIPTION b/DESCRIPTION index dccfb25c..fae869d7 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: RcppParallel Type: Package Title: Parallel Programming Tools for 'Rcpp' -Version: 5.1.9 +Version: 5.1.11-1 Authors@R: c( person("JJ", "Allaire", role = c("aut"), email = "jj@rstudio.com"), person("Romain", "Francois", role = c("aut", "cph")), @@ -23,16 +23,29 @@ Description: High level functions for parallel programming with 'Rcpp'. a standard serial "for" loop into a parallel one and the 'parallelReduce()' function can be used for accumulating aggregate or other values. Depends: R (>= 3.0.2) -Suggests: - Rcpp, - RUnit, - knitr, - rmarkdown -Roxygen: list(markdown = TRUE) -SystemRequirements: GNU make, Intel TBB, Windows: cmd.exe and cscript.exe, Solaris: g++ is required +Suggests: Rcpp, RUnit, knitr, rmarkdown +SystemRequirements: GNU make, Intel TBB, Windows: cmd.exe and + cscript.exe, Solaris: g++ is required License: GPL (>= 3) -URL: https://rcppcore.github.io/RcppParallel/, https://github.com/RcppCore/RcppParallel +URL: https://rcppcore.github.io/RcppParallel/, + https://github.com/RcppCore/RcppParallel BugReports: https://github.com/RcppCore/RcppParallel/issues Biarch: TRUE RoxygenNote: 7.1.1 Encoding: UTF-8 +NeedsCompilation: yes +Packaged: 2025-01-23 23:26:25 UTC; kevin +Author: JJ Allaire [aut], + Romain Francois [aut, cph], + Kevin Ushey [aut, cre], + Gregory Vandenbrouck [aut], + Marcus Geelnard [aut, cph] (TinyThread library, + https://tinythreadpp.bitsnbites.eu/), + Hamada S. Badr [ctb] (), + Posit, PBC [cph], + Intel [aut, cph] (Intel TBB library, + https://www.threadingbuildingblocks.org/), + Microsoft [cph] +Maintainer: Kevin Ushey +Repository: CRAN +Date/Publication: 2025-01-24 02:00:02 UTC diff --git a/NEWS.md b/NEWS.md index 33f3db36..dfca7bda 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,14 @@ +## RcppParallel 5.1.11 + +* Compatibility with LLVM 21. + +## RcppParallel 5.1.10 + +* Fixed an issue where packages linking to RcppParallel could inadverently + depend on internals of the TBB library available during compilation, even + if the package did not explicitly use TBB itself. + ## RcppParallel 5.1.9 * RcppParallel no longer passes `-rpath` when building / linking on Windows. diff --git a/R/flags.R b/R/flags.R index 32f64184..6ae1f828 100644 --- a/R/flags.R +++ b/R/flags.R @@ -1,32 +1,32 @@ #' Compilation flags for RcppParallel -#' +#' #' Output the compiler or linker flags required to build against RcppParallel. -#' +#' #' These functions are typically called from `Makevars` as follows: -#' +#' #' ``` #' PKG_LIBS += $(shell "${R_HOME}/bin/Rscript" -e "RcppParallel::LdFlags()") #' ``` -#' +#' #' On Windows, the flags ensure that the package links with the built-in TBB #' library. On Linux and macOS, the output is empty, because TBB is loaded #' dynamically on load by `RcppParallel`. -#' +#' #' \R packages using RcppParallel should also add the following to their #' `NAMESPACE` file: -#' +#' #' ``` #' importFrom(RcppParallel, RcppParallelLibs) #' ``` -#' +#' #' This is necessary to ensure that \pkg{RcppParallel} (and so, TBB) is loaded #' and available. -#' +#' #' @name flags #' @rdname flags #' @aliases RcppParallelLibs LdFlags CxxFlags -#' +#' #' @return Returns \code{NULL}, invisibly. These functions are called for #' their side effects (writing the associated flags to stdout). #' diff --git a/R/tbb.R b/R/tbb.R index 4c8fc14c..87362db9 100644 --- a/R/tbb.R +++ b/R/tbb.R @@ -1,26 +1,26 @@ #' Get the Path to a TBB Library -#' +#' #' Retrieve the path to a TBB library. This can be useful for \R packages #' using RcppParallel that wish to use, or re-use, the version of TBB that #' RcppParallel has been configured to use. -#' +#' #' @param name #' The name of the TBB library to be resolved. Normally, this is one of #' `tbb`, `tbbmalloc`, or `tbbmalloc_proxy`. When `NULL`, the library #' path containing the TBB libraries is returned instead. -#' +#' #' @export tbbLibraryPath <- function(name = NULL) { - + # library paths for different OSes sysname <- Sys.info()[["sysname"]] - + # find root for TBB install tbbRoot <- Sys.getenv("TBB_LIB", unset = tbbRoot()) if (is.null(name)) return(tbbRoot) - + # form library names tbbLibNames <- list( "Darwin" = paste0("lib", name, ".dylib"), @@ -28,12 +28,12 @@ tbbLibraryPath <- function(name = NULL) { "SunOS" = paste0("lib", name, ".so"), "Linux" = paste0("lib", name, c(".so.2", ".so")) ) - + # skip systems that we know not to be compatible isCompatible <- !is_sparc() && !is.null(tbbLibNames[[sysname]]) if (!isCompatible) return(NULL) - + # find the request library (if any) libNames <- tbbLibNames[[sysname]] for (libName in libNames) { @@ -41,13 +41,13 @@ tbbLibraryPath <- function(name = NULL) { if (file.exists(tbbName)) return(tbbName) } - + } tbbCxxFlags <- function() { - + flags <- character() - + # opt-in to TBB on Windows if (is_windows()) { flags <- c(flags, "-DRCPP_PARALLEL_USE_TBB=1") @@ -57,37 +57,57 @@ tbbCxxFlags <- function() { flags <- c(flags, "-DTBB_USE_GCC_BUILTINS") } } - + # if TBB_INC is set, apply those library paths tbbInc <- Sys.getenv("TBB_INC", unset = TBB_INC) if (nzchar(tbbInc)) { - + # add include path flags <- c(flags, paste0("-I", asBuildPath(tbbInc))) - + # prefer new interface if version.h exists versionPath <- file.path(tbbInc, "tbb/version.h") if (file.exists(versionPath)) flags <- c(flags, "-DTBB_INTERFACE_NEW") - + } - + # return flags as string paste(flags, collapse = " ") - + } # Return the linker flags required for TBB on this platform tbbLdFlags <- function() { - + + tbbFlags <- tbbLdFlagsImpl() + + if (is_windows()) { + libDir <- system.file("libs", .Platform$r_arch, package = "RcppParallel") + libName <- paste0("RcppParallel", .Platform$dynlib.ext) + newFlags <- sprintf("-L%s -lRcppParallel", shQuote(libDir)) + tbbFlags <- paste(newFlags, tbbFlags) + } + + tbbFlags + +} + +tbbLdFlagsImpl <- function() { + # shortcut if TBB_LIB defined tbbLib <- Sys.getenv("TBB_LINK_LIB", Sys.getenv("TBB_LIB", unset = TBB_LIB)) if (nzchar(tbbLib)) { - fmt <- if (is_windows()) "-L%1$s -ltbb -ltbbmalloc" - else "-L%1$s -Wl,-rpath,%1$s -ltbb -ltbbmalloc" + + fmt <- if (is_windows()) { + "-L%1$s -ltbb -ltbbmalloc" + } else { + "-L%1$s -Wl,-rpath,%1$s -ltbb -ltbbmalloc" + } + return(sprintf(fmt, asBuildPath(tbbLib))) } - + # on Mac, Windows and Solaris, we need to explicitly link (#206) needsExplicitFlags <- is_mac() || is_windows() || (is_solaris() && !is_sparc()) if (needsExplicitFlags) { @@ -95,20 +115,20 @@ tbbLdFlags <- function() { libFlag <- paste0("-L", libPath) return(paste(libFlag, "-ltbb", "-ltbbmalloc")) } - + # nothing required on other platforms "" - + } tbbRoot <- function() { - + if (nzchar(TBB_LIB)) return(TBB_LIB) - + rArch <- .Platform$r_arch parts <- c("lib", if (nzchar(rArch)) rArch) libDir <- paste(parts, collapse = "/") system.file(libDir, package = "RcppParallel") - + } diff --git a/R/zzz.R b/R/zzz.R index 31c40a3e..3116939c 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -37,7 +37,7 @@ loadTbbLibrary <- function(name) { .tbbMallocProxyDllInfo <<- loadTbbLibrary("tbbmalloc_proxy") # load RcppParallel library if available - .dllInfo <<- library.dynam("RcppParallel", pkgname, libname) + .dllInfo <<- library.dynam("RcppParallel", pkgname, libname, local = FALSE) } diff --git a/inst/include/RcppParallel.h b/inst/include/RcppParallel.h index cbd5ef45..af26afc9 100644 --- a/inst/include/RcppParallel.h +++ b/inst/include/RcppParallel.h @@ -6,7 +6,7 @@ #include "RcppParallel/TinyThread.h" // Use TBB only where it's known to compile and work correctly -// (NOTE: Windows TBB is temporarily opt-in for packages for +// (NOTE: Windows TBB is temporarily opt-in for packages for // compatibility with CRAN packages not previously configured // to link to TBB in Makevars.win) #ifndef RCPP_PARALLEL_USE_TBB @@ -32,14 +32,14 @@ namespace RcppParallel { inline void parallelFor(std::size_t begin, - std::size_t end, + std::size_t end, Worker& worker, std::size_t grainSize = 1, int numThreads = -1) { - grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1u); + grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, std::size_t(1)); numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1); - + #if RCPP_PARALLEL_USE_TBB if (internal::backend() == internal::BACKEND_TBB) tbbParallelFor(begin, end, worker, grainSize, numThreads); @@ -52,14 +52,14 @@ inline void parallelFor(std::size_t begin, template inline void parallelReduce(std::size_t begin, - std::size_t end, + std::size_t end, Reducer& reducer, std::size_t grainSize = 1, int numThreads = -1) { - grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1); + grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, std::size_t(1)); numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1); - + #if RCPP_PARALLEL_USE_TBB if (internal::backend() == internal::BACKEND_TBB) tbbParallelReduce(begin, end, reducer, grainSize, numThreads); diff --git a/inst/include/RcppParallel/Common.h b/inst/include/RcppParallel/Common.h index 0e4c0e28..e4bd1e99 100644 --- a/inst/include/RcppParallel/Common.h +++ b/inst/include/RcppParallel/Common.h @@ -5,6 +5,11 @@ #include #include +#include +#include +#include +#include + namespace RcppParallel { template @@ -13,53 +18,65 @@ inline int resolveValue(const char* envvar, U defaultValue) { // if the requested value is non-zero and not the default, we can use it - if (requestedValue != defaultValue && requestedValue > 0) + if (requestedValue != static_cast(defaultValue) && requestedValue > 0) return requestedValue; - + // otherwise, try reading the default from associated envvar // if the environment variable is unset, use the default const char* var = getenv(envvar); if (var == NULL) return defaultValue; - + // try to convert the string to a number // if an error occurs during conversion, just use default errno = 0; char* end; long value = strtol(var, &end, 10); - + // check for conversion failure if (end == var || *end != '\0' || errno == ERANGE) return defaultValue; - - // okay, return the parsed environment variable value + + // okay, return the parsed environment variable value return value; } +// Tag type used for disambiguating splitting constructors +struct Split {}; + // Work executed within a background thread. We implement dynamic // dispatch using vtables so we can have a stable type to cast // to from the void* passed to the worker thread (required because // the tinythreads interface allows to pass only a void* to the // thread main rather than a generic type / template) - -struct Worker -{ +struct Worker +{ // construct and destruct (delete virtually) Worker() {} virtual ~Worker() {} - + // dispatch work over a range of values - virtual void operator()(std::size_t begin, std::size_t end) = 0; - - // disable copying and assignment + virtual void operator()(std::size_t begin, std::size_t end) = 0; + private: + // disable copying and assignment Worker(const Worker&); void operator=(const Worker&); }; -// Tag type used for disambiguating splitting constructors +// Used for controlling the stack size for threads / tasks within a scope. +class ThreadStackSizeControl +{ +public: + ThreadStackSizeControl(); + ~ThreadStackSizeControl(); + +private: + // COPYING: not copyable + ThreadStackSizeControl(const ThreadStackSizeControl&); + ThreadStackSizeControl& operator=(const ThreadStackSizeControl&); +}; -struct Split {}; } // namespace RcppParallel diff --git a/inst/include/RcppParallel/TBB.h b/inst/include/RcppParallel/TBB.h index f276f75e..3f63ac5d 100644 --- a/inst/include/RcppParallel/TBB.h +++ b/inst/include/RcppParallel/TBB.h @@ -7,252 +7,127 @@ # define TBB_PREVIEW_GLOBAL_CONTROL 1 #endif -#include -#include -#include +#include "tbb/blocked_range.h" +#include "tbb/concurrent_unordered_set.h" +#include "tbb/concurrent_unordered_map.h" +#include "tbb/global_control.h" +#include "tbb/mutex.h" +#include "tbb/parallel_for.h" +#include "tbb/parallel_for_each.h" +#include "tbb/parallel_reduce.h" +#include "tbb/parallel_sort.h" +#include "tbb/spin_mutex.h" namespace RcppParallel { -namespace { +// This class is primarily used to implement type erasure. The goals here were: +// +// 1. Hide the tbb symbols / implementation details from client R packages. +// That is, they should get the tools they need only via RcppParallel. +// +// 2. Do this in a way that preserves binary compatibility with pre-existing +// classes that make use of parallelReduce(). +// +// 3. Ensure that those packages, when re-compiled without source changes, +// can still function as expected. +// +// The downside here is that all the indirection through std::function<> +// and the requirement for RTTI is probably expensive, but I couldn't find +// a better way forward that could also preserve binary compatibility with +// existing pre-built pacakges. +// +// Hopefully, in a future release, we can do away with this wrapper, once +// packages have been rebuilt and no longer implicitly depend on TBB internals. +struct ReducerWrapper { + + template + ReducerWrapper(T* reducer) + { + self_ = reinterpret_cast(reducer); + owned_ = false; -struct TBBWorker -{ - explicit TBBWorker(Worker& worker) : worker_(worker) {} - - void operator()(const tbb::blocked_range& r) const { - worker_(r.begin(), r.end()); - } + work_ = [&](void* self, std::size_t begin, std::size_t end) + { + (*reinterpret_cast(self))(begin, end); + }; -private: - Worker& worker_; -}; + split_ = [&](void* object, Split split) + { + return new T(*reinterpret_cast(object), split); + }; -template -struct TBBReducer -{ - explicit TBBReducer(Reducer& reducer) - : pSplitReducer_(NULL), reducer_(reducer) - { - } - - TBBReducer(TBBReducer& tbbReducer, tbb::split) - : pSplitReducer_(new Reducer(tbbReducer.reducer_, RcppParallel::Split())), - reducer_(*pSplitReducer_) - { - } - - virtual ~TBBReducer() { delete pSplitReducer_; } + join_ = [&](void* self, void* other) + { + (*reinterpret_cast(self)).join(*reinterpret_cast(other)); + }; - void operator()(const tbb::blocked_range& r) { - reducer_(r.begin(), r.end()); - } - - void join(const TBBReducer& tbbReducer) { - reducer_.join(tbbReducer.reducer_); + deleter_ = [&](void* object) + { + delete (T*) object; + }; } - -private: - Reducer* pSplitReducer_; - Reducer& reducer_; -}; -class TBBParallelForExecutor -{ -public: - - TBBParallelForExecutor(Worker& worker, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : worker_(worker), - begin_(begin), - end_(end), - grainSize_(grainSize) + ~ReducerWrapper() { + if (owned_) + { + deleter_(self_); + self_ = nullptr; + } } - - void operator()() const - { - TBBWorker tbbWorker(worker_); - tbb::parallel_for( - tbb::blocked_range(begin_, end_, grainSize_), - tbbWorker - ); - } - -private: - Worker& worker_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; -template -class TBBParallelReduceExecutor -{ -public: - - TBBParallelReduceExecutor(Reducer& reducer, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : reducer_(reducer), - begin_(begin), - end_(end), - grainSize_(grainSize) + void operator()(std::size_t begin, std::size_t end) const { + work_(self_, begin, end); } - - void operator()() const - { - TBBReducer tbbReducer(reducer_); - tbb::parallel_reduce( - tbb::blocked_range(begin_, end_, grainSize_), - tbbReducer - ); - } - -private: - Reducer& reducer_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; -class TBBArenaParallelForExecutor -{ -public: - - TBBArenaParallelForExecutor(tbb::task_group& group, - Worker& worker, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : group_(group), - worker_(worker), - begin_(begin), - end_(end), - grainSize_(grainSize) + ReducerWrapper(const ReducerWrapper& rhs, Split split) { - } - - void operator()() const - { - TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_); - group_.run_and_wait(executor); - } - -private: - - tbb::task_group& group_; - Worker& worker_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; + self_ = rhs.split_(rhs.self_, split); + owned_ = true; -template -class TBBArenaParallelReduceExecutor -{ -public: - - TBBArenaParallelReduceExecutor(tbb::task_group& group, - Reducer& reducer, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : group_(group), - reducer_(reducer), - begin_(begin), - end_(end), - grainSize_(grainSize) - { - } - - void operator()() const - { - TBBParallelReduceExecutor executor(reducer_, begin_, end_, grainSize_); - group_.run_and_wait(executor); + work_ = rhs.work_; + split_ = rhs.split_; + join_ = rhs.join_; + deleter_ = rhs.deleter_; } - -private: - - tbb::task_group& group_; - Reducer& reducer_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; -class ThreadStackSizeControl -{ -public: - - ThreadStackSizeControl() - : control_(nullptr) + void join(const ReducerWrapper& rhs) const { - int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0); - if (stackSize > 0) - { - control_ = new tbb::global_control( - tbb::global_control::thread_stack_size, - stackSize - ); - } - } - - ~ThreadStackSizeControl() - { - if (control_ != nullptr) - { - delete control_; - control_ = nullptr; - } + join_(self_, rhs.self_); } private: - - // COPYING: not copyable - ThreadStackSizeControl(const ThreadStackSizeControl&); - ThreadStackSizeControl& operator=(const ThreadStackSizeControl&); - - // private members - tbb::global_control* control_; - + void* self_ = nullptr; + bool owned_ = false; + + std::function work_; + std::function split_; + std::function join_; + std::function deleter_; }; - -} // anonymous namespace +void tbbParallelFor(std::size_t begin, + std::size_t end, + Worker& worker, + std::size_t grainSize = 1, + int numThreads = -1); -inline void tbbParallelFor(std::size_t begin, - std::size_t end, - Worker& worker, +void tbbParallelReduceImpl(std::size_t begin, + std::size_t end, + ReducerWrapper& wrapper, std::size_t grainSize = 1, - int numThreads = tbb::task_arena::automatic) -{ - ThreadStackSizeControl control; - - tbb::task_arena arena(numThreads); - tbb::task_group group; - - TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize); - arena.execute(executor); -} + int numThreads = -1); template -inline void tbbParallelReduce(std::size_t begin, - std::size_t end, - Reducer& reducer, - std::size_t grainSize = 1, - int numThreads = tbb::task_arena::automatic) +void tbbParallelReduce(std::size_t begin, + std::size_t end, + Reducer& reducer, + std::size_t grainSize = 1, + int numThreads = -1) { - ThreadStackSizeControl control; - - tbb::task_arena arena(numThreads); - tbb::task_group group; - - TBBArenaParallelReduceExecutor executor(group, reducer, begin, end, grainSize); - arena.execute(executor); + ReducerWrapper wrapper(&reducer); + tbbParallelReduceImpl(begin, end, wrapper, grainSize, numThreads); } } // namespace RcppParallel diff --git a/inst/skeleton/vector-sum.cpp b/inst/skeleton/vector-sum.cpp index 30622e51..57b54165 100644 --- a/inst/skeleton/vector-sum.cpp +++ b/inst/skeleton/vector-sum.cpp @@ -19,37 +19,37 @@ using namespace Rcpp; using namespace RcppParallel; struct Sum : public Worker -{ +{ // source vector const RVector input; - + // accumulated value double value; - + // constructors Sum(const NumericVector input) : input(input), value(0) {} Sum(const Sum& sum, Split) : input(sum.input), value(0) {} - + // accumulate just the element of the range I've been asked to void operator()(std::size_t begin, std::size_t end) { value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0); } - + // join my value with that of another Sum - void join(const Sum& rhs) { - value += rhs.value; + void join(const Sum& rhs) { + value += rhs.value; } }; // [[Rcpp::export]] double parallelVectorSum(NumericVector x) { - - // declare the SumBody instance + + // declare the SumBody instance Sum sum(x); - + // call parallel_reduce to start the work parallelReduce(0, x.length(), sum); - + // return the computed sum return sum.value; } diff --git a/inst/tests/cpp/innerproduct.cpp b/inst/tests/cpp/innerproduct.cpp index 7a195720..fee3e41c 100644 --- a/inst/tests/cpp/innerproduct.cpp +++ b/inst/tests/cpp/innerproduct.cpp @@ -19,43 +19,43 @@ double innerProduct(NumericVector x, NumericVector y) { using namespace RcppParallel; struct InnerProduct : public Worker -{ +{ // source vectors const RVector x; const RVector y; - + // product that I have accumulated double product; - + // constructors - InnerProduct(const NumericVector x, const NumericVector y) + InnerProduct(const NumericVector x, const NumericVector y) : x(x), y(y), product(0) {} - InnerProduct(const InnerProduct& innerProduct, Split) + InnerProduct(const InnerProduct& innerProduct, Split) : x(innerProduct.x), y(innerProduct.y), product(0) {} - + // process just the elements of the range I have been asked to void operator()(std::size_t begin, std::size_t end) { - product += std::inner_product(x.begin() + begin, - x.begin() + end, - y.begin() + begin, + product += std::inner_product(x.begin() + begin, + x.begin() + end, + y.begin() + begin, 0.0); } - + // join my value with that of another InnerProduct - void join(const InnerProduct& rhs) { - product += rhs.product; + void join(const InnerProduct& rhs) { + product += rhs.product; } }; // [[Rcpp::export]] double parallelInnerProduct(NumericVector x, NumericVector y) { - + // declare the InnerProduct instance that takes a pointer to the vector data InnerProduct innerProduct(x, y); - + // call paralleReduce to start the work parallelReduce(0, x.length(), innerProduct); - + // return the computed product return innerProduct.product; } diff --git a/inst/tests/cpp/sum.cpp b/inst/tests/cpp/sum.cpp index aec4895f..db47c699 100644 --- a/inst/tests/cpp/sum.cpp +++ b/inst/tests/cpp/sum.cpp @@ -3,7 +3,7 @@ * @author JJ Allaire * @license GPL (>= 2) */ - + #include #include @@ -12,37 +12,37 @@ using namespace RcppParallel; using namespace Rcpp; struct Sum : public Worker -{ +{ // source vector const RVector input; - + // accumulated value double value; - + // constructors Sum(const NumericVector input) : input(input), value(0) {} Sum(const Sum& sum, Split) : input(sum.input), value(0) {} - + // accumulate just the element of the range I have been asked to void operator()(std::size_t begin, std::size_t end) { value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0); } - + // join my value with that of another Sum - void join(const Sum& rhs) { - value += rhs.value; - } + void join(const Sum& rhs) { + value += rhs.value; + } }; // [[Rcpp::export]] double parallelVectorSum(NumericVector x) { - - // declare the SumBody instance + + // declare the SumBody instance Sum sum(x); - + // call parallel_reduce to start the work parallelReduce(0, x.length(), sum); - + // return the computed sum return sum.value; } diff --git a/src/Makevars.in b/src/Makevars.in index c3171236..be8445f8 100644 --- a/src/Makevars.in +++ b/src/Makevars.in @@ -38,15 +38,15 @@ ifeq ($(OS), Windows_NT) MAKE = make MAKEFLAGS = -e -j1 - MAKE_CMD = \ - MSYS2_ARG_CONV_EXCL="*" \ - CYGWIN=nodosfilewarning \ - CONLY="@WINDOWS_CC@" \ - CPLUS="@WINDOWS_CXX11@" \ - CXXFLAGS="$(TBB_CXXFLAGS)" \ - PIC_KEY="@CXX11PICFLAGS@" \ - WARNING_SUPPRESS="" \ - WINARM64_CLANG="$(WINARM64_CLANG)" \ + MAKE_CMD = \ + MSYS2_ARG_CONV_EXCL="*" \ + CYGWIN=nodosfilewarning \ + CONLY="@WINDOWS_CC@" \ + CPLUS="@WINDOWS_CXX11@" \ + CXXFLAGS="$(TBB_CXXFLAGS)" \ + PIC_KEY="@CXX11PICFLAGS@" \ + WARNING_SUPPRESS="" \ + WINARM64_CLANG="$(WINARM64_CLANG)" \ $(MAKE) else @@ -71,7 +71,7 @@ else endif MAKEFLAGS += -e - MAKE_CMD = \ + MAKE_CMD = \ CONLY="@CC@ $(PKG_CPPFLAGS) @CPPFLAGS@" \ CPLUS="@CXX11@ $(PKG_CPPFLAGS) @CPPFLAGS@" \ CXXFLAGS="@CXX11FLAGS@ -DTBB_NO_LEGACY=1" \ @@ -93,6 +93,7 @@ ifeq ($(USE_TBB), Windows) # rtools: turn on hacks to compensate for make and shell differences rtools<=>MinGW # compiler: overwrite default (which is cl = MS compiler) MAKE_ARGS += rtools=true compiler=gcc + # TBB configure will detect mingw runtime with unknown arch on WINARM64_CLANG but not an # issue as we are using compiler built-ins instead of arch-specific code ifneq ($(WINARM64_CLANG), true) @@ -160,7 +161,7 @@ else @mkdir -p ../inst/include @cp -R tbb/include/* ../inst/include/ @(cd tbb/src && $(MAKE_CMD) $(MAKE_ARGS) info) - @(cd tbb/src && $(MAKE_CMD) @STDVER@ $(MAKE_ARGS) $(MAKE_TARGETS) $(MAKE_LOG)) + @(cd tbb/src && $(MAKE_CMD) $(MAKE_ARGS) $(MAKE_TARGETS) $(MAKE_LOG)) endif # NOTE: we do not want to clean ../inst/lib or ../inst/libs here, diff --git a/src/options.cpp b/src/options.cpp index 4f9de2e2..1d654cf7 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -8,6 +8,10 @@ #include #include +#include +#include +#include + extern "C" SEXP defaultNumThreads() { SEXP threadsSEXP = Rf_allocVector(INTSXP, 1); #ifdef __TBB_task_arena_H diff --git a/src/tbb.cpp b/src/tbb.cpp new file mode 100644 index 00000000..e617a692 --- /dev/null +++ b/src/tbb.cpp @@ -0,0 +1,244 @@ + +#if RCPP_PARALLEL_USE_TBB + +#include +#include + +#include +#include +#include + +namespace RcppParallel { + +tbb::global_control* s_globalControl = nullptr; + +// TBB Tools ---- + +struct TBBWorker +{ + explicit TBBWorker(Worker& worker) : worker_(worker) {} + + void operator()(const tbb::blocked_range& r) const { + worker_(r.begin(), r.end()); + } + +private: + Worker& worker_; +}; + +ThreadStackSizeControl::ThreadStackSizeControl() +{ + int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0); + if (stackSize > 0) + { + s_globalControl = new tbb::global_control( + tbb::global_control::thread_stack_size, + stackSize + ); + } +} + +ThreadStackSizeControl::~ThreadStackSizeControl() +{ + if (s_globalControl != nullptr) + { + delete s_globalControl; + s_globalControl = nullptr; + } +} + + +// TBB Parallel For ---- + +class TBBParallelForExecutor +{ +public: + + TBBParallelForExecutor(Worker& worker, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : worker_(worker), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBWorker tbbWorker(worker_); + tbb::parallel_for( + tbb::blocked_range(begin_, end_, grainSize_), + tbbWorker + ); + } + +private: + Worker& worker_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +class TBBArenaParallelForExecutor +{ +public: + + TBBArenaParallelForExecutor(tbb::task_group& group, + Worker& worker, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : group_(group), + worker_(worker), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_); + group_.run_and_wait(executor); + } + +private: + + tbb::task_group& group_; + Worker& worker_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +void tbbParallelFor(std::size_t begin, + std::size_t end, + Worker& worker, + std::size_t grainSize, + int numThreads) +{ + ThreadStackSizeControl control; + + tbb::task_group group; + TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize); + + tbb::task_arena arena(numThreads); + arena.execute(executor); +} + + +// TBB Parallel Reduce ---- + +struct TBBReducer +{ + explicit TBBReducer(ReducerWrapper& reducer) + : pSplitReducer_(NULL), reducer_(reducer) + { + } + + TBBReducer(TBBReducer& tbbReducer, tbb::split) + : pSplitReducer_(new ReducerWrapper(tbbReducer.reducer_, RcppParallel::Split())), + reducer_(*pSplitReducer_) + { + } + + virtual ~TBBReducer() { delete pSplitReducer_; } + + void operator()(const tbb::blocked_range& r) + { + reducer_(r.begin(), r.end()); + } + + void join(const TBBReducer& tbbReducer) + { + reducer_.join(tbbReducer.reducer_); + } + +private: + ReducerWrapper* pSplitReducer_; + ReducerWrapper& reducer_; +}; + +class TBBParallelReduceExecutor +{ +public: + + TBBParallelReduceExecutor(ReducerWrapper& reducer, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : reducer_(reducer), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBReducer tbbReducer(reducer_); + tbb::parallel_reduce( + tbb::blocked_range(begin_, end_, grainSize_), + tbbReducer + ); + } + +private: + ReducerWrapper& reducer_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +class TBBArenaParallelReduceExecutor +{ +public: + + TBBArenaParallelReduceExecutor(tbb::task_group& group, + ReducerWrapper& reducer, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : group_(group), + reducer_(reducer), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBParallelReduceExecutor executor(reducer_, begin_, end_, grainSize_); + group_.run_and_wait(executor); + } + +private: + + tbb::task_group& group_; + ReducerWrapper& reducer_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +void tbbParallelReduceImpl(std::size_t begin, + std::size_t end, + ReducerWrapper& reducer, + std::size_t grainSize, + int numThreads) +{ + ThreadStackSizeControl control; + + tbb::task_group group; + TBBArenaParallelReduceExecutor executor(group, reducer, begin, end, grainSize); + + tbb::task_arena arena(numThreads); + arena.execute(executor); +} + +} // end namespace RcppParallel + +#endif /* RCPP_PARALLEL_USE_TBB */ diff --git a/src/tbb/include/tbb/task.h b/src/tbb/include/tbb/task.h index a57200c1..4722d478 100644 --- a/src/tbb/include/tbb/task.h +++ b/src/tbb/include/tbb/task.h @@ -350,7 +350,10 @@ class task_group_context : internal::no_copy { public: enum kind_type { isolated, - bound + bound, + binding_completed, + detached, + dying }; enum traits_type { @@ -561,9 +564,6 @@ class task_group_context : internal::no_copy { friend class internal::allocate_root_with_context_proxy; static const kind_type binding_required = bound; - static const kind_type binding_completed = kind_type(bound+1); - static const kind_type detached = kind_type(binding_completed+1); - static const kind_type dying = kind_type(detached+1); //! Propagates any state change detected to *this, and as an optimisation possibly also upward along the heritage line. template diff --git a/tools/config/configure.R b/tools/config/configure.R index e057c466..1f4f62f7 100644 --- a/tools/config/configure.R +++ b/tools/config/configure.R @@ -50,6 +50,16 @@ broken <- if (broken) cxxflags <- gsub("-Werror=format-security", "-Wformat -Werror=format-security", cxxflags) +# add C++ standard if not set +if (!grepl("-std=", cxxflags, fixed = TRUE)) { + stdflag <- if (getRversion() < "4.0") { + "-std=c++0x" + } else { + "$(CXX11STD)" + } + cxxflags <- paste(stdflag, cxxflags) +} + # avoid including /usr/local/include, as this can cause # RcppParallel to find and use a version of libtbb installed # there as opposed to the bundled version @@ -115,13 +125,6 @@ if (info[["sysname"]] == "Windows") { } -# use c++0x for compatibility with older compilers -if (getRversion() < "4.0") { - define(STDVER = "stdver=c++0x") -} else { - define(STDVER = "") -} - # on Solaris, check if we're using gcc or g++ if (Sys.info()[["sysname"]] == "SunOS") { cxx <- r_cmd_config("CXX")