Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ struct AnalysisDataProcessorBuilder {
analysis_task_parsers::bindInternalIndicesPartition(element, &groupingTable);
return true;
},
task);
task);

if constexpr (sizeof...(Associated) == 0) {
// single argument to process
Expand All @@ -330,7 +330,7 @@ struct AnalysisDataProcessorBuilder {
analysis_task_parsers::setGroupedCombination(element, groupingTable);
return true;
},
task);
task);
if constexpr (soa::is_iterator<G>) {
for (auto& element : groupingTable) {
std::invoke(processingFunction, task, *element);
Expand Down Expand Up @@ -365,7 +365,7 @@ struct AnalysisDataProcessorBuilder {
analysis_task_parsers::bindExternalIndicesPartition(t, &x);
return true;
},
task);
task);
};
groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);

Expand All @@ -381,7 +381,7 @@ struct AnalysisDataProcessorBuilder {
analysis_task_parsers::setGroupedCombination(t, groupingTable, associatedTables);
return true;
},
task);
task);
overwriteInternalIndices(associatedTables, associatedTables);
if constexpr (soa::is_iterator<std::decay_t<G>>) {
auto slicer = GroupSlicer(groupingTable, associatedTables, slices);
Expand All @@ -399,7 +399,7 @@ struct AnalysisDataProcessorBuilder {
analysis_task_parsers::bindExternalIndicesPartition(x, &groupingTable);
return true;
},
task);
task);

invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices);
}
Expand All @@ -409,7 +409,7 @@ struct AnalysisDataProcessorBuilder {
analysis_task_parsers::bindExternalIndicesPartition(x, &groupingTable);
return true;
},
task);
task);

invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables);
}
Expand Down Expand Up @@ -547,7 +547,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
homogeneous_apply_refs_sized<numElements>([&inputs](auto& element) {
return analysis_task_parsers::requestInputs(inputs, element);
},
*task.get());
*task.get());

// no static way to check if the task defines any processing, we can only make sure it subscribes to at least something
if (inputs.empty() == true) {
Expand Down Expand Up @@ -576,7 +576,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
analysis_task_parsers::postRunService(eosContext, element);
analysis_task_parsers::postRunOutput(eosContext, element);
return true; },
*task.get());
*task.get());
eosContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
};

Expand All @@ -595,7 +595,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
homogeneous_apply_refs_sized<numElements>([&expressionInfos](auto& element) {
return analysis_task_parsers::createExpressionTrees(expressionInfos, element);
},
*task.get());
*task.get());

/// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values
if constexpr (requires { &T::process; }) {
Expand All @@ -616,13 +616,13 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
// reset partitions once per dataframe
homogeneous_apply_refs_sized<numElements>([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get());
// reset selections for the next dataframe
std::ranges::for_each(expressionInfos, [](auto& info){ info.resetSelection = true; });
std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; });
// reset pre-slice for the next dataframe
auto slices = pc.services().get<ArrowTableSlicingCache>();
homogeneous_apply_refs_sized<numElements>([&pc, &slices](auto& element) {
return analysis_task_parsers::updateSliceInfo(element, slices);
},
*(task.get()));
*(task.get()));
// initialize local caches
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get()));
// prepare outputs
Expand Down