MongoDB Adapter fix and updates for scenarios (#1196)

* Updates for mongodb adapter; fix connection usage on cli

* Reduce the cycles for test w/ sporadic failures

* Removed redundant '-> string'

* Removed state tracking and cleanup

* Space cache capture and connection management

* Mongo client management w/ connection

* Remove unused getData()

* Removed debug entry no longer required
This commit is contained in:
Jeff Banks
2023-04-04 16:09:00 -05:00
committed by GitHub
parent 1e27a93e8a
commit 388ece75f7
18 changed files with 308 additions and 298 deletions

View File

@@ -18,9 +18,9 @@ package io.nosqlbench.engine.api.activityimpl;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.metrics.ThreadLocalNamedTimers;
import io.nosqlbench.engine.api.templating.ParsedOp;
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
/**
* {@inheritDoc}
* See {@link OpDispenser} for details on how to use this type.
*
* <p>
* Some details are tracked per op template, which aligns to the life-cycle of the op dispenser.
* Thus, each op dispenser is where the stats for all related operations are kept.
*
@@ -37,51 +37,41 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T> {
private final String name;
private final String opName;
protected final DriverAdapter<T, S> adapter;
private boolean instrument;
private Histogram resultSizeHistogram;
private Timer successTimer;
private Timer errorTimer;
private String[] timerStarts = new String[0];
private String[] timerStops = new String[0];
private final String[] timerStarts;
private final String[] timerStops;
public BaseOpDispenser(DriverAdapter<T,S> adapter,ParsedOp op) {
this.name = op.getName();
protected BaseOpDispenser(DriverAdapter<T, S> adapter, ParsedOp op) {
this.opName = op.getName();
this.adapter = adapter;
timerStarts = op.takeOptionalStaticValue("start-timers", String.class)
.map(s -> s.split(", *"))
.orElse(null);
.map(s -> s.split(", *"))
.orElse(null);
timerStops = op.takeOptionalStaticValue("stop-timers", String.class)
.map(s -> s.split(", *"))
.orElse(null);
.map(s -> s.split(", *"))
.orElse(null);
if (timerStarts!=null) {
if (timerStarts != null) {
for (String timerStart : timerStarts) {
ThreadLocalNamedTimers.addTimer(op,timerStart);
ThreadLocalNamedTimers.addTimer(op, timerStart);
}
}
configureInstrumentation(op);
}
public DriverAdapter<T,S> getAdapter() {
return adapter;
String getOpName() {
return opName;
}
// public BaseOpDispenser(CommandTemplate cmdtpl) {
// this.name = cmdtpl.getName();
// }
/**
* {@inheritDoc}
*
* @param cycle The cycle number which serves as the seed for any
* generated op fields to be bound into an operation.
* @return
*/
@Override
public abstract T apply(long cycle);
public DriverAdapter<T, S> getAdapter() {
return adapter;
}
protected String getDefaultMetricsPrefix(ParsedOp pop) {
return pop.getStaticConfigOr("alias", "UNKNOWN") + "-" + pop.getName() + "--";
@@ -98,32 +88,31 @@ public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T>
@Override
public void onStart(long cycleValue) {
if (timerStarts!=null) {
if (timerStarts != null) {
ThreadLocalNamedTimers.TL_INSTANCE.get().start(timerStarts);
}
}
@Override
public void onSuccess(long cycleValue, long nanoTime, long resultsize) {
public void onSuccess(long cycleValue, long nanoTime, long resultSize) {
if (instrument) {
successTimer.update(nanoTime, TimeUnit.NANOSECONDS);
if (resultsize>-1) {
resultSizeHistogram.update(resultsize);
if (resultSize > -1) {
resultSizeHistogram.update(resultSize);
}
}
if (timerStops!=null) {
if (timerStops != null) {
ThreadLocalNamedTimers.TL_INSTANCE.get().stop(timerStops);
}
// ThreadLocalNamedTimers.TL_INSTANCE.get().stop(stopTimers);
}
@Override
public void onError(long cycleValue, long resultNanos, Throwable t) {
if (instrument) {
errorTimer.update(resultNanos, TimeUnit.NANOSECONDS);
}
if (timerStops!=null) {
if (timerStops != null) {
ThreadLocalNamedTimers.TL_INSTANCE.get().stop(timerStops);
}
}

View File

@@ -42,15 +42,16 @@ public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapte
* BaseDriverAdapter will take any provided functions from {@link #getOpStmtRemappers()}
* and {@link #getOpFieldRemappers()} and construct a preprocessor list. These are applied
* successively to the op template fields so long as remapping occurs.
*
* @return a list of preprocessors for op template fields
*/
@Override
public final Function<Map<String, Object>, Map<String, Object>> getPreprocessor() {
List<Function<Map<String, Object>, Map<String, Object>>> mappers = new ArrayList<>();
List<Function<Map<String, Object>, Map<String, Object>>> stmtRemappers =
getOpStmtRemappers().stream()
.map(m -> new FieldDestructuringMapper("stmt", m))
.collect(Collectors.toList());
getOpStmtRemappers().stream()
.map(m -> new FieldDestructuringMapper("stmt", m))
.collect(Collectors.toList());
mappers.addAll(stmtRemappers);
mappers.addAll(getOpFieldRemappers());
@@ -77,13 +78,13 @@ public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapte
* This allows users to specify String-form op templates which are automatically
* parsed and destructured into the canonical field-wise form for a given type of
* operation.</p>
*
* <p>
* <br/>
*
* <p>Each function in this list is applied in order. If the function returns a value,
* then the 'stmt' field is removed and the resulting map is added to the other
* fields in the op template.</p>
*
* <p>
* <br/>
*
* <p>If a driver adapter is meant to support the {@code stmt} field, then this
@@ -92,7 +93,7 @@ public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapte
* implementation does nothing, as it must be decided per-driver whether or not
* the stmt field will be used directly or whether it is short-hand for a more
* canonical form.
*
* <p>
* <br/>
*
* <p>If you want to automatically destructure stmt values into a map and inject
@@ -112,6 +113,7 @@ public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapte
/**
* <p>Provide a list of field remappers which operate on arbitrary fields.
* Each function is applied to the op template fields. </p>
*
* @return
*/
@Override
@@ -120,7 +122,7 @@ public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapte
}
@Override
public synchronized final DriverSpaceCache<? extends S> getSpaceCache() {
public final synchronized DriverSpaceCache<? extends S> getSpaceCache() {
if (spaceCache == null) {
spaceCache = new DriverSpaceCache<>(getSpaceInitializer(getConfiguration()));
}
@@ -151,33 +153,33 @@ public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapte
@Override
public NBConfigModel getConfigModel() {
return ConfigModel.of(BaseDriverAdapter.class)
.add(Param.optional("alias"))
.add(Param.defaultTo("strict", true, "strict op field mode, which requires that provided op fields are recognized and used"))
.add(Param.optional(List.of("op", "stmt", "statement"), String.class, "op template in statement form"))
.add(Param.optional("tags", String.class, "tags to be used to filter operations"))
.add(Param.defaultTo("errors", "stop", "error handler configuration"))
.add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool"))
.add(Param.optional("stride").setRegex("\\d+"))
.add(Param.optional("striderate", String.class, "rate limit for strides per second"))
.add(Param.optional("cycles").setRegex("\\d+[KMBGTPE]?|\\d+[KMBGTPE]?\\.\\.\\d+[KMBGTPE]?").setDescription("cycle interval to use"))
.add(Param.optional("recycles").setDescription("allow cycles to be re-used this many times"))
.add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second"))
.add(Param.optional("phaserate", String.class, "rate limit for phases per second"))
.add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver", String.class))
.add(Param.defaultTo("dryrun","none").setRegex("(op|jsonnet|none)"))
.asReadOnly();
.add(Param.optional("alias"))
.add(Param.defaultTo("strict", true, "strict op field mode, which requires that provided op fields are recognized and used"))
.add(Param.optional(List.of("op", "stmt", "statement"), String.class, "op template in statement form"))
.add(Param.optional("tags", String.class, "tags to be used to filter operations"))
.add(Param.defaultTo("errors", "stop", "error handler configuration"))
.add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool"))
.add(Param.optional("stride").setRegex("\\d+"))
.add(Param.optional("striderate", String.class, "rate limit for strides per second"))
.add(Param.optional("cycles").setRegex("\\d+[KMBGTPE]?|\\d+[KMBGTPE]?\\.\\.\\d+[KMBGTPE]?").setDescription("cycle interval to use"))
.add(Param.optional("recycles").setDescription("allow cycles to be re-used this many times"))
.add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second"))
.add(Param.optional("phaserate", String.class, "rate limit for phases per second"))
.add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver", String.class))
.add(Param.defaultTo("dryrun", "none").setRegex("(op|jsonnet|none)"))
.asReadOnly();
}
@Override
public NBConfigModel getReconfigModel() {
return ConfigModel.of(BaseDriverAdapter.class)
.add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool"))
.add(Param.optional("striderate", String.class, "rate limit for strides per second"))
.add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second"))
.asReadOnly();
.add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool"))
.add(Param.optional("striderate", String.class, "rate limit for strides per second"))
.add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second"))
.asReadOnly();
}
@Override