Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Framework/AnalysisSupport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ o2_add_test(DataInputDirector NAME test_Framework_test_DataInputDirector
LABELS framework
PUBLIC_LINK_LIBRARIES O2::FrameworkAnalysisSupport)

add_executable(o2-test-framework-analysis-support
test/test_NavigateToLevel.cxx)
target_link_libraries(o2-test-framework-analysis-support PRIVATE O2::FrameworkAnalysisSupport O2::Catch2)

get_filename_component(outdir ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/../tests ABSOLUTE)
set_property(TARGET o2-test-framework-analysis-support PROPERTY RUNTIME_OUTPUT_DIRECTORY ${outdir})

add_test(NAME framework:analysis-support COMMAND o2-test-framework-analysis-support)

o2_add_test(TableToTree NAME benchmark_TableToTree
SOURCES test/benchmark_TableToTree.cxx
COMPONENT_NAME Framework
Expand Down
34 changes: 29 additions & 5 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
// or submit itself to any jurisdiction.

#include "AODJAlienReaderHelpers.h"
#include <charconv>
#include <memory>
#include <ranges>
#include <vector>
#include "Framework/TableTreeHelpers.h"
#include "Framework/AnalysisHelpers.h"
#include "Framework/DataProcessingStats.h"
Expand Down Expand Up @@ -111,10 +114,31 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
if (ctx.options().isSet("aod-parent-access-level")) {
parentAccessLevel = ctx.options().get<int>("aod-parent-access-level");
}
auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel](ConfigParamRegistry const& options,
DeviceSpec const& spec,
Monitoring& monitoring,
DataProcessingStats& stats) {
std::vector<std::pair<std::string, int>> originLevelMapping;
if (ctx.options().isSet("aod-origin-level-mapping")) {
auto originLevelMappingStr = ctx.options().get<std::string>("aod-origin-level-mapping");
for (auto pairRange : originLevelMappingStr | std::views::split(',')) {
std::string_view pair{pairRange.begin(), pairRange.end()};
auto colonPos = pair.find(':');
if (colonPos == std::string_view::npos) {
LOGP(fatal, "Badly formatted aod-origin-level-mapping entry: \"{}\"", pair);
continue;
}
std::string key(pair.substr(0, colonPos));
std::string_view valueStr = pair.substr(colonPos + 1);
int value{};
auto [ptr, ec] = std::from_chars(valueStr.data(), valueStr.data() + valueStr.size(), value);
if (ec == std::errc{}) {
originLevelMapping.emplace_back(std::move(key), value);
} else {
LOGP(fatal, "Unable to parse level in aod-origin-level-mapping entry: \"{}\"", pair);
}
}
}
auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel, originLevelMapping](ConfigParamRegistry const& options,
DeviceSpec const& spec,
Monitoring& monitoring,
DataProcessingStats& stats) {
// FIXME: not actually needed, since data processing stats can specify that we should
// send the initial value.
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0});
Expand All @@ -134,7 +158,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
auto maxRate = options.get<float>("aod-max-io-rate");

// create a DataInputDirector
auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement});
auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement, originLevelMapping});
if (options.isSet("aod-reader-json")) {
auto jsonFile = options.get<std::string>("aod-reader-json");
if (!didir->readJson(jsonFile)) {
Expand Down
92 changes: 76 additions & 16 deletions Framework/AnalysisSupport/src/DataInputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void DataInputDescriptor::addFileNameHolder(FileNameHolder* fn)
mfilenames.emplace_back(fn);
}

bool DataInputDescriptor::setFile(int counter, std::string_view origin)
bool DataInputDescriptor::setFile(int counter, int wantedParentLevel, std::string_view origin)
{
// no files left
if (counter >= getNumberInputfiles()) {
Expand All @@ -133,7 +133,9 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
// of the filename. In the future we might expand this for proper rewriting of the
// filename based on the origin and the original file information.
std::string filename = mfilenames[counter]->fileName;
if (!origin.starts_with("AOD")) {
// In case we do not need to remap parent levels, the requested origin is what
// drives the filename.
if (wantedParentLevel == -1 && !origin.starts_with("AOD")) {
filename = std::regex_replace(filename, std::regex("[.]root$"), fmt::format("_{}.root", origin));
}

Expand All @@ -146,7 +148,19 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
closeInputFile();
}

mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory);
TFile* tfile = nullptr;
bool externalFile = false;
for (auto& [name, f] : mContext.openFiles) {
if (name == filename) {
tfile = f;
externalFile = true;
break;
}
}
if (tfile == nullptr) {
tfile = TFile::Open(filename.c_str());
}
mCurrentFilesystem = std::make_shared<TFileFileSystem>(tfile, 50 * 1024 * 1024, mFactory, !externalFile);
if (!mCurrentFilesystem.get()) {
throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
}
Expand Down Expand Up @@ -218,11 +232,11 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
return true;
}

uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::string_view origin)
uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
{

// open file
if (!setFile(counter, origin)) {
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
return 0ul;
}

Expand All @@ -234,10 +248,32 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::st
return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
}

arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, std::string_view origin)
std::pair<DataInputDescriptor*, int> DataInputDescriptor::navigateToLevel(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
{
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
return {nullptr, -1};
}
auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
auto parentFile = getParentFile(counter, numTF, "", wantedParentLevel, wantedOrigin);
if (parentFile == nullptr) {
return {nullptr, -1};
}
return {parentFile, parentFile->findDFNumber(0, folderName)};
}

arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
{
// If mapped to a parent level deeper than current, skip directly to the right level.
if (wantedParentLevel != -1 && mLevel < wantedParentLevel) {
auto [parentFile, parentNumTF] = navigateToLevel(counter, numTF, wantedParentLevel, wantedOrigin);
if (parentFile == nullptr || parentNumTF == -1) {
return {};
}
return parentFile->getFileFolder(0, parentNumTF, wantedParentLevel, wantedOrigin);
}

// open file
if (!setFile(counter, origin)) {
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
return {};
}

Expand All @@ -251,7 +287,7 @@ arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int n
return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
}

DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, std::string_view origin)
DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin)
{
if (!mParentFileMap) {
// This file has no parent map
Expand Down Expand Up @@ -288,7 +324,7 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
mParentFile->fillInputfiles();
mParentFile->setFile(0, origin);
mParentFile->setFile(0, wantedParentLevel, wantedOrigin);
return mParentFile;
}

Expand All @@ -314,7 +350,9 @@ void DataInputDescriptor::printFileOpening()
monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
}
#endif
mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
if (mContext.monitoring) {
mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
}
LOGP(info, "Opening file: {}", monitoringInfo);
}

Expand All @@ -335,7 +373,9 @@ void DataInputDescriptor::printFileStatistics()
monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
}
#endif
mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
if (mContext.monitoring) {
mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
}
LOGP(info, "Read info: {}", monitoringInfo);
}

Expand Down Expand Up @@ -446,8 +486,26 @@ struct CalculateDelta {
bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
{
CalculateDelta t(mIOTime);
std::string origin = dh.dataOrigin.as<std::string>();
auto folder = getFileFolder(counter, numTF, origin);
std::string wantedOrigin = dh.dataOrigin.as<std::string>();
int wantedLevel = mContext.levelForOrigin(wantedOrigin);

// If this origin is mapped to a parent level deeper than current, skip directly without
// attempting to read from this level.
if (wantedLevel != -1 && mLevel < wantedLevel) {
auto [parentFile, parentNumTF] = navigateToLevel(counter, numTF, wantedLevel, wantedOrigin);
if (parentFile == nullptr) {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
throw std::runtime_error(fmt::format(R"(No parent file found for "{}" while looking for level {} in "{}")", treename, wantedLevel, rootFS->GetFile()->GetName()));
}
if (parentNumTF == -1) {
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
throw std::runtime_error(fmt::format(R"(DF not found in parent file "{}")", parentRootFS->GetFile()->GetName()));
}
t.deactivate();
return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
}

auto folder = getFileFolder(counter, numTF, wantedLevel, wantedOrigin);
if (!folder.filesystem()) {
t.deactivate();
return false;
Expand Down Expand Up @@ -480,7 +538,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
if (!format) {
t.deactivate();
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
auto parentFile = getParentFile(counter, numTF, treename, origin);
auto parentFile = getParentFile(counter, numTF, treename, wantedLevel, wantedOrigin);
if (parentFile != nullptr) {
int parentNumTF = parentFile->findDFNumber(0, folder.path());
if (parentNumTF == -1) {
Expand Down Expand Up @@ -813,8 +871,9 @@ arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader d
didesc = mdefaultDataInputDescriptor;
}
std::string origin = dh.dataOrigin.as<std::string>();
int wantedLevel = mContext.levelForOrigin(origin);

return didesc->getFileFolder(counter, numTF, origin);
return didesc->getFileFolder(counter, numTF, wantedLevel, origin);
}

int DataInputDirector::getTimeFramesInFile(header::DataHeader dh, int counter)
Expand All @@ -836,8 +895,9 @@ uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counte
didesc = mdefaultDataInputDescriptor;
}
std::string origin = dh.dataOrigin.as<std::string>();
int wantedLevel = mContext.levelForOrigin(origin);

return didesc->getTimeFrameNumber(counter, numTF, origin);
return didesc->getTimeFrameNumber(counter, numTF, wantedLevel, origin);
}

bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
Expand Down
26 changes: 22 additions & 4 deletions Framework/AnalysisSupport/src/DataInputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <arrow/dataset/dataset.h>

#include <regex>
#include <vector>
#include "rapidjson/fwd.h"

namespace o2::monitoring
Expand All @@ -44,6 +45,20 @@ struct DataInputDirectorContext {
o2::monitoring::Monitoring* monitoring = nullptr;
int allowedParentLevel = 0;
std::string parentFileReplacement = "";
std::vector<std::pair<std::string, int>> parentLevelToOrigin = {};
// Optional registry of pre-opened TFiles (keyed by name) used to bypass
// TFile::Open for testing with in-memory TMemFile instances.
std::vector<std::pair<std::string, TFile*>> openFiles = {};

int levelForOrigin(std::string_view origin) const
{
for (auto& [o, level] : parentLevelToOrigin) {
if (o == origin) {
return level;
}
}
return -1;
}
};

class DataInputDescriptor
Expand Down Expand Up @@ -71,7 +86,7 @@ class DataInputDescriptor

void addFileNameHolder(FileNameHolder* fn);
int fillInputfiles();
bool setFile(int counter, std::string_view origin);
bool setFile(int counter, int wantedParentLevel, std::string_view wantedOrigin);

// getters
std::string getInputfilesFilename();
Expand All @@ -81,9 +96,12 @@ class DataInputDescriptor
int getNumberTimeFrames() { return mtotalNumberTimeFrames; }
int findDFNumber(int file, std::string dfName);

uint64_t getTimeFrameNumber(int counter, int numTF, std::string_view origin);
arrow::dataset::FileSource getFileFolder(int counter, int numTF, std::string_view origin);
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename, std::string_view origin);
uint64_t getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
arrow::dataset::FileSource getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
// Open the current file to populate the parent map, then return the parent descriptor and
// the TF index within it that corresponds to numTF at this level. Returns {nullptr, -1} on failure.
std::pair<DataInputDescriptor*, int> navigateToLevel(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin);
int getTimeFramesInFile(int counter);
int getReadTimeFramesInFile(int counter);

Expand Down
Loading
Loading