StandardActivity improvements

This commit is contained in:
Jonathan Shook 2021-07-08 11:49:25 -05:00
parent 747ef53a16
commit 1f4961bb61
6 changed files with 87 additions and 23 deletions

View File

@ -424,7 +424,7 @@ public class SimpleActivity implements Activity, ProgressCapable {
return createOpSequence(opTemplateOFunction); return createOpSequence(opTemplateOFunction);
} }
protected <O> OpSequence<OpDispenser<O>> createOpSourceFromCommands( protected <O extends Runnable> OpSequence<OpDispenser<O>> createOpSourceFromCommands(
Function<ParsedCommand, OpDispenser<O>> opinit, Function<ParsedCommand, OpDispenser<O>> opinit,
List<Function<Map<String, Object>, Map<String, Object>>> parsers List<Function<Map<String, Object>, Map<String, Object>>> parsers
) { ) {

View File

@ -17,12 +17,12 @@ import java.util.concurrent.TimeUnit;
* of this work will be undertaken by the project maintainers. * of this work will be undertaken by the project maintainers.
* *
* @param <A> The type of activity * @param <A> The type of activity
* @param <O> The type of operation * @param <R> The type of operation
*/ */
public class StandardAction<A extends StandardActivity<O>, O extends Runnable> implements SyncAction, ActivityDefObserver { public class StandardAction<A extends StandardActivity, R extends Runnable> implements SyncAction, ActivityDefObserver {
private final A activity; private final A activity;
private final OpSource<O> opsource; private final OpSource<R> opsource;
private final int slot; private final int slot;
public StandardAction(A activity, int slot) { public StandardAction(A activity, int slot) {
@ -34,7 +34,7 @@ public class StandardAction<A extends StandardActivity<O>, O extends Runnable> i
@Override @Override
public int runCycle(long cycle) { public int runCycle(long cycle) {
O op = null; R op = null;
try (Timer.Context ct = activity.getInstrumentation().getOrCreateInputTimer().time()) { try (Timer.Context ct = activity.getInstrumentation().getOrCreateInputTimer().time()) {
op = opsource.apply(cycle); op = opsource.apply(cycle);
} }

View File

@ -4,9 +4,9 @@ import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.core.Activity;
public class StandardActionDispenser implements ActionDispenser { public class StandardActionDispenser implements ActionDispenser {
private final StandardActivity<?> activity; private final StandardActivity<?,?> activity;
public <A extends Activity> StandardActionDispenser(StandardActivity<?> activity) { public <A extends Activity> StandardActionDispenser(StandardActivity<?,?> activity) {
this.activity = activity; this.activity = activity;
} }

View File

@ -1,5 +1,6 @@
package io.nosqlbench.engine.api.activityimpl.uniform; package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.OpSource; import io.nosqlbench.engine.api.activityapi.planning.OpSource;
import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.ActivityDef;
@ -17,21 +18,22 @@ import java.util.function.Function;
* core of all new activity types. Extant NB drivers should also migrate * core of all new activity types. Extant NB drivers should also migrate
* to this when possible. * to this when possible.
* *
* @param <O> A type of runnable which wraps the operations for this type of driver. * @param <R> A type of runnable which wraps the operations for this type of driver.
*/ */
public class StandardActivity<O extends Runnable> extends SimpleActivity { public class StandardActivity<R extends Runnable,S> extends SimpleActivity {
private final DriverAdapter<O> adapter; private final DriverAdapter<R,S> adapter;
private final OpSource<O> opsource; private final OpSource<R> opsource;
private NBErrorHandler errorHandler;
public StandardActivity(DriverAdapter<O> adapter, ActivityDef activityDef) { public StandardActivity(DriverAdapter<R,S> adapter, ActivityDef activityDef) {
super(activityDef); super(activityDef);
this.adapter = adapter; this.adapter = adapter;
try { try {
Function<ParsedCommand, OpDispenser<O>> opmapper = adapter.getOpMapper(); Function<ParsedCommand, OpDispenser<R>> opmapper = adapter.getOpMapper();
Function<Map<String, Object>, Map<String, Object>> preprocessor = adapter.getPreprocessor(); Function<Map<String, Object>, Map<String, Object>> preprocessor = adapter.getPreprocessor();
OpSequence<OpDispenser<O>> seq = createOpSourceFromCommands(opmapper,List.of(preprocessor)); OpSequence<OpDispenser<R>> seq = createOpSourceFromCommands(opmapper,List.of(preprocessor));
opsource= OpSource.of(seq); opsource= OpSource.of(seq);
} catch (Exception e) { } catch (Exception e) {
if (e instanceof OpConfigError) { if (e instanceof OpConfigError) {
@ -42,14 +44,19 @@ public class StandardActivity<O extends Runnable> extends SimpleActivity {
} }
} }
public OpSource<O> getOpSource() { public OpSource<R> getOpSource() {
return opsource; return opsource;
} }
// public Function<OpTemplate, OpDispenser<? extends Runnable>> getRunnableOpFunction() { /**
// DiagRunnableOpMapper mapper = new DiagRunnableOpMapper(); * When an adapter needs to identify an error uniquely for the purposes of
// return mapper::apply; * routing it to the correct error handler, or naming it in logs, or naming
// } * metrics, override this method in your activity.
* @return A function that can reliably and safely map an instance of Throwable to a stable name.
*/
@Override
public final Function<Throwable, String> getErrorNameMapper() {
return adapter.getErrorNameMapper();
}
} }

View File

@ -3,19 +3,28 @@ package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType; import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.nb.api.spi.SimpleServiceLoader; import io.nosqlbench.nb.api.spi.SimpleServiceLoader;
public class StandardActivityType<A extends StandardActivity<?>> implements ActivityType<A> { public class StandardActivityType<A extends StandardActivity<?,?>> extends SimpleActivity implements ActivityType<A> {
public static SimpleServiceLoader<DriverAdapter> FINDER = new SimpleServiceLoader<DriverAdapter>(DriverAdapter.class); public static SimpleServiceLoader<DriverAdapter> FINDER = new SimpleServiceLoader<DriverAdapter>(DriverAdapter.class);
private final DriverAdapter<?> adapter; private final DriverAdapter<?,?> adapter;
public StandardActivityType(DriverAdapter<?> adapter) { public StandardActivityType(DriverAdapter<?,?> adapter, ActivityDef activityDef) {
super(activityDef);
this.adapter = adapter; this.adapter = adapter;
if (adapter instanceof ActivityDefAware) {
((ActivityDefAware) adapter).setActivityDef(activityDef);
}
} }
@Override @Override
public A getActivity(ActivityDef activityDef) { public A getActivity(ActivityDef activityDef) {
if (activityDef.getParams().getOptionalString("async").isPresent()) {
throw new RuntimeException("This driver does not support async mode yet.");
}
return (A) new StandardActivity(adapter,activityDef); return (A) new StandardActivity(adapter,activityDef);
} }
@ -23,4 +32,6 @@ public class StandardActivityType<A extends StandardActivity<?>> implements Acti
public ActionDispenser getActionDispenser(A activity) { public ActionDispenser getActionDispenser(A activity) {
return new StandardActionDispenser(activity); return new StandardActionDispenser(activity);
} }
} }

View File

@ -0,0 +1,46 @@
package io.nosqlbench.engine.api.activityimpl.uniform.fieldmappers;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
public class FieldDestructuringMapper implements Function<Map<String, Object>, Map<String, Object>> {
private final String fieldname;
private final Function<String, Optional<Map<String, Object>>> thenfunc;
public FieldDestructuringMapper(String fieldName, Function<String, Optional<Map<String, Object>>> thenfunc) {
this.fieldname = fieldName;
this.thenfunc = thenfunc;
}
@Override
public Map<String, Object> apply(Map<String, Object> stringObjectMap) {
if (stringObjectMap.containsKey(fieldname)) {
Object o = stringObjectMap.get(fieldname);
if (o instanceof CharSequence) {
String rawfield = o.toString();
Optional<Map<String, Object>> optionalResult = thenfunc.apply(rawfield);
if (optionalResult.isPresent()) {
Map<String, Object> resultmap = optionalResult.get();
LinkedHashMap<String, Object> returnmap = new LinkedHashMap<>(stringObjectMap);
returnmap.remove(fieldname);
resultmap.forEach((k, v) -> {
if (returnmap.containsKey(k)) {
throw new RuntimeException("element '" + k + "' already exist during field remapping.");
}
returnmap.put(k, v);
});
return returnmap;
} else {
return stringObjectMap;
}
} else {
throw new RuntimeException("During op mapping, can't parse something that is not a CharSequence: '" + fieldname + "' (type is " + o.getClass().getCanonicalName() + ")");
}
} else {
return stringObjectMap;
}
}
}