From 684aa2bd135ce53c111dd94d44b0e4bfbb4f7e21 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 23 Mar 2026 13:46:56 +0100 Subject: [PATCH] DPL: get rid of the size method of the MessageSet One more step in getting rid of the artificial container "MessageSet". By removing the size method, we imply that any sequence of messages can have their number of parts computed, regardless of how we store them and how the ownership of such parts works. --- Framework/Core/include/Framework/MessageSet.h | 6 ----- Framework/Core/src/DataRelayer.cxx | 24 +++++++++---------- Framework/Core/test/benchmark_DataRelayer.cxx | 8 +++---- Framework/Core/test/test_DataRelayer.cxx | 12 +++++----- Framework/Core/test/test_ForwardInputs.cxx | 24 +++++++++---------- 5 files changed, 34 insertions(+), 40 deletions(-) diff --git a/Framework/Core/include/Framework/MessageSet.h b/Framework/Core/include/Framework/MessageSet.h index 281f9c42a0773..166934238d647 100644 --- a/Framework/Core/include/Framework/MessageSet.h +++ b/Framework/Core/include/Framework/MessageSet.h @@ -82,12 +82,6 @@ struct MessageSet { return *this; } - /// get number of in-flight O2 messages - [[nodiscard]] size_t size() const - { - return messages | count_parts{}; - } - /// get number of header-payload pairs [[nodiscard]] size_t getNumberOfPairs() const { diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index cece5b343659f..5b85a63bf6c95 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -184,11 +184,11 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector 0 && part.header(0) != nullptr) { + if (!part.messages.empty() && part.header(0) != nullptr) { headerPresent++; continue; } - if (part.size() > 0 && part.payload(0) != nullptr) { + if (!part.messages.empty() && part.payload(0) != nullptr) { payloadPresent++; continue; } @@ -213,7 +213,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector 0 && partial[idx].header(part).get()) { + if (!partial[idx].messages.empty() && partial[idx].header(part).get()) { auto header = partial[idx].header(part).get(); auto payload = partial[idx].payload(part).get(); return DataRef{nullptr, @@ -224,7 +224,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector int { auto& header = static_cast(*partial[idx].header(0)); @@ -327,7 +327,7 @@ void DataRelayer::setOldestPossibleInput(TimesliceId proposed, ChannelIndex chan for (size_t mi = 0; mi < mInputs.size(); ++mi) { auto& input = mInputs[mi]; auto& element = mCache[si * mInputs.size() + mi]; - if (element.size() != 0) { + if (!element.messages.empty()) { if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") { didDrop = true; auto& state = mContext.get(); @@ -353,7 +353,7 @@ void DataRelayer::setOldestPossibleInput(TimesliceId proposed, ChannelIndex chan continue; } auto& element = mCache[si * mInputs.size() + mi]; - if (element.size() == 0) { + if (element.messages.empty()) { auto& state = mContext.get(); if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) { if (state.allowedProcessing == DeviceState::CalibrationOnly) { @@ -411,11 +411,11 @@ void DataRelayer::pruneCache(TimesliceSlot slot, OnDropCallback onDrop) cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; // TODO: in the original implementation of the cache, there have been only two messages per entry, // check if the 2 above corresponds to the number of messages. - if (cache[cacheId].size() > 0) { + if (!cache[cacheId].messages.empty()) { dropped[ai] = std::move(cache[cacheId]); } } - bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); }); + bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return !m.messages.empty(); }); if (anyDropped) { O2_SIGNPOST_ID_GENERATE(aid, data_relayer); O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value); @@ -786,7 +786,7 @@ void DataRelayer::getReadyToProcess(std::vector& comp auto partial = getPartialRecord(li); // TODO: get the data ref from message model auto getter = [&partial](size_t idx, size_t part) { - if (partial[idx].size() > 0 && partial[idx].header(part).get()) { + if (!partial[idx].messages.empty() && partial[idx].header(part).get()) { auto header = partial[idx].header(part).get(); auto payload = partial[idx].payload(part).get(); return DataRef{nullptr, @@ -797,7 +797,7 @@ void DataRelayer::getReadyToProcess(std::vector& comp return DataRef{}; }; auto nPartsGetter = [&partial](size_t idx) { - return partial[idx].size(); + return partial[idx].messages | count_parts{}; }; auto refCountGetter = [&partial](size_t idx) -> int { auto& header = static_cast(*partial[idx].header(0)); @@ -897,7 +897,7 @@ std::vector DataRelayer::consumeAllInputsForTimeslice cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; // TODO: in the original implementation of the cache, there have been only two messages per entry, // check if the 2 above corresponds to the number of messages. - if (cache[cacheId].size() > 0) { + if (!cache[cacheId].messages.empty()) { messages[arg] = std::move(cache[cacheId]); } index.markAsInvalid(s); @@ -951,7 +951,7 @@ std::vector DataRelayer::consumeExistingInputsForTime cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; // TODO: in the original implementation of the cache, there have been only two messages per entry, // check if the 2 above corresponds to the number of messages. - for (size_t pi = 0; pi < cache[cacheId].size(); pi++) { + for (size_t pi = 0; pi < (cache[cacheId].messages | count_parts{}); pi++) { auto& header = cache[cacheId].header(pi); auto&& newHeader = header->GetTransport()->CreateMessage(); newHeader->Copy(*header); diff --git a/Framework/Core/test/benchmark_DataRelayer.cxx b/Framework/Core/test/benchmark_DataRelayer.cxx index 3c3d2294fdd7e..e983f3604cfab 100644 --- a/Framework/Core/test/benchmark_DataRelayer.cxx +++ b/Framework/Core/test/benchmark_DataRelayer.cxx @@ -96,7 +96,7 @@ static void BM_RelaySingleSlot(benchmark::State& state) assert(ready[0].op == CompletionPolicy::CompletionOp::Consume); auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); assert(result.size() == 1); - assert(result.at(0).size() == 1); + assert((result.at(0).messages | count_parts{}) == 1); inflightMessages = std::move(result[0].messages); } } @@ -153,7 +153,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state) assert(ready[0].op == CompletionPolicy::CompletionOp::Consume); auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); assert(result.size() == 1); - assert(result.at(0).size() == 1); + assert((result.at(0).messages | count_parts{}) == 1); inflightMessages = std::move(result[0].messages); } } @@ -228,8 +228,8 @@ static void BM_RelayMultipleRoutes(benchmark::State& state) assert(ready[0].op == CompletionPolicy::CompletionOp::Consume); auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); assert(result.size() == 2); - assert(result.at(0).size() == 1); - assert(result.at(1).size() == 1); + assert((result.at(0).messages | count_parts{}) == 1); + assert((result.at(1).messages | count_parts{}) == 1); inflightMessages = std::move(result[0].messages); inflightMessages.emplace_back(std::move(result[1].messages[0])); inflightMessages.emplace_back(std::move(result[1].messages[1])); diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index 8957e361cb8a2..e5ca7c5d235e5 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -115,7 +115,7 @@ TEST_CASE("DataRelayer") auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); // one MessageSet with one PartRef with header and payload REQUIRE(result.size() == 1); - REQUIRE(result.at(0).size() == 1); + REQUIRE((result.at(0).messages | count_parts{}) == 1); } // @@ -165,7 +165,7 @@ TEST_CASE("DataRelayer") auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); // one MessageSet with one PartRef with header and payload REQUIRE(result.size() == 1); - REQUIRE(result.at(0).size() == 1); + REQUIRE((result.at(0).messages | count_parts{}) == 1); } // This test a more complicated set of inputs, and verifies that data is @@ -245,8 +245,8 @@ TEST_CASE("DataRelayer") auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); // two MessageSets, each with one PartRef REQUIRE(result.size() == 2); - REQUIRE(result.at(0).size() == 1); - REQUIRE(result.at(1).size() == 1); + REQUIRE((result.at(0).messages | count_parts{}) == 1); + REQUIRE((result.at(1).messages | count_parts{}) == 1); } // This test a more complicated set of inputs, and verifies that data is @@ -733,7 +733,7 @@ TEST_CASE("DataRelayer") // we have one input route and thus one message set containing pairs for all // payloads REQUIRE(messageSet.size() == 1); - REQUIRE(messageSet[0].size() == nSplitParts); + REQUIRE((messageSet[0].messages | count_parts{}) == nSplitParts); REQUIRE(messageSet[0].getNumberOfPayloads(0) == 1); } @@ -796,7 +796,7 @@ TEST_CASE("DataRelayer") // we have one input route REQUIRE(messageSet.size() == 1); // one message set containing number of added sequences of messages - REQUIRE(messageSet[0].size() == sequenceSize.size()); + REQUIRE((messageSet[0].messages | count_parts{}) == sequenceSize.size()); size_t counter = 0; for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) { REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]); diff --git a/Framework/Core/test/test_ForwardInputs.cxx b/Framework/Core/test/test_ForwardInputs.cxx index 7081d600080b1..e3031b7e72a69 100644 --- a/Framework/Core/test/test_ForwardInputs.cxx +++ b/Framework/Core/test/test_ForwardInputs.cxx @@ -92,7 +92,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -143,7 +143,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, true); @@ -198,7 +198,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS") auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, sih}); REQUIRE(o2::header::get(header->GetData())); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -256,7 +256,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible") auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, dih}); REQUIRE(o2::header::get(header->GetData())); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -321,7 +321,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutes") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -384,7 +384,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -455,12 +455,12 @@ TEST_CASE("ForwardInputsMultiMessageMultipleRoutes") auto header1 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh1, dph}); MessageSet messageSet1; messageSet1.add(PartRef{std::move(header1), std::move(payload1)}); - REQUIRE(messageSet1.size() == 1); + REQUIRE((messageSet1.messages | count_parts{}) == 1); auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph}); MessageSet messageSet2; messageSet2.add(PartRef{std::move(header2), std::move(payload2)}); - REQUIRE(messageSet2.size() == 1); + REQUIRE((messageSet2.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet1)); currentSetOfInputs.emplace_back(std::move(messageSet2)); REQUIRE(currentSetOfInputs.size() == 2); @@ -525,7 +525,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -607,7 +607,7 @@ TEST_CASE("ForwardInputsSplitPayload") PartRef part{std::move(header2), transport->CreateMessage()}; messageSet.add(std::move(part)); - REQUIRE(messageSet.size() == 2); + REQUIRE((messageSet.messages | count_parts{}) == 2); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -727,7 +727,7 @@ TEST_CASE("ForwardInputEOSSingleRoute") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -772,7 +772,7 @@ TEST_CASE("ForwardInputOldestPossibleSingleRoute") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);