Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Framework/Core/include/Framework/DataProcessingHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "Framework/TimesliceIndex.h"
#include <fairmq/FwdDecls.h>
#include <vector>
#include <span>

namespace o2::framework
{
Expand Down Expand Up @@ -53,9 +54,11 @@ struct DataProcessingHelpers {
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
/// Helper to route messages for forwarding
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy,
std::vector<MessageSet>& currentSetOfInputs,
const bool copyByDefault, bool consume);
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<MessageSet>& currentSetOfInputs,
bool copy, bool consume);
/// Helper to route messages for forwarding
static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& currentSetOfInputs, std::vector<fair::mq::Parts>& forwardedParts,
bool copy, bool consume);
};
} // namespace o2::framework
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/ServiceSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct DeviceSpec;
struct ServiceRegistry;
struct ServiceRegistryRef;
struct DeviceState;
struct ProcessingContext;
class ProcessingContext;
class EndOfStreamContext;
struct ConfigContext;
struct WorkflowSpecNode;
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/StringHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ constexpr auto get_str(const char (&str)[N])
}

template <int N>
constexpr auto get_size(const char (&str)[N])
constexpr auto get_size(const char (&)[N])
{
return N;
}
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copy, consume);
auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copy, consume);

for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
if (forwardedParts[fi].Size() == 0) {
Expand Down
188 changes: 107 additions & 81 deletions Framework/Core/src/DataProcessingHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -228,102 +228,128 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
}
}

auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy,
std::vector<MessageSet>& currentSetOfInputs,
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
void DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& messages, std::vector<fair::mq::Parts>& forwardedParts,
const bool copyByDefault, bool consume)
{
// we collect all messages per forward in a map and send them together
std::vector<fair::mq::Parts> forwardedParts;
forwardedParts.resize(proxy.getNumForwards());
std::vector<ChannelIndex> forwardingChoices{};
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
std::vector<ChannelIndex> forwardingChoices{};
size_t pi = 0;
while (pi < messages.size()) {
auto& header = messages[pi];

for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto& messageSet = currentSetOfInputs[ii];
// If is now possible that the record is not complete when
// we forward it, because of a custom completion policy.
// this means that we need to skip the empty entries in the
// record for being forwarded.
if (header->GetData() == nullptr) {
pi += 2;
continue;
}
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
if (dih) {
pi += 2;
continue;
}
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
if (sih) {
pi += 2;
continue;
}

for (size_t pi = 0; pi < messageSet.size(); ++pi) {
auto& header = messageSet.header(pi);
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());

// If is now possible that the record is not complete when
// we forward it, because of a custom completion policy.
// this means that we need to skip the empty entries in the
// record for being forwarded.
if (header->GetData() == nullptr) {
continue;
}
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
if (dih) {
continue;
}
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
if (sih) {
continue;
}
if (dph == nullptr || dh == nullptr) {
// Complain only if this is not an out-of-band message
LOGP(error, "Data is missing {}{}{}",
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
pi += 2;
continue;
}

auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
// At least one payload.
auto& payload = messages[pi + 1];
// Calculate the number of messages which should be handled together
// all in one go.
size_t numberOfMessages = 0;
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
// Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
numberOfMessages = dh->splitPayloadParts + 1; // one is for the header
} else {
// Sequence of splitPayloadParts (header, payload) pairs belonging together.
// In case splitPayloadParts = 0, we consider this as a single message pair
numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
}

if (dph == nullptr || dh == nullptr) {
// Complain only if this is not an out-of-band message
LOGP(error, "Data is missing {}{}{}",
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
continue;
}
if (payload.get() == nullptr && consume == true) {
// If the payload is not there, it means we already
// processed it with ConsumeExisiting. Therefore we
// need to do something only if this is the last consume.
header.reset(nullptr);
pi += numberOfMessages;
continue;
}

auto& payload = messageSet.payload(pi);
// We need to find the forward route only for the first
// part of a split payload. All the others will use the same.
// Therefore, we reset and recompute the forwarding choice:
//
// - If this is the first payload of a [header0][payload0][header0][payload1]... sequence,
// which is actually always created and handled together. Notice that in this
// case we have splitPayloadParts == splitPayloadIndex
// - If this is the first payload of a [header0][payload0][header1][payload1]... sequence
// belonging to the same multipart message (and therefore we are guaranteed that they
// need to be routed together).
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
// we will already use the same choice in the for loop below.
//

if (payload.get() == nullptr && consume == true) {
// If the payload is not there, it means we already
// processed it with ConsumeExisiting. Therefore we
// need to do something only if this is the last consume.
header.reset(nullptr);
continue;
}
forwardingChoices.clear();
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);

// We need to find the forward route only for the first
// part of a split payload. All the others will use the same.
// Therefore, we reset and recompute the forwarding choice:
//
// - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
// which is actually always created and handled together
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
// we will already use the same choice in the for loop below.
if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
forwardingChoices.clear();
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
}
if (forwardingChoices.empty()) {
// Nothing to forward go to the next messageset
pi += numberOfMessages;
continue;
}

if (forwardingChoices.empty()) {
// Nothing to forward go to the next messageset
continue;
}
// In case of more than one forward route, we need to copy the message.
// This will eventually use the same memory if running with the same backend.
if (copyByDefault || forwardingChoices.size() > 1) {
for (auto& choice : forwardingChoices) {
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);

// In case of more than one forward route, we need to copy the message.
// This will eventually use the same memory if running with the same backend.
if (copyByDefault || forwardingChoices.size() > 1) {
for (auto& choice : forwardingChoices) {
auto&& newHeader = header->GetTransport()->CreateMessage();
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
newHeader->Copy(*header);
forwardedParts[choice.value].AddPart(std::move(newHeader));

for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
auto&& newPayload = header->GetTransport()->CreateMessage();
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
forwardedParts[choice.value].AddPart(std::move(newPayload));
}
}
} else {
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
auto&& newMsg = header->GetTransport()->CreateMessage();
newMsg->Copy(*messages[ppi]);
forwardedParts[choice.value].AddPart(std::move(newMsg));
}
}
} else {
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messages[ppi]));
}
}
pi += numberOfMessages;
}
}

auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
std::vector<MessageSet>& currentSetOfInputs,
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
{
// we collect all messages per forward in a map and send them together
std::vector<fair::mq::Parts> forwardedParts;
forwardedParts.resize(proxy.getNumForwards());
std::vector<ChannelIndex> forwardingChoices{};

for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume);
}
return forwardedParts;
};
Expand Down
Loading