refactor dryrun for type clarity

This commit is contained in:
Jonathan Shook
2024-10-23 13:40:09 -05:00
parent d5ec597152
commit 05c6b82bba
12 changed files with 339 additions and 34 deletions

View File

@@ -0,0 +1,73 @@
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryCycleOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunnableOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterCycleOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterRunnableOpDispenserWrapper;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.nb.api.errors.OpConfigError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class OpWrappers {
public final static Logger logger = LogManager.getLogger(OpWrappers.class);
public static <OP extends Op, SPACE extends Space> OpDispenser<OP> wrapOptionally(
DriverAdapter<OP, SPACE> adapter,
OpDispenser<OP> dispenser,
ParsedOp pop,
String dryrunSpec
) {
if (dryrunSpec.isEmpty() || "none".equals(dryrunSpec)) {
return dispenser;
}
if ("op".equalsIgnoreCase(dryrunSpec)) {
Op exampleOp = dispenser.getOp(0L);
if (exampleOp instanceof RunnableOp runnableOp) {
dispenser = new DryRunnableOpDispenserWrapper(adapter, pop, dispenser);
} else if (exampleOp instanceof CycleOp<?> cycleOp) {
dispenser = new DryCycleOpDispenserWrapper(adapter, pop, dispenser);
} else {
throw new OpConfigError("Unable to wrap op named '" + pop.getDefinedNames() + "' for dry run, since" +
"only RunnableOp and CycleOp<Result> types are supported");
}
logger.warn(
"initialized {} for dry run only. " +
"This op will be synthesized for each cycle, but will not be executed.",
pop.getName()
);
} else if ("emit".equalsIgnoreCase(dryrunSpec)) {
Op exampleOp = dispenser.getOp(0L);
if (exampleOp instanceof RunnableOp runnableOp) {
dispenser = new EmitterRunnableOpDispenserWrapper(adapter, pop, dispenser);
} else if (exampleOp instanceof CycleOp<?> cycleOp) {
dispenser = new EmitterCycleOpDispenserWrapper(adapter, pop, dispenser);
} else {
throw new OpConfigError("Unable to make op named '" + pop.getName() + "' emit a value, " +
"since only RunnableOp and CycleOp<Result> types are supported");
}
dispenser = new EmitterRunnableOpDispenserWrapper(
(DriverAdapter<Op, Space>) adapter,
pop,
(OpDispenser<? extends Op>) dispenser
);
logger.warn(
"initialized {} for to emit the result type to stdout. ",
pop.getName()
);
}
return dispenser;
}
}

View File

@@ -27,8 +27,10 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunnableOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterCycleOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterRunnableOpDispenserWrapper;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter;
@@ -415,7 +417,6 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
int dryrunCount = 0;
for (int i = 0; i < pops.size(); i++) {
long ratio = ratios.get(i);
ParsedOp pop = pops.get(i);
@@ -431,16 +432,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
LongFunction<Space> spaceFunc = adapter.getSpaceFunc(pop);
OpDispenser<Op> dispenser = opMapper.apply(pop, spaceFunc);
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
if ("op".equalsIgnoreCase(dryrunSpec)) {
dispenser = new DryRunOpDispenserWrapper((DriverAdapter<Op, Object>) adapter, pop, dispenser);
dryrunCount++;
} else if ("emit".equalsIgnoreCase(dryrunSpec)) {
dispenser = new EmitterOpDispenserWrapper(
(DriverAdapter<Op, Object>) adapter,
pop,
(OpDispenser<? extends CycleOp<?>>) dispenser
);
}
dispenser = OpWrappers.wrapOptionally(adapter, dispenser, pop, dryrunSpec);
// if (strict) {
// optemplate.assertConsumed();
@@ -450,9 +442,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
throw new OpConfigError("Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(), e);
}
}
if (0 < dryrunCount) {
logger.warn("initialized {} op templates for dry run only. These ops will be synthesized for each cycle, but will not be executed.", dryrunCount);
}
return planner.resolve();