mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-20 11:38:28 -06:00
readability improvements
This commit is contained in:
parent
3e7c6840b0
commit
459587e379
@ -12,9 +12,6 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class Cqld4Action implements SyncAction, ActivityDefObserver {
|
||||
|
||||
/**
|
||||
* The thread number within the activity instance
|
||||
**/
|
||||
private final int slot;
|
||||
private final Cqld4Activity activity;
|
||||
|
||||
@ -39,7 +36,6 @@ public class Cqld4Action implements SyncAction, ActivityDefObserver {
|
||||
this.resultTimer = activity.getInstrumentation().getOrCreateResultTimer();
|
||||
this.resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer();
|
||||
this.triesHisto = activity.getInstrumentation().getOrCreateTriesHistogram();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -5,15 +5,37 @@ import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.TrackedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
/**
|
||||
* The Op Tracker is the keeper of concurrency and op states. It serves a couple
|
||||
* key functions during the execution of an activity.
|
||||
* <OL>
|
||||
* <LI>It provides a single control point for tracking the state of all operations
|
||||
* for an activity.</LI>
|
||||
* <LI>It provides a synchronization object for parameter updates which might affect
|
||||
* whether new operations should block callers.</LI>
|
||||
* </OL>
|
||||
* @param <D>
|
||||
*/
|
||||
public interface OpTracker<D> extends OpEvents<D> {
|
||||
|
||||
void setMaxPendingOps(int maxPendingOps);
|
||||
/**
|
||||
* The cycle op function is the function which can map a cycle number into
|
||||
* an operation of some sort.
|
||||
* @param newOpFunction
|
||||
*/
|
||||
void setCycleOpFunction(LongFunction<D> newOpFunction);
|
||||
|
||||
/**
|
||||
* The maximum pending ops determines how many ops an activity is allowed to have in
|
||||
* flight at any one time. When
|
||||
* @return
|
||||
*/
|
||||
int getMaxPendingOps();
|
||||
void setMaxPendingOps(int maxPendingOps);
|
||||
|
||||
boolean isFull();
|
||||
int getPendingOps();
|
||||
|
||||
void setCycleOpFunction(LongFunction<D> newOpFunction);
|
||||
int getPendingOps();
|
||||
|
||||
// By making the op tracker the factory for ops, we allow it to hook their event streams
|
||||
TrackedOp<D> newOp(long cycle, OpEvents<D> strideTracker);
|
||||
|
@ -21,7 +21,7 @@ import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResult
|
||||
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment;
|
||||
|
||||
/**
|
||||
* A cycle marker is simply a type that knows how to do something
|
||||
* A cycle output is simply a type that knows how to do something
|
||||
* useful with the result of a particular cycle.
|
||||
* Outputs are required to be thread-safe.
|
||||
*/
|
||||
|
@ -240,9 +240,9 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
|
||||
opTracker = new OpTrackerImpl<>(activity, slotId);
|
||||
opTracker.setCycleOpFunction(async.getOpInitFunction());
|
||||
|
||||
StrideOutputConsumer<D> outputreader = null;
|
||||
StrideOutputConsumer<D> strideconsumer = null;
|
||||
if (action instanceof StrideOutputConsumer) {
|
||||
outputreader = (StrideOutputConsumer<D>) async;
|
||||
strideconsumer = (StrideOutputConsumer<D>) async;
|
||||
}
|
||||
|
||||
while (slotState.get() == Running) {
|
||||
@ -271,7 +271,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
|
||||
cycleSegment.peekNextCycle(),
|
||||
stride,
|
||||
output,
|
||||
outputreader);
|
||||
strideconsumer);
|
||||
strideTracker.start();
|
||||
|
||||
long strideStart = System.nanoTime();
|
||||
@ -303,7 +303,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
|
||||
synchronized (opTracker) {
|
||||
while (opTracker.isFull()) {
|
||||
try {
|
||||
logger.trace("Blocking for enqueue with (" + opTracker.getPendingOps() + "/" + opTracker.getMaxPendingOps() + ") queued ops");
|
||||
logger.trace(() -> "Blocking for enqueue with (" + opTracker.getPendingOps() + "/" + opTracker.getMaxPendingOps() + ") queued ops");
|
||||
optrackerBlockCounter.inc();
|
||||
opTracker.wait(10000);
|
||||
} catch (InterruptedException ignored) {
|
||||
@ -313,14 +313,6 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
|
||||
|
||||
async.enqueue(op);
|
||||
|
||||
// T opc = async.newOpContext();
|
||||
// opc.addSink(strideTracker);
|
||||
// async.enqueue(opc);
|
||||
// boolean canAcceptMore = async.enqueue(opc);
|
||||
// if (!canAcceptMore) {
|
||||
// logger.trace("Action queue full at cycle=" + cyclenum);
|
||||
// }
|
||||
|
||||
} catch (Exception t) {
|
||||
logger.error("Error while processing async cycle " + cyclenum + ", error:" + t);
|
||||
throw t;
|
||||
|
Loading…
Reference in New Issue
Block a user