Skip to content

Commit c16f5fe

Browse files
authored
Task based central trigger processor (#6758)
* New task for the central event filter processor * Task configurables from table descriptions
1 parent 80d980c commit c16f5fe

File tree

6 files changed

+269
-303
lines changed

6 files changed

+269
-303
lines changed

Analysis/EventFiltering/CMakeLists.txt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,11 @@
99
# granted to it by virtue of its status as an Intergovernmental Organization
1010
# or submit itself to any jurisdiction.
1111

12-
o2_add_library(EventFiltering
13-
SOURCES centralEventFilterProcessor.cxx
14-
PUBLIC_LINK_LIBRARIES O2::Framework O2::DetectorsBase O2::AnalysisDataModel O2::AnalysisCore)
1512

16-
o2_add_dpl_workflow(central-event-filter-processor
17-
SOURCES cefp.cxx
13+
o2_add_dpl_workflow(central-event-filter-task
14+
SOURCES cefpTask.cxx
1815
COMPONENT_NAME Analysis
19-
PUBLIC_LINK_LIBRARIES O2::EventFiltering)
16+
PUBLIC_LINK_LIBRARIES O2::Framework O2::AnalysisDataModel O2::AnalysisCore)
2017

2118
o2_add_dpl_workflow(nuclei-filter
2219
SOURCES nucleiFilter.cxx

Analysis/EventFiltering/cefp.cxx

Lines changed: 0 additions & 43 deletions
This file was deleted.
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
// O2 includes
12+
13+
#include "Framework/AnalysisTask.h"
14+
#include "Framework/AnalysisDataModel.h"
15+
#include "Framework/ASoAHelpers.h"
16+
#include "AnalysisDataModel/TrackSelectionTables.h"
17+
18+
#include "filterTables.h"
19+
20+
#include "Framework/HistogramRegistry.h"
21+
22+
#include <iostream>
23+
#include <cstdio>
24+
#include <random>
25+
#include <fmt/format.h>
26+
#include <rapidjson/document.h>
27+
#include <rapidjson/filereadstream.h>
28+
29+
// we need to add workflow options before including Framework/runDataProcessing
30+
void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
31+
{
32+
// option allowing to set parameters
33+
std::vector<o2::framework::ConfigParamSpec> options{o2::framework::ConfigParamSpec{"train_config", o2::framework::VariantType::String, "full_config.json", {"Configuration of the filtering train"}}};
34+
35+
std::swap(workflowOptions, options);
36+
}
37+
38+
#include "Framework/runDataProcessing.h"
39+
40+
using namespace o2;
41+
using namespace o2::aod;
42+
using namespace o2::framework;
43+
using namespace o2::framework::expressions;
44+
using namespace rapidjson;
45+
46+
namespace
47+
{
48+
bool readJsonFile(std::string& config, Document& d)
49+
{
50+
FILE* fp = fopen(config.data(), "rb");
51+
if (!fp) {
52+
LOG(WARNING) << "Missing configuration json file: " << config;
53+
return false;
54+
}
55+
56+
char readBuffer[65536];
57+
FileReadStream is(fp, readBuffer, sizeof(readBuffer));
58+
59+
d.ParseStream(is);
60+
fclose(fp);
61+
return true;
62+
}
63+
64+
std::unordered_map<std::string, std::unordered_map<std::string, float>> mDownscaling;
65+
static const std::vector<std::string> downscalingName{"Downscaling"};
66+
static const float defaultDownscaling[32][1]{
67+
{1.f},
68+
{1.f},
69+
{1.f},
70+
{1.f},
71+
{1.f},
72+
{1.f},
73+
{1.f},
74+
{1.f},
75+
{1.f},
76+
{1.f},
77+
{1.f},
78+
{1.f},
79+
{1.f},
80+
{1.f},
81+
{1.f},
82+
{1.f},
83+
{1.f},
84+
{1.f},
85+
{1.f},
86+
{1.f},
87+
{1.f},
88+
{1.f},
89+
{1.f},
90+
{1.f},
91+
{1.f},
92+
{1.f},
93+
{1.f},
94+
{1.f},
95+
{1.f},
96+
{1.f},
97+
{1.f},
98+
{1.f}}; /// Max number of columns for triggers is 32 (extendible)
99+
100+
#define FILTER_CONFIGURABLE(_TYPE_) \
101+
Configurable<LabeledArray<float>> cfg##_TYPE_ { #_TYPE_, {defaultDownscaling[0], NumberOfColumns < _TYPE_>(), 1, ColumnsNames(typename _TYPE_::iterator::persistent_columns_t{}), downscalingName }, #_TYPE_ " downscalings" }
102+
103+
} // namespace
104+
105+
struct centralEventFilterTask {
106+
107+
HistogramRegistry scalers{"scalers", {}, OutputObjHandlingPolicy::AnalysisObject, true, true};
108+
109+
FILTER_CONFIGURABLE(NucleiFilters);
110+
FILTER_CONFIGURABLE(DiffractionFilters);
111+
112+
void init(o2::framework::InitContext& initc)
113+
{
114+
LOG(INFO) << "Start init";
115+
int nCols{0};
116+
for (auto& table : mDownscaling) {
117+
nCols += table.second.size();
118+
}
119+
LOG(INFO) << "Middle init, total number of columns " << nCols;
120+
121+
scalers.add("mScalers", "", HistType::kTH1F, {{nCols + 1, -0.5, 0.5 + nCols, ";;Number of events"}});
122+
scalers.add("mFiltered", "", HistType::kTH1F, {{nCols + 1, -0.5, 0.5 + nCols, ";;Number of filtered events"}});
123+
auto mScalers = scalers.get<TH1>(HIST("mScalers"));
124+
auto mFiltered = scalers.get<TH1>(HIST("mFiltered"));
125+
126+
mScalers->GetXaxis()->SetBinLabel(1, "Total number of events");
127+
mFiltered->GetXaxis()->SetBinLabel(1, "Total number of events");
128+
int bin{2};
129+
for (auto& table : mDownscaling) {
130+
for (auto& column : table.second) {
131+
mScalers->GetXaxis()->SetBinLabel(bin, column.first.data());
132+
mFiltered->GetXaxis()->SetBinLabel(bin++, column.first.data());
133+
}
134+
if (initc.options().isDefault(table.first.data()) || !initc.options().isSet(table.first.data())) {
135+
continue;
136+
}
137+
auto filterOpt = initc.mOptions.get<LabeledArray<float>>(table.first.data());
138+
for (auto& col : table.second) {
139+
col.second = filterOpt.get(col.first.data(), 0u);
140+
}
141+
}
142+
}
143+
144+
void run(ProcessingContext& pc)
145+
{
146+
147+
auto mScalers = scalers.get<TH1>(HIST("mScalers"));
148+
auto mFiltered = scalers.get<TH1>(HIST("mFiltered"));
149+
int64_t nEvents{-1};
150+
for (auto& tableName : mDownscaling) {
151+
auto tableConsumer = pc.inputs().get<TableConsumer>(tableName.first);
152+
153+
auto tablePtr{tableConsumer->asArrowTable()};
154+
int64_t nRows{tablePtr->num_rows()};
155+
nEvents = nEvents < 0 ? nRows : nEvents;
156+
if (nEvents != nRows) {
157+
LOG(FATAL) << "Inconsistent number of rows across trigger tables.";
158+
}
159+
160+
auto schema{tablePtr->schema()};
161+
for (auto& colName : tableName.second) {
162+
int bin{mScalers->GetXaxis()->FindBin(colName.first.data())};
163+
double binCenter{mScalers->GetXaxis()->GetBinCenter(bin)};
164+
auto column{tablePtr->GetColumnByName(colName.first)};
165+
double downscaling{colName.second};
166+
if (column) {
167+
for (int64_t iC{0}; iC < column->num_chunks(); ++iC) {
168+
auto chunk{column->chunk(iC)};
169+
auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(chunk);
170+
for (int64_t iS{0}; iS < chunk->length(); ++iS) {
171+
if (boolArray->Value(iS)) {
172+
mScalers->Fill(binCenter);
173+
if (mUniformGenerator(mGeneratorEngine) < downscaling) {
174+
mFiltered->Fill(binCenter);
175+
}
176+
}
177+
}
178+
}
179+
}
180+
}
181+
}
182+
mScalers->SetBinContent(1, mScalers->GetBinContent(1) + nEvents);
183+
mFiltered->SetBinContent(1, mFiltered->GetBinContent(1) + nEvents);
184+
}
185+
186+
std::mt19937_64 mGeneratorEngine;
187+
std::uniform_real_distribution<double> mUniformGenerator = std::uniform_real_distribution<double>(0., 1.);
188+
};
189+
190+
WorkflowSpec defineDataProcessing(ConfigContext const& cfg)
191+
{
192+
std::vector<InputSpec> inputs;
193+
194+
auto config = cfg.options().get<std::string>("train_config");
195+
Document d;
196+
std::unordered_map<std::string, std::unordered_map<std::string, float>> downscalings;
197+
FillFiltersMap(FiltersPack, downscalings);
198+
199+
std::array<bool, NumberOfFilters> enabledFilters = {false};
200+
if (readJsonFile(config, d)) {
201+
for (auto& workflow : d["workflows"].GetArray()) {
202+
for (uint32_t iFilter{0}; iFilter < NumberOfFilters; ++iFilter) {
203+
if (std::string_view(workflow["workflow_name"].GetString()) == std::string_view(FilteringTaskNames[iFilter])) {
204+
inputs.emplace_back(std::string(AvailableFilters[iFilter]), "AOD", FilterDescriptions[iFilter], 0, Lifetime::Timeframe);
205+
enabledFilters[iFilter] = true;
206+
break;
207+
}
208+
}
209+
}
210+
}
211+
212+
for (uint32_t iFilter{0}; iFilter < NumberOfFilters; ++iFilter) {
213+
if (!enabledFilters[iFilter]) {
214+
LOG(INFO) << std::string_view(AvailableFilters[iFilter]) << " not present in the configuration, removing it.";
215+
downscalings.erase(std::string(AvailableFilters[iFilter]));
216+
}
217+
}
218+
219+
DataProcessorSpec spec{adaptAnalysisTask<centralEventFilterTask>(cfg)};
220+
for (auto& input : inputs) {
221+
spec.inputs.emplace_back(input);
222+
}
223+
mDownscaling.swap(downscalings);
224+
225+
return WorkflowSpec{spec};
226+
}

0 commit comments

Comments
 (0)