mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2024-12-23 15:40:44 -06:00
allow dryrun to be multi-modal
This commit is contained in:
parent
4f827c8031
commit
4e70eb7680
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -166,7 +166,7 @@ public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapte
|
||||
.add(Param.optional("instrument", Boolean.class))
|
||||
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
|
||||
.add(Param.optional("driver", String.class))
|
||||
.add(Param.defaultTo("dryrun",false))
|
||||
.add(Param.defaultTo("dryrun","none").setRegex("(op|jsonnet|none)"))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
|
@ -495,7 +495,8 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
|
||||
logger.info(() -> "skipped mapping op '" + pop.getName() + "'");
|
||||
continue;
|
||||
}
|
||||
boolean dryrun = pop.takeStaticConfigOr("dryrun", false);
|
||||
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
|
||||
boolean dryrun = dryrunSpec.equalsIgnoreCase("op");
|
||||
|
||||
DriverAdapter adapter = adapters.get(i);
|
||||
OpMapper opMapper = adapter.getOpMapper();
|
||||
|
@ -82,7 +82,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
/**
|
||||
* Simply stop the motors
|
||||
*/
|
||||
public void stopActivity() {
|
||||
public void stopActivity() {
|
||||
logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias());
|
||||
|
||||
activity.setRunState(RunState.Stopping);
|
||||
@ -96,14 +96,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
|
||||
|
||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||
.session(sessionId)
|
||||
.interval(this.startedAt, this.stoppedAt)
|
||||
.layer(Layer.Activity)
|
||||
.label("alias", getActivityDef().getAlias())
|
||||
.label("driver", getActivityDef().getActivityType())
|
||||
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
||||
.detail("params", getActivityDef().toString())
|
||||
.build()
|
||||
.session(sessionId)
|
||||
.interval(this.startedAt, this.stoppedAt)
|
||||
.layer(Layer.Activity)
|
||||
.label("alias", getActivityDef().getAlias())
|
||||
.label("driver", getActivityDef().getActivityType())
|
||||
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
||||
.detail("params", getActivityDef().toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
@ -123,14 +123,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
|
||||
|
||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||
.session(sessionId)
|
||||
.interval(this.startedAt, this.stoppedAt)
|
||||
.layer(Layer.Activity)
|
||||
.label("alias", getActivityDef().getAlias())
|
||||
.label("driver", getActivityDef().getActivityType())
|
||||
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
||||
.detail("params", getActivityDef().toString())
|
||||
.build()
|
||||
.session(sessionId)
|
||||
.interval(this.startedAt, this.stoppedAt)
|
||||
.layer(Layer.Activity)
|
||||
.label("alias", getActivityDef().getAlias())
|
||||
.label("driver", getActivityDef().getActivityType())
|
||||
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
||||
.detail("params", getActivityDef().toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
@ -210,10 +210,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
adjustMotorCountToThreadParam(activity.getActivityDef());
|
||||
}
|
||||
motors.stream()
|
||||
.filter(m -> (m instanceof ActivityDefObserver))
|
||||
.filter(m -> (m instanceof ActivityDefObserver))
|
||||
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
|
||||
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
|
||||
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
|
||||
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,8 +227,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
|
||||
private String getSlotStatus() {
|
||||
return motors.stream()
|
||||
.map(m -> m.getState().get().getCode())
|
||||
.collect(Collectors.joining(",", "[", "]"));
|
||||
.map(m -> m.getState().get().getCode())
|
||||
.collect(Collectors.joining(",", "[", "]"));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -285,17 +285,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
case Running:
|
||||
case Starting:
|
||||
motors.stream()
|
||||
.filter(m -> m.getState().get() != RunState.Running)
|
||||
.filter(m -> m.getState().get() != RunState.Finished)
|
||||
.filter(m -> m.getState().get() != RunState.Starting)
|
||||
.forEach(m -> {
|
||||
executorService.execute(m);
|
||||
});
|
||||
.filter(m -> m.getState().get() != RunState.Running)
|
||||
.filter(m -> m.getState().get() != RunState.Finished)
|
||||
.filter(m -> m.getState().get() != RunState.Starting)
|
||||
.forEach(m -> {
|
||||
executorService.execute(m);
|
||||
});
|
||||
break;
|
||||
case Stopped:
|
||||
motors.stream()
|
||||
.filter(m -> m.getState().get() != RunState.Stopped)
|
||||
.forEach(Motor::requestStop);
|
||||
.filter(m -> m.getState().get() != RunState.Stopped)
|
||||
.forEach(Motor::requestStop);
|
||||
break;
|
||||
case Finished:
|
||||
case Stopping:
|
||||
@ -474,10 +474,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
|
||||
private void startMotorExecutorService() {
|
||||
this.executorService = new ThreadPoolExecutor(
|
||||
0, Integer.MAX_VALUE,
|
||||
0L, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<>(),
|
||||
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
|
||||
0, Integer.MAX_VALUE,
|
||||
0L, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<>(),
|
||||
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
|
||||
);
|
||||
}
|
||||
|
||||
@ -494,14 +494,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
|
||||
logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
|
||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||
.session(sessionId)
|
||||
.now()
|
||||
.layer(Layer.Activity)
|
||||
.label("alias", getActivityDef().getAlias())
|
||||
.label("driver", getActivityDef().getActivityType())
|
||||
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
||||
.detail("params", getActivityDef().toString())
|
||||
.build()
|
||||
.session(sessionId)
|
||||
.now()
|
||||
.layer(Layer.Activity)
|
||||
.label("alias", getActivityDef().getAlias())
|
||||
.label("driver", getActivityDef().getActivityType())
|
||||
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
||||
.detail("params", getActivityDef().toString())
|
||||
.build()
|
||||
);
|
||||
|
||||
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");
|
||||
|
Loading…
Reference in New Issue
Block a user