From 3f4a7564b6c2a043f7e2b97e2a3ed4b57fef8701 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Tue, 24 Mar 2026 14:37:50 +0100 Subject: [PATCH] DPL: move away from MessageSet::header / payload Abstract header / payload retrieval, with the idea that get_header / get_payload will work on any range of fair::mq::MessagePtrs. For now we only do the first header / payload pair only, to validate the trivial change. --- Framework/Core/include/Framework/DataModelViews.h | 4 ++-- Framework/Core/src/DataProcessingDevice.cxx | 2 +- Framework/Core/src/DataRelayer.cxx | 12 ++++++------ Framework/Core/test/test_DataRelayer.cxx | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/Framework/Core/include/Framework/DataModelViews.h b/Framework/Core/include/Framework/DataModelViews.h index f42ef85ec78e1..73faf7699834d 100644 --- a/Framework/Core/include/Framework/DataModelViews.h +++ b/Framework/Core/include/Framework/DataModelViews.h @@ -153,7 +153,7 @@ struct get_header { // ends the pipeline, returns the number of parts template requires std::ranges::random_access_range && std::ranges::sized_range - friend fair::mq::MessagePtr& operator|(R&& r, get_header self) + friend auto& operator|(R&& r, get_header self) { return r[(r | get_dataref_indices{self.id, 0}).headerIdx]; } @@ -165,7 +165,7 @@ struct get_payload { // ends the pipeline, returns the number of parts template requires std::ranges::random_access_range && std::ranges::sized_range - friend fair::mq::MessagePtr& operator|(R&& r, get_payload self) + friend auto& operator|(R&& r, get_payload self) { return r[(r | get_dataref_indices{self.part, self.subPart}).payloadIdx]; } diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index da04a23e81c0c..0fa70947bf18c 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -2153,7 +2153,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v return currentSetOfInputs[i].getNumberOfPairs(); }; auto refCountGetter = [¤tSetOfInputs](size_t idx) -> int { - auto& header = static_cast(*currentSetOfInputs[idx].header(0)); + auto& header = static_cast(*(currentSetOfInputs[idx].messages | get_header{0})); return header.GetRefCount(); }; return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()}; diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 5b85a63bf6c95..7eb851e2aadd8 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -184,11 +184,11 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector int { - auto& header = static_cast(*partial[idx].header(0)); + auto& header = static_cast(*(partial[idx].messages | get_header{0})); return header.GetRefCount(); }; InputSpan span{getter, nPartsGetter, refCountGetter, static_cast(partial.size())}; @@ -246,8 +246,8 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector& comp return partial[idx].messages | count_parts{}; }; auto refCountGetter = [&partial](size_t idx) -> int { - auto& header = static_cast(*partial[idx].header(0)); + auto& header = static_cast(*(partial[idx].messages | get_header{0})); return header.GetRefCount(); }; InputSpan span{getter, nPartsGetter, refCountGetter, static_cast(partial.size())}; diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index e5ca7c5d235e5..e4aa35286942d 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -798,11 +798,11 @@ TEST_CASE("DataRelayer") // one message set containing number of added sequences of messages REQUIRE((messageSet[0].messages | count_parts{}) == sequenceSize.size()); size_t counter = 0; - for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) { + for (size_t seqid = 0; seqid < sequenceSize.size(); ++seqid) { REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]); - for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) { - REQUIRE(messageSet[0].payload(seqid, pi)); - auto const* data = messageSet[0].payload(seqid, pi)->GetData(); + for (size_t pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) { + REQUIRE((messageSet[0].messages | get_payload{seqid, pi})); + auto const* data = (messageSet[0].messages | get_payload{seqid, pi})->GetData(); REQUIRE(*(reinterpret_cast(data)) == counter); ++counter; }