review fixes

add oom warning on index cache

more fixes
This commit is contained in:
Jonathan Shook 2024-10-30 18:10:43 -05:00
parent f6cbd5e42b
commit 9c53b4fa7c
14 changed files with 184 additions and 71 deletions

View File

@ -40,6 +40,7 @@ import org.apache.logging.log4j.Logger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public abstract class Cqld4BaseOpDispenser<T extends Cqld4BaseOp> extends BaseOpDispenser<T, Cqld4Space> implements CqlOpMetrics {

View File

@ -57,7 +57,6 @@ public class MongoOpMapper<MC extends MongoDirectCommandOp> implements OpMapper<
if (connectionURL == null) {
throw new BasicError("Must provide a connection value for use by the MongoDB adapter.");
}
spaceCache.get(0).createMongoClient(connectionURL);
Optional<LongFunction<String>> oDatabaseF = op.getAsOptionalFunction("database");
if (oDatabaseF.isEmpty()) {

View File

@ -195,8 +195,8 @@ public abstract class BaseDriverAdapter<R extends Op, S extends Space> extends N
@Override
public LongFunction<S> getSpaceFunc(ParsedOp pop) {
Optional<LongFunction<String>> spaceFuncTest = pop.getAsOptionalFunction("space");
LongUnaryOperator cycleToSpaceF;
Optional<LongFunction<Object>> spaceFuncTest = pop.getAsOptionalFunction("space",Object.class);
LongToIntFunction cycleToSpaceF;
if (spaceFuncTest.isEmpty()) {
cycleToSpaceF = (long l) -> 0;
} else {
@ -204,7 +204,7 @@ public abstract class BaseDriverAdapter<R extends Op, S extends Space> extends N
if (example instanceof Number n) {
logger.trace("mapping space indirectly with Number type");
LongFunction<Number> numberF = pop.getAsRequiredFunction("space", Number.class);
cycleToSpaceF= l -> numberF.apply(l).longValue();
cycleToSpaceF= l -> numberF.apply(l).intValue();
} else {
logger.trace("mapping space indirectly through hash table to index pool");
LongFunction<?> sourceF = pop.getAsRequiredFunction("space", String.class);
@ -214,6 +214,6 @@ public abstract class BaseDriverAdapter<R extends Op, S extends Space> extends N
}
}
ConcurrentSpaceCache<S> spaceCache1 = getSpaceCache();
return l -> spaceCache1.get(cycleToSpaceF.applyAsLong(l));
return l -> spaceCache1.get(cycleToSpaceF.applyAsInt(l));
}
}

View File

@ -18,6 +18,8 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
*/
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
@ -35,4 +37,10 @@ public class BaseSpace<SelfT extends BaseSpace<SelfT> > implements Space {
public String getName() {
return spaceName;
}
public static class BasicSpace extends BaseSpace<BasicSpace> implements Space {
public BasicSpace(DriverAdapter<Op, BasicSpace> adapter, long idx) {
super(adapter, idx);
}
}
}

View File

@ -18,11 +18,8 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
*/
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.core.NBNamedElement;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
@ -35,13 +32,10 @@ import java.util.function.LongFunction;
/**
* <P>This cache implementation packs referents into an atomic array, keeping things as compact as possible,
* allowing auto-resizing, size tracking, and supporting concurrent access with minimal locking. It also uses a bitset
* to track the
* referent indices for enumeration or traversal.</P>
* to track the referent indices for enumeration or traversal.</P>
*
* <P>TODO: The referent indices are intended to be drawn from a contiguous set of integer identifiers. If a referent
* index which is extremely large is accessed, this will cause the referent array to be resized, possibly
* causing OOM. Because of this, some sampling methods will likely be applied to this layer to pre-verify
* the likely bounds of provided indices prior to actually using them.</P>
* <P>In order to protect against unexpected OOM scenarios, the maximum index is defaulted to 1000000. If you want
* to have index caches bigger than this, pass ina higher limit.</P>
*
* @param <T>
*/
@ -53,26 +47,34 @@ public class ConcurrentIndexCache<T> implements Iterable<T> {
private final BitSet active = new BitSet();
private final String label;
private volatile int count = 0;
private int maxIndex = 1000000;
// Constructor with initial capacity
public ConcurrentIndexCache(String label, LongFunction<T> valueLoader) {
public ConcurrentIndexCache(String label, LongFunction<T> valueLoader, int maxIndex) {
this.label = label;
this.valueLoader = valueLoader;
this.cacheRef = new AtomicReference<>(new AtomicReferenceArray<>(1));
this.maxIndex = maxIndex;
}
public ConcurrentIndexCache(String label, LongFunction<T> valueLoader) {
this(label, valueLoader, 1000000);
}
public ConcurrentIndexCache(String label) {
this(label, null);
}
public T get(long longkey) {
return get(longkey, valueLoader);
public T get(long key) {
return get(key, valueLoader);
}
public T get(long longkey, LongFunction<T> defaultValueLoader) {
if (longkey > Integer.MAX_VALUE) {
throw new OpConfigError("space index must be between 0 and " + (Integer.MAX_VALUE - 1) + " inclusive");
if (longkey > maxIndex) {
if (longkey > maxIndex) {
throw new BasicError("index " + longkey + " too high (outside 0.." + maxIndex + ")");
}
}
int key = (int) longkey;
@ -107,6 +109,9 @@ public class ConcurrentIndexCache<T> implements Iterable<T> {
// Method to resize the array if key exceeds current capacity
private synchronized void resize(int key) {
if (key > maxIndex) {
throw new BasicError("index " + key + " too high (outside 0.." + maxIndex + ")");
}
AtomicReferenceArray<T> currentCache = cacheRef.get();
if (key < currentCache.length()) {
return; // Double-check locking to avoid multiple resizes
@ -142,7 +147,8 @@ public class ConcurrentIndexCache<T> implements Iterable<T> {
// Optional: Method to clear the entire cache
public void clear() {
cacheRef.set(new AtomicReferenceArray<>(1));
count=0;
active.clear();
count = 0;
}
@Override

View File

@ -19,6 +19,7 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
import io.nosqlbench.virtdata.library.basics.shared.functionadapters.ToLongFunction;
import scala.Int;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@ -32,14 +33,14 @@ import java.util.function.Function;
*/
public class ConcurrentIndexCacheWrapper {
private ConcurrentHashMap<Object,Long> forwardMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<Object, Integer> forwardMap = new ConcurrentHashMap<>();
public long mapKeyToIndex(Object key) {
public int mapKeyToIndex(Object key) {
return forwardMap.computeIfAbsent(key, this::nextIndex);
}
private long idx=0;
private synchronized long nextIndex(Object any) {
private int idx=0;
private synchronized int nextIndex(Object any) {
return idx++;
}

View File

@ -161,7 +161,12 @@ public interface DriverAdapter<OPTYPE extends Op, SPACETYPE extends Space> exten
* object state related to retained objects for the lifetime of a native driver.
*/
default LongFunction<SPACETYPE> getSpaceInitializer(NBConfiguration cfg) {
return n -> null;
return n -> (SPACETYPE) new Space() {
@Override
public String getName() {
return "empty_space";
}
};
}
NBConfiguration getConfiguration();

View File

@ -35,6 +35,12 @@ public class DryCycleOpDispenserWrapper<S extends Space, RESULT> extends BaseOpD
) {
super(adapter, pop);
this.realDispenser = realDispenser;
logger.warn(
"initialized {} for dry run only. " +
"This op will be synthesized for each cycle, but will not be executed.",
pop.getName()
);
}
@Override

View File

@ -22,9 +22,10 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DryRunnableOpDispenserWrapper<S extends Space> extends BaseOpDispenser<RunnableOp, S> {
private final OpDispenser<RunnableOp> realDispenser;
public DryRunnableOpDispenserWrapper(
@ -34,6 +35,12 @@ public class DryRunnableOpDispenserWrapper<S extends Space> extends BaseOpDispen
) {
super(adapter, pop);
this.realDispenser = realDispenser;
logger.warn(
"initialized {} for dry run only. " +
"This op will be synthesized for each cycle, but will not be executed.",
pop.getName()
);
}
@Override

View File

@ -35,6 +35,11 @@ public class EmitterCycleOpDispenserWrapper<O,S extends Space,R> extends BaseOpD
) {
super(adapter, pop);
this.realDispenser = realDispenser;
logger.warn(
"initialized {} for to emit the result type to stdout. ",
pop.getName()
);
}
@Override

View File

@ -35,6 +35,11 @@ public class EmitterRunnableOpDispenserWrapper<O,S> extends BaseOpDispenser<Runn
) {
super(adapter, pop);
this.realDispenser = realDispenser;
logger.warn(
"initialized {} for to emit the result type to stdout. ",
pop.getName()
);
}
@Override

View File

@ -20,15 +20,68 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.stream.StreamSupport;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
class ConcurrentIndexCacheTest {
@Test
public void testBasicCache() {
ConcurrentIndexCache<String> sc = new ConcurrentIndexCache<>("testing",l -> String.valueOf(l));
ConcurrentIndexCache<String> sc = new ConcurrentIndexCache<>("testing1",l -> String.valueOf(l));
String s = sc.get(300);
assertThat(s).isEqualTo("300");
}
@Test
public void testCount() {
ConcurrentIndexCache<String> sc = new ConcurrentIndexCache<>("testing2",l -> String.valueOf(l));
for (int i = 0; i < 1000; i++) {
String name = sc.get(i);
}
assertThat(sc.size()).isEqualTo(1000);
for (int i = 999; i > 0; i-=2) {
sc.remove(i);
}
assertThat(sc.size()).isEqualTo(500);
assertThat(sc.remove(1001)).isNull();
}
@Test
public void TestTraversal() {
int[] indices = new int[1000];
for (int i = 0; i < 1000; i++) {
// generate an assortment of in-range indexes, but not all of them
indices[i] = (int) (Math.abs(Math.sin((double)i)*1000));
}
int[] distinct = Arrays.stream(indices).sorted().distinct().toArray();
ConcurrentIndexCache<String> sc = new ConcurrentIndexCache<>("testing3");
for (int i : distinct) {
sc.get(i,l -> String.valueOf(l));
}
Iterator<String> iter = sc.iterator();
for (int i = 0; i < distinct.length; i++) {
assertThat(iter.hasNext()).isTrue();
String nextValue = iter.next();
assertThat(nextValue.equals(String.valueOf(distinct[i])));
}
sc.clear();
assertThat(sc.size()==0);
}
@Test
public void testSafetyLimit() {
ConcurrentIndexCache<String> sc = new ConcurrentIndexCache<>("testing4", String::valueOf, 1000);
assertThat(sc.get(1000)).isNotNull();
assertThat(sc.remove(11000)).isNull();
assertThatThrownBy(() -> sc.get(1001)).hasMessageContaining("too high");
}
}

View File

@ -0,0 +1,36 @@
package io.nosqlbench.engine.api.activityimpl;
/*
* Copyright (c) nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
public enum Dryrun {
/**
* Ops are executed normally, no change to the dispenser behavior
*/
none,
/**
* Ops will be synthesized, but they will not be executed.
* This is done by wrapping the synthesized op in a no-op facade
*/
op,
/**
* Ops will print the toString version of their result to stdout.
* This is done by wrapping the synthesized op in a post-emit facade.
*/
emit
}

View File

@ -43,49 +43,30 @@ public class OpWrappers {
ParsedOp pop,
String dryrunSpec
) {
if (dryrunSpec.isEmpty() || "none".equals(dryrunSpec)) {
return dispenser;
}
if ("op".equalsIgnoreCase(dryrunSpec)) {
Op exampleOp = dispenser.getOp(0L);
if (exampleOp instanceof RunnableOp runnableOp) {
dispenser = new DryRunnableOpDispenserWrapper(adapter, pop, dispenser);
} else if (exampleOp instanceof CycleOp<?> cycleOp) {
dispenser = new DryCycleOpDispenserWrapper(adapter, pop, dispenser);
} else {
throw new OpConfigError("Unable to wrap op named '" + pop.getDefinedNames() + "' for dry run, since" +
"only RunnableOp and CycleOp<Result> types are supported");
Dryrun dryrun = Dryrun.valueOf(dryrunSpec);
return switch (dryrun) {
case none -> dispenser;
case op -> {
Op exampleOp = dispenser.getOp(0L);
yield switch (exampleOp) {
case RunnableOp runnableOp -> new DryRunnableOpDispenserWrapper(adapter, pop, dispenser);
case CycleOp<?> cycleOp -> new DryCycleOpDispenserWrapper(adapter, pop, dispenser);
default -> throw new OpConfigError(
"Unable to wrap op named '"
+ pop.getDefinedNames() + "' for dry run, since"
+ "only RunnableOp and CycleOp<Result> types are supported");
};
}
logger.warn(
"initialized {} for dry run only. " +
"This op will be synthesized for each cycle, but will not be executed.",
pop.getName()
);
} else if ("emit".equalsIgnoreCase(dryrunSpec)) {
Op exampleOp = dispenser.getOp(0L);
if (exampleOp instanceof RunnableOp runnableOp) {
dispenser = new EmitterRunnableOpDispenserWrapper(adapter, pop, dispenser);
} else if (exampleOp instanceof CycleOp<?> cycleOp) {
dispenser = new EmitterCycleOpDispenserWrapper(adapter, pop, dispenser);
} else {
throw new OpConfigError("Unable to make op named '" + pop.getName() + "' emit a value, " +
"since only RunnableOp and CycleOp<Result> types are supported");
case emit -> {
Op exampleOp = dispenser.getOp(0L);
yield switch (exampleOp) {
case RunnableOp runnableOp -> new EmitterRunnableOpDispenserWrapper(adapter, pop, dispenser);
case CycleOp<?> cycleOp -> new EmitterCycleOpDispenserWrapper(adapter, pop, dispenser);
default ->
throw new OpConfigError("Unable to make op named '" + pop.getName() + "' emit a value, " +
"since only RunnableOp and CycleOp<Result> types are supported");
};
}
dispenser = new EmitterRunnableOpDispenserWrapper(
(DriverAdapter<Op, Space>) adapter,
pop,
(OpDispenser<? extends Op>) dispenser
);
logger.warn(
"initialized {} for to emit the result type to stdout. ",
pop.getName()
);
}
return dispenser;
};
}
}