mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
introduce DriverAdapter
This commit is contained in:
@@ -0,0 +1,86 @@
|
|||||||
|
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public abstract class BaseDriverAdapter<R extends Runnable> implements DriverAdapter<R> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final Function<Map<String, Object>, Map<String, Object>> getPreprocessor() {
|
||||||
|
List<Function<Map<String,Object>,Map<String,Object>>> mappers = new ArrayList<>();
|
||||||
|
List<Function<Map<String,Object>,Map<String,Object>>> stmtRemappers =
|
||||||
|
getStmtRemappers().stream()
|
||||||
|
.map(m -> new FieldDestructuringMapper("stmt",m))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
mappers.addAll(stmtRemappers);
|
||||||
|
mappers.addAll(getFieldRemappers());
|
||||||
|
|
||||||
|
if (mappers.size()==0) {
|
||||||
|
return (i) -> i;
|
||||||
|
}
|
||||||
|
|
||||||
|
Function<Map<String,Object>,Map<String,Object>> remapper = null;
|
||||||
|
for (int i = 0; i < mappers.size(); i++) {
|
||||||
|
if (i==0) {
|
||||||
|
remapper=mappers.get(i);
|
||||||
|
} else {
|
||||||
|
remapper = remapper.andThen(mappers.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return remapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
protected final static class FieldDestructuringMapper implements Function<Map<String,Object>,Map<String,Object>> {
|
||||||
|
|
||||||
|
private final String fieldname;
|
||||||
|
private final Function<String, Optional<Map<String, Object>>> thenfunc;
|
||||||
|
|
||||||
|
public FieldDestructuringMapper(String fieldName, Function<String,Optional<Map<String,Object>>> thenfunc) {
|
||||||
|
this.fieldname = fieldName;
|
||||||
|
this.thenfunc = thenfunc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> apply(Map<String, Object> stringObjectMap) {
|
||||||
|
if (stringObjectMap.containsKey(fieldname)) {
|
||||||
|
Object o = stringObjectMap.get(fieldname);
|
||||||
|
if (o instanceof CharSequence) {
|
||||||
|
String rawfield = o.toString();
|
||||||
|
Optional<Map<String, Object>> optionalResult = thenfunc.apply(rawfield);
|
||||||
|
if (optionalResult.isPresent()) {
|
||||||
|
Map<String, Object> resultmap = optionalResult.get();
|
||||||
|
LinkedHashMap<String, Object> returnmap = new LinkedHashMap<>(stringObjectMap);
|
||||||
|
returnmap.remove(fieldname);
|
||||||
|
resultmap.forEach((k,v)->{
|
||||||
|
if (returnmap.containsKey(k)) {
|
||||||
|
throw new RuntimeException("element '" + k + "' already exist during field remapping.");
|
||||||
|
}
|
||||||
|
returnmap.put(k,v);
|
||||||
|
});
|
||||||
|
return returnmap;
|
||||||
|
} else {
|
||||||
|
return stringObjectMap;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("During op mapping, can't parse something that is not a CharSequence: '" + fieldname + "' (type is " + o.getClass().getCanonicalName() + ")");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return stringObjectMap;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Function<String, Optional<Map<String,Object>>>> getStmtRemappers() {
|
||||||
|
return List.of();
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Function<Map<String,Object>,Map<String,Object>>> getFieldRemappers() {
|
||||||
|
return List.of();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||||
|
|
||||||
|
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||||
|
import io.nosqlbench.engine.api.templating.ParsedCommand;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <P>The DriverAdapter interface is expected to be the replacement
|
||||||
|
* for the current {@link ActivityType}. This interface takes a simpler
|
||||||
|
* approach than the historic NoSQLBench approach. Specifically,
|
||||||
|
* all of the core logic which was being pasted into each
|
||||||
|
* driver type is centralized, and only the necessary interfaces
|
||||||
|
* needed for construction new operations are exposed.
|
||||||
|
* </P>
|
||||||
|
*
|
||||||
|
* This
|
||||||
|
*
|
||||||
|
* @param <R>
|
||||||
|
*/
|
||||||
|
public interface DriverAdapter<R extends Runnable> {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An Op Mapper is a function which can look at the parsed
|
||||||
|
* fields in a {@link ParsedCommand} and create an OpDispenser.
|
||||||
|
* An OpDispenser is a function that will produce an special
|
||||||
|
* type {@link R} that this DriverAdapter implements as its
|
||||||
|
* op implementation.
|
||||||
|
*
|
||||||
|
* @return a synthesizer function for {@link R} op generation
|
||||||
|
*/
|
||||||
|
Function<ParsedCommand, OpDispenser<R>> getOpMapper();
|
||||||
|
|
||||||
|
Function<Map<String,Object>,Map<String,Object>> getPreprocessor();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The op parsers do additional semantic parsing work on behalf
|
||||||
|
* of the user, according to the rules set by the driver adapter
|
||||||
|
* dev. These rules can, for example, look at a single template
|
||||||
|
* formatted field and break it apart into multiple other fields
|
||||||
|
* which are used to directly drive the construction of an operation.
|
||||||
|
* By doing this, it is possible to mark static text segments
|
||||||
|
* and dynamic insertion points separately, and thus allow the
|
||||||
|
* developer to create optimal construction patterns depending
|
||||||
|
* how much is known. For details on this, see TBD: A section
|
||||||
|
* yet to be written on how to do this the easy way.
|
||||||
|
*
|
||||||
|
* If any of these returns a non-null result, then the map is added
|
||||||
|
* to the fields in the op template, and the stmt field is removed.
|
||||||
|
*
|
||||||
|
* @return optional rewrite rules for op stmt field
|
||||||
|
*/
|
||||||
|
}
|
||||||
@@ -19,6 +19,8 @@ import io.nosqlbench.engine.api.activityapi.core.ActivityType;
|
|||||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||||
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
|
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
|
||||||
import io.nosqlbench.engine.api.activityimpl.ProgressAndStateMeter;
|
import io.nosqlbench.engine.api.activityimpl.ProgressAndStateMeter;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType;
|
||||||
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||||
import io.nosqlbench.engine.core.annotation.Annotators;
|
import io.nosqlbench.engine.core.annotation.Annotators;
|
||||||
import io.nosqlbench.nb.api.annotations.Layer;
|
import io.nosqlbench.nb.api.annotations.Layer;
|
||||||
@@ -55,13 +57,13 @@ public class ScenarioController {
|
|||||||
*/
|
*/
|
||||||
public synchronized void start(ActivityDef activityDef) {
|
public synchronized void start(ActivityDef activityDef) {
|
||||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||||
.session(sessionId)
|
.session(sessionId)
|
||||||
.now()
|
.now()
|
||||||
.layer(Layer.Activity)
|
.layer(Layer.Activity)
|
||||||
.label("alias", activityDef.getAlias())
|
.label("alias", activityDef.getAlias())
|
||||||
.detail("command", "start")
|
.detail("command", "start")
|
||||||
.detail("params", activityDef.toString())
|
.detail("params", activityDef.toString())
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
|
|
||||||
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
|
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
|
||||||
@@ -104,13 +106,13 @@ public class ScenarioController {
|
|||||||
*/
|
*/
|
||||||
public synchronized void run(int timeout, ActivityDef activityDef) {
|
public synchronized void run(int timeout, ActivityDef activityDef) {
|
||||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||||
.session(sessionId)
|
.session(sessionId)
|
||||||
.now()
|
.now()
|
||||||
.layer(Layer.Activity)
|
.layer(Layer.Activity)
|
||||||
.label("alias", activityDef.getAlias())
|
.label("alias", activityDef.getAlias())
|
||||||
.detail("command", "run")
|
.detail("command", "run")
|
||||||
.detail("params", activityDef.toString())
|
.detail("params", activityDef.toString())
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
|
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
|
||||||
scenariologger.debug("RUN alias=" + activityDef.getAlias());
|
scenariologger.debug("RUN alias=" + activityDef.getAlias());
|
||||||
@@ -164,13 +166,13 @@ public class ScenarioController {
|
|||||||
*/
|
*/
|
||||||
public synchronized void stop(ActivityDef activityDef) {
|
public synchronized void stop(ActivityDef activityDef) {
|
||||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||||
.session(sessionId)
|
.session(sessionId)
|
||||||
.now()
|
.now()
|
||||||
.layer(Layer.Activity)
|
.layer(Layer.Activity)
|
||||||
.label("alias", activityDef.getAlias())
|
.label("alias", activityDef.getAlias())
|
||||||
.detail("command", "stop")
|
.detail("command", "stop")
|
||||||
.detail("params", activityDef.toString())
|
.detail("params", activityDef.toString())
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false);
|
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false);
|
||||||
if (activityExecutor == null) {
|
if (activityExecutor == null) {
|
||||||
@@ -266,9 +268,9 @@ public class ScenarioController {
|
|||||||
*/
|
*/
|
||||||
private ActivityExecutor getActivityExecutor(String activityAlias) {
|
private ActivityExecutor getActivityExecutor(String activityAlias) {
|
||||||
Optional<ActivityExecutor> executor =
|
Optional<ActivityExecutor> executor =
|
||||||
Optional.ofNullable(activityExecutors.get(activityAlias));
|
Optional.ofNullable(activityExecutors.get(activityAlias));
|
||||||
return executor.orElseThrow(
|
return executor.orElseThrow(
|
||||||
() -> new RuntimeException("ActivityExecutor for alias " + activityAlias + " not found.")
|
() -> new RuntimeException("ActivityExecutor for alias " + activityAlias + " not found.")
|
||||||
);
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -298,13 +300,30 @@ public class ScenarioController {
|
|||||||
|
|
||||||
if (activityTypeName == null) {
|
if (activityTypeName == null) {
|
||||||
String errmsg = "You must provide a driver=<driver> parameter. Valid examples are:\n" +
|
String errmsg = "You must provide a driver=<driver> parameter. Valid examples are:\n" +
|
||||||
knownTypes.stream().map(t -> " driver=" + t + "\n").collect(Collectors.joining());
|
knownTypes.stream().map(t -> " driver=" + t + "\n").collect(Collectors.joining());
|
||||||
throw new BasicError(errmsg);
|
throw new BasicError(errmsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
ActivityType<?> activityType = ActivityType.FINDER.getOrThrow(activityTypeName);
|
// ActivityType<?> activityType = ActivityType.FINDER.getOrThrow(activityTypeName);
|
||||||
|
|
||||||
|
ActivityType<?> activityType = null;
|
||||||
|
|
||||||
|
Optional<ActivityType> ato = ActivityType.FINDER.getOptionally(activityTypeName);
|
||||||
|
if (ato.isPresent()) {
|
||||||
|
activityType = ato.get();
|
||||||
|
} else {
|
||||||
|
Optional<DriverAdapter> oda = StandardActivityType.FINDER.getOptionally(activityTypeName);
|
||||||
|
if (oda.isPresent()) {
|
||||||
|
DriverAdapter driverAdapter = oda.get();
|
||||||
|
activityType = new StandardActivityType<>(driverAdapter);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("Found neither ActivityType named '" + activityTypeName + "' nor DriverAdapter named '" + activityTypeName + "'.");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
executor = new ActivityExecutor(activityType.getAssembledActivity(activityDef, getActivityMap()),
|
executor = new ActivityExecutor(activityType.getAssembledActivity(activityDef, getActivityMap()),
|
||||||
this.sessionId);
|
this.sessionId);
|
||||||
activityExecutors.put(activityDef.getAlias(), executor);
|
activityExecutors.put(activityDef.getAlias(), executor);
|
||||||
}
|
}
|
||||||
return executor;
|
return executor;
|
||||||
@@ -349,8 +368,8 @@ public class ScenarioController {
|
|||||||
*/
|
*/
|
||||||
public List<ActivityDef> getActivityDefs() {
|
public List<ActivityDef> getActivityDefs() {
|
||||||
return activityExecutors.values().stream()
|
return activityExecutors.values().stream()
|
||||||
.map(ActivityExecutor::getActivityDef)
|
.map(ActivityExecutor::getActivityDef)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user