Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions Framework/Core/include/Framework/MessageSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,30 +136,6 @@ struct MessageSet {
}
}

fair::mq::MessagePtr& header(size_t partIndex)
{
return messages[messageMap[partIndex].position];
}

fair::mq::MessagePtr& payload(size_t partIndex, size_t payloadIndex = 0)
{
assert(partIndex < messageMap.size());
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
return messages[messageMap[partIndex].position + payloadIndex + 1];
}

fair::mq::MessagePtr const& header(size_t partIndex) const
{
return messages[messageMap[partIndex].position];
}

fair::mq::MessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const
{
assert(partIndex < messageMap.size());
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
return messages[messageMap[partIndex].position + payloadIndex + 1];
}

fair::mq::MessagePtr const& associatedHeader(size_t pos) const
{
return messages[messageMap[pairMap[pos].partIndex].position];
Expand Down
5 changes: 3 additions & 2 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2140,8 +2140,9 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
// sequence is the header message
// - each part has one or more payload messages
// - InputRecord provides all payloads as header-payload pair
auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
auto const indices = currentSetOfInputs[i].messages | get_pair{partindex};
auto const& headerMsg = currentSetOfInputs[i].messages[indices.headerIdx];
auto const& payloadMsg = currentSetOfInputs[i].messages[indices.payloadIdx];
headerptr = static_cast<char const*>(headerMsg->GetData());
payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
Expand Down
134 changes: 80 additions & 54 deletions Framework/Core/test/test_MessageSet.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ TEST_CASE("MessageSet")
REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0);
REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1);
CHECK_THROWS((msgSet.messages | get_pair{1}));
// Validate pipe operators match old API
REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0));
REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0));
REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size);
REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size());
REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size());
REQUIRE((msgSet.messages | get_num_payloads{0}) == 1);
REQUIRE((msgSet.messages | count_parts{}) == 1);
// messages: [hdr, pl] — one pair
REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0);
REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1);
for (size_t i = 0; i < msgSet.pairMap.size(); ++i) {
auto indices = msgSet.messages | get_pair{i};
REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i));
REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i));
}
}

TEST_CASE("MessageSetWithFunction")
Expand All @@ -76,11 +80,13 @@ TEST_CASE("MessageSetWithFunction")
REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0);
REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1);
CHECK_THROWS((msgSet.messages | get_pair{1}));
REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0));
REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0));
REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size);
REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size());
REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size());
REQUIRE((msgSet.messages | get_num_payloads{0}) == 1);
REQUIRE((msgSet.messages | count_parts{}) == 1);
for (size_t i = 0; i < msgSet.pairMap.size(); ++i) {
auto indices = msgSet.messages | get_pair{i};
REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i));
REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i));
}
}

TEST_CASE("MessageSetWithMultipart")
Expand Down Expand Up @@ -112,13 +118,18 @@ TEST_CASE("MessageSetWithMultipart")
REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 0);
REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 2);
CHECK_THROWS((msgSet.messages | get_pair{2}));
// Validate pipe operators match old API for multi-payload
REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0));
REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0, 0));
REQUIRE(&(msgSet.messages | get_payload{0, 1}) == &msgSet.payload(0, 1));
REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size);
REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size());
REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size());
REQUIRE((msgSet.messages | get_num_payloads{0}) == 2);
REQUIRE((msgSet.messages | count_parts{}) == 1);
// messages: [hdr, pl0, pl1] — one header, two payloads
REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0);
REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1);
REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 0);
REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 2);
for (size_t i = 0; i < msgSet.pairMap.size(); ++i) {
auto indices = msgSet.messages | get_pair{i};
REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i));
REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i));
}
}

TEST_CASE("MessageSetAddPartRef")
Expand Down Expand Up @@ -190,18 +201,16 @@ TEST_CASE("MessageSetAddMultiple")
REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 5);
REQUIRE((msgSet.messages | get_pair{3}).headerIdx == 4);
REQUIRE((msgSet.messages | get_pair{3}).payloadIdx == 6);
// Validate pipe operators match old API for mixed modes
for (size_t i = 0; i < 3; ++i) {
REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i));
REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i, 0));
}
// Part 2 has a second payload (multi-payload with splitPayloadParts=2, splitPayloadIndex=2)
REQUIRE(&(msgSet.messages | get_payload{2, 1}) == &msgSet.payload(2, 1));
for (size_t i = 0; i < 3; ++i) {
REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size);
REQUIRE((msgSet.messages | get_num_payloads{0}) == 1);
REQUIRE((msgSet.messages | get_num_payloads{1}) == 1);
REQUIRE((msgSet.messages | get_num_payloads{2}) == 2);
REQUIRE((msgSet.messages | count_parts{}) == 3);
REQUIRE((msgSet.messages | count_payloads{}) == 4);
for (size_t i = 0; i < msgSet.pairMap.size(); ++i) {
auto indices = msgSet.messages | get_pair{i};
REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i));
REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i));
}
REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size());
REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size());
}

TEST_CASE("GetHeaderPayloadOperators")
Expand Down Expand Up @@ -251,13 +260,18 @@ TEST_CASE("GetHeaderPayloadOperators")
REQUIRE(pl1.get() != nullptr);
REQUIRE(pl1->GetSize() == 200);

// Validate pipe operators match old API
for (size_t i = 0; i < 2; ++i) {
REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i));
REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i, 0));
REQUIRE((msgSet.messages | count_parts{}) == 2);
REQUIRE((msgSet.messages | count_payloads{}) == 2);
// messages: [hdr0, pl0, hdr1, pl1] — two standard pairs
REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0);
REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1);
REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 2);
REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 3);
for (size_t i = 0; i < msgSet.pairMap.size(); ++i) {
auto indices = msgSet.messages | get_pair{i};
REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i));
REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i));
}
REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size());
REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size());
}

TEST_CASE("GetHeaderPayloadMultiPayload")
Expand Down Expand Up @@ -343,18 +357,24 @@ TEST_CASE("GetHeaderPayloadMultiPayload")
// get_num_payloads for part 1 should be 3
REQUIRE((msgSet.messages | get_num_payloads{1}) == 3);

// Validate pipe operators match old API for multi-payload (header, pl, pl, pl)
REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0));
REQUIRE(&(msgSet.messages | get_header{1}) == &msgSet.header(1));
REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0, 0));
REQUIRE(&(msgSet.messages | get_payload{1, 0}) == &msgSet.payload(1, 0));
REQUIRE(&(msgSet.messages | get_payload{1, 1}) == &msgSet.payload(1, 1));
REQUIRE(&(msgSet.messages | get_payload{1, 2}) == &msgSet.payload(1, 2));
for (size_t i = 0; i < 2; ++i) {
REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size);
REQUIRE((msgSet.messages | get_num_payloads{0}) == 1);
REQUIRE((msgSet.messages | get_num_payloads{1}) == 3);
REQUIRE((msgSet.messages | count_parts{}) == 2);
REQUIRE((msgSet.messages | count_payloads{}) == 4);
// messages: [hdr0, pl0, hdr1, pl1_0, pl1_1, pl1_2]
REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0);
REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1);
REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 2);
REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 3);
REQUIRE((msgSet.messages | get_pair{2}).headerIdx == 2);
REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 4);
REQUIRE((msgSet.messages | get_pair{3}).headerIdx == 2);
REQUIRE((msgSet.messages | get_pair{3}).payloadIdx == 5);
for (size_t i = 0; i < msgSet.pairMap.size(); ++i) {
auto indices = msgSet.messages | get_pair{i};
REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i));
REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i));
}
REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size());
REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size());
}

TEST_CASE("TraditionalSplitParts")
Expand Down Expand Up @@ -418,14 +438,20 @@ TEST_CASE("TraditionalSplitParts")

// get_num_payloads: each traditional split pair has 1 payload
for (size_t i = 0; i < 3; ++i) {
REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size);
REQUIRE((msgSet.messages | get_num_payloads{i}) == 1);
}

// Validate pipe operators match old MessageSet::header()/payload() API
for (size_t i = 0; i < 3; ++i) {
REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i));
REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i));
REQUIRE((msgSet.messages | count_parts{}) == 3);
REQUIRE((msgSet.messages | count_payloads{}) == 3);
// messages: [hdr0, pl0, hdr1, pl1, hdr2, pl2] — three traditional split pairs
REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0);
REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1);
REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 2);
REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 3);
REQUIRE((msgSet.messages | get_pair{2}).headerIdx == 4);
REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 5);
for (size_t i = 0; i < msgSet.pairMap.size(); ++i) {
auto indices = msgSet.messages | get_pair{i};
REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i));
REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i));
}
REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size());
REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size());
}
Loading