Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
# In applying this license CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

o2_add_library(Framework
SOURCES src/AnalysisHelpers.cxx
src/AlgorithmSpec.cxx
Expand Down Expand Up @@ -178,6 +177,7 @@ o2_add_library(Framework
RapidJSON::RapidJSON
Arrow::arrow_shared
ArrowDataset::arrow_dataset_shared
$<$<TARGET_EXISTS:ArrowCompute::arrow_compute_shared>:ArrowCompute::arrow_compute_shared>
Microsoft.GSL::GSL
O2::FrameworkLogger
Gandiva::gandiva_shared
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/IndexBuilderHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ enum struct IndexKind : int {

namespace o2::framework
{
void cannotBuildAnArray();
void cannotBuildAnArray(const char* reason);
void cannotCreateIndexBuilder();

struct ChunkedArrayIterator {
Expand Down
81 changes: 56 additions & 25 deletions Framework/Core/src/IndexBuilderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
#include "Framework/IndexBuilderHelpers.h"
#include "Framework/CompilerBuiltins.h"
#include "Framework/VariantHelpers.h"
#include <arrow/compute/api_aggregate.h>
#include <arrow/util/config.h>
#if (ARROW_VERSION_MAJOR > 20)
#include <arrow/compute/initialize.h>
#endif
#include <arrow/compute/kernel.h>
#include <arrow/compute/api_aggregate.h>
#include <arrow/status.h>
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>

namespace o2::framework
{
void cannotBuildAnArray()
void cannotBuildAnArray(const char* reason)
{
throw framework::runtime_error("Cannot finish an array");
throw framework::runtime_error_f("Cannot finish an array: %s", reason);
}

void cannotCreateIndexBuilder()
Expand Down Expand Up @@ -62,10 +66,10 @@ SelfBuilder::SelfBuilder(arrow::MemoryPool* pool)
{
auto status = arrow::MakeBuilder(pool, arrow::int32(), &mBuilder);
if (!status.ok()) {
throw framework::runtime_error("Cannot create array builder for the self-index!");
throw framework::runtime_error_f("Cannot create array builder for the self-index: %s", status.ToString().c_str());
}
}
// static_cast<ChunkedArrayIterator*>(this)->reset(pool);

void SelfBuilder::reset(std::shared_ptr<arrow::ChunkedArray>)
{
mBuilder->Reset();
Expand All @@ -74,15 +78,18 @@ void SelfBuilder::reset(std::shared_ptr<arrow::ChunkedArray>)

void SelfBuilder::fill(int idx)
{
(void)static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(idx);
auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(idx);
if (!status.ok()) {
throw framework::runtime_error_f("Cannot append to self-index array: %s", status.ToString().c_str());
}
}

std::shared_ptr<arrow::ChunkedArray> SelfBuilder::result() const
{
std::shared_ptr<arrow::Array> array;
auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Finish(&array);
if (!status.ok()) {
cannotBuildAnArray();
cannotBuildAnArray(status.ToString().c_str());
}

return std::make_shared<arrow::ChunkedArray>(array);
Expand All @@ -93,7 +100,7 @@ SingleBuilder::SingleBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow:
{
auto status = arrow::MakeBuilder(pool, arrow::int32(), &mBuilder);
if (!status.ok()) {
throw framework::runtime_error("Cannot create array builder for the single-valued index!");
throw framework::runtime_error_f("Cannot create array builder for the single-valued index: %s", status.ToString().c_str());
}
}

Expand Down Expand Up @@ -126,10 +133,14 @@ bool SingleBuilder::find(int idx)

void SingleBuilder::fill(int idx)
{
arrow::Status status;
if (mPosition < mSourceSize && valueAt(mPosition) == idx) {
(void)static_cast<arrow::Int32Builder*>(mBuilder.get())->Append((int)mPosition);
status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Append((int)mPosition);
} else {
(void)static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(-1);
status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(-1);
}
if (!status.ok()) {
throw framework::runtime_error_f("Cannot append to array: %s", status.ToString().c_str());
}
}

Expand All @@ -138,22 +149,23 @@ std::shared_ptr<arrow::ChunkedArray> SingleBuilder::result() const
std::shared_ptr<arrow::Array> array;
auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Finish(&array);
if (!status.ok()) {
cannotBuildAnArray();
cannotBuildAnArray(status.ToString().c_str());
}
return std::make_shared<arrow::ChunkedArray>(array);
}

SliceBuilder::SliceBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool)
: ChunkedArrayIterator{source}
{
if (!preSlice().ok()) {
throw framework::runtime_error("Cannot pre-slice the source for slice-index building");
auto status = preSlice();
if (!status.ok()) {
throw framework::runtime_error_f("Cannot pre-slice the source for slice-index building: %s", status.ToString().c_str());
}

std::unique_ptr<arrow::ArrayBuilder> builder;
auto status = arrow::MakeBuilder(pool, arrow::int32(), &builder);
status = arrow::MakeBuilder(pool, arrow::int32(), &builder);
if (!status.ok()) {
throw framework::runtime_error("Cannot create array for the slice-index builder!");
throw framework::runtime_error_f("Cannot create array for the slice-index builder: %s", status.ToString().c_str());
}
mListBuilder = std::make_unique<arrow::FixedSizeListBuilder>(pool, std::move(builder), 2);
mValueBuilder = static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->value_builder();
Expand All @@ -166,8 +178,9 @@ void SliceBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
mListBuilder->Reset();
mValuePos = 0;
static_cast<ChunkedArrayIterator*>(this)->reset(source);
if (!preSlice().ok()) {
throw framework::runtime_error("Cannot pre-slice the source for slice-index building");
auto status = preSlice();
if (!status.ok()) {
throw framework::runtime_error_f("Cannot pre-slice the source for slice-index building: %s", status.ToString().c_str());
}
}

Expand Down Expand Up @@ -211,13 +224,21 @@ std::shared_ptr<arrow::ChunkedArray> SliceBuilder::result() const
std::shared_ptr<arrow::Array> array;
auto status = static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->Finish(&array);
if (!status.ok()) {
cannotBuildAnArray();
cannotBuildAnArray(status.ToString().c_str());
}
return std::make_shared<arrow::ChunkedArray>(array);
}

arrow::Status SliceBuilder::SliceBuilder::preSlice()
{
#if (ARROW_VERSION_MAJOR > 20)
auto status = arrow::compute::Initialize();
if (!status.ok()) {
throw framework::runtime_error_f("Cannot initialize arrow compute: %s", status.ToString().c_str());
}
#else
arrow::Status status;
#endif
arrow::Datum value_counts;
auto options = arrow::compute::ScalarAggregateOptions::Defaults();
ARROW_ASSIGN_OR_RAISE(value_counts, arrow::compute::CallFunction("value_counts", {mSource}, &options));
Expand All @@ -230,14 +251,15 @@ arrow::Status SliceBuilder::SliceBuilder::preSlice()
ArrayBuilder::ArrayBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool)
: ChunkedArrayIterator{source}
{
if (!preFind().ok()) {
throw framework::runtime_error("Cannot pre-find in a source for array-index building");
auto&& status = preFind();
if (!status.ok()) {
throw framework::runtime_error_f("Cannot pre-find in a source for array-index building: %s", status.ToString().c_str());
}

std::unique_ptr<arrow::ArrayBuilder> builder;
auto status = arrow::MakeBuilder(pool, arrow::int32(), &builder);
status = arrow::MakeBuilder(pool, arrow::int32(), &builder);
if (!status.ok()) {
throw framework::runtime_error("Cannot create array for the array-index builder!");
throw framework::runtime_error_f("Cannot create array for the array-index builder: %s", status.ToString().c_str());
}
mListBuilder = std::make_unique<arrow::ListBuilder>(pool, std::move(builder));
mValueBuilder = static_cast<arrow::ListBuilder*>(mListBuilder.get())->value_builder();
Expand All @@ -246,8 +268,9 @@ ArrayBuilder::ArrayBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::M
void ArrayBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
{
static_cast<ChunkedArrayIterator*>(this)->reset(source);
if (!preFind().ok()) {
throw framework::runtime_error("Cannot pre-find in a source for array-index building");
auto status = preFind();
if (!status.ok()) {
throw framework::runtime_error_f("Cannot pre-find in a source for array-index building: %s", status.ToString().c_str());
}
mValues.clear();
mIndices.clear();
Expand All @@ -274,13 +297,21 @@ std::shared_ptr<arrow::ChunkedArray> ArrayBuilder::result() const
std::shared_ptr<arrow::Array> array;
auto status = static_cast<arrow::ListBuilder*>(mListBuilder.get())->Finish(&array);
if (!status.ok()) {
cannotBuildAnArray();
cannotBuildAnArray(status.ToString().c_str());
}
return std::make_shared<arrow::ChunkedArray>(array);
}

arrow::Status ArrayBuilder::preFind()
{
#if (ARROW_VERSION_MAJOR > 20)
auto status = arrow::compute::Initialize();
if (!status.ok()) {
throw framework::runtime_error_f("Cannot initialize arrow compute: %s", status.ToString().c_str());
}
#else
arrow::Status status;
#endif
arrow::Datum max;
auto options = arrow::compute::ScalarAggregateOptions::Defaults();
ARROW_ASSIGN_OR_RAISE(max, arrow::compute::CallFunction("max", {mSource}, &options));
Expand Down
32 changes: 31 additions & 1 deletion dependencies/O2Dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,37 @@ if(NOT TARGET ArrowAcero::arrow_acero_shared)
)
endif()

if (NOT TARGET Gandiva::gandiva_shared)
string(REGEX MATCH "([0-9]+)\.*" ARROW_MAJOR "${ARROW_VERSION}")
if(${ARROW_MAJOR} GREATER 20)
if(NOT TARGET ArrowCompute::arrow_compute_shared)
# ArrowCompute::arrow_compute_shared is linked for no reason to parquet
# so we cannot use it because we do not want to build parquet itself.
# For that reason at the moment we need to do the lookup by hand.
get_target_property(ARROW_SHARED_LOCATION Arrow::arrow_shared LOCATION)
get_filename_component(ARROW_SHARED_DIR ${ARROW_SHARED_LOCATION} DIRECTORY)

find_library(ARROW_COMPUTE_SHARED arrow_compute
PATHS ${ARROW_SHARED_DIR}
NO_DEFAULT_PATH
)

if(ARROW_COMPUTE_SHARED)
message(STATUS
"Found arrow_compute_shared library at: ${ARROW_COMPUTE_SHARED}")
else()
message(FATAL_ERROR
"arrow_compute_shared library not found in ${ARROW_SHARED_DIR}")
endif()

# Step 3: Create a target for ArrowCompute::arrow_compute_shared
add_library(ArrowCompute::arrow_compute_shared SHARED IMPORTED)
set_target_properties(ArrowCompute::arrow_compute_shared PROPERTIES
IMPORTED_LOCATION ${ARROW_COMPUTE_SHARED}
)
endif()
endif()

if(NOT TARGET Gandiva::gandiva_shared)
add_library(Gandiva::gandiva_shared ALIAS gandiva_shared)
endif()

Expand Down
Loading