jshook/nosqlbench-2057-flattenop (#2067)

* remove ChainingOp

* typos and other minor fixes

* remove Op, ChainingOp, flatten to CycleOp, derive RunnableOp

* refactor qdrant for aligned op types

* refactor mongodb for aligned op types

* refactor cqld4 for aligned op types

* minor API alignment

refactor gcpspanner for aligned op types

* minor API alignment

refactor gcpspanner for aligned op types

add license

* fix cqld4 batch statement mapper

* fix cql base op mapper

* remove var inference

* remove emitter for runnable op
This commit is contained in:
Jonathan Shook
2024-11-04 16:42:05 -06:00
committed by GitHub
parent 62ad93cb59
commit ea7fceae49
87 changed files with 331 additions and 539 deletions

View File

@@ -17,7 +17,6 @@
package io.nosqlbench.engine.api.activityapi.planning;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import java.util.function.LongFunction;
@@ -29,7 +28,7 @@ import java.util.function.LongFunction;
*/
public interface OpSource<T> extends LongFunction<T> {
static <O extends Op> OpSource<O> of(OpSequence<OpDispenser<? extends O>> seq) {
static <O extends LongFunction<?>> OpSource<O> of(OpSequence<OpDispenser<? extends O>> seq) {
return (long l) -> seq.apply(l).getOp(l);
}

View File

@@ -22,14 +22,9 @@ import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryCycleOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunnableOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterCycleOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterRunnableOpDispenserWrapper;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.nb.api.errors.OpConfigError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -37,7 +32,7 @@ public class OpWrappers {
public final static Logger logger = LogManager.getLogger(OpWrappers.class);
public static <OP extends Op, SPACE extends Space> OpDispenser<OP> wrapOptionally(
public static <OP extends CycleOp<?>, SPACE extends Space> OpDispenser<OP> wrapOptionally(
DriverAdapter<OP, SPACE> adapter,
OpDispenser<OP> dispenser,
ParsedOp pop,
@@ -46,27 +41,8 @@ public class OpWrappers {
Dryrun dryrun = Dryrun.valueOf(dryrunSpec);
return switch (dryrun) {
case none -> dispenser;
case op -> {
Op exampleOp = dispenser.getOp(0L);
yield switch (exampleOp) {
case RunnableOp runnableOp -> new DryRunnableOpDispenserWrapper(adapter, pop, dispenser);
case CycleOp<?> cycleOp -> new DryCycleOpDispenserWrapper(adapter, pop, dispenser);
default -> throw new OpConfigError(
"Unable to wrap op named '"
+ pop.getDefinedNames() + "' for dry run, since"
+ "only RunnableOp and CycleOp<Result> types are supported");
};
}
case emit -> {
Op exampleOp = dispenser.getOp(0L);
yield switch (exampleOp) {
case RunnableOp runnableOp -> new EmitterRunnableOpDispenserWrapper(adapter, pop, dispenser);
case CycleOp<?> cycleOp -> new EmitterCycleOpDispenserWrapper(adapter, pop, dispenser);
default ->
throw new OpConfigError("Unable to make op named '" + pop.getName() + "' emit a value, " +
"since only RunnableOp and CycleOp<Result> types are supported");
};
}
case op -> new DryCycleOpDispenserWrapper(adapter, pop, dispenser);
case emit -> new EmitterCycleOpDispenserWrapper(adapter, pop, dispenser);
};
}
}

View File

@@ -26,11 +26,6 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunnableOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterCycleOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterRunnableOpDispenserWrapper;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter;
@@ -396,10 +391,10 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
}
protected <O extends Op> OpSequence<OpDispenser<? extends O>> createOpSourceFromParsedOps(
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps(
// Map<String, DriverAdapter<?,?>> adapterCache,
// Map<String, OpMapper<? extends Op>> mapperCache,
List<DriverAdapter<Op, Space>> adapters,
List<DriverAdapter<CycleOp<?>, Space>> adapters,
List<ParsedOp> pops
) {
try {
@@ -415,7 +410,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(sequencerType);
for (int i = 0; i < pops.size(); i++) {
long ratio = ratios.get(i);
@@ -427,17 +422,17 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
continue;
}
DriverAdapter<Op, Space> adapter = adapters.get(i);
OpMapper<Op, Space> opMapper = adapter.getOpMapper();
DriverAdapter<CycleOp<?>, Space> adapter = adapters.get(i);
OpMapper<CycleOp<?>, Space> opMapper = adapter.getOpMapper();
LongFunction<Space> spaceFunc = adapter.getSpaceFunc(pop);
OpDispenser<Op> dispenser = opMapper.apply(pop, spaceFunc);
OpDispenser<CycleOp<?>> dispenser = opMapper.apply(pop, spaceFunc);
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
dispenser = OpWrappers.wrapOptionally(adapter, dispenser, pop, dryrunSpec);
// if (strict) {
// optemplate.assertConsumed();
// }
planner.addOp((OpDispenser<? extends O>) dispenser, ratio);
planner.addOp((OpDispenser<? extends CycleOp<?>>) dispenser, ratio);
} catch (Exception e) {
throw new OpConfigError("Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(), e);
}

View File

@@ -25,7 +25,7 @@ import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.lifecycle.Shutdownable;
@@ -60,10 +60,10 @@ import java.util.concurrent.ConcurrentHashMap;
* @param <S>
* The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
*/
public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver {
public class StandardActivity<R extends java.util.function.LongFunction, S> extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence<OpDispenser<? extends Op>> sequence;
private final ConcurrentHashMap<String, DriverAdapter<Op,Space>> adapters = new ConcurrentHashMap<>();
private final OpSequence<OpDispenser<? extends CycleOp<?>>> sequence;
private final ConcurrentHashMap<String, DriverAdapter<CycleOp<?>,Space>> adapters = new ConcurrentHashMap<>();
public StandardActivity(NBComponent parent, ActivityDef activityDef) {
super(parent, activityDef);
@@ -93,11 +93,11 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
List<ParsedOp> pops = new ArrayList<>();
List<DriverAdapter<Op, Space>> adapterlist = new ArrayList<>();
List<DriverAdapter<CycleOp<?>, Space>> adapterlist = new ArrayList<>();
NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(yamlmodel);
Optional<String> defaultDriverOption = defaultDriverName;
ConcurrentHashMap<String, OpMapper<? extends Op, ? extends Space>> mappers = new ConcurrentHashMap<>();
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers = new ConcurrentHashMap<>();
for (OpTemplate ot : opTemplates) {
// ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this);
String driverName = ot.getOptionalStringParam("driver", String.class)
@@ -113,7 +113,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
// HERE
if (!adapters.containsKey(driverName)) {
DriverAdapter<Op,Space> adapter = Optional.of(driverName)
DriverAdapter<CycleOp<?>,Space> adapter = Optional.of(driverName)
.flatMap(
name -> ServiceSelector.of(
name,
@@ -145,7 +145,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
DriverAdapter<Op, Space> adapter = adapters.get(driverName);
DriverAdapter<CycleOp<?>, Space> adapter = adapters.get(driverName);
adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
@@ -195,7 +195,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
}
public OpSequence<OpDispenser<? extends Op>> getOpSequence() {
public OpSequence<OpDispenser<? extends CycleOp<?>>> getOpSequence() {
return sequence;
}
@@ -262,7 +262,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
*/
@Override
public void shutdownActivity() {
for (Map.Entry<String, DriverAdapter<Op,Space>> entry : adapters.entrySet()) {
for (Map.Entry<String, DriverAdapter<CycleOp<?>,Space>> entry : adapters.entrySet()) {
String adapterName = entry.getKey();
DriverAdapter<?, ?> adapter = entry.getValue();
for (Space space : adapter.getSpaceCache()) {

View File

@@ -46,7 +46,7 @@ import java.util.concurrent.TimeUnit;
* @param <R>
* The type of operation
*/
public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> implements SyncAction, ActivityDefObserver {
public class StandardAction<A extends StandardActivity<R, ?>, R extends java.util.function.LongFunction> implements SyncAction, ActivityDefObserver {
private final static Logger logger = LogManager.getLogger("ACTION");
private final Timer executeTimer;
private final Histogram triesHistogram;
@@ -54,7 +54,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
private final Timer resultTimer;
private final Timer bindTimer;
private final NBErrorHandler errorHandler;
private final OpSequence<OpDispenser<? extends Op>> opsequence;
private final OpSequence<OpDispenser<? extends CycleOp<?>>> opsequence;
private final int maxTries;
private final Timer verifierTimer;
@@ -73,8 +73,8 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
@Override
public int runCycle(long cycle) {
OpDispenser<? extends Op> dispenser=null;
Op op = null;
OpDispenser<? extends CycleOp<?>> dispenser=null;
CycleOp op = null;
try (Timer.Context ct = bindTimer.time()) {
dispenser = opsequence.apply(cycle);
@@ -96,16 +96,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
dispenser.onStart(cycle);
try (Timer.Context ct = executeTimer.time()) {
if (op instanceof RunnableOp runnableOp) {
runnableOp.run();
} else if (op instanceof CycleOp<?> cycleOp) {
result = cycleOp.apply(cycle);
} else if (op instanceof ChainingOp chainingOp) {
result = chainingOp.apply(result);
} else {
throw new RuntimeException("The op implementation did not implement any active logic. Implement " +
"one of [RunnableOp, CycleOp, or ChainingOp]");
}
result = op.apply(cycle);
// TODO: break out validation timer from execute
try (Timer.Context ignored = verifierTimer.time()) {
CycleFunction<Boolean> verifier = dispenser.getVerifier();