From 0ee51a044a0397c99fdf29bdf656a7c302c7c5e5 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 14 Jan 2026 13:50:50 +0100 Subject: [PATCH 1/3] DPL: add test for routing messages --- Framework/Core/test/test_ForwardInputs.cxx | 74 ++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/Framework/Core/test/test_ForwardInputs.cxx b/Framework/Core/test/test_ForwardInputs.cxx index fe9f70d1daadb..7081d600080b1 100644 --- a/Framework/Core/test/test_ForwardInputs.cxx +++ b/Framework/Core/test/test_ForwardInputs.cxx @@ -616,6 +616,80 @@ TEST_CASE("ForwardInputsSplitPayload") CHECK(result[1].Size() == 3); } +TEST_CASE("ForwardInputsSplitPayloadNoMessageSet") +{ + o2::header::DataHeader dh; + dh.dataOrigin = "TST"; + dh.dataDescription = "A"; + dh.subSpecification = 0; + dh.splitPayloadIndex = 2; + dh.splitPayloadParts = 2; + + o2::header::DataHeader dh2; + dh2.dataOrigin = "TST"; + dh2.dataDescription = "B"; + dh2.subSpecification = 0; + dh2.splitPayloadIndex = 0; + dh2.splitPayloadParts = 1; + + o2::framework::DataProcessingHeader dph{0, 1}; + + std::vector channels{ + fair::mq::Channel("from_A_to_B"), + fair::mq::Channel("from_A_to_C"), + }; + + bool consume = true; + bool copyByDefault = true; + FairMQDeviceProxy proxy; + std::vector routes{ + ForwardRoute{ + .timeslice = 0, + .maxTimeslices = 1, + .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}}, + .channel = "from_A_to_B", + .policy = nullptr, + }, + ForwardRoute{ + .timeslice = 0, + .maxTimeslices = 1, + .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}}, + .channel = "from_A_to_C", + .policy = nullptr, + }}; + + auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& { + for (auto& channel : channels) { + if (channel.GetName() == channelName) { + return channel; + } + } + throw std::runtime_error("Channel not found"); + }; + + proxy.bind({}, {}, routes, findChannelByName, nullptr); + + auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); + fair::mq::MessagePtr payload1(transport->CreateMessage()); + fair::mq::MessagePtr payload2(transport->CreateMessage()); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); + std::vector> messages; + messages.push_back(std::move(header)); + messages.push_back(std::move(payload1)); + messages.push_back(std::move(payload2)); + auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph}); + messages.push_back(std::move(header2)); + messages.push_back(transport->CreateMessage()); + + std::vector result(2); + auto span = std::span(messages); + o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, span, result, copyByDefault, consume); + REQUIRE(result.size() == 2); // Two routes + CHECK(result[0].Size() == 2); // No messages on this route + CHECK(result[1].Size() == 3); +} + TEST_CASE("ForwardInputEOSSingleRoute") { o2::framework::SourceInfoHeader sih{}; From e2f35883fffbe0ffad088b876a9df79a077470fc Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 14 Jan 2026 13:56:36 +0100 Subject: [PATCH 2/3] DPL: add callback when inserting in the slot --- Framework/Core/include/Framework/DataRelayer.h | 4 ++++ Framework/Core/src/DataProcessingDevice.cxx | 1 + Framework/Core/src/DataRelayer.cxx | 10 ++++++++-- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/Framework/Core/include/Framework/DataRelayer.h b/Framework/Core/include/Framework/DataRelayer.h index 012b909096317..1e010fc12f3d4 100644 --- a/Framework/Core/include/Framework/DataRelayer.h +++ b/Framework/Core/include/Framework/DataRelayer.h @@ -114,6 +114,9 @@ class DataRelayer using OnDropCallback = std::function&, TimesliceIndex::OldestOutputInfo info)>; + // Callback for when some messages are about to be owned by the the DataRelayer + using OnInsertionCallback = std::function&)>; + /// Prune all the pending entries in the cache. void prunePending(OnDropCallback); /// Prune the cache for a given slot @@ -135,6 +138,7 @@ class DataRelayer InputInfo const& info, size_t nMessages, size_t nPayloads = 1, + OnInsertionCallback onInsertion = nullptr, OnDropCallback onDrop = nullptr); /// This is to set the oldest possible @a timeslice this relayer can diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 3925359b056b2..343b567d8b852 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1859,6 +1859,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& input, nMessages, nPayloadsPerHeader, + nullptr, onDrop); switch (relayed.type) { case DataRelayer::RelayChoice::Type::Backpressured: diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 01e7a2b29fd35..ea2c4c0b73316 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -436,7 +436,8 @@ DataRelayer::RelayChoice InputInfo const& info, size_t nMessages, size_t nPayloads, - std::function&, TimesliceIndex::OldestOutputInfo)> onDrop) + OnInsertionCallback onInsertion, + OnDropCallback onDrop) { std::scoped_lock lock(mMutex); DataProcessingHeader const* dph = o2::header::get(rawHeader); @@ -482,6 +483,7 @@ DataRelayer::RelayChoice &messages, &nMessages, &nPayloads, + &onInsertion, &cache = mCache, &services = mContext, numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t { @@ -512,7 +514,11 @@ DataRelayer::RelayChoice mi += nPayloads; continue; } - target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1); + auto span = std::span(messages + mi, messages + mi + nPayloads + 1); + if (onInsertion) { + onInsertion(services, span); + } + target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1); mi += nPayloads; saved += nPayloads; } From 88491fd82cd245e27d57633c1b2e01238835d651 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 14 Jan 2026 13:56:37 +0100 Subject: [PATCH 3/3] DPL: fix how many forwarded parts are needed In principle this is not fatal because the number of routes is always larger / equal than the number of channels by construction. Better safe than sorry. --- Framework/Core/src/DataProcessingHelpers.cxx | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Framework/Core/src/DataProcessingHelpers.cxx b/Framework/Core/src/DataProcessingHelpers.cxx index 2f7a1f65f3bd3..87e7c9bf8962f 100644 --- a/Framework/Core/src/DataProcessingHelpers.cxx +++ b/Framework/Core/src/DataProcessingHelpers.cxx @@ -343,9 +343,7 @@ auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy, const bool copyByDefault, bool consume) -> std::vector { // we collect all messages per forward in a map and send them together - std::vector forwardedParts; - forwardedParts.resize(proxy.getNumForwards()); - std::vector forwardingChoices{}; + std::vector forwardedParts(proxy.getNumForwardChannels()); for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { auto span = std::span(currentSetOfInputs[ii].messages);