From 9c53b4fa7cafdc2065333e9c34ef426c17501cce Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Wed, 30 Oct 2024 18:10:43 -0500 Subject: [PATCH] review fixes add oom warning on index cache more fixes --- .../opdispensers/Cqld4BaseOpDispenser.java | 1 + .../adapter/mongodb/core/MongoOpMapper.java | 1 - .../uniform/BaseDriverAdapter.java | 8 +-- .../api/activityimpl/uniform/BaseSpace.java | 8 +++ .../uniform/ConcurrentIndexCache.java | 38 ++++++----- .../uniform/ConcurrentIndexCacheWrapper.java | 9 +-- .../activityimpl/uniform/DriverAdapter.java | 7 +- .../DryCycleOpDispenserWrapper.java | 6 ++ .../DryRunnableOpDispenserWrapper.java | 9 ++- .../EmitterCycleOpDispenserWrapper.java | 5 ++ .../EmitterRunnableOpDispenserWrapper.java | 5 ++ .../uniform/ConcurrentIndexCacheTest.java | 57 +++++++++++++++- .../engine/api/activityimpl/Dryrun.java | 36 ++++++++++ .../engine/api/activityimpl/OpWrappers.java | 65 +++++++------------ 14 files changed, 184 insertions(+), 71 deletions(-) create mode 100644 nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/Dryrun.java diff --git a/nb-adapters/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4BaseOpDispenser.java b/nb-adapters/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4BaseOpDispenser.java index 3c1dbc2e5..7fdcf2706 100644 --- a/nb-adapters/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4BaseOpDispenser.java +++ b/nb-adapters/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4BaseOpDispenser.java @@ -40,6 +40,7 @@ import org.apache.logging.log4j.Logger; import java.nio.ByteBuffer; import java.time.Duration; import java.util.Map; +import java.util.function.IntFunction; import java.util.function.LongFunction; public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser implements CqlOpMetrics { diff --git a/nb-adapters/adapter-mongodb/src/main/java/io/nosqlbench/adapter/mongodb/core/MongoOpMapper.java b/nb-adapters/adapter-mongodb/src/main/java/io/nosqlbench/adapter/mongodb/core/MongoOpMapper.java index 676bc8a4a..7526e9d58 100644 --- a/nb-adapters/adapter-mongodb/src/main/java/io/nosqlbench/adapter/mongodb/core/MongoOpMapper.java +++ b/nb-adapters/adapter-mongodb/src/main/java/io/nosqlbench/adapter/mongodb/core/MongoOpMapper.java @@ -57,7 +57,6 @@ public class MongoOpMapper implements OpMapper< if (connectionURL == null) { throw new BasicError("Must provide a connection value for use by the MongoDB adapter."); } - spaceCache.get(0).createMongoClient(connectionURL); Optional> oDatabaseF = op.getAsOptionalFunction("database"); if (oDatabaseF.isEmpty()) { diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/BaseDriverAdapter.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/BaseDriverAdapter.java index d526c4a70..ad95d6f60 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/BaseDriverAdapter.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/BaseDriverAdapter.java @@ -195,8 +195,8 @@ public abstract class BaseDriverAdapter extends N @Override public LongFunction getSpaceFunc(ParsedOp pop) { - Optional> spaceFuncTest = pop.getAsOptionalFunction("space"); - LongUnaryOperator cycleToSpaceF; + Optional> spaceFuncTest = pop.getAsOptionalFunction("space",Object.class); + LongToIntFunction cycleToSpaceF; if (spaceFuncTest.isEmpty()) { cycleToSpaceF = (long l) -> 0; } else { @@ -204,7 +204,7 @@ public abstract class BaseDriverAdapter extends N if (example instanceof Number n) { logger.trace("mapping space indirectly with Number type"); LongFunction numberF = pop.getAsRequiredFunction("space", Number.class); - cycleToSpaceF= l -> numberF.apply(l).longValue(); + cycleToSpaceF= l -> numberF.apply(l).intValue(); } else { logger.trace("mapping space indirectly through hash table to index pool"); LongFunction sourceF = pop.getAsRequiredFunction("space", String.class); @@ -214,6 +214,6 @@ public abstract class BaseDriverAdapter extends N } } ConcurrentSpaceCache spaceCache1 = getSpaceCache(); - return l -> spaceCache1.get(cycleToSpaceF.applyAsLong(l)); + return l -> spaceCache1.get(cycleToSpaceF.applyAsInt(l)); } } diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/BaseSpace.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/BaseSpace.java index 5b7cb1255..21e3a1fd8 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/BaseSpace.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/BaseSpace.java @@ -18,6 +18,8 @@ package io.nosqlbench.adapters.api.activityimpl.uniform; */ +import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op; + import java.util.function.IntFunction; import java.util.function.LongFunction; @@ -35,4 +37,10 @@ public class BaseSpace > implements Space { public String getName() { return spaceName; } + + public static class BasicSpace extends BaseSpace implements Space { + public BasicSpace(DriverAdapter adapter, long idx) { + super(adapter, idx); + } + } } diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCache.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCache.java index 4bcb05e11..564a89412 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCache.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCache.java @@ -18,11 +18,8 @@ package io.nosqlbench.adapters.api.activityimpl.uniform; */ -import io.nosqlbench.nb.api.components.core.NBBaseComponent; -import io.nosqlbench.nb.api.components.core.NBComponent; -import io.nosqlbench.nb.api.components.core.NBNamedElement; +import io.nosqlbench.nb.api.errors.BasicError; import io.nosqlbench.nb.api.errors.OpConfigError; -import io.nosqlbench.nb.api.labels.NBLabels; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; @@ -35,13 +32,10 @@ import java.util.function.LongFunction; /** *

This cache implementation packs referents into an atomic array, keeping things as compact as possible, * allowing auto-resizing, size tracking, and supporting concurrent access with minimal locking. It also uses a bitset - * to track the - * referent indices for enumeration or traversal.

+ * to track the referent indices for enumeration or traversal.

* - *

TODO: The referent indices are intended to be drawn from a contiguous set of integer identifiers. If a referent - * index which is extremely large is accessed, this will cause the referent array to be resized, possibly - * causing OOM. Because of this, some sampling methods will likely be applied to this layer to pre-verify - * the likely bounds of provided indices prior to actually using them.

+ *

In order to protect against unexpected OOM scenarios, the maximum index is defaulted to 1000000. If you want + * to have index caches bigger than this, pass ina higher limit.

* * @param */ @@ -53,26 +47,34 @@ public class ConcurrentIndexCache implements Iterable { private final BitSet active = new BitSet(); private final String label; private volatile int count = 0; + private int maxIndex = 1000000; // Constructor with initial capacity - public ConcurrentIndexCache(String label, LongFunction valueLoader) { + public ConcurrentIndexCache(String label, LongFunction valueLoader, int maxIndex) { this.label = label; this.valueLoader = valueLoader; this.cacheRef = new AtomicReference<>(new AtomicReferenceArray<>(1)); + this.maxIndex = maxIndex; + } + + public ConcurrentIndexCache(String label, LongFunction valueLoader) { + this(label, valueLoader, 1000000); } public ConcurrentIndexCache(String label) { this(label, null); } - public T get(long longkey) { - return get(longkey, valueLoader); + public T get(long key) { + return get(key, valueLoader); } public T get(long longkey, LongFunction defaultValueLoader) { - if (longkey > Integer.MAX_VALUE) { - throw new OpConfigError("space index must be between 0 and " + (Integer.MAX_VALUE - 1) + " inclusive"); + if (longkey > maxIndex) { + if (longkey > maxIndex) { + throw new BasicError("index " + longkey + " too high (outside 0.." + maxIndex + ")"); + } } int key = (int) longkey; @@ -107,6 +109,9 @@ public class ConcurrentIndexCache implements Iterable { // Method to resize the array if key exceeds current capacity private synchronized void resize(int key) { + if (key > maxIndex) { + throw new BasicError("index " + key + " too high (outside 0.." + maxIndex + ")"); + } AtomicReferenceArray currentCache = cacheRef.get(); if (key < currentCache.length()) { return; // Double-check locking to avoid multiple resizes @@ -142,7 +147,8 @@ public class ConcurrentIndexCache implements Iterable { // Optional: Method to clear the entire cache public void clear() { cacheRef.set(new AtomicReferenceArray<>(1)); - count=0; + active.clear(); + count = 0; } @Override diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCacheWrapper.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCacheWrapper.java index 593fdc260..4a1413f53 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCacheWrapper.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCacheWrapper.java @@ -19,6 +19,7 @@ package io.nosqlbench.adapters.api.activityimpl.uniform; import io.nosqlbench.virtdata.library.basics.shared.functionadapters.ToLongFunction; +import scala.Int; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -32,14 +33,14 @@ import java.util.function.Function; */ public class ConcurrentIndexCacheWrapper { - private ConcurrentHashMap forwardMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap forwardMap = new ConcurrentHashMap<>(); - public long mapKeyToIndex(Object key) { + public int mapKeyToIndex(Object key) { return forwardMap.computeIfAbsent(key, this::nextIndex); } - private long idx=0; - private synchronized long nextIndex(Object any) { + private int idx=0; + private synchronized int nextIndex(Object any) { return idx++; } diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/DriverAdapter.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/DriverAdapter.java index 2f7d1a66a..629ea1fb6 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/DriverAdapter.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/DriverAdapter.java @@ -161,7 +161,12 @@ public interface DriverAdapter exten * object state related to retained objects for the lifetime of a native driver. */ default LongFunction getSpaceInitializer(NBConfiguration cfg) { - return n -> null; + return n -> (SPACETYPE) new Space() { + @Override + public String getName() { + return "empty_space"; + } + }; } NBConfiguration getConfiguration(); diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/DryCycleOpDispenserWrapper.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/DryCycleOpDispenserWrapper.java index 894126f7e..d8a7b0129 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/DryCycleOpDispenserWrapper.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/DryCycleOpDispenserWrapper.java @@ -35,6 +35,12 @@ public class DryCycleOpDispenserWrapper extends BaseOpD ) { super(adapter, pop); this.realDispenser = realDispenser; + logger.warn( + "initialized {} for dry run only. " + + "This op will be synthesized for each cycle, but will not be executed.", + pop.getName() + ); + } @Override diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/DryRunnableOpDispenserWrapper.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/DryRunnableOpDispenserWrapper.java index d4388ff08..7faf56add 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/DryRunnableOpDispenserWrapper.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/DryRunnableOpDispenserWrapper.java @@ -22,9 +22,10 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.adapters.api.activityimpl.uniform.Space; import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp; import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class DryRunnableOpDispenserWrapper extends BaseOpDispenser { - private final OpDispenser realDispenser; public DryRunnableOpDispenserWrapper( @@ -34,6 +35,12 @@ public class DryRunnableOpDispenserWrapper extends BaseOpDispen ) { super(adapter, pop); this.realDispenser = realDispenser; + logger.warn( + "initialized {} for dry run only. " + + "This op will be synthesized for each cycle, but will not be executed.", + pop.getName() + ); + } @Override diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/EmitterCycleOpDispenserWrapper.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/EmitterCycleOpDispenserWrapper.java index 390909965..dc587a6d6 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/EmitterCycleOpDispenserWrapper.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/EmitterCycleOpDispenserWrapper.java @@ -35,6 +35,11 @@ public class EmitterCycleOpDispenserWrapper extends BaseOpD ) { super(adapter, pop); this.realDispenser = realDispenser; + logger.warn( + "initialized {} for to emit the result type to stdout. ", + pop.getName() + ); + } @Override diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/EmitterRunnableOpDispenserWrapper.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/EmitterRunnableOpDispenserWrapper.java index 647a9cc62..2d90774c7 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/EmitterRunnableOpDispenserWrapper.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/opwrappers/EmitterRunnableOpDispenserWrapper.java @@ -35,6 +35,11 @@ public class EmitterRunnableOpDispenserWrapper extends BaseOpDispenser sc = new ConcurrentIndexCache<>("testing",l -> String.valueOf(l)); + ConcurrentIndexCache sc = new ConcurrentIndexCache<>("testing1",l -> String.valueOf(l)); String s = sc.get(300); assertThat(s).isEqualTo("300"); } + @Test + public void testCount() { + ConcurrentIndexCache sc = new ConcurrentIndexCache<>("testing2",l -> String.valueOf(l)); + for (int i = 0; i < 1000; i++) { + String name = sc.get(i); + } + assertThat(sc.size()).isEqualTo(1000); + for (int i = 999; i > 0; i-=2) { + sc.remove(i); + } + assertThat(sc.size()).isEqualTo(500); + assertThat(sc.remove(1001)).isNull(); + } + + @Test + public void TestTraversal() { + + int[] indices = new int[1000]; + for (int i = 0; i < 1000; i++) { + // generate an assortment of in-range indexes, but not all of them + indices[i] = (int) (Math.abs(Math.sin((double)i)*1000)); + } + int[] distinct = Arrays.stream(indices).sorted().distinct().toArray(); + + ConcurrentIndexCache sc = new ConcurrentIndexCache<>("testing3"); + for (int i : distinct) { + sc.get(i,l -> String.valueOf(l)); + } + + Iterator iter = sc.iterator(); + for (int i = 0; i < distinct.length; i++) { + assertThat(iter.hasNext()).isTrue(); + String nextValue = iter.next(); + assertThat(nextValue.equals(String.valueOf(distinct[i]))); + } + + sc.clear(); + assertThat(sc.size()==0); + } + + @Test + public void testSafetyLimit() { + ConcurrentIndexCache sc = new ConcurrentIndexCache<>("testing4", String::valueOf, 1000); + assertThat(sc.get(1000)).isNotNull(); + assertThat(sc.remove(11000)).isNull(); + assertThatThrownBy(() -> sc.get(1001)).hasMessageContaining("too high"); + } + } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/Dryrun.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/Dryrun.java new file mode 100644 index 000000000..7293a07e8 --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/Dryrun.java @@ -0,0 +1,36 @@ +package io.nosqlbench.engine.api.activityimpl; + +/* + * Copyright (c) nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +public enum Dryrun { + /** + * Ops are executed normally, no change to the dispenser behavior + */ + none, + /** + * Ops will be synthesized, but they will not be executed. + * This is done by wrapping the synthesized op in a no-op facade + */ + op, + /** + * Ops will print the toString version of their result to stdout. + * This is done by wrapping the synthesized op in a post-emit facade. + */ + emit +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/OpWrappers.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/OpWrappers.java index 4e770e914..30d45924f 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/OpWrappers.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/OpWrappers.java @@ -43,49 +43,30 @@ public class OpWrappers { ParsedOp pop, String dryrunSpec ) { - if (dryrunSpec.isEmpty() || "none".equals(dryrunSpec)) { - return dispenser; - } - - - if ("op".equalsIgnoreCase(dryrunSpec)) { - Op exampleOp = dispenser.getOp(0L); - - if (exampleOp instanceof RunnableOp runnableOp) { - dispenser = new DryRunnableOpDispenserWrapper(adapter, pop, dispenser); - } else if (exampleOp instanceof CycleOp cycleOp) { - dispenser = new DryCycleOpDispenserWrapper(adapter, pop, dispenser); - } else { - throw new OpConfigError("Unable to wrap op named '" + pop.getDefinedNames() + "' for dry run, since" + - "only RunnableOp and CycleOp types are supported"); + Dryrun dryrun = Dryrun.valueOf(dryrunSpec); + return switch (dryrun) { + case none -> dispenser; + case op -> { + Op exampleOp = dispenser.getOp(0L); + yield switch (exampleOp) { + case RunnableOp runnableOp -> new DryRunnableOpDispenserWrapper(adapter, pop, dispenser); + case CycleOp cycleOp -> new DryCycleOpDispenserWrapper(adapter, pop, dispenser); + default -> throw new OpConfigError( + "Unable to wrap op named '" + + pop.getDefinedNames() + "' for dry run, since" + + "only RunnableOp and CycleOp types are supported"); + }; } - logger.warn( - "initialized {} for dry run only. " + - "This op will be synthesized for each cycle, but will not be executed.", - pop.getName() - ); - - } else if ("emit".equalsIgnoreCase(dryrunSpec)) { - Op exampleOp = dispenser.getOp(0L); - if (exampleOp instanceof RunnableOp runnableOp) { - dispenser = new EmitterRunnableOpDispenserWrapper(adapter, pop, dispenser); - } else if (exampleOp instanceof CycleOp cycleOp) { - dispenser = new EmitterCycleOpDispenserWrapper(adapter, pop, dispenser); - } else { - throw new OpConfigError("Unable to make op named '" + pop.getName() + "' emit a value, " + - "since only RunnableOp and CycleOp types are supported"); + case emit -> { + Op exampleOp = dispenser.getOp(0L); + yield switch (exampleOp) { + case RunnableOp runnableOp -> new EmitterRunnableOpDispenserWrapper(adapter, pop, dispenser); + case CycleOp cycleOp -> new EmitterCycleOpDispenserWrapper(adapter, pop, dispenser); + default -> + throw new OpConfigError("Unable to make op named '" + pop.getName() + "' emit a value, " + + "since only RunnableOp and CycleOp types are supported"); + }; } - dispenser = new EmitterRunnableOpDispenserWrapper( - (DriverAdapter) adapter, - pop, - (OpDispenser) dispenser - ); - logger.warn( - "initialized {} for to emit the result type to stdout. ", - pop.getName() - ); - - } - return dispenser; + }; } }