Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
auto maxRate = options.get<float>("aod-max-io-rate");

// create a DataInputDirector
auto didir = std::make_shared<DataInputDirector>(filename, &monitoring, parentAccessLevel, parentFileReplacement);
auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement});
if (options.isSet("aod-reader-json")) {
auto jsonFile = options.get<std::string>("aod-reader-json");
if (!didir->readJson(jsonFile)) {
Expand Down
58 changes: 22 additions & 36 deletions Framework/AnalysisSupport/src/DataInputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,10 @@ FileNameHolder* makeFileNameHolder(std::string fileName)
return fileNameHolder;
}

DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement)
DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, DataInputDirectorContext& context)
: mAlienSupport(alienSupport),
mMonitoring(monitoring),
mAllowedParentLevel(allowedParentLevel),
mParentFileReplacement(std::move(parentFileReplacement)),
mLevel(level)
mLevel(level),
mContext(context)
{
std::vector<char const*> capabilitiesSpecs = {
"O2Framework:RNTupleObjectReadingCapability",
Expand Down Expand Up @@ -157,13 +155,13 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)

// get the parent file map if exists
mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
if (mParentFileMap && !mParentFileReplacement.empty()) {
auto pos = mParentFileReplacement.find(';');
if (mParentFileMap && !mContext.parentFileReplacement.empty()) {
auto pos = mContext.parentFileReplacement.find(';');
if (pos == std::string::npos) {
throw std::runtime_error(fmt::format("Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mParentFileReplacement.c_str()));
throw std::runtime_error(fmt::format("Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mContext.parentFileReplacement.c_str()));
}
auto from = mParentFileReplacement.substr(0, pos);
auto to = mParentFileReplacement.substr(pos + 1);
auto from = mContext.parentFileReplacement.substr(0, pos);
auto to = mContext.parentFileReplacement.substr(pos + 1);

auto it = mParentFileMap->MakeIterator();
while (auto obj = it->Next()) {
Expand Down Expand Up @@ -280,13 +278,13 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
}
}

if (mLevel == mAllowedParentLevel) {
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
if (mLevel == mContext.allowedParentLevel) {
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mContext.allowedParentLevel, folderName.c_str(),
rootFS->GetFile()->GetName()));
}

LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
mParentFile = new DataInputDescriptor(mAlienSupport, mLevel + 1, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
mParentFile = new DataInputDescriptor(mAlienSupport, mLevel + 1, mContext);
mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
mParentFile->fillInputfiles();
Expand Down Expand Up @@ -316,7 +314,7 @@ void DataInputDescriptor::printFileOpening()
monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
}
#endif
mMonitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
LOGP(info, "Opening file: {}", monitoringInfo);
}

Expand All @@ -337,7 +335,7 @@ void DataInputDescriptor::printFileStatistics()
monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
}
#endif
mMonitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
LOGP(info, "Read info: {}", monitoringInfo);
}

Expand Down Expand Up @@ -524,27 +522,15 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
return true;
}

DataInputDirector::DataInputDirector()
DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, DataInputDirectorContext&& context)
: mContext{context}
{
createDefaultDataInputDescriptor();
}

DataInputDirector::DataInputDirector(std::string inputFile, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
{
if (inputFile.size() && inputFile[0] == '@') {
inputFile.erase(0, 1);
setInputfilesFile(inputFile);
if (inputFiles.size() == 1 && !inputFiles[0].empty() && inputFiles[0][0] == '@') {
setInputfilesFile(inputFiles.back().substr(1, -1));
} else {
mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
}

createDefaultDataInputDescriptor();
}

DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
{
for (auto inputFile : inputFiles) {
mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
for (auto inputFile : inputFiles) {
mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
}
}

createDefaultDataInputDescriptor();
Expand Down Expand Up @@ -576,7 +562,7 @@ void DataInputDirector::createDefaultDataInputDescriptor()
if (mdefaultDataInputDescriptor) {
delete mdefaultDataInputDescriptor;
}
mdefaultDataInputDescriptor = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
mdefaultDataInputDescriptor = new DataInputDescriptor(mAlienSupport, 0, mContext);

mdefaultDataInputDescriptor->setInputfilesFile(minputfilesFile);
mdefaultDataInputDescriptor->setFilenamesRegex(mFilenameRegex);
Expand Down Expand Up @@ -700,7 +686,7 @@ bool DataInputDirector::readJsonDocument(Document* jsonDoc)
return false;
}
// create a new dataInputDescriptor
auto didesc = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
auto didesc = new DataInputDescriptor(mAlienSupport, 0, mContext);
didesc->setDefaultInputfiles(&mdefaultInputFiles);

itemName = "table";
Expand Down
23 changes: 11 additions & 12 deletions Framework/AnalysisSupport/src/DataInputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ struct FileNameHolder {
std::vector<uint64_t> listOfTimeFrameNumbers;
std::vector<bool> alreadyRead;
};

FileNameHolder* makeFileNameHolder(std::string fileName);

struct DataInputDirectorContext {
o2::monitoring::Monitoring* monitoring = nullptr;
int allowedParentLevel = 0;
std::string parentFileReplacement = "";
};

class DataInputDescriptor
{
/// Holds information concerning the reading of an aod table.
Expand All @@ -50,7 +57,7 @@ class DataInputDescriptor
std::string treename = "";
std::unique_ptr<data_matcher::DataDescriptorMatcher> matcher;

DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = "");
DataInputDescriptor(bool alienSupport, int level, DataInputDirectorContext& context);

void printOut();

Expand Down Expand Up @@ -93,16 +100,13 @@ class DataInputDescriptor
std::string* minputfilesFilePtr = nullptr;
std::string mFilenameRegex = "";
std::string* mFilenameRegexPtr = nullptr;
int mAllowedParentLevel = 0;
std::string mParentFileReplacement;
std::vector<FileNameHolder*> mfilenames;
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
std::shared_ptr<arrow::fs::FileSystem> mCurrentFilesystem;
int mCurrentFileID = -1;
bool mAlienSupport = false;

o2::monitoring::Monitoring* mMonitoring = nullptr;

DataInputDirectorContext& mContext;
TMap* mParentFileMap = nullptr;
DataInputDescriptor* mParentFile = nullptr;
int mLevel = 0; // level of parent files
Expand All @@ -120,9 +124,7 @@ class DataInputDirector
/// and the related input files

public:
DataInputDirector();
DataInputDirector(std::string inputFile, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = "");
DataInputDirector(std::vector<std::string> inputFiles, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = "");
DataInputDirector(std::vector<std::string> inputFiles, DataInputDirectorContext&& context);
~DataInputDirector();

void reset();
Expand All @@ -149,18 +151,15 @@ class DataInputDirector
uint64_t getTotalSizeUncompressed();

private:
DataInputDirectorContext mContext;
std::string minputfilesFile;
std::string* const minputfilesFilePtr = &minputfilesFile;
std::string mFilenameRegex;
int mAllowedParentLevel = 0;
std::string mParentFileReplacement;
std::string* const mFilenameRegexPtr = &mFilenameRegex;
DataInputDescriptor* mdefaultDataInputDescriptor = nullptr;
std::vector<FileNameHolder*> mdefaultInputFiles;
std::vector<DataInputDescriptor*> mdataInputDescriptors;

o2::monitoring::Monitoring* mMonitoring = nullptr;

bool mDebugMode = false;
bool mAlienSupport = false;

Expand Down
12 changes: 6 additions & 6 deletions Framework/AnalysisSupport/test/test_DataInputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ BOOST_AUTO_TEST_CASE(TestDatainputDirector)
jf << R"(})" << std::endl;
jf.close();

DataInputDirector didir1;
DataInputDirector didir1({}, {});
BOOST_CHECK(didir1.readJson(jsonFile));
didir1.printOut();
printf("\n\n");
Expand All @@ -60,8 +60,8 @@ BOOST_AUTO_TEST_CASE(TestDatainputDirector)
auto dh = DataHeader(DataDescription{"DUE"},
DataOrigin{"AOD"},
DataHeader::SubSpecificationType{0});
//auto [file1, directory1] = didir1.getFileFolder(dh, 1, 0);
//BOOST_CHECK_EQUAL(file1->GetName(), "Bresults_1.root");
// auto [file1, directory1] = didir1.getFileFolder(dh, 1, 0);
// BOOST_CHECK_EQUAL(file1->GetName(), "Bresults_1.root");

auto didesc = didir1.getDataInputDescriptor(dh);
BOOST_CHECK(didesc);
Expand Down Expand Up @@ -96,13 +96,13 @@ BOOST_AUTO_TEST_CASE(TestDatainputDirector)
"Aresults_2.root",
"Bresults_1.root",
"Bresults_2.root"};
DataInputDirector didir2(inputFiles);
DataInputDirector didir2(inputFiles, {});
didir2.printOut();
printf("\n\n");
BOOST_CHECK(didir2.readJson(jsonFile));

//auto [file2, directory2] = didir2.getFileFolder(dh, 1, 0);
//BOOST_CHECK_EQUAL(file2->GetName(), "Bresults_1.root");
// auto [file2, directory2] = didir2.getFileFolder(dh, 1, 0);
// BOOST_CHECK_EQUAL(file2->GetName(), "Bresults_1.root");

didesc = didir2.getDataInputDescriptor(dh);
BOOST_CHECK(didesc);
Expand Down
Loading