diff --git a/Framework/Core/include/Framework/MessageSet.h b/Framework/Core/include/Framework/MessageSet.h index 281f9c42a0773..166934238d647 100644 --- a/Framework/Core/include/Framework/MessageSet.h +++ b/Framework/Core/include/Framework/MessageSet.h @@ -82,12 +82,6 @@ struct MessageSet { return *this; } - /// get number of in-flight O2 messages - [[nodiscard]] size_t size() const - { - return messages | count_parts{}; - } - /// get number of header-payload pairs [[nodiscard]] size_t getNumberOfPairs() const { diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index cece5b343659f..5b85a63bf6c95 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -184,11 +184,11 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector 0 && part.header(0) != nullptr) { + if (!part.messages.empty() && part.header(0) != nullptr) { headerPresent++; continue; } - if (part.size() > 0 && part.payload(0) != nullptr) { + if (!part.messages.empty() && part.payload(0) != nullptr) { payloadPresent++; continue; } @@ -213,7 +213,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector 0 && partial[idx].header(part).get()) { + if (!partial[idx].messages.empty() && partial[idx].header(part).get()) { auto header = partial[idx].header(part).get(); auto payload = partial[idx].payload(part).get(); return DataRef{nullptr, @@ -224,7 +224,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector int { auto& header = static_cast(*partial[idx].header(0)); @@ -327,7 +327,7 @@ void DataRelayer::setOldestPossibleInput(TimesliceId proposed, ChannelIndex chan for (size_t mi = 0; mi < mInputs.size(); ++mi) { auto& input = mInputs[mi]; auto& element = mCache[si * mInputs.size() + mi]; - if (element.size() != 0) { + if (!element.messages.empty()) { if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") { didDrop = true; auto& state = mContext.get(); @@ -353,7 +353,7 @@ void DataRelayer::setOldestPossibleInput(TimesliceId proposed, ChannelIndex chan continue; } auto& element = mCache[si * mInputs.size() + mi]; - if (element.size() == 0) { + if (element.messages.empty()) { auto& state = mContext.get(); if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) { if (state.allowedProcessing == DeviceState::CalibrationOnly) { @@ -411,11 +411,11 @@ void DataRelayer::pruneCache(TimesliceSlot slot, OnDropCallback onDrop) cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; // TODO: in the original implementation of the cache, there have been only two messages per entry, // check if the 2 above corresponds to the number of messages. - if (cache[cacheId].size() > 0) { + if (!cache[cacheId].messages.empty()) { dropped[ai] = std::move(cache[cacheId]); } } - bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); }); + bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return !m.messages.empty(); }); if (anyDropped) { O2_SIGNPOST_ID_GENERATE(aid, data_relayer); O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value); @@ -786,7 +786,7 @@ void DataRelayer::getReadyToProcess(std::vector& comp auto partial = getPartialRecord(li); // TODO: get the data ref from message model auto getter = [&partial](size_t idx, size_t part) { - if (partial[idx].size() > 0 && partial[idx].header(part).get()) { + if (!partial[idx].messages.empty() && partial[idx].header(part).get()) { auto header = partial[idx].header(part).get(); auto payload = partial[idx].payload(part).get(); return DataRef{nullptr, @@ -797,7 +797,7 @@ void DataRelayer::getReadyToProcess(std::vector& comp return DataRef{}; }; auto nPartsGetter = [&partial](size_t idx) { - return partial[idx].size(); + return partial[idx].messages | count_parts{}; }; auto refCountGetter = [&partial](size_t idx) -> int { auto& header = static_cast(*partial[idx].header(0)); @@ -897,7 +897,7 @@ std::vector DataRelayer::consumeAllInputsForTimeslice cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; // TODO: in the original implementation of the cache, there have been only two messages per entry, // check if the 2 above corresponds to the number of messages. - if (cache[cacheId].size() > 0) { + if (!cache[cacheId].messages.empty()) { messages[arg] = std::move(cache[cacheId]); } index.markAsInvalid(s); @@ -951,7 +951,7 @@ std::vector DataRelayer::consumeExistingInputsForTime cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; // TODO: in the original implementation of the cache, there have been only two messages per entry, // check if the 2 above corresponds to the number of messages. - for (size_t pi = 0; pi < cache[cacheId].size(); pi++) { + for (size_t pi = 0; pi < (cache[cacheId].messages | count_parts{}); pi++) { auto& header = cache[cacheId].header(pi); auto&& newHeader = header->GetTransport()->CreateMessage(); newHeader->Copy(*header); diff --git a/Framework/Core/test/benchmark_DataRelayer.cxx b/Framework/Core/test/benchmark_DataRelayer.cxx index 3c3d2294fdd7e..e983f3604cfab 100644 --- a/Framework/Core/test/benchmark_DataRelayer.cxx +++ b/Framework/Core/test/benchmark_DataRelayer.cxx @@ -96,7 +96,7 @@ static void BM_RelaySingleSlot(benchmark::State& state) assert(ready[0].op == CompletionPolicy::CompletionOp::Consume); auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); assert(result.size() == 1); - assert(result.at(0).size() == 1); + assert((result.at(0).messages | count_parts{}) == 1); inflightMessages = std::move(result[0].messages); } } @@ -153,7 +153,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state) assert(ready[0].op == CompletionPolicy::CompletionOp::Consume); auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); assert(result.size() == 1); - assert(result.at(0).size() == 1); + assert((result.at(0).messages | count_parts{}) == 1); inflightMessages = std::move(result[0].messages); } } @@ -228,8 +228,8 @@ static void BM_RelayMultipleRoutes(benchmark::State& state) assert(ready[0].op == CompletionPolicy::CompletionOp::Consume); auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); assert(result.size() == 2); - assert(result.at(0).size() == 1); - assert(result.at(1).size() == 1); + assert((result.at(0).messages | count_parts{}) == 1); + assert((result.at(1).messages | count_parts{}) == 1); inflightMessages = std::move(result[0].messages); inflightMessages.emplace_back(std::move(result[1].messages[0])); inflightMessages.emplace_back(std::move(result[1].messages[1])); diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index 8957e361cb8a2..e5ca7c5d235e5 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -115,7 +115,7 @@ TEST_CASE("DataRelayer") auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); // one MessageSet with one PartRef with header and payload REQUIRE(result.size() == 1); - REQUIRE(result.at(0).size() == 1); + REQUIRE((result.at(0).messages | count_parts{}) == 1); } // @@ -165,7 +165,7 @@ TEST_CASE("DataRelayer") auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); // one MessageSet with one PartRef with header and payload REQUIRE(result.size() == 1); - REQUIRE(result.at(0).size() == 1); + REQUIRE((result.at(0).messages | count_parts{}) == 1); } // This test a more complicated set of inputs, and verifies that data is @@ -245,8 +245,8 @@ TEST_CASE("DataRelayer") auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); // two MessageSets, each with one PartRef REQUIRE(result.size() == 2); - REQUIRE(result.at(0).size() == 1); - REQUIRE(result.at(1).size() == 1); + REQUIRE((result.at(0).messages | count_parts{}) == 1); + REQUIRE((result.at(1).messages | count_parts{}) == 1); } // This test a more complicated set of inputs, and verifies that data is @@ -733,7 +733,7 @@ TEST_CASE("DataRelayer") // we have one input route and thus one message set containing pairs for all // payloads REQUIRE(messageSet.size() == 1); - REQUIRE(messageSet[0].size() == nSplitParts); + REQUIRE((messageSet[0].messages | count_parts{}) == nSplitParts); REQUIRE(messageSet[0].getNumberOfPayloads(0) == 1); } @@ -796,7 +796,7 @@ TEST_CASE("DataRelayer") // we have one input route REQUIRE(messageSet.size() == 1); // one message set containing number of added sequences of messages - REQUIRE(messageSet[0].size() == sequenceSize.size()); + REQUIRE((messageSet[0].messages | count_parts{}) == sequenceSize.size()); size_t counter = 0; for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) { REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]); diff --git a/Framework/Core/test/test_ForwardInputs.cxx b/Framework/Core/test/test_ForwardInputs.cxx index 7081d600080b1..e3031b7e72a69 100644 --- a/Framework/Core/test/test_ForwardInputs.cxx +++ b/Framework/Core/test/test_ForwardInputs.cxx @@ -92,7 +92,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -143,7 +143,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, true); @@ -198,7 +198,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS") auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, sih}); REQUIRE(o2::header::get(header->GetData())); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -256,7 +256,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible") auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, dih}); REQUIRE(o2::header::get(header->GetData())); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -321,7 +321,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutes") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -384,7 +384,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -455,12 +455,12 @@ TEST_CASE("ForwardInputsMultiMessageMultipleRoutes") auto header1 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh1, dph}); MessageSet messageSet1; messageSet1.add(PartRef{std::move(header1), std::move(payload1)}); - REQUIRE(messageSet1.size() == 1); + REQUIRE((messageSet1.messages | count_parts{}) == 1); auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph}); MessageSet messageSet2; messageSet2.add(PartRef{std::move(header2), std::move(payload2)}); - REQUIRE(messageSet2.size() == 1); + REQUIRE((messageSet2.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet1)); currentSetOfInputs.emplace_back(std::move(messageSet2)); REQUIRE(currentSetOfInputs.size() == 2); @@ -525,7 +525,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -607,7 +607,7 @@ TEST_CASE("ForwardInputsSplitPayload") PartRef part{std::move(header2), transport->CreateMessage()}; messageSet.add(std::move(part)); - REQUIRE(messageSet.size() == 2); + REQUIRE((messageSet.messages | count_parts{}) == 2); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -727,7 +727,7 @@ TEST_CASE("ForwardInputEOSSingleRoute") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); @@ -772,7 +772,7 @@ TEST_CASE("ForwardInputOldestPossibleSingleRoute") auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih}); messageSet.add(PartRef{std::move(header), std::move(payload)}); - REQUIRE(messageSet.size() == 1); + REQUIRE((messageSet.messages | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);