From 35e451973f68c8d31941854c7162a1effbd12dc8 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 27 Mar 2026 06:51:38 +0100 Subject: [PATCH 1/2] DPL: remove deprecated header / payload code Rest of the usecases removed. Abstract header / payload retrieval, with the idea that get_header / get_payload will work on any range of fair::mq::MessagePtrs. --- Framework/Core/include/Framework/MessageSet.h | 24 ------ Framework/Core/test/test_MessageSet.cxx | 78 +++++-------------- 2 files changed, 20 insertions(+), 82 deletions(-) diff --git a/Framework/Core/include/Framework/MessageSet.h b/Framework/Core/include/Framework/MessageSet.h index 166934238d647..323c0ad4608af 100644 --- a/Framework/Core/include/Framework/MessageSet.h +++ b/Framework/Core/include/Framework/MessageSet.h @@ -136,30 +136,6 @@ struct MessageSet { } } - fair::mq::MessagePtr& header(size_t partIndex) - { - return messages[messageMap[partIndex].position]; - } - - fair::mq::MessagePtr& payload(size_t partIndex, size_t payloadIndex = 0) - { - assert(partIndex < messageMap.size()); - assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size()); - return messages[messageMap[partIndex].position + payloadIndex + 1]; - } - - fair::mq::MessagePtr const& header(size_t partIndex) const - { - return messages[messageMap[partIndex].position]; - } - - fair::mq::MessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const - { - assert(partIndex < messageMap.size()); - assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size()); - return messages[messageMap[partIndex].position + payloadIndex + 1]; - } - fair::mq::MessagePtr const& associatedHeader(size_t pos) const { return messages[messageMap[pairMap[pos].partIndex].position]; diff --git a/Framework/Core/test/test_MessageSet.cxx b/Framework/Core/test/test_MessageSet.cxx index aa7b49c1d1d3c..c6d5030cf5e33 100644 --- a/Framework/Core/test/test_MessageSet.cxx +++ b/Framework/Core/test/test_MessageSet.cxx @@ -45,12 +45,8 @@ TEST_CASE("MessageSet") REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); CHECK_THROWS((msgSet.messages | get_pair{1})); - // Validate pipe operators match old API - REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); - REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0)); - REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size); - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); + REQUIRE((msgSet.messages | count_parts{}) == 1); } TEST_CASE("MessageSetWithFunction") @@ -76,11 +72,8 @@ TEST_CASE("MessageSetWithFunction") REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); CHECK_THROWS((msgSet.messages | get_pair{1})); - REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); - REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0)); - REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size); - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); + REQUIRE((msgSet.messages | count_parts{}) == 1); } TEST_CASE("MessageSetWithMultipart") @@ -112,13 +105,8 @@ TEST_CASE("MessageSetWithMultipart") REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 0); REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 2); CHECK_THROWS((msgSet.messages | get_pair{2})); - // Validate pipe operators match old API for multi-payload - REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); - REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0, 0)); - REQUIRE(&(msgSet.messages | get_payload{0, 1}) == &msgSet.payload(0, 1)); - REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size); - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 2); + REQUIRE((msgSet.messages | count_parts{}) == 1); } TEST_CASE("MessageSetAddPartRef") @@ -190,18 +178,11 @@ TEST_CASE("MessageSetAddMultiple") REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 5); REQUIRE((msgSet.messages | get_pair{3}).headerIdx == 4); REQUIRE((msgSet.messages | get_pair{3}).payloadIdx == 6); - // Validate pipe operators match old API for mixed modes - for (size_t i = 0; i < 3; ++i) { - REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); - REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i, 0)); - } - // Part 2 has a second payload (multi-payload with splitPayloadParts=2, splitPayloadIndex=2) - REQUIRE(&(msgSet.messages | get_payload{2, 1}) == &msgSet.payload(2, 1)); - for (size_t i = 0; i < 3; ++i) { - REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size); - } - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); + REQUIRE((msgSet.messages | get_num_payloads{1}) == 1); + REQUIRE((msgSet.messages | get_num_payloads{2}) == 2); + REQUIRE((msgSet.messages | count_parts{}) == 3); + REQUIRE((msgSet.messages | count_payloads{}) == 4); } TEST_CASE("GetHeaderPayloadOperators") @@ -251,13 +232,8 @@ TEST_CASE("GetHeaderPayloadOperators") REQUIRE(pl1.get() != nullptr); REQUIRE(pl1->GetSize() == 200); - // Validate pipe operators match old API - for (size_t i = 0; i < 2; ++i) { - REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); - REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i, 0)); - } - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | count_parts{}) == 2); + REQUIRE((msgSet.messages | count_payloads{}) == 2); } TEST_CASE("GetHeaderPayloadMultiPayload") @@ -343,18 +319,10 @@ TEST_CASE("GetHeaderPayloadMultiPayload") // get_num_payloads for part 1 should be 3 REQUIRE((msgSet.messages | get_num_payloads{1}) == 3); - // Validate pipe operators match old API for multi-payload (header, pl, pl, pl) - REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); - REQUIRE(&(msgSet.messages | get_header{1}) == &msgSet.header(1)); - REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0, 0)); - REQUIRE(&(msgSet.messages | get_payload{1, 0}) == &msgSet.payload(1, 0)); - REQUIRE(&(msgSet.messages | get_payload{1, 1}) == &msgSet.payload(1, 1)); - REQUIRE(&(msgSet.messages | get_payload{1, 2}) == &msgSet.payload(1, 2)); - for (size_t i = 0; i < 2; ++i) { - REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size); - } - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); + REQUIRE((msgSet.messages | get_num_payloads{1}) == 3); + REQUIRE((msgSet.messages | count_parts{}) == 2); + REQUIRE((msgSet.messages | count_payloads{}) == 4); } TEST_CASE("TraditionalSplitParts") @@ -418,14 +386,8 @@ TEST_CASE("TraditionalSplitParts") // get_num_payloads: each traditional split pair has 1 payload for (size_t i = 0; i < 3; ++i) { - REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size); + REQUIRE((msgSet.messages | get_num_payloads{i}) == 1); } - - // Validate pipe operators match old MessageSet::header()/payload() API - for (size_t i = 0; i < 3; ++i) { - REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); - REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i)); - } - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | count_parts{}) == 3); + REQUIRE((msgSet.messages | count_payloads{}) == 3); } From a7da1a5bf41c0a9c66e31d0445c5114898262fb5 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 27 Mar 2026 06:51:38 +0100 Subject: [PATCH 2/2] DPL: replace MessageSet::associateHeader / associatePayload --- Framework/Core/src/DataProcessingDevice.cxx | 5 +- Framework/Core/test/test_MessageSet.cxx | 64 +++++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 0fa70947bf18c..bb6502758a95a 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -2140,8 +2140,9 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v // sequence is the header message // - each part has one or more payload messages // - InputRecord provides all payloads as header-payload pair - auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex); - auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex); + auto const indices = currentSetOfInputs[i].messages | get_pair{partindex}; + auto const& headerMsg = currentSetOfInputs[i].messages[indices.headerIdx]; + auto const& payloadMsg = currentSetOfInputs[i].messages[indices.payloadIdx]; headerptr = static_cast(headerMsg->GetData()); payloadptr = payloadMsg ? static_cast(payloadMsg->GetData()) : nullptr; payloadSize = payloadMsg ? payloadMsg->GetSize() : 0; diff --git a/Framework/Core/test/test_MessageSet.cxx b/Framework/Core/test/test_MessageSet.cxx index c6d5030cf5e33..8c9ed4a7cbf1c 100644 --- a/Framework/Core/test/test_MessageSet.cxx +++ b/Framework/Core/test/test_MessageSet.cxx @@ -47,6 +47,14 @@ TEST_CASE("MessageSet") CHECK_THROWS((msgSet.messages | get_pair{1})); REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); REQUIRE((msgSet.messages | count_parts{}) == 1); + // messages: [hdr, pl] — one pair + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } } TEST_CASE("MessageSetWithFunction") @@ -74,6 +82,11 @@ TEST_CASE("MessageSetWithFunction") CHECK_THROWS((msgSet.messages | get_pair{1})); REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); REQUIRE((msgSet.messages | count_parts{}) == 1); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } } TEST_CASE("MessageSetWithMultipart") @@ -107,6 +120,16 @@ TEST_CASE("MessageSetWithMultipart") CHECK_THROWS((msgSet.messages | get_pair{2})); REQUIRE((msgSet.messages | get_num_payloads{0}) == 2); REQUIRE((msgSet.messages | count_parts{}) == 1); + // messages: [hdr, pl0, pl1] — one header, two payloads + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 2); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } } TEST_CASE("MessageSetAddPartRef") @@ -183,6 +206,11 @@ TEST_CASE("MessageSetAddMultiple") REQUIRE((msgSet.messages | get_num_payloads{2}) == 2); REQUIRE((msgSet.messages | count_parts{}) == 3); REQUIRE((msgSet.messages | count_payloads{}) == 4); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } } TEST_CASE("GetHeaderPayloadOperators") @@ -234,6 +262,16 @@ TEST_CASE("GetHeaderPayloadOperators") REQUIRE((msgSet.messages | count_parts{}) == 2); REQUIRE((msgSet.messages | count_payloads{}) == 2); + // messages: [hdr0, pl0, hdr1, pl1] — two standard pairs + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 3); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } } TEST_CASE("GetHeaderPayloadMultiPayload") @@ -323,6 +361,20 @@ TEST_CASE("GetHeaderPayloadMultiPayload") REQUIRE((msgSet.messages | get_num_payloads{1}) == 3); REQUIRE((msgSet.messages | count_parts{}) == 2); REQUIRE((msgSet.messages | count_payloads{}) == 4); + // messages: [hdr0, pl0, hdr1, pl1_0, pl1_1, pl1_2] + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 3); + REQUIRE((msgSet.messages | get_pair{2}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 4); + REQUIRE((msgSet.messages | get_pair{3}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{3}).payloadIdx == 5); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } } TEST_CASE("TraditionalSplitParts") @@ -390,4 +442,16 @@ TEST_CASE("TraditionalSplitParts") } REQUIRE((msgSet.messages | count_parts{}) == 3); REQUIRE((msgSet.messages | count_payloads{}) == 3); + // messages: [hdr0, pl0, hdr1, pl1, hdr2, pl2] — three traditional split pairs + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 3); + REQUIRE((msgSet.messages | get_pair{2}).headerIdx == 4); + REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 5); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } }