fixes for passing basic tests

This commit is contained in:
Jonathan Shook 2023-09-29 19:31:42 -05:00
parent 7e2ddc745a
commit 7f960778b4
10 changed files with 63 additions and 77 deletions

View File

@ -16,12 +16,12 @@
package io.nosqlbench.engine.api.activityimpl.input;
import com.codahale.metrics.Gauge;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.CyclesSpec;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment;
import io.nosqlbench.engine.api.activityapi.input.Input;
@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
* after the max value. They simply expose it to callers. It is up to the
* caller to check the value to determine when the input is deemed "used up."</p>
*/
public class AtomicInput implements Input, ActivityDefObserver, Gauge<Long>, NBLabeledElement {
public class AtomicInput extends NBBaseComponent implements Input, ActivityDefObserver, Gauge<Long> {
private final static Logger logger = LogManager.getLogger(AtomicInput.class);
private final AtomicLong cycle_value = new AtomicLong(0L);
@ -57,20 +57,19 @@ public class AtomicInput implements Input, ActivityDefObserver, Gauge<Long>, NBL
private final long startedAt = System.currentTimeMillis();
private final ActivityDef activityDef;
private final NBLabeledElement parent;
public AtomicInput(NBLabeledElement parent, ActivityDef activityDef) {
this.parent = parent;
public AtomicInput(NBComponent parent, ActivityDef activityDef) {
super(parent);
this.activityDef = activityDef;
onActivityDefUpdate(activityDef);
ActivityMetrics.gauge(this, "input_cycles_first", new NBFunctionGauge(this, () -> (double)this.cycles_min.get()));
ActivityMetrics.gauge(this, "input_cycles_last", new NBFunctionGauge(this, () -> (double)this.cycles_max.get()));
ActivityMetrics.gauge(this, "input_cycle", new NBFunctionGauge(this, () -> (double)this.cycle_value.get()));
ActivityMetrics.gauge(this, "input_cycles_total", new NBFunctionGauge(this, this::getTotalCycles));
ActivityMetrics.gauge(this, "input_recycles_first", new NBFunctionGauge(this, () -> (double)this.recycles_min.get()));
ActivityMetrics.gauge(this, "input_recycles_last", new NBFunctionGauge(this, () -> (double)this.recycles_max.get()));
ActivityMetrics.gauge(this, "input_recycle", new NBFunctionGauge(this, () -> (double) this.recycle_value.get()));
ActivityMetrics.gauge(this, "input_recycles_total", new NBFunctionGauge(this, this::getTotalRecycles));
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.cycles_min.get(), "input_cycles_first"));
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.cycles_max.get(), "input_cycles_last"));
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.cycle_value.get(), "input_cycle"));
ActivityMetrics.gauge(new NBFunctionGauge(this, this::getTotalCycles, "input_cycles_total"));
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.recycles_min.get(), "input_recycles_first"));
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.recycles_max.get(), "input_recycles_last"));
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.recycle_value.get(), "input_recycle"));
ActivityMetrics.gauge(new NBFunctionGauge(this, this::getTotalRecycles, "input_recycles_total"));
}
private double getTotalRecycles() {
@ -90,7 +89,7 @@ public class AtomicInput implements Input, ActivityDefObserver, Gauge<Long>, NBL
recycle_value.getAndIncrement();
if (recycle_value.get() >= recycles_max.get()) {
logger.trace(() -> "Exhausted input for " + activityDef.getAlias() + " at " + currentStrideStart + ", recycle " +
"count " + recycle_value.get());
"count " + recycle_value.get());
return null;
} else {
cycle_value.set(cycles_min.get());
@ -107,11 +106,11 @@ public class AtomicInput implements Input, ActivityDefObserver, Gauge<Long>, NBL
@Override
public String toString() {
return "AtomicInput{" +
"cycleValue=" + cycle_value +
", min=" + cycles_min +
", max=" + cycles_max +
", activity=" + activityDef.getAlias() +
'}';
"cycleValue=" + cycle_value +
", min=" + cycles_min +
", max=" + cycles_max +
", activity=" + activityDef.getAlias() +
'}';
}
@Override
@ -121,16 +120,16 @@ public class AtomicInput implements Input, ActivityDefObserver, Gauge<Long>, NBL
cycles_max.set(cyclesSpec.last_exclusive());
if (cycles_min.get() != cyclesSpec.first_inclusive()) {
logger.info(() -> "resetting cycle value to new start: cycle[" + cycles_min.get() + "->" + cyclesSpec.first_inclusive()+"] " +
" start["+cycle_value.get()+"->"+ cycles_min.get()+"]");
logger.info(() -> "resetting cycle value to new start: cycle[" + cycles_min.get() + "->" + cyclesSpec.first_inclusive() + "] " +
" start[" + cycle_value.get() + "->" + cycles_min.get() + "]");
cycles_min.set(cyclesSpec.first_inclusive());
cycle_value.set(cycles_min.get());
}
recycles_max.set(recyclesSpec.last_exclusive());
if (recycles_min.get() != recyclesSpec.first_inclusive()) {
logger.info(() -> "resetting recycle value to new start: recycle[" + recycles_min.get() + "->" + recyclesSpec.first_inclusive()+"] " +
" start["+recycle_value.get()+"->"+ recycles_min.get()+"]");
logger.info(() -> "resetting recycle value to new start: recycle[" + recycles_min.get() + "->" + recyclesSpec.first_inclusive() + "] " +
" start[" + recycle_value.get() + "->" + recycles_min.get() + "]");
recycles_min.set(recyclesSpec.first_inclusive());
recycle_value.set(recyclesSpec.first_inclusive());
}
@ -145,11 +144,6 @@ public class AtomicInput implements Input, ActivityDefObserver, Gauge<Long>, NBL
return true;
}
@Override
public NBLabels getLabels() {
return parent.getLabels();
}
@Override
public Long getValue() {
return this.cycle_value.get();

View File

@ -152,13 +152,13 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
}
this.pendingOpsGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> this.getProgressMeter().getSummary().pending(), "ops_pending")
this.pendingOpsGauge = ActivityMetrics.gauge(
new NBFunctionGauge(this,() -> this.getProgressMeter().getSummary().pending(), "ops_pending")
);
this.activeOpsGauge = ActivityMetrics.register(
this.activeOpsGauge = ActivityMetrics.gauge(
new NBFunctionGauge(this, () -> this.getProgressMeter().getSummary().current(),"ops_active")
);
this.completeOpsGauge = ActivityMetrics.register(
this.completeOpsGauge = ActivityMetrics.gauge(
new NBFunctionGauge(this, () -> this.getProgressMeter().getSummary().complete(),"ops_complete"));
}

View File

@ -16,8 +16,9 @@
package io.nosqlbench.engine.api.activityimpl.input;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment;
import org.junit.jupiter.api.Test;
@ -25,16 +26,17 @@ import static org.assertj.core.api.Assertions.assertThat;
public class AtomicInputTest {
private final static NBComponent root = new TestComponent("testing","atomicinput");
@Test
public void testThatNoCyclesAndNoRecyclesMeansZero() {
AtomicInput input = new AtomicInput(NBLabeledElement.EMPTY, ActivityDef.parseActivityDef("alias=foo;cycles=0;recycles=0"));
AtomicInput input = new AtomicInput(root, ActivityDef.parseActivityDef("alias=foo;cycles=0;recycles=0"));
CycleSegment inputSegment = input.getInputSegment(1);
assertThat(inputSegment).isNull();
}
@Test
public void testThatNoCyclesAndDefaultRecyclesMeans1xCycles() {
AtomicInput input = new AtomicInput(NBLabeledElement.EMPTY, ActivityDef.parseActivityDef("alias=foo;cycles=10"));
AtomicInput input = new AtomicInput(root, ActivityDef.parseActivityDef("alias=foo;cycles=10"));
CycleSegment inputSegment =null;
inputSegment= input.getInputSegment(10);
@ -51,7 +53,7 @@ public class AtomicInputTest {
int intendedRecycles=4;
int stride=10;
AtomicInput input = new AtomicInput(NBLabeledElement.EMPTY, ActivityDef.parseActivityDef("alias=foo;cycles="+intendedCycles+";recycles="+intendedRecycles));
AtomicInput input = new AtomicInput(root, ActivityDef.parseActivityDef("alias=foo;cycles="+intendedCycles+";recycles="+intendedRecycles));
CycleSegment segment =null;
for (int nextRecycle = 0; nextRecycle < intendedRecycles; nextRecycle++) {
for (int nextCycle = 0; nextCycle < intendedCycles; nextCycle+=stride) {
@ -66,7 +68,7 @@ public class AtomicInputTest {
@Test
public void testThatCycleAndRecycleOffsetsWork() {
AtomicInput input = new AtomicInput(NBLabeledElement.EMPTY, ActivityDef.parseActivityDef("alias=foo;cycles=310..330;recycles=37..39"));
AtomicInput input = new AtomicInput(root, ActivityDef.parseActivityDef("alias=foo;cycles=310..330;recycles=37..39"));
CycleSegment segment = null;
int stride=10;
segment = input.getInputSegment(stride);
@ -88,7 +90,7 @@ public class AtomicInputTest {
@Test
public void testEmptyIntervalShouldNotProvideValues() {
AtomicInput i = new AtomicInput(NBLabeledElement.EMPTY,ActivityDef.parseActivityDef("alias=foo;cycles=23..23"));
AtomicInput i = new AtomicInput(root,ActivityDef.parseActivityDef("alias=foo;cycles=23..23"));
CycleSegment inputSegment = i.getInputSegment(1);
assertThat(inputSegment).isNull();
}

View File

@ -22,6 +22,7 @@ import io.nosqlbench.adapters.api.activityconfig.rawyaml.RawOpsLoader;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.apps.BundledApp;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.api.content.Content;
import io.nosqlbench.api.content.NBIO;
@ -73,7 +74,7 @@ import java.util.ServiceLoader.Provider;
import java.util.function.Function;
import java.util.stream.Collectors;
public class NBCLI implements Function<String[], Integer>, NBComponent {
public class NBCLI extends NBBaseComponent implements Function<String[], Integer> {
private static Logger logger;
private static final LoggerConfig loggerConfig;
@ -92,10 +93,12 @@ public class NBCLI implements Function<String[], Integer>, NBComponent {
private String sessionName;
private String sessionCode;
private long sessionTime;
private NBLabels extraLabels = NBLabels.forKV();
private ClientSystemMetricChecker clientMetricChecker;
public NBCLI(final String commandName) {
super(null, NBLabels.forKV("appname","nosqlbench"));
this.commandName = commandName;
}
@ -163,6 +166,7 @@ public class NBCLI implements Function<String[], Integer>, NBComponent {
NBCLI.loggerConfig.setConsoleLevel(NBLogLevel.ERROR);
this.sessionTime = System.currentTimeMillis();
final NBCLIOptions globalOptions = new NBCLIOptions(args, Mode.ParseGlobalsOnly);
this.extraLabels=globalOptions.getLabelMap();
this.sessionCode = SystemId.genSessionCode(sessionTime);
this.sessionName = SessionNamer.format(globalOptions.getSessionName(), sessionTime).replaceAll("SESSIONCODE", sessionCode);
@ -362,14 +366,6 @@ public class NBCLI implements Function<String[], Integer>, NBComponent {
return NBCLI.EXIT_OK;
}
// disabled for now
// if (null != options.wantsMetricsForActivity()) {
// final String metricsHelp = this.getMetricsHelpFor(options.wantsMetricsForActivity());
// System.out.println("Available metric names for activity:" + options.wantsMetricsForActivity() + ':');
// System.out.println(metricsHelp);
// return NBCLI.EXIT_OK;
// }
NBCLI.logger.debug("initializing annotators with config:'{}'", annotatorsConfig);
Annotators.init(annotatorsConfig, options.getAnnotateLabelSpec());
Annotators.recordAnnotation(
@ -599,11 +595,7 @@ public class NBCLI implements Function<String[], Integer>, NBComponent {
@Override
public NBLabels getLabels() {
return labels;
return (extraLabels==null) ? super.getLabels() : super.getLabels().and(extraLabels);
}
@Override
public NBComponent getParent() {
return this;
}
}

View File

@ -18,6 +18,7 @@ package io.nosqlbench.engine.core.lifecycle.activity;
import com.codahale.metrics.Gauge;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricGauge;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.engine.api.activityapi.core.*;
@ -75,7 +76,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
private long stoppedAt = 0L;
private ActivityExecutorShutdownHook shutdownHook = null;
private NBFunctionGauge threadsGauge;
private NBMetricGauge threadsGauge;
public ActivityExecutor(Activity activity, String sessionId) {
this.activity = activity;
@ -459,7 +460,8 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
// }
private void registerMetrics() {
this.threadsGauge = ActivityMetrics.register(new NBFunctionGauge(activity, () -> (double) this.motors.size(), "threads"));
NBMetricGauge gauge = new NBFunctionGauge(activity, () -> (double) this.motors.size(), "threads");
this.threadsGauge = ActivityMetrics.gauge(gauge);
}
private void unregisterMetrics() {
if (this.threadsGauge != null) {

View File

@ -16,8 +16,8 @@
package io.nosqlbench.engine.core;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
@ -25,7 +25,6 @@ import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityimpl.CoreServices;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser;
import io.nosqlbench.engine.api.activityimpl.input.AtomicInput;
import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser;
@ -36,23 +35,9 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.MotorDispenser;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
class ActivityExecutorTest {
@ -118,10 +103,8 @@ class ActivityExecutorTest {
ExecutorService testExecutor = Executors.newCachedThreadPool();
Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
try {
activityDef.setThreads(1);
activityExecutor.startActivity();
future.get();
testExecutor.shutdownNow();

View File

@ -19,6 +19,7 @@ package io.nosqlbench.api.engine.metrics;
import com.codahale.metrics.*;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBNamedElement;
import io.nosqlbench.api.labels.NBLabelsFilter;
import io.nosqlbench.api.engine.activityapi.core.MetricRegistryService;
@ -265,6 +266,12 @@ public class ActivityMetrics {
}
public static NBMetricGauge gauge(NBMetricGauge gauge) {
final NBLabels labels = gauge.getLabels();
return (NBMetricGauge) register(labels, () -> new NBMetricGaugeWrapper<>(labels, gauge));
}
@SuppressWarnings("unchecked")
public static <T> Gauge<T> gauge(NBLabeledElement parent, String metricFamilyName, Gauge<T> gauge) {
final NBLabels labels = parent.getLabels().and("name", sanitize(metricFamilyName));

View File

@ -18,6 +18,7 @@ package io.nosqlbench.api.engine.metrics.instruments;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBComponent;
import java.util.Map;
import java.util.function.Supplier;
@ -28,12 +29,12 @@ public class NBFunctionGauge implements NBMetricGauge<Double> {
private final NBLabeledElement parent;
private final NBLabels labels;
public NBFunctionGauge(NBLabeledElement parent, Supplier<Double> source, String metricFamilyName, Map<String,String> additionalLabels) {
public NBFunctionGauge(NBComponent parent, Supplier<Double> source, String metricFamilyName, Map<String,String> additionalLabels) {
this.parent = parent;
this.labels = NBLabels.forMap(additionalLabels).and("name",metricFamilyName);
this.source = source;
}
public NBFunctionGauge(NBLabeledElement parent, Supplier<Double> source, String metricFamilyName) {
public NBFunctionGauge(NBComponent parent, Supplier<Double> source, String metricFamilyName) {
this(parent, source, metricFamilyName,Map.of());
}
@Override

View File

@ -29,6 +29,9 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
private final List<NBComponent> children = new ArrayList<>();
private final NBLabels labels;
public NBBaseComponent(NBComponent parentComponent) {
this(parentComponent,NBLabels.forKV());
}
public NBBaseComponent(NBComponent parentComponent, NBLabels componentSpecificLabelsOnly) {
this.labels = componentSpecificLabelsOnly;
if (parentComponent!=null) {

View File

@ -35,6 +35,8 @@ import java.util.List;
*/
public interface NBComponent extends NBLabeledElement, NBComponentMetrics, NBMetricsQuery {
NBComponent EMPTY_COMPONENT = new NBBaseComponent(null);
NBComponent getParent();
NBComponent attach(NBComponent... children);