From 3243b9b6e50409e7d2dda27d2fde3ec1409904f1 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 24 Oct 2025 17:56:48 +1000 Subject: [PATCH 01/15] introduce concept of DispatchStrategy --- src/main/java/org/dataloader/DataLoader.java | 12 +- .../org/dataloader/DataLoaderOptions.java | 51 +++-- .../org/dataloader/DataLoaderRegistry.java | 53 +++-- .../java/org/dataloader/DispatchStrategy.java | 16 ++ .../dataloader/NotBusyDispatchStrategy.java | 185 ++++++++++++++++++ 5 files changed, 287 insertions(+), 30 deletions(-) create mode 100644 src/main/java/org/dataloader/DispatchStrategy.java create mode 100644 src/main/java/org/dataloader/NotBusyDispatchStrategy.java diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index d80823f9..31e6a2c6 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -229,6 +229,12 @@ public Optional> getIfCompleted(K key) { * @return the future of the value */ public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { + CompletableFuture result = loadImpl(key, keyContext); + options.getDispatchStrategy().loadCalled(this); + return result; + } + + private CompletableFuture loadImpl(@NonNull K key, @Nullable Object keyContext) { return helper.load(nonNull(key), keyContext); } @@ -275,8 +281,9 @@ public CompletableFuture> loadMany(List keys, List keyContext if (i < keyContexts.size()) { keyContext = keyContexts.get(i); } - collect.add(load(key, keyContext)); + collect.add(loadImpl(key, keyContext)); } + options.getDispatchStrategy().loadCalled(this); return CompletableFutureKit.allOf(collect); } @@ -302,8 +309,9 @@ public CompletableFuture> loadMany(Map keysAndContexts) { for (Map.Entry entry : keysAndContexts.entrySet()) { K key = entry.getKey(); Object keyContext = entry.getValue(); - collect.put(key, load(key, keyContext)); + collect.put(key, loadImpl(key, keyContext)); } + options.getDispatchStrategy().loadCalled(this); return CompletableFutureKit.allOf(collect); } diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index f7c006fa..e8628ab6 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -55,6 +55,7 @@ public class DataLoaderOptions { private final ValueCacheOptions valueCacheOptions; private final BatchLoaderScheduler batchLoaderScheduler; private final DataLoaderInstrumentation instrumentation; + private final DispatchStrategy dispatchStrategy; /** * Creates a new data loader options with default settings. @@ -72,6 +73,7 @@ public DataLoaderOptions() { valueCacheOptions = DEFAULT_VALUE_CACHE_OPTIONS; batchLoaderScheduler = null; instrumentation = DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION; + dispatchStrategy = DispatchStrategy.NO_OP; } private DataLoaderOptions(Builder builder) { @@ -87,6 +89,7 @@ private DataLoaderOptions(Builder builder) { this.valueCacheOptions = builder.valueCacheOptions; this.batchLoaderScheduler = builder.batchLoaderScheduler; this.instrumentation = builder.instrumentation; + this.dispatchStrategy = builder.dispatchStrategy; } /** @@ -116,6 +119,7 @@ public static DataLoaderOptions.Builder newOptions(DataLoaderOptions otherOption * Will transform the current options in to a builder ands allow you to build a new set of options * * @param builderConsumer the consumer of a builder that has this objects starting values + * * @return a new {@link DataLoaderOptions} object */ public DataLoaderOptions transform(Consumer builderConsumer) { @@ -126,19 +130,21 @@ public DataLoaderOptions transform(Consumer builderConsumer) { @Override public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) return false; + if (o == null || getClass() != o.getClass()) { + return false; + } DataLoaderOptions that = (DataLoaderOptions) o; return batchingEnabled == that.batchingEnabled - && cachingEnabled == that.cachingEnabled - && cachingExceptionsEnabled == that.cachingExceptionsEnabled - && maxBatchSize == that.maxBatchSize - && Objects.equals(cacheKeyFunction, that.cacheKeyFunction) && - Objects.equals(cacheMap, that.cacheMap) && - Objects.equals(valueCache, that.valueCache) && - Objects.equals(statisticsCollector, that.statisticsCollector) && - Objects.equals(environmentProvider, that.environmentProvider) && - Objects.equals(valueCacheOptions, that.valueCacheOptions) && - Objects.equals(batchLoaderScheduler, that.batchLoaderScheduler); + && cachingEnabled == that.cachingEnabled + && cachingExceptionsEnabled == that.cachingExceptionsEnabled + && maxBatchSize == that.maxBatchSize + && Objects.equals(cacheKeyFunction, that.cacheKeyFunction) && + Objects.equals(cacheMap, that.cacheMap) && + Objects.equals(valueCache, that.valueCache) && + Objects.equals(statisticsCollector, that.statisticsCollector) && + Objects.equals(environmentProvider, that.environmentProvider) && + Objects.equals(valueCacheOptions, that.valueCacheOptions) && + Objects.equals(batchLoaderScheduler, that.batchLoaderScheduler); } @@ -254,7 +260,12 @@ public DataLoaderInstrumentation getInstrumentation() { return instrumentation; } + public DispatchStrategy getDispatchStrategy() { + return dispatchStrategy; + } + public static class Builder { + private DispatchStrategy dispatchStrategy = DispatchStrategy.NO_OP; private boolean batchingEnabled; private boolean cachingEnabled; private boolean cachingExceptionsEnabled; @@ -285,12 +296,14 @@ public Builder() { this.valueCacheOptions = other.valueCacheOptions; this.batchLoaderScheduler = other.batchLoaderScheduler; this.instrumentation = other.instrumentation; + this.dispatchStrategy = other.dispatchStrategy; } /** * Sets the option that determines whether batch loading is enabled. * * @param batchingEnabled {@code true} to enable batch loading, {@code false} otherwise + * * @return this builder for fluent coding */ public Builder setBatchingEnabled(boolean batchingEnabled) { @@ -302,6 +315,7 @@ public Builder setBatchingEnabled(boolean batchingEnabled) { * Sets the option that determines whether caching is enabled. * * @param cachingEnabled {@code true} to enable caching, {@code false} otherwise + * * @return this builder for fluent coding */ public Builder setCachingEnabled(boolean cachingEnabled) { @@ -313,6 +327,7 @@ public Builder setCachingEnabled(boolean cachingEnabled) { * Sets the option that determines whether exceptional values are cache enabled. * * @param cachingExceptionsEnabled {@code true} to enable caching exceptional values, {@code false} otherwise + * * @return this builder for fluent coding */ public Builder setCachingExceptionsEnabled(boolean cachingExceptionsEnabled) { @@ -324,6 +339,7 @@ public Builder setCachingExceptionsEnabled(boolean cachingExceptionsEnabled) { * Sets the function to use for creating the cache key, if caching is enabled. * * @param cacheKeyFunction the cache key function to use + * * @return this builder for fluent coding */ public Builder setCacheKeyFunction(CacheKey cacheKeyFunction) { @@ -335,6 +351,7 @@ public Builder setCacheKeyFunction(CacheKey cacheKeyFunction) { * Sets the cache map implementation to use for caching, if caching is enabled. * * @param cacheMap the cache map instance + * * @return this builder for fluent coding */ public Builder setCacheMap(CacheMap cacheMap) { @@ -346,6 +363,7 @@ public Builder setCacheMap(CacheMap cacheMap) { * Sets the value cache implementation to use for caching values, if caching is enabled. * * @param valueCache the value cache instance + * * @return this builder for fluent coding */ public Builder setValueCache(ValueCache valueCache) { @@ -358,6 +376,7 @@ public Builder setValueCache(ValueCache valueCache) { * before they are split into multiple class * * @param maxBatchSize the maximum batch size + * * @return this builder for fluent coding */ public Builder setMaxBatchSize(int maxBatchSize) { @@ -371,6 +390,7 @@ public Builder setMaxBatchSize(int maxBatchSize) { * a common value * * @param statisticsCollector the statistics collector to use + * * @return this builder for fluent coding */ public Builder setStatisticsCollector(Supplier statisticsCollector) { @@ -382,6 +402,7 @@ public Builder setStatisticsCollector(Supplier statisticsCo * Sets the batch loader environment provider that will be used to give context to batch load functions * * @param environmentProvider the batch loader context provider + * * @return this builder for fluent coding */ public Builder setBatchLoaderContextProvider(BatchLoaderContextProvider environmentProvider) { @@ -393,6 +414,7 @@ public Builder setBatchLoaderContextProvider(BatchLoaderContextProvider environm * Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used * * @param valueCacheOptions the value cache options + * * @return this builder for fluent coding */ public Builder setValueCacheOptions(ValueCacheOptions valueCacheOptions) { @@ -405,6 +427,7 @@ public Builder setValueCacheOptions(ValueCacheOptions valueCacheOptions) { * to some future time. * * @param batchLoaderScheduler the scheduler + * * @return this builder for fluent coding */ public Builder setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) { @@ -416,6 +439,7 @@ public Builder setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler * Sets in a new {@link DataLoaderInstrumentation} * * @param instrumentation the new {@link DataLoaderInstrumentation} + * * @return this builder for fluent coding */ public Builder setInstrumentation(DataLoaderInstrumentation instrumentation) { @@ -423,6 +447,11 @@ public Builder setInstrumentation(DataLoaderInstrumentation instrumentation) { return this; } + public Builder setDispatchStrategy(DispatchStrategy dispatchStrategy) { + this.dispatchStrategy = dispatchStrategy; + return this; + } + public DataLoaderOptions build() { return new DataLoaderOptions(this); } diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 6bc79f64..028966ee 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -46,24 +46,27 @@ public class DataLoaderRegistry { protected final Map> dataLoaders; protected final @Nullable DataLoaderInstrumentation instrumentation; + private final DispatchStrategy dispatchStrategy; + public DataLoaderRegistry() { - this(new ConcurrentHashMap<>(), null); + this(new ConcurrentHashMap<>(), null, DispatchStrategy.NO_OP); } private DataLoaderRegistry(Builder builder) { - this(builder.dataLoaders, builder.instrumentation); + this(builder.dataLoaders, builder.instrumentation, builder.dispatchStrategy); } - protected DataLoaderRegistry(Map> dataLoaders, @Nullable DataLoaderInstrumentation instrumentation) { + protected DataLoaderRegistry(Map> dataLoaders, @Nullable DataLoaderInstrumentation instrumentation, DispatchStrategy dispatchStrategy) { this.dataLoaders = instrumentDLs(dataLoaders, instrumentation); this.instrumentation = instrumentation; + this.dispatchStrategy = dispatchStrategy; } private Map> instrumentDLs(Map> incomingDataLoaders, @Nullable DataLoaderInstrumentation registryInstrumentation) { Map> dataLoaders = new ConcurrentHashMap<>(incomingDataLoaders); if (registryInstrumentation != null) { - dataLoaders.replaceAll((k, existingDL) -> nameAndInstrumentDL(k, registryInstrumentation, existingDL)); + dataLoaders.replaceAll((k, existingDL) -> nameAndInstrumentDL(k, registryInstrumentation, existingDL, dispatchStrategy)); } return dataLoaders; } @@ -74,9 +77,10 @@ protected DataLoaderRegistry(Map> dataLoaders, @Nullabl * @param key the key used to register the data loader * @param registryInstrumentation the common registry {@link DataLoaderInstrumentation} * @param existingDL the existing data loader + * * @return a new {@link DataLoader} or the same one if there is nothing to change */ - private static DataLoader nameAndInstrumentDL(String key, @Nullable DataLoaderInstrumentation registryInstrumentation, DataLoader existingDL) { + private static DataLoader nameAndInstrumentDL(String key, @Nullable DataLoaderInstrumentation registryInstrumentation, DataLoader existingDL, DispatchStrategy dispatchStrategy) { existingDL = checkAndSetName(key, existingDL); if (registryInstrumentation == null) { @@ -92,18 +96,18 @@ protected DataLoaderRegistry(Map> dataLoaders, @Nullabl } if (existingInstrumentation == DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION) { // replace it with the registry one - return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation); + return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation, dispatchStrategy); } if (existingInstrumentation instanceof ChainedDataLoaderInstrumentation) { // avoids calling a chained inside a chained DataLoaderInstrumentation newInstrumentation = ((ChainedDataLoaderInstrumentation) existingInstrumentation).prepend(registryInstrumentation); - return mkInstrumentedDataLoader(existingDL, options, newInstrumentation); + return mkInstrumentedDataLoader(existingDL, options, newInstrumentation, dispatchStrategy); } else { DataLoaderInstrumentation newInstrumentation = new ChainedDataLoaderInstrumentation().add(registryInstrumentation).add(existingInstrumentation); - return mkInstrumentedDataLoader(existingDL, options, newInstrumentation); + return mkInstrumentedDataLoader(existingDL, options, newInstrumentation, dispatchStrategy); } } else { - return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation); + return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation, dispatchStrategy); } } @@ -116,12 +120,12 @@ protected DataLoaderRegistry(Map> dataLoaders, @Nullabl return dataLoader; } - private static DataLoader mkInstrumentedDataLoader(DataLoader existingDL, DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation) { - return existingDL.transform(builder -> builder.options(setInInstrumentation(options, newInstrumentation))); + private static DataLoader mkInstrumentedDataLoader(DataLoader existingDL, DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) { + return existingDL.transform(builder -> builder.options(setInInstrumentation(options, newInstrumentation, dispatchStrategy))); } - private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation) { - return options.transform(optionsBuilder -> optionsBuilder.setInstrumentation(newInstrumentation)); + private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) { + return options.transform(optionsBuilder -> optionsBuilder.setInstrumentation(newInstrumentation).setDispatchStrategy(dispatchStrategy)); } /** @@ -140,11 +144,12 @@ private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, * object that was registered. * * @param dataLoader the named data loader to register + * * @return this registry */ public DataLoaderRegistry register(DataLoader dataLoader) { String name = Assertions.nonNull(dataLoader.getName(), () -> "The DataLoader must have a non null name"); - dataLoaders.put(name, nameAndInstrumentDL(name, instrumentation, dataLoader)); + dataLoaders.put(name, nameAndInstrumentDL(name, instrumentation, dataLoader, dispatchStrategy)); return this; } @@ -157,10 +162,11 @@ public DataLoaderRegistry register(DataLoader dataLoader) { * * @param key the key to put the data loader under * @param dataLoader the data loader to register + * * @return this registry */ public DataLoaderRegistry register(String key, DataLoader dataLoader) { - dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader)); + dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader, dispatchStrategy)); return this; } @@ -173,10 +179,11 @@ public DataLoaderRegistry register(String key, DataLoader dataLoader) { * * @param key the key to put the data loader under * @param dataLoader the data loader to register + * * @return the data loader instance that was registered */ public DataLoader registerAndGet(String key, DataLoader dataLoader) { - dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader)); + dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader, dispatchStrategy)); return Objects.requireNonNull(getDataLoader(key)); } @@ -195,6 +202,7 @@ public DataLoader registerAndGet(String key, DataLoader dataL * @param mappingFunction the function to compute a data loader * @param the type of keys * @param the type of values + * * @return a data loader */ @SuppressWarnings("unchecked") @@ -202,7 +210,7 @@ public DataLoader computeIfAbsent(final String key, final Function> mappingFunction) { return (DataLoader) dataLoaders.computeIfAbsent(key, (k) -> { DataLoader dl = mappingFunction.apply(k); - return nameAndInstrumentDL(key, instrumentation, dl); + return nameAndInstrumentDL(key, instrumentation, dl, dispatchStrategy); }); } @@ -211,6 +219,7 @@ public DataLoader computeIfAbsent(final String key, * and return a new combined registry * * @param registry the registry to combine into this registry + * * @return a new combined registry */ public DataLoaderRegistry combine(DataLoaderRegistry registry) { @@ -239,6 +248,7 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { * This will unregister a new dataloader * * @param key the key of the data loader to unregister + * * @return this registry */ public DataLoaderRegistry unregister(String key) { @@ -252,6 +262,7 @@ public DataLoaderRegistry unregister(String key) { * @param key the key of the data loader * @param the type of keys * @param the type of values + * * @return a data loader or null if it's not present */ @SuppressWarnings("unchecked") @@ -322,6 +333,7 @@ public static Builder newRegistry() { public static class Builder { private final Map> dataLoaders = new HashMap<>(); + public DispatchStrategy dispatchStrategy = DispatchStrategy.NO_OP; private @Nullable DataLoaderInstrumentation instrumentation; /** @@ -329,6 +341,7 @@ public static class Builder { * * @param key the key to put the data loader under * @param dataLoader the data loader to register + * * @return this builder for a fluent pattern */ public Builder register(String key, DataLoader dataLoader) { @@ -341,6 +354,7 @@ public Builder register(String key, DataLoader dataLoader) { * from a previous {@link DataLoaderRegistry} * * @param otherRegistry the previous {@link DataLoaderRegistry} + * * @return this builder for a fluent pattern */ public Builder registerAll(DataLoaderRegistry otherRegistry) { @@ -353,6 +367,11 @@ public Builder instrumentation(DataLoaderInstrumentation instrumentation) { return this; } + public Builder dispatchStrategy(DispatchStrategy dispatchStrategy) { + this.dispatchStrategy = dispatchStrategy; + return this; + } + /** * @return the newly built {@link DataLoaderRegistry} */ diff --git a/src/main/java/org/dataloader/DispatchStrategy.java b/src/main/java/org/dataloader/DispatchStrategy.java new file mode 100644 index 00000000..8efed668 --- /dev/null +++ b/src/main/java/org/dataloader/DispatchStrategy.java @@ -0,0 +1,16 @@ +package org.dataloader; + +import org.dataloader.annotations.PublicApi; +import org.jspecify.annotations.NullMarked; + +@NullMarked +@PublicApi +public interface DispatchStrategy { + + DispatchStrategy NO_OP = new DispatchStrategy() { + }; + + default void loadCalled(DataLoader dataLoader) { + + } +} diff --git a/src/main/java/org/dataloader/NotBusyDispatchStrategy.java b/src/main/java/org/dataloader/NotBusyDispatchStrategy.java new file mode 100644 index 00000000..f43ec908 --- /dev/null +++ b/src/main/java/org/dataloader/NotBusyDispatchStrategy.java @@ -0,0 +1,185 @@ +package org.dataloader; + +import org.dataloader.annotations.PublicApi; +import org.jspecify.annotations.NullMarked; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A dispatch strategy that dispatches immediately if it is not busy and not currently dispatching. + *

+ * Busy is determined by a busy counter, which is > 0 if busy, or zero when not busy. + * The two methods to increase and decrease the busy counter are {@link #incrementBusyCount()} and {@link #decrementBusyCount()} + *

+ * This Strategy must be configured as part of {@link DispatchStrategy} + */ +@PublicApi +@NullMarked +public class NotBusyDispatchStrategy implements DispatchStrategy { + + + // 30 bits for busy counting + // 1 bit for dataLoaderToDispatch + // 1 bit for currentlyDispatching + + // Bit positions (from right to left) + static final int currentlyDispatchingShift = 0; + static final int dataLoaderToDispatchShift = 1; + static final int busyCountShift = 2; + + // mask + static final int booleanMask = 1; + static final int busyCountMask = (1 << 30) - 1; + + public static int getBusyCount(int state) { + return (state >> busyCountShift) & busyCountMask; + } + + public static int setBusyCount(int state, int busyCount) { + return (state & ~(busyCountMask << busyCountShift)) | + (busyCount << busyCountShift); + } + + public static int setDataLoaderToDispatch(int state, boolean dataLoaderToDispatch) { + return (state & ~(booleanMask << dataLoaderToDispatchShift)) | + ((dataLoaderToDispatch ? 1 : 0) << dataLoaderToDispatchShift); + } + + public static int setCurrentlyDispatching(int state, boolean currentlyDispatching) { + return (state & ~(booleanMask << currentlyDispatchingShift)) | + ((currentlyDispatching ? 1 : 0) << currentlyDispatchingShift); + } + + + public static boolean getDataLoaderToDispatch(int state) { + return ((state >> dataLoaderToDispatchShift) & booleanMask) != 0; + } + + public static boolean getCurrentlyDispatching(int state) { + return ((state >> currentlyDispatchingShift) & booleanMask) != 0; + } + + + private final AtomicInteger state = new AtomicInteger(); + private final DataLoaderRegistry dataLoaderRegistry; + + public NotBusyDispatchStrategy(DataLoaderRegistry dataLoaderRegistry) { + this.dataLoaderRegistry = dataLoaderRegistry; + } + + + private int incrementBusyCountImpl() { + while (true) { + int oldState = getState(); + int busyCount = getBusyCount(oldState); + int newState = setBusyCount(oldState, busyCount + 1); + if (tryUpdateState(oldState, newState)) { + return newState; + } + } + } + + private int decrementBusyCountImpl() { + while (true) { + int oldState = getState(); + int busyCount = getBusyCount(oldState); + int newState = setBusyCount(oldState, busyCount - 1); + if (tryUpdateState(oldState, newState)) { + return newState; + } + } + } + + private int getState() { + return state.get(); + } + + + private boolean tryUpdateState(int oldState, int newState) { + return state.compareAndSet(oldState, newState); + } + + + public void decrementBusyCount() { + int newState = decrementBusyCountImpl(); + if (getBusyCount(newState) == 0 && getDataLoaderToDispatch(newState) && !getCurrentlyDispatching(newState)) { + dispatchImpl(); + } + } + + public void incrementBusyCount() { + incrementBusyCountImpl(); + } + + + private void newDataLoaderInvocationMaybeDispatch() { + int currentState; + while (true) { + int oldState = getState(); + if (getDataLoaderToDispatch(oldState)) { + return; + } + int newState = setDataLoaderToDispatch(oldState, true); + if (tryUpdateState(oldState, newState)) { + currentState = newState; + break; + } + } + + if (getBusyCount(currentState) == 0 && !getCurrentlyDispatching(currentState)) { + dispatchImpl(); + } + } + + + private void dispatchImpl() { + while (true) { + int oldState = getState(); + if (!getDataLoaderToDispatch(oldState)) { + int newState = setCurrentlyDispatching(oldState, false); + if (tryUpdateState(oldState, newState)) { + return; + } + } + int newState = setCurrentlyDispatching(oldState, true); + newState = setDataLoaderToDispatch(newState, false); + if (tryUpdateState(oldState, newState)) { + break; + } + } + + List> dataLoaders = dataLoaderRegistry.getDataLoaders(); + List>> allDispatchedCFs = new ArrayList<>(); + for (DataLoader dataLoader : dataLoaders) { + CompletableFuture> dispatch = dataLoader.dispatch(); + allDispatchedCFs.add(dispatch); + } + CompletableFuture.allOf(allDispatchedCFs.toArray(new CompletableFuture[0])) + .whenComplete((unused, throwable) -> { + dispatchImpl(); + }); + + } + + @Override + public void loadCalled(DataLoader dataLoader) { + newDataLoaderInvocationMaybeDispatch(); + } + + private static String printState(int state) { + return "busyCount= " + getBusyCount(state) + + ",dataLoaderToDispatch= " + getDataLoaderToDispatch(state) + + ",currentlyDispatching= " + getCurrentlyDispatching(state); + } + + @Override + public String toString() { + return "NotBusyDispatchStrategy{" + + "state=" + printState(getState()) + + ", dataLoaderRegistry=" + dataLoaderRegistry + + '}'; + } +} From e81ea3cde6f2a8c2087162751a53dc6c86a09ad8 Mon Sep 17 00:00:00 2001 From: zsmoore Date: Tue, 6 Jan 2026 15:34:11 -0500 Subject: [PATCH 02/15] Update dispatch strategy and impl bfs strategy --- src/main/java/org/dataloader/DataLoader.java | 6 +- .../java/org/dataloader/DataLoaderHelper.java | 8 + .../org/dataloader/DataLoaderRegistry.java | 20 +- .../java/org/dataloader/DispatchStrategy.java | 12 +- .../dataloader/NotBusyDispatchStrategy.java | 185 -------------- .../ScheduledDataLoaderRegistry.java | 3 +- .../BreadthFirstChainedDispatchStrategy.java | 131 ++++++++++ ...eadthFirstChainedDispatchStrategyTest.java | 229 ++++++++++++++++++ 8 files changed, 395 insertions(+), 199 deletions(-) delete mode 100644 src/main/java/org/dataloader/NotBusyDispatchStrategy.java create mode 100644 src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java create mode 100644 src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index 31e6a2c6..66cf7984 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -229,9 +229,7 @@ public Optional> getIfCompleted(K key) { * @return the future of the value */ public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { - CompletableFuture result = loadImpl(key, keyContext); - options.getDispatchStrategy().loadCalled(this); - return result; + return loadImpl(key, keyContext); } private CompletableFuture loadImpl(@NonNull K key, @Nullable Object keyContext) { @@ -283,7 +281,6 @@ public CompletableFuture> loadMany(List keys, List keyContext } collect.add(loadImpl(key, keyContext)); } - options.getDispatchStrategy().loadCalled(this); return CompletableFutureKit.allOf(collect); } @@ -311,7 +308,6 @@ public CompletableFuture> loadMany(Map keysAndContexts) { Object keyContext = entry.getValue(); collect.put(key, loadImpl(key, keyContext)); } - options.getDispatchStrategy().loadCalled(this); return CompletableFutureKit.allOf(collect); } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 249c1f25..c2913a9f 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -158,7 +158,9 @@ CompletableFuture load(K key, Object loadContext) { // We already have a promise for this key, no need to check value cache or queue up load stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); ctx.onDispatched(); + loaderOptions.getDispatchStrategy().loadCalled(); cachedFuture.whenComplete(ctx::onCompleted); + cachedFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted()); return cachedFuture; } } catch (Exception ignored) { @@ -173,7 +175,9 @@ CompletableFuture load(K key, Object loadContext) { // another thread was faster and created a matching CF ... hence this is really a cachehit and we are done stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); ctx.onDispatched(); + loaderOptions.getDispatchStrategy().loadCalled(); cachedFuture.whenComplete(ctx::onCompleted); + cachedFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted()); return cachedFuture; } } @@ -190,14 +194,18 @@ CompletableFuture load(K key, Object loadContext) { // meaning this is a cache hit and we are done stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); ctx.onDispatched(); + loaderOptions.getDispatchStrategy().loadCalled(); cachedFuture.whenComplete(ctx::onCompleted); + cachedFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted()); return cachedFuture; } } } ctx.onDispatched(); + loaderOptions.getDispatchStrategy().loadCalled(); loadCallFuture.whenComplete(ctx::onCompleted); + loadCallFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted()); return loadCallFuture; } diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 028966ee..d41eda5a 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -58,14 +58,15 @@ private DataLoaderRegistry(Builder builder) { } protected DataLoaderRegistry(Map> dataLoaders, @Nullable DataLoaderInstrumentation instrumentation, DispatchStrategy dispatchStrategy) { - this.dataLoaders = instrumentDLs(dataLoaders, instrumentation); + this.dataLoaders = instrumentDLs(dataLoaders, instrumentation, dispatchStrategy); this.instrumentation = instrumentation; this.dispatchStrategy = dispatchStrategy; + dispatchStrategy.onRegistryCreation(this); } - private Map> instrumentDLs(Map> incomingDataLoaders, @Nullable DataLoaderInstrumentation registryInstrumentation) { + private Map> instrumentDLs(Map> incomingDataLoaders, @Nullable DataLoaderInstrumentation registryInstrumentation, DispatchStrategy dispatchStrategy) { Map> dataLoaders = new ConcurrentHashMap<>(incomingDataLoaders); - if (registryInstrumentation != null) { + if (registryInstrumentation != null || dispatchStrategy != DispatchStrategy.NO_OP) { dataLoaders.replaceAll((k, existingDL) -> nameAndInstrumentDL(k, registryInstrumentation, existingDL, dispatchStrategy)); } return dataLoaders; @@ -83,7 +84,7 @@ protected DataLoaderRegistry(Map> dataLoaders, @Nullabl private static DataLoader nameAndInstrumentDL(String key, @Nullable DataLoaderInstrumentation registryInstrumentation, DataLoader existingDL, DispatchStrategy dispatchStrategy) { existingDL = checkAndSetName(key, existingDL); - if (registryInstrumentation == null) { + if (registryInstrumentation == null && dispatchStrategy == DispatchStrategy.NO_OP) { return existingDL; } DataLoaderOptions options = existingDL.getOptions(); @@ -120,12 +121,17 @@ protected DataLoaderRegistry(Map> dataLoaders, @Nullabl return dataLoader; } - private static DataLoader mkInstrumentedDataLoader(DataLoader existingDL, DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) { + private static DataLoader mkInstrumentedDataLoader(DataLoader existingDL, DataLoaderOptions options, @Nullable DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) { return existingDL.transform(builder -> builder.options(setInInstrumentation(options, newInstrumentation, dispatchStrategy))); } - private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) { - return options.transform(optionsBuilder -> optionsBuilder.setInstrumentation(newInstrumentation).setDispatchStrategy(dispatchStrategy)); + private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, @Nullable DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) { + return options.transform(optionsBuilder -> { + optionsBuilder.setDispatchStrategy(dispatchStrategy); + if (newInstrumentation != null) { + optionsBuilder.setInstrumentation(newInstrumentation); + } + }); } /** diff --git a/src/main/java/org/dataloader/DispatchStrategy.java b/src/main/java/org/dataloader/DispatchStrategy.java index 8efed668..8e0865cd 100644 --- a/src/main/java/org/dataloader/DispatchStrategy.java +++ b/src/main/java/org/dataloader/DispatchStrategy.java @@ -3,6 +3,8 @@ import org.dataloader.annotations.PublicApi; import org.jspecify.annotations.NullMarked; +import java.util.concurrent.atomic.AtomicBoolean; + @NullMarked @PublicApi public interface DispatchStrategy { @@ -10,7 +12,15 @@ public interface DispatchStrategy { DispatchStrategy NO_OP = new DispatchStrategy() { }; - default void loadCalled(DataLoader dataLoader) { + default void onRegistryCreation(DataLoaderRegistry registry) { + + } + + default void loadCalled() { + + } + + default void loadCompleted() { } } diff --git a/src/main/java/org/dataloader/NotBusyDispatchStrategy.java b/src/main/java/org/dataloader/NotBusyDispatchStrategy.java deleted file mode 100644 index f43ec908..00000000 --- a/src/main/java/org/dataloader/NotBusyDispatchStrategy.java +++ /dev/null @@ -1,185 +0,0 @@ -package org.dataloader; - -import org.dataloader.annotations.PublicApi; -import org.jspecify.annotations.NullMarked; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * A dispatch strategy that dispatches immediately if it is not busy and not currently dispatching. - *

- * Busy is determined by a busy counter, which is > 0 if busy, or zero when not busy. - * The two methods to increase and decrease the busy counter are {@link #incrementBusyCount()} and {@link #decrementBusyCount()} - *

- * This Strategy must be configured as part of {@link DispatchStrategy} - */ -@PublicApi -@NullMarked -public class NotBusyDispatchStrategy implements DispatchStrategy { - - - // 30 bits for busy counting - // 1 bit for dataLoaderToDispatch - // 1 bit for currentlyDispatching - - // Bit positions (from right to left) - static final int currentlyDispatchingShift = 0; - static final int dataLoaderToDispatchShift = 1; - static final int busyCountShift = 2; - - // mask - static final int booleanMask = 1; - static final int busyCountMask = (1 << 30) - 1; - - public static int getBusyCount(int state) { - return (state >> busyCountShift) & busyCountMask; - } - - public static int setBusyCount(int state, int busyCount) { - return (state & ~(busyCountMask << busyCountShift)) | - (busyCount << busyCountShift); - } - - public static int setDataLoaderToDispatch(int state, boolean dataLoaderToDispatch) { - return (state & ~(booleanMask << dataLoaderToDispatchShift)) | - ((dataLoaderToDispatch ? 1 : 0) << dataLoaderToDispatchShift); - } - - public static int setCurrentlyDispatching(int state, boolean currentlyDispatching) { - return (state & ~(booleanMask << currentlyDispatchingShift)) | - ((currentlyDispatching ? 1 : 0) << currentlyDispatchingShift); - } - - - public static boolean getDataLoaderToDispatch(int state) { - return ((state >> dataLoaderToDispatchShift) & booleanMask) != 0; - } - - public static boolean getCurrentlyDispatching(int state) { - return ((state >> currentlyDispatchingShift) & booleanMask) != 0; - } - - - private final AtomicInteger state = new AtomicInteger(); - private final DataLoaderRegistry dataLoaderRegistry; - - public NotBusyDispatchStrategy(DataLoaderRegistry dataLoaderRegistry) { - this.dataLoaderRegistry = dataLoaderRegistry; - } - - - private int incrementBusyCountImpl() { - while (true) { - int oldState = getState(); - int busyCount = getBusyCount(oldState); - int newState = setBusyCount(oldState, busyCount + 1); - if (tryUpdateState(oldState, newState)) { - return newState; - } - } - } - - private int decrementBusyCountImpl() { - while (true) { - int oldState = getState(); - int busyCount = getBusyCount(oldState); - int newState = setBusyCount(oldState, busyCount - 1); - if (tryUpdateState(oldState, newState)) { - return newState; - } - } - } - - private int getState() { - return state.get(); - } - - - private boolean tryUpdateState(int oldState, int newState) { - return state.compareAndSet(oldState, newState); - } - - - public void decrementBusyCount() { - int newState = decrementBusyCountImpl(); - if (getBusyCount(newState) == 0 && getDataLoaderToDispatch(newState) && !getCurrentlyDispatching(newState)) { - dispatchImpl(); - } - } - - public void incrementBusyCount() { - incrementBusyCountImpl(); - } - - - private void newDataLoaderInvocationMaybeDispatch() { - int currentState; - while (true) { - int oldState = getState(); - if (getDataLoaderToDispatch(oldState)) { - return; - } - int newState = setDataLoaderToDispatch(oldState, true); - if (tryUpdateState(oldState, newState)) { - currentState = newState; - break; - } - } - - if (getBusyCount(currentState) == 0 && !getCurrentlyDispatching(currentState)) { - dispatchImpl(); - } - } - - - private void dispatchImpl() { - while (true) { - int oldState = getState(); - if (!getDataLoaderToDispatch(oldState)) { - int newState = setCurrentlyDispatching(oldState, false); - if (tryUpdateState(oldState, newState)) { - return; - } - } - int newState = setCurrentlyDispatching(oldState, true); - newState = setDataLoaderToDispatch(newState, false); - if (tryUpdateState(oldState, newState)) { - break; - } - } - - List> dataLoaders = dataLoaderRegistry.getDataLoaders(); - List>> allDispatchedCFs = new ArrayList<>(); - for (DataLoader dataLoader : dataLoaders) { - CompletableFuture> dispatch = dataLoader.dispatch(); - allDispatchedCFs.add(dispatch); - } - CompletableFuture.allOf(allDispatchedCFs.toArray(new CompletableFuture[0])) - .whenComplete((unused, throwable) -> { - dispatchImpl(); - }); - - } - - @Override - public void loadCalled(DataLoader dataLoader) { - newDataLoaderInvocationMaybeDispatch(); - } - - private static String printState(int state) { - return "busyCount= " + getBusyCount(state) + - ",dataLoaderToDispatch= " + getDataLoaderToDispatch(state) + - ",currentlyDispatching= " + getCurrentlyDispatching(state); - } - - @Override - public String toString() { - return "NotBusyDispatchStrategy{" + - "state=" + printState(getState()) + - ", dataLoaderRegistry=" + dataLoaderRegistry + - '}'; - } -} diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 4f62378d..33498b5c 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -2,6 +2,7 @@ import org.dataloader.DataLoader; import org.dataloader.DataLoaderRegistry; +import org.dataloader.DispatchStrategy; import org.dataloader.annotations.ExperimentalApi; import org.dataloader.impl.Assertions; import org.dataloader.instrumentation.DataLoaderInstrumentation; @@ -69,7 +70,7 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { - super(builder.dataLoaders, builder.instrumentation); + super(builder.dataLoaders, builder.instrumentation, DispatchStrategy.NO_OP); this.scheduledExecutorService = Assertions.nonNull(builder.scheduledExecutorService); this.defaultExecutorUsed = builder.defaultExecutorUsed; this.schedule = builder.schedule; diff --git a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java new file mode 100644 index 00000000..f828dff0 --- /dev/null +++ b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java @@ -0,0 +1,131 @@ +package org.dataloader.strategy; + +import org.dataloader.DataLoaderRegistry; +import org.dataloader.DispatchStrategy; +import org.jspecify.annotations.Nullable; + +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class BreadthFirstChainedDispatchStrategy implements DispatchStrategy { + + private static final Duration DEFAULT_FALLBACK_TIMEOUT = Duration.ofMillis(30); + + private final ScheduledExecutorService scheduledExecutorService; + private final AtomicInteger pendingLoadCount = new AtomicInteger(0); + private final AtomicInteger totalWorkCount = new AtomicInteger(0); + private final Object dispatchLock = new Object(); + + private final Duration fallbackTimeout; + @Nullable private ScheduledFuture fallbackDispatchFuture = null; + + @Nullable private Runnable dispatchCallback; + + private BreadthFirstChainedDispatchStrategy(Builder builder) { + this.scheduledExecutorService = builder.scheduledExecutorService; + this.fallbackTimeout = builder.fallbackTimeout; + } + + @Override + public void onRegistryCreation(DataLoaderRegistry registry) { + dispatchCallback = registry::dispatchAll; + } + + @Override + public void loadCalled() { + // initial load called + pendingLoadCount.incrementAndGet(); + totalWorkCount.incrementAndGet(); + if (totalWorkCount.get() == 1) { + triggerDeterministicDispatch(); + } + } + + @Override + public void loadCompleted() { + pendingLoadCount.decrementAndGet(); + } + + private void triggerDeterministicDispatch() { + synchronized (dispatchLock) { + if (dispatchCallback == null) { + throw new IllegalStateException("Dispatch strategy started without being registered to registry"); + } + + // sanity check + if (pendingLoadCount.get() == 0) { + return; + } + + while (pendingLoadCount.get() > 0) { + int workBefore = totalWorkCount.get(); + + dispatchCallback.run(); + + int workAfter = totalWorkCount.get(); + int pendingAfter = pendingLoadCount.get(); + + // no progress but not done - trigger async check + if (workAfter == workBefore && pendingAfter > 0) { + scheduleFallbackDispatch(); + break; + } + + // completed + if (pendingAfter == 0) { + resetState(); + } + } + } + } + + private synchronized void scheduleFallbackDispatch() { + // fallback already scheduled, don't reschedule + if (fallbackDispatchFuture != null && !fallbackDispatchFuture.isDone()) { + return; + } + + fallbackDispatchFuture = + scheduledExecutorService.schedule( + () -> { + // clear the future so we can start scheduling again + synchronized (this) { + fallbackDispatchFuture = null; + } + triggerDeterministicDispatch(); + }, + fallbackTimeout.toMillis(), + TimeUnit.MILLISECONDS + ); + } + + private void resetState() { + pendingLoadCount.set(0); + totalWorkCount.set(0); + if (fallbackDispatchFuture != null) { + fallbackDispatchFuture.cancel(false); + } + } + + public static class Builder { + private Duration fallbackTimeout = DEFAULT_FALLBACK_TIMEOUT; + private final ScheduledExecutorService scheduledExecutorService; + + public Builder(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = scheduledExecutorService; + } + + public Builder setFallbackTimeout(Duration fallbackTimeout) { + this.fallbackTimeout = fallbackTimeout; + return this; + } + + public BreadthFirstChainedDispatchStrategy build() { + return new BreadthFirstChainedDispatchStrategy(this); + } + + } +} diff --git a/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java b/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java new file mode 100644 index 00000000..a48487b0 --- /dev/null +++ b/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java @@ -0,0 +1,229 @@ +package org.dataloader.strategy; + +import org.dataloader.BatchLoader; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderFactory; +import org.dataloader.DataLoaderRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class BreadthFirstChainedDispatchStrategyTest { + + private ScheduledExecutorService scheduledExecutorService; + + @BeforeEach + public void setUp() { + this.scheduledExecutorService = Executors.newScheduledThreadPool(2); + } + + @AfterEach + public void cleanUp() { + this.scheduledExecutorService.shutdown(); + } + + @Test + void singleDepthLoadSucceeds() throws Exception { + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())) + .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + DataLoader loader = registry.getDataLoader(SimpleLoader.class.getSimpleName()); + CompletableFuture result = loader.load(1); + assertThat(result.isDone(), equalTo(true)); + assertThat(result.get(), equalTo(1)); + } + + @Test + void singleDepthLoadSucceedsMultipleTimes() throws Exception { + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())) + .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + DataLoader loader = registry.getDataLoader(SimpleLoader.class.getSimpleName()); + CompletableFuture result = loader.load(1); + assertThat(result.isDone(), equalTo(true)); + assertThat(result.get(), equalTo(1)); + + // state reset, kick off another load + CompletableFuture result2 = loader.load(1); + assertThat(result2.isDone(), equalTo(true)); + assertThat(result2.get(), equalTo(1)); + } + + @Test + void chainedLoaderSucceeds() throws Exception { + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + registry.register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())); + registry.register(ChainedLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new ChainedLoader(registry))); + DataLoader loader = registry.getDataLoader(ChainedLoader.class.getSimpleName()); + CompletableFuture result = loader.load(1); + assertThat(result.isDone(), equalTo(true)); + assertThat(result.get(), equalTo(1)); + } + + @Test + void chainedAsyncLoaderSucceeds() { + CountDownLatch latch = new CountDownLatch(1); + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + registry.register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())); + registry.register(ChainedAsyncLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new ChainedAsyncLoader(registry, latch))); + DataLoader loader = registry.getDataLoader(ChainedAsyncLoader.class.getSimpleName()); + CompletableFuture result = loader.load(1); + // future not done, fallback triggered + assertThat(result.isDone(), equalTo(false)); + // allow loader to continue, simulating async behavior + latch.countDown(); + + // blocking wait for fallback dispatch to trigger + Integer resultInteger = result.join(); + + assertThat(resultInteger, equalTo(1)); + } + + @Test + void dispatchGoesByLevel() throws Exception { + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + List> leafLevelSeenKeys = new ArrayList<>(); + BatchLoader leaf = keys -> { + leafLevelSeenKeys.add(keys); + return CompletableFuture.completedFuture(keys); + }; + + List> secondLevelSeenKeys = new ArrayList<>(); + BatchLoader secondLevel = keys -> { + secondLevelSeenKeys.add(keys); + List> futures = keys.stream().map(key -> registry.getDataLoader("leaf").load(key)).collect(Collectors.toList()); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(unused -> { + List results = new ArrayList<>(); + for (CompletableFuture future: futures) { + results.add(future.join()); + } + return results; + }); + }; + + // Call trigger 2 loads on root to secondLevel + BatchLoader root = keys -> { + DataLoader second = registry.getDataLoader("secondLevel"); + List> firstCall = keys.stream().map(second::load).collect(Collectors.toList()); + + // used to verify batching + List> secondCall = keys.stream().map(second::load).collect(Collectors.toList()); + + // used to verify multiple keys + List> thirdCall = keys.stream().map(key -> second.load(key + 1)).collect(Collectors.toList()); + CompletableFuture firstFinished = CompletableFuture.allOf(firstCall.toArray(new CompletableFuture[0])); + CompletableFuture secondFinished = CompletableFuture.allOf(secondCall.toArray(new CompletableFuture[0])); + CompletableFuture thirdFinished = CompletableFuture.allOf(thirdCall.toArray(new CompletableFuture[0])); + CompletableFuture allFinished = CompletableFuture.allOf(firstFinished, secondFinished, thirdFinished); + return allFinished.thenApply(unused -> { + List result = new ArrayList<>(); + for (int i = 0; i < firstCall.size(); i++) { + int firstResult = firstCall.get(i).join(); + int secondResult = secondCall.get(i).join(); + int thirdResult = thirdCall.get(i).join(); + result.add(firstResult + secondResult + thirdResult); + } + return result; + }); + }; + + registry.register("root", DataLoaderFactory.newDataLoader(root)); + registry.register("secondLevel", DataLoaderFactory.newDataLoader(secondLevel)); + registry.register("leaf", DataLoaderFactory.newDataLoader(leaf)); + + CompletableFuture result = registry.getDataLoader("root").load(1); + + assertThat(result.isDone(), equalTo(true)); + // 1 + 1 + 2 (first, second, third call) + assertThat(result.get(), equalTo(4)); + + // verify levels only called once even though multiple loads called with different arguments (level by level) + assertThat(secondLevelSeenKeys.size(), equalTo(1)); + assertThat(leafLevelSeenKeys.size(), equalTo(1)); + + // verify keys sent to levels are proper + assertThat(secondLevelSeenKeys.get(0), equalTo(List.of(1, 2))); + assertThat(leafLevelSeenKeys.get(0), equalTo(List.of(1, 2))); + } + + private static final class ChainedAsyncLoader implements BatchLoader { + private final DataLoaderRegistry dataLoaderRegistry; + private final CountDownLatch latch; + public ChainedAsyncLoader(DataLoaderRegistry dataLoaderRegistry, CountDownLatch latch) { + this.dataLoaderRegistry = dataLoaderRegistry; + this.latch = latch; + } + + @Override + public CompletionStage> load(List keys) { + return CompletableFuture.supplyAsync(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + List> futures = keys.stream().map(key -> + dataLoaderRegistry.getDataLoader(SimpleLoader.class.getSimpleName()).load(key) + ).collect(Collectors.toList()); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(unused -> { + List result = new ArrayList<>(); + for (CompletableFuture future: futures) { + result.add(future.join()); + } + return result; + }).join(); + }); + } + } + + private static final class ChainedLoader implements BatchLoader { + + private final DataLoaderRegistry dataLoaderRegistry; + public ChainedLoader(DataLoaderRegistry dataLoaderRegistry) { + this.dataLoaderRegistry = dataLoaderRegistry; + } + + @Override + public CompletionStage> load(List keys) { + List> futures = keys.stream().map(key -> + dataLoaderRegistry.getDataLoader(SimpleLoader.class.getSimpleName()).load(key) + ).collect(Collectors.toList()); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(unused -> { + List result = new ArrayList<>(); + for (CompletableFuture future: futures) { + result.add(future.join()); + } + return result; + }); + } + } + + private static final class SimpleLoader implements BatchLoader { + + @Override + public CompletionStage> load(List keys) { + return CompletableFuture.completedFuture(keys); + } + } +} From 6efaf3b53e375cf95eb26c548ac7b77360b27202 Mon Sep 17 00:00:00 2001 From: zsmoore Date: Tue, 6 Jan 2026 15:55:19 -0500 Subject: [PATCH 03/15] Fix test to update with new dispatch whenCompletes --- src/test/java/org/dataloader/DataLoaderCacheMapTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java index d3de4aad..f24c6bd4 100644 --- a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java +++ b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java @@ -65,7 +65,8 @@ public void should_access_to_future_dependants() { Collection> futures = dataLoader.getCacheMap().getAll(); List> futuresList = new ArrayList<>(futures); - assertThat(futuresList.get(0).getNumberOfDependents(), equalTo(4)); // instrumentation is depending on the CF completing - assertThat(futuresList.get(1).getNumberOfDependents(), equalTo(2)); + // instrumentation is depending on the CF completing + dispatch strategy + assertThat(futuresList.get(0).getNumberOfDependents(), equalTo(6)); + assertThat(futuresList.get(1).getNumberOfDependents(), equalTo(3)); } } From 5e423449c301a69ad3e09f7b28cab67ad6a7574c Mon Sep 17 00:00:00 2001 From: Zachary Moore Date: Thu, 8 Jan 2026 13:07:48 -0500 Subject: [PATCH 04/15] Update src/main/java/org/dataloader/DataLoaderOptions.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/main/java/org/dataloader/DataLoaderOptions.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index e8628ab6..0212650c 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -144,7 +144,8 @@ public boolean equals(Object o) { Objects.equals(statisticsCollector, that.statisticsCollector) && Objects.equals(environmentProvider, that.environmentProvider) && Objects.equals(valueCacheOptions, that.valueCacheOptions) && - Objects.equals(batchLoaderScheduler, that.batchLoaderScheduler); + Objects.equals(batchLoaderScheduler, that.batchLoaderScheduler) && + Objects.equals(dispatchStrategy, that.dispatchStrategy); } From 7492d2a614538b2e694ee302621c3b359a2b1dda Mon Sep 17 00:00:00 2001 From: zsmoore Date: Thu, 8 Jan 2026 13:15:16 -0500 Subject: [PATCH 05/15] Minor review comments --- .../strategy/BreadthFirstChainedDispatchStrategy.java | 5 +++-- .../strategy/BreadthFirstChainedDispatchStrategyTest.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java index f828dff0..e1f0cabb 100644 --- a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java +++ b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java @@ -2,6 +2,7 @@ import org.dataloader.DataLoaderRegistry; import org.dataloader.DispatchStrategy; +import org.dataloader.impl.Assertions; import org.jspecify.annotations.Nullable; import java.time.Duration; @@ -115,11 +116,11 @@ public static class Builder { private final ScheduledExecutorService scheduledExecutorService; public Builder(ScheduledExecutorService scheduledExecutorService) { - this.scheduledExecutorService = scheduledExecutorService; + this.scheduledExecutorService = Assertions.nonNull(scheduledExecutorService); } public Builder setFallbackTimeout(Duration fallbackTimeout) { - this.fallbackTimeout = fallbackTimeout; + this.fallbackTimeout = Assertions.nonNull(fallbackTimeout); return this; } diff --git a/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java b/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java index a48487b0..498d7174 100644 --- a/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java +++ b/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java @@ -31,7 +31,7 @@ public void setUp() { @AfterEach public void cleanUp() { - this.scheduledExecutorService.shutdown(); + this.scheduledExecutorService.shutdownNow(); } @Test From ab0ca1336bbe2294243fed96099c08dcafb76333 Mon Sep 17 00:00:00 2001 From: Zachary Moore Date: Thu, 8 Jan 2026 13:20:10 -0500 Subject: [PATCH 06/15] Update src/main/java/org/dataloader/DataLoaderOptions.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/main/java/org/dataloader/DataLoaderOptions.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index 0212650c..b031e836 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -261,6 +261,9 @@ public DataLoaderInstrumentation getInstrumentation() { return instrumentation; } + /** + * @return the {@link DispatchStrategy} to use for dispatching batch loads + */ public DispatchStrategy getDispatchStrategy() { return dispatchStrategy; } From dc9874973e5b236ae74eb73114fb5332ca9f5bb8 Mon Sep 17 00:00:00 2001 From: Zachary Moore Date: Thu, 8 Jan 2026 13:38:10 -0500 Subject: [PATCH 07/15] Update src/main/java/org/dataloader/DispatchStrategy.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/main/java/org/dataloader/DispatchStrategy.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/org/dataloader/DispatchStrategy.java b/src/main/java/org/dataloader/DispatchStrategy.java index 8e0865cd..5a8cc745 100644 --- a/src/main/java/org/dataloader/DispatchStrategy.java +++ b/src/main/java/org/dataloader/DispatchStrategy.java @@ -3,8 +3,6 @@ import org.dataloader.annotations.PublicApi; import org.jspecify.annotations.NullMarked; -import java.util.concurrent.atomic.AtomicBoolean; - @NullMarked @PublicApi public interface DispatchStrategy { From d9a514f771e625cc1ff0da8fbfa61f2d852ae8e5 Mon Sep 17 00:00:00 2001 From: Zachary Moore Date: Thu, 8 Jan 2026 13:38:27 -0500 Subject: [PATCH 08/15] Update src/main/java/org/dataloader/DataLoaderRegistry.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/main/java/org/dataloader/DataLoaderRegistry.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index d41eda5a..3906666c 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -339,7 +339,7 @@ public static Builder newRegistry() { public static class Builder { private final Map> dataLoaders = new HashMap<>(); - public DispatchStrategy dispatchStrategy = DispatchStrategy.NO_OP; + private DispatchStrategy dispatchStrategy = DispatchStrategy.NO_OP; private @Nullable DataLoaderInstrumentation instrumentation; /** From 01a72d63fc6c2509afb56fe01dcc25d21dacc09b Mon Sep 17 00:00:00 2001 From: zsmoore Date: Thu, 8 Jan 2026 13:40:38 -0500 Subject: [PATCH 09/15] Review comments --- .../java/org/dataloader/DataLoaderRegistry.java | 14 ++++++++++++++ .../java/org/dataloader/DispatchStrategy.java | 16 ++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index d41eda5a..c9da0a4a 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -368,11 +368,25 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) { return this; } + /** + * The {@link DataLoaderInstrumentation} to use for this registry + * + * @param instrumentation instrumentation to use + * + * @return the builder for a fluent pattern + */ public Builder instrumentation(DataLoaderInstrumentation instrumentation) { this.instrumentation = instrumentation; return this; } + /** + * The {@link DispatchStrategy} to use for this registry + * + * @param dispatchStrategy strategy to use + * + * @return this builder for a fluent pattern + */ public Builder dispatchStrategy(DispatchStrategy dispatchStrategy) { this.dispatchStrategy = dispatchStrategy; return this; diff --git a/src/main/java/org/dataloader/DispatchStrategy.java b/src/main/java/org/dataloader/DispatchStrategy.java index 8e0865cd..931f20cc 100644 --- a/src/main/java/org/dataloader/DispatchStrategy.java +++ b/src/main/java/org/dataloader/DispatchStrategy.java @@ -5,21 +5,37 @@ import java.util.concurrent.atomic.AtomicBoolean; +/** + * An interface to implement to allow for custom dispatch strategies when executing {@link DataLoader}s + */ @NullMarked @PublicApi public interface DispatchStrategy { + /** + * A {@link DispatchStrategy} that does nothing + */ DispatchStrategy NO_OP = new DispatchStrategy() { }; + /** + * Lifecycle method called when the registry is created that this dispatch strategy is attached to + * @param registry the {@link DataLoaderRegistry} this dispatch strategy is attached to + */ default void onRegistryCreation(DataLoaderRegistry registry) { } + /** + * Called when a {@link DataLoader#load(Object)} is called on a dataloader + */ default void loadCalled() { } + /** + * Called when a {@link DataLoader#load(Object)} is executed and completed on a dataloader + */ default void loadCompleted() { } From 26e683554ad2dd4e2b9ae7da5f43105f0b2677b2 Mon Sep 17 00:00:00 2001 From: Zachary Moore Date: Thu, 8 Jan 2026 13:44:42 -0500 Subject: [PATCH 10/15] Update src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../strategy/BreadthFirstChainedDispatchStrategy.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java index e1f0cabb..1a6be94a 100644 --- a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java +++ b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java @@ -120,6 +120,12 @@ public Builder(ScheduledExecutorService scheduledExecutorService) { } public Builder setFallbackTimeout(Duration fallbackTimeout) { + if (fallbackTimeout == null) { + throw new IllegalArgumentException("fallbackTimeout must not be null"); + } + if (fallbackTimeout.isZero() || fallbackTimeout.isNegative()) { + throw new IllegalArgumentException("fallbackTimeout must be a positive duration"); + } this.fallbackTimeout = Assertions.nonNull(fallbackTimeout); return this; } From 6d2db6085c326c19b21556faac93fc12a1070992 Mon Sep 17 00:00:00 2001 From: zsmoore Date: Thu, 8 Jan 2026 13:44:50 -0500 Subject: [PATCH 11/15] Javadoc --- src/main/java/org/dataloader/DataLoaderOptions.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index b031e836..c374075c 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -451,6 +451,13 @@ public Builder setInstrumentation(DataLoaderInstrumentation instrumentation) { return this; } + /** + * Sets in a new {@link DispatchStrategy} + * + * @param dispatchStrategy the new {@link DispatchStrategy} + * + * @return the builder for fluent coding + */ public Builder setDispatchStrategy(DispatchStrategy dispatchStrategy) { this.dispatchStrategy = dispatchStrategy; return this; From ab222f251f68477437ae44788a3896e80b4cd892 Mon Sep 17 00:00:00 2001 From: Zachary Moore Date: Thu, 8 Jan 2026 13:46:20 -0500 Subject: [PATCH 12/15] Update src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../strategy/BreadthFirstChainedDispatchStrategy.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java index 1a6be94a..88346be8 100644 --- a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java +++ b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java @@ -39,8 +39,8 @@ public void onRegistryCreation(DataLoaderRegistry registry) { public void loadCalled() { // initial load called pendingLoadCount.incrementAndGet(); - totalWorkCount.incrementAndGet(); - if (totalWorkCount.get() == 1) { + int previousTotal = totalWorkCount.getAndIncrement(); + if (previousTotal == 0) { triggerDeterministicDispatch(); } } From 2420b82e7ddf2dd74b3ef1694cdb0f99aaf2d707 Mon Sep 17 00:00:00 2001 From: zsmoore Date: Thu, 8 Jan 2026 13:50:29 -0500 Subject: [PATCH 13/15] Synchronize resetState --- .../strategy/BreadthFirstChainedDispatchStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java index 1a6be94a..6569c856 100644 --- a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java +++ b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java @@ -103,7 +103,7 @@ private synchronized void scheduleFallbackDispatch() { ); } - private void resetState() { + private synchronized void resetState() { pendingLoadCount.set(0); totalWorkCount.set(0); if (fallbackDispatchFuture != null) { From 4c922c57397e7d88346f8456efbaf89831076474 Mon Sep 17 00:00:00 2001 From: zsmoore Date: Thu, 8 Jan 2026 16:58:37 -0500 Subject: [PATCH 14/15] Add complicated test case --- .../BreadthFirstChainedDispatchStrategy.java | 11 + ...irstChainedDispatchStrategyStressTest.java | 287 ++++++++++++++++++ 2 files changed, 298 insertions(+) create mode 100644 src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyStressTest.java diff --git a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java index 20448b3d..ddd2d12b 100644 --- a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java +++ b/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java @@ -2,6 +2,7 @@ import org.dataloader.DataLoaderRegistry; import org.dataloader.DispatchStrategy; +import org.dataloader.annotations.VisibleForTesting; import org.dataloader.impl.Assertions; import org.jspecify.annotations.Nullable; @@ -20,6 +21,9 @@ public class BreadthFirstChainedDispatchStrategy implements DispatchStrategy { private final AtomicInteger totalWorkCount = new AtomicInteger(0); private final Object dispatchLock = new Object(); + // only used for tests + private Runnable onIteration; + private final Duration fallbackTimeout; @Nullable private ScheduledFuture fallbackDispatchFuture = null; @@ -62,6 +66,8 @@ private void triggerDeterministicDispatch() { } while (pendingLoadCount.get() > 0) { + onIteration.run(); + int workBefore = totalWorkCount.get(); dispatchCallback.run(); @@ -111,6 +117,11 @@ private synchronized void resetState() { } } + @VisibleForTesting + void onIteration(Runnable onIteration) { + this.onIteration = onIteration; + } + public static class Builder { private Duration fallbackTimeout = DEFAULT_FALLBACK_TIMEOUT; private final ScheduledExecutorService scheduledExecutorService; diff --git a/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyStressTest.java b/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyStressTest.java new file mode 100644 index 00000000..6266f6a3 --- /dev/null +++ b/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyStressTest.java @@ -0,0 +1,287 @@ +package org.dataloader.strategy; + +import org.dataloader.BatchLoader; +import org.dataloader.DataLoaderFactory; +import org.dataloader.DataLoaderRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class BreadthFirstChainedDispatchStrategyStressTest { + + private int iterationCount; + private List> dispatchOrder; + private List> queueOrder; + private List> completionOrder; + private DataLoaderRegistry registry; + private CountDownLatch bLatch; + private CountDownLatch gLatch; + private CountDownLatch aStarted; + private CountDownLatch gCompleted; + private CountDownLatch iCompleted; + + /* + Simulating tree with async conditions + + A + B (async) completed before G + E + F + C + G (async) completes last + H + D + I + J + */ + @BeforeEach + public void setup() { + dispatchOrder = new ArrayList<>(); + queueOrder = new ArrayList<>(); + completionOrder = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + dispatchOrder.add(new ArrayList<>()); + queueOrder.add(new ArrayList<>()); + completionOrder.add(new ArrayList<>()); + } + addAtIteration(queueOrder, "A"); + bLatch = new CountDownLatch(1); + gLatch = new CountDownLatch(1); + aStarted = new CountDownLatch(1); + gCompleted = new CountDownLatch(1); + iCompleted = new CountDownLatch(1); + iterationCount = 1; + BreadthFirstChainedDispatchStrategy breadthFirstChainedDispatchStrategy = + new BreadthFirstChainedDispatchStrategy.Builder(Executors.newSingleThreadScheduledExecutor()) + .setFallbackTimeout(Duration.ofMillis(300)).build(); + breadthFirstChainedDispatchStrategy.onIteration(() -> iterationCount += 1); + registry = DataLoaderRegistry.newRegistry() + .dispatchStrategy(breadthFirstChainedDispatchStrategy) + .build(); + + + // Loaders named after diagram above + BatchLoader eLoader = keys -> { + addAtIteration(dispatchOrder, "E"); + return CompletableFuture.completedFuture(keys); + }; + BatchLoader fLoader = keys -> { + addAtIteration(dispatchOrder, "F"); + return CompletableFuture.completedFuture(keys); + }; + BatchLoader hLoader = keys -> { + addAtIteration(dispatchOrder, "H"); + return CompletableFuture.completedFuture(keys); + }; + BatchLoader iLoader = keys -> { + addAtIteration(dispatchOrder, "I"); + return CompletableFuture.completedFuture(keys); + }; + BatchLoader jLoader = keys -> { + addAtIteration(dispatchOrder, "J"); + return CompletableFuture.completedFuture(keys); + }; + + BatchLoader gLoader = keys -> { + addAtIteration(dispatchOrder, "G"); + CompletableFuture> gFuture = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + gLatch.await(); + gFuture.complete(keys); + } catch (InterruptedException e) { + // do nothing + } + }); + return gFuture; + }; + + BatchLoader bLoader = keys -> { + addAtIteration(dispatchOrder, "B"); + CompletableFuture> bFuture = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + bLatch.await(); + CompletableFuture eResult = registry.getDataLoader("eLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "E")); + addAtIteration(queueOrder, "E"); + CompletableFuture fResult = registry.getDataLoader("fLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "F")); + addAtIteration(queueOrder, "F"); + eResult.thenCombine(fResult, (eNum, fNum) -> List.of(eNum + fNum)) + .thenAccept(bFuture::complete); + } catch (InterruptedException e) { + // do nothing + } + }); + return bFuture; + }; + + BatchLoader cLoader = keys -> { + addAtIteration(dispatchOrder, "C"); + CompletableFuture gResult = registry.getDataLoader("gLoader").load(keys.get(0)) + .whenComplete((result, error) -> { + addAtIteration(completionOrder, "G"); + gCompleted.countDown(); + }); + addAtIteration(queueOrder, "G"); + CompletableFuture hResult = registry.getDataLoader("hLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "H")); + addAtIteration(queueOrder, "H"); + + return gResult.thenCombine(hResult, (gNum, hNum) -> List.of(gNum + hNum)); + }; + + BatchLoader dLoader = keys -> { + addAtIteration(dispatchOrder, "D"); + CompletableFuture iResult = registry.getDataLoader("iLoader").load(keys.get(0)) + .whenComplete((result, error) -> { + addAtIteration(completionOrder, "I"); + iCompleted.countDown(); + }); + addAtIteration(queueOrder, "I"); + CompletableFuture jResult = registry.getDataLoader("jLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "J")); + addAtIteration(queueOrder, "J"); + + return iResult.thenCombine(jResult, (iNum, jNum) -> List.of(iNum + jNum)); + }; + + BatchLoader aLoader = keys -> { + aStarted.countDown(); + addAtIteration(dispatchOrder, "A"); + CompletableFuture bResult = registry.getDataLoader("bLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "B")); + addAtIteration(queueOrder, "B"); + CompletableFuture cResult = registry.getDataLoader("cLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "C")); + addAtIteration(queueOrder, "C"); + CompletableFuture dResult = registry.getDataLoader("dLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "D")); + addAtIteration(queueOrder, "D"); + + return CompletableFuture.allOf(bResult, cResult, dResult).thenApply(unused -> { + int bNum = bResult.join(); + int cNum = cResult.join(); + int dNum = dResult.join(); + + return List.of(bNum + cNum + dNum); + }).whenComplete((result, error) -> addAtIteration(completionOrder, "A")); + }; + + registry.register("aLoader", DataLoaderFactory.newDataLoader(aLoader)); + registry.register("bLoader", DataLoaderFactory.newDataLoader(bLoader)); + registry.register("cLoader", DataLoaderFactory.newDataLoader(cLoader)); + registry.register("dLoader", DataLoaderFactory.newDataLoader(dLoader)); + registry.register("eLoader", DataLoaderFactory.newDataLoader(eLoader)); + registry.register("fLoader", DataLoaderFactory.newDataLoader(fLoader)); + registry.register("gLoader", DataLoaderFactory.newDataLoader(gLoader)); + registry.register("hLoader", DataLoaderFactory.newDataLoader(hLoader)); + registry.register("iLoader", DataLoaderFactory.newDataLoader(iLoader)); + registry.register("jLoader", DataLoaderFactory.newDataLoader(jLoader)); + } + + + /* + Explanation of assertions. + + G and B are async + G unlocked once leaf nodes have started + + B unlocked once G completes + + Dispatch order + Iteration 1: - Due to dataloader order in the registry C is dispatched and H is dispatched greedily + A, B, C, D H + Iteration 2: + G, I, J - E and F are blocked by B as they are async chained + Iteration 3: + E, F - B has unlocked and allowed dispatching of E and F + + Queue Order + Iteration 1: - + A + Iteration 2: + B, C, D, G, H, I, J - All but E and F queued as we get as much work as possible + Iteration 3 + E, F - B unlocks E and F once async call completes + + Completion Order + H, J, I, D, G, C, E, F, B, A + + Walk the tree up from roots greedily as calls finish. + D finishes first as no blocks + C finishes second as G is async + B finishes last as well as E and F leafs as they are blocked by async B finishing + */ + @Test + void verifyExecutionOrder() throws Exception { + CompletableFuture result = CompletableFuture.supplyAsync(() -> registry.getDataLoader("aLoader").load(1).join(), + Executors.newSingleThreadExecutor()); + + aStarted.await(); + + // do not release g until leaf level started + iCompleted.await(); + + // g call finished + gLatch.countDown(); + + // do not release b until leafs completed + gCompleted.await(); + + // b call finished + bLatch.countDown(); + + int resultNum = result.join(); + + // 6 leaf nodes added together + assertThat(resultNum, equalTo(6)); + + // clean up padded lists + dispatchOrder = dispatchOrder.stream().filter(list -> !list.isEmpty()).collect(Collectors.toList()); + queueOrder = queueOrder.stream().filter(list -> !list.isEmpty()).collect(Collectors.toList()); + List flatCompletionOrder = completionOrder.stream().flatMap(List::stream).collect(Collectors.toList()); + + // Due to DataLoaders queueing other dataloaders during dispatch more work is done than level by level + assertThat(dispatchOrder, equalTo(List.of( + List.of("A", "C", "B", "H", "D"), + List.of("G", "J", "I"), + List.of("E", "F") + ))); + // Greedily queues all known work capable + assertThat(queueOrder, equalTo(List.of( + List.of("A"), + List.of("B", "C", "D", "G", "H", "I", "J"), + List.of("E", "F") + ))); + + assertThat(completionOrder, equalTo(List.of( + "H", + "J", + "I", + "D", + "G", + "C", + "E", + "F", + "B", + "A" + ))); + } + + private void addAtIteration(List> aList, String toAdd) { + aList.get(iterationCount).add(toAdd); + } +} From fa1e7207bd0ecabf98c813adb852f598c7a156cc Mon Sep 17 00:00:00 2001 From: zsmoore Date: Thu, 8 Jan 2026 17:06:45 -0500 Subject: [PATCH 15/15] Rename and doc --- ...yLevelByLevelChainedDispatchStrategy.java} | 21 +++++++++++++++---- ...velChainedDispatchStrategyStressTest.java} | 11 +++++----- ...elByLevelChainedDispatchStrategyTest.java} | 12 +++++------ 3 files changed, 28 insertions(+), 16 deletions(-) rename src/main/java/org/dataloader/strategy/{BreadthFirstChainedDispatchStrategy.java => GreedyLevelByLevelChainedDispatchStrategy.java} (81%) rename src/test/java/org/dataloader/strategy/{BreadthFirstChainedDispatchStrategyStressTest.java => GreedyLevelByLevelChainedDispatchStrategyStressTest.java} (96%) rename src/test/java/org/dataloader/strategy/{BreadthFirstChainedDispatchStrategyTest.java => GreedyLevelByLevelChainedDispatchStrategyTest.java} (93%) diff --git a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java b/src/main/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategy.java similarity index 81% rename from src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java rename to src/main/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategy.java index ddd2d12b..aa72c406 100644 --- a/src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java +++ b/src/main/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategy.java @@ -12,7 +12,20 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -public class BreadthFirstChainedDispatchStrategy implements DispatchStrategy { +/** + * A {@link DispatchStrategy} which balances batching and performance by dispatching level by level with minimal waiting. + *

+ * We use a fallback {@link ScheduledExecutorService} to handle when work is stuck due to async calls in the chain for + * chained dataloaders. This minimizes the amount of threads spawned to only be used when there is no known work to be + * done and the chain is not finished. + *

+ * Due to the concept of 'known' work we greedily walk the chain instead of waiting for async calls to finish before + * kicking off the next level. + *

+ * In practice this will greedily fill up DataLoader keys while walking the chain to provide a nice balance of + * batching/dedupe/caching while not needing to worry about manually dispatching the tree. + */ +public class GreedyLevelByLevelChainedDispatchStrategy implements DispatchStrategy { private static final Duration DEFAULT_FALLBACK_TIMEOUT = Duration.ofMillis(30); @@ -29,7 +42,7 @@ public class BreadthFirstChainedDispatchStrategy implements DispatchStrategy { @Nullable private Runnable dispatchCallback; - private BreadthFirstChainedDispatchStrategy(Builder builder) { + private GreedyLevelByLevelChainedDispatchStrategy(Builder builder) { this.scheduledExecutorService = builder.scheduledExecutorService; this.fallbackTimeout = builder.fallbackTimeout; } @@ -141,8 +154,8 @@ public Builder setFallbackTimeout(Duration fallbackTimeout) { return this; } - public BreadthFirstChainedDispatchStrategy build() { - return new BreadthFirstChainedDispatchStrategy(this); + public GreedyLevelByLevelChainedDispatchStrategy build() { + return new GreedyLevelByLevelChainedDispatchStrategy(this); } } diff --git a/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyStressTest.java b/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyStressTest.java similarity index 96% rename from src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyStressTest.java rename to src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyStressTest.java index 6266f6a3..7507b272 100644 --- a/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyStressTest.java +++ b/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyStressTest.java @@ -12,13 +12,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; -import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -public class BreadthFirstChainedDispatchStrategyStressTest { +public class GreedyLevelByLevelChainedDispatchStrategyStressTest { private int iterationCount; private List> dispatchOrder; @@ -62,12 +61,12 @@ public void setup() { gCompleted = new CountDownLatch(1); iCompleted = new CountDownLatch(1); iterationCount = 1; - BreadthFirstChainedDispatchStrategy breadthFirstChainedDispatchStrategy = - new BreadthFirstChainedDispatchStrategy.Builder(Executors.newSingleThreadScheduledExecutor()) + GreedyLevelByLevelChainedDispatchStrategy greedyLevelByLevelChainedDispatchStrategy = + new GreedyLevelByLevelChainedDispatchStrategy.Builder(Executors.newSingleThreadScheduledExecutor()) .setFallbackTimeout(Duration.ofMillis(300)).build(); - breadthFirstChainedDispatchStrategy.onIteration(() -> iterationCount += 1); + greedyLevelByLevelChainedDispatchStrategy.onIteration(() -> iterationCount += 1); registry = DataLoaderRegistry.newRegistry() - .dispatchStrategy(breadthFirstChainedDispatchStrategy) + .dispatchStrategy(greedyLevelByLevelChainedDispatchStrategy) .build(); diff --git a/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java b/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyTest.java similarity index 93% rename from src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java rename to src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyTest.java index 498d7174..4cb0ee9d 100644 --- a/src/test/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategyTest.java +++ b/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyTest.java @@ -20,7 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -public class BreadthFirstChainedDispatchStrategyTest { +public class GreedyLevelByLevelChainedDispatchStrategyTest { private ScheduledExecutorService scheduledExecutorService; @@ -38,7 +38,7 @@ public void cleanUp() { void singleDepthLoadSucceeds() throws Exception { DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() .register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())) - .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) .build(); DataLoader loader = registry.getDataLoader(SimpleLoader.class.getSimpleName()); CompletableFuture result = loader.load(1); @@ -50,7 +50,7 @@ void singleDepthLoadSucceeds() throws Exception { void singleDepthLoadSucceedsMultipleTimes() throws Exception { DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() .register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())) - .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) .build(); DataLoader loader = registry.getDataLoader(SimpleLoader.class.getSimpleName()); CompletableFuture result = loader.load(1); @@ -66,7 +66,7 @@ void singleDepthLoadSucceedsMultipleTimes() throws Exception { @Test void chainedLoaderSucceeds() throws Exception { DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() - .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) .build(); registry.register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())); registry.register(ChainedLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new ChainedLoader(registry))); @@ -80,7 +80,7 @@ void chainedLoaderSucceeds() throws Exception { void chainedAsyncLoaderSucceeds() { CountDownLatch latch = new CountDownLatch(1); DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() - .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) .build(); registry.register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())); registry.register(ChainedAsyncLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new ChainedAsyncLoader(registry, latch))); @@ -100,7 +100,7 @@ void chainedAsyncLoaderSucceeds() { @Test void dispatchGoesByLevel() throws Exception { DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() - .dispatchStrategy(new BreadthFirstChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) .build(); List> leafLevelSeenKeys = new ArrayList<>(); BatchLoader leaf = keys -> {