provide cycle value metric

This commit is contained in:
Jonathan Shook
2023-09-08 20:07:47 -05:00
parent 675667eaa6
commit 56c687380e
6 changed files with 56 additions and 19 deletions

View File

@@ -16,6 +16,8 @@
package io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog; package io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegmentBuffer; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegmentBuffer;
@@ -36,9 +38,10 @@ import java.nio.channels.FileChannel;
import java.util.Iterator; import java.util.Iterator;
import java.util.function.Predicate; import java.util.function.Predicate;
public class CycleLogInput implements Input, AutoCloseable, Iterable<CycleResultsSegment>, CanFilterResultValue { public class CycleLogInput implements Input, AutoCloseable, Iterable<CycleResultsSegment>, CanFilterResultValue, NBLabeledElement {
private final static Logger logger = LogManager.getLogger(CycleLogInput.class); private final static Logger logger = LogManager.getLogger(CycleLogInput.class);
private final Iterator<CycleResultsSegment> cycleResultSegmentIterator; private final Iterator<CycleResultsSegment> cycleResultSegmentIterator;
private final NBLabeledElement parent;
private RandomAccessFile raf; private RandomAccessFile raf;
private MappedByteBuffer mbb; private MappedByteBuffer mbb;
private Iterator<CycleResult> segmentIter; private Iterator<CycleResult> segmentIter;
@@ -49,6 +52,7 @@ public class CycleLogInput implements Input, AutoCloseable, Iterable<CycleResult
mbb = initMappedBuffer(conf.getString("file").orElse(activity.getAlias()) + ".cyclelog"); mbb = initMappedBuffer(conf.getString("file").orElse(activity.getAlias()) + ".cyclelog");
cycleResultSegmentIterator = iterator(); cycleResultSegmentIterator = iterator();
segmentIter = cycleResultSegmentIterator.next().iterator(); segmentIter = cycleResultSegmentIterator.next().iterator();
this.parent = activity;
} }
public CycleLogInput(String filename) { public CycleLogInput(String filename) {
@@ -67,6 +71,7 @@ public class CycleLogInput implements Input, AutoCloseable, Iterable<CycleResult
mbb = initMappedBuffer(cycleFile.getPath()); mbb = initMappedBuffer(cycleFile.getPath());
cycleResultSegmentIterator = new CycleResultsRLEBufferReadable(mbb).iterator(); cycleResultSegmentIterator = new CycleResultsRLEBufferReadable(mbb).iterator();
segmentIter = cycleResultSegmentIterator.next().iterator(); segmentIter = cycleResultSegmentIterator.next().iterator();
this.parent = NBLabeledElement.EMPTY;
} }
@Override @Override
@@ -152,4 +157,8 @@ public class CycleLogInput implements Input, AutoCloseable, Iterable<CycleResult
return cycleResultsSegments.iterator(); return cycleResultsSegments.iterator();
} }
@Override
public NBLabels getLabels() {
return parent.getLabels();
}
} }

View File

@@ -15,16 +15,20 @@
*/ */
package io.nosqlbench.engine.api.activityimpl.input; package io.nosqlbench.engine.api.activityimpl.input;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay; import com.codahale.metrics.Gauge;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment; import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.engine.util.Unit;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.progress.CycleMeter; import io.nosqlbench.engine.api.activityapi.core.progress.CycleMeter;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment;
import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.util.Unit;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.security.InvalidParameterException; import java.security.InvalidParameterException;
import java.time.Instant; import java.time.Instant;
@@ -44,7 +48,7 @@ import java.util.concurrent.atomic.AtomicLong;
* after the max value. They simply expose it to callers. It is up to the * after the max value. They simply expose it to callers. It is up to the
* caller to check the value to determine when the input is deemed "used up."</p> * caller to check the value to determine when the input is deemed "used up."</p>
*/ */
public class AtomicInput implements Input, ActivityDefObserver, ProgressCapable { public class AtomicInput implements Input, ActivityDefObserver, ProgressCapable, Gauge<Long>, NBLabeledElement {
private final static Logger logger = LogManager.getLogger(AtomicInput.class); private final static Logger logger = LogManager.getLogger(AtomicInput.class);
private final AtomicLong cycleValue = new AtomicLong(0L); private final AtomicLong cycleValue = new AtomicLong(0L);
@@ -56,10 +60,13 @@ public class AtomicInput implements Input, ActivityDefObserver, ProgressCapable
private final long startedAt = System.currentTimeMillis(); private final long startedAt = System.currentTimeMillis();
private final ActivityDef activityDef; private final ActivityDef activityDef;
private final NBLabeledElement parent;
public AtomicInput(ActivityDef activityDef) { public AtomicInput(NBLabeledElement parent, ActivityDef activityDef) {
this.parent = parent;
this.activityDef = activityDef; this.activityDef = activityDef;
onActivityDefUpdate(activityDef); onActivityDefUpdate(activityDef);
ActivityMetrics.gauge(this,"cycle", this);
} }
@Override @Override
@@ -140,7 +147,7 @@ public class AtomicInput implements Input, ActivityDefObserver, ProgressCapable
this.recycleMax.set(recycles); this.recycleMax.set(recycles);
} }
public long getStarteAtMillis() { public long getStartedAtMillis() {
return this.startedAt; return this.startedAt;
} }
@@ -151,16 +158,29 @@ public class AtomicInput implements Input, ActivityDefObserver, ProgressCapable
@Override @Override
public ProgressMeterDisplay getProgressMeter() { public ProgressMeterDisplay getProgressMeter() {
return new AtomicInputProgress(activityDef.getAlias(), this); return new AtomicInputProgress(this, activityDef.getAlias(), this);
} }
private static class AtomicInputProgress implements ProgressMeterDisplay, CycleMeter { @Override
public NBLabels getLabels() {
return parent.getLabels();
}
@Override
public Long getValue() {
return this.cycleValue.get();
}
private static class AtomicInputProgress implements NBLabeledElement, ProgressMeterDisplay, CycleMeter {
private final AtomicInput input; private final AtomicInput input;
private final String name; private final String name;
public AtomicInputProgress(String name, AtomicInput input) { private final NBLabeledElement parent;
public AtomicInputProgress(NBLabeledElement parent, String name, AtomicInput input) {
this.name = name; this.name = name;
this.input = input; this.input = input;
this.parent = parent;
} }
@Override @Override
@@ -170,7 +190,7 @@ public class AtomicInput implements Input, ActivityDefObserver, ProgressCapable
@Override @Override
public Instant getStartTime() { public Instant getStartTime() {
return Instant.ofEpochMilli(input.getStarteAtMillis()); return Instant.ofEpochMilli(input.getStartedAtMillis());
} }
@Override @Override
@@ -208,5 +228,10 @@ public class AtomicInput implements Input, ActivityDefObserver, ProgressCapable
public long getRecyclesMax() { public long getRecyclesMax() {
return input.recycleMax.get(); return input.recycleMax.get();
} }
@Override
public NBLabels getLabels() {
return parent.getLabels();
}
} }
} }

View File

@@ -37,7 +37,7 @@ public class TargetRateInputType implements InputType {
public Dispenser(Activity activity) { public Dispenser(Activity activity) {
this.activity = activity; this.activity = activity;
this.input = new AtomicInput(activity.getActivityDef()); this.input = new AtomicInput(activity, activity.getActivityDef());
} }
@Override @Override

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.engine.api.activityimpl.input; package io.nosqlbench.engine.api.activityimpl.input;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -26,7 +27,7 @@ public class AtomicInputTest {
@Test @Test
public void testEmptyIntervalShouldNotProvideValues() { public void testEmptyIntervalShouldNotProvideValues() {
AtomicInput i = new AtomicInput(ActivityDef.parseActivityDef("alias=foo,cycles=23..23")); AtomicInput i = new AtomicInput(NBLabeledElement.EMPTY,ActivityDef.parseActivityDef("alias=foo,cycles=23..23"));
CycleSegment inputSegment = i.getInputSegment(1); CycleSegment inputSegment = i.getInputSegment(1);
assertThat(inputSegment).isNull(); assertThat(inputSegment).isNull();

View File

@@ -127,9 +127,10 @@ class ActivityExecutorTest {
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-dynamic-params;cycles=1000;initdelay=5000;"); final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-dynamic-params;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef,NBLabeledElement.EMPTY); new ActivityTypeLoader().load(activityDef,NBLabeledElement.EMPTY);
this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(activityDef));
Activity simpleActivity = new SimpleActivity(activityDef, NBLabeledElement.forMap(Map.of())); Activity simpleActivity = new SimpleActivity(activityDef, NBLabeledElement.forMap(Map.of()));
this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(simpleActivity,activityDef));
final InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity); final InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
final ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity); final ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null); final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null);

View File

@@ -38,11 +38,11 @@ public class CoreMotorTest {
@Test @Test
public void testBasicActivityMotor() { public void testBasicActivityMotor() {
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Activity activity = new SimpleActivity( final Activity activity = new SimpleActivity(
ActivityDef.parseActivityDef("alias=foo"), ActivityDef.parseActivityDef("alias=foo"),
NBLabeledElement.forMap(Map.of("testing","coremotor")) NBLabeledElement.forMap(Map.of("testing","coremotor"))
); );
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Motor cm = new CoreMotor(activity, 5L, lockstepper); final Motor cm = new CoreMotor(activity, 5L, lockstepper);
final AtomicLong observableAction = new AtomicLong(-3L); final AtomicLong observableAction = new AtomicLong(-3L);
cm.setAction(this.getTestConsumer(observableAction)); cm.setAction(this.getTestConsumer(observableAction));
@@ -60,8 +60,9 @@ public class CoreMotorTest {
@Test @Test
public void testIteratorStride() { public void testIteratorStride() {
SimpleActivity activity = new SimpleActivity("stride=3", NBLabeledElement.EMPTY);
final BlockingSegmentInput lockstepper = new BlockingSegmentInput(); final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Motor cm1 = new CoreMotor(new SimpleActivity("stride=3",NBLabeledElement.EMPTY),1L, lockstepper); final Motor cm1 = new CoreMotor(activity,1L, lockstepper);
final AtomicLongArray ary = new AtomicLongArray(10); final AtomicLongArray ary = new AtomicLongArray(10);
final Action a1 = this.getTestArrayConsumer(ary); final Action a1 = this.getTestArrayConsumer(ary);
cm1.setAction(a1); cm1.setAction(a1);