use standard metrics API in cql drivers

This commit is contained in:
Jonathan Shook 2021-04-01 10:51:35 -05:00
parent a4d5110c8d
commit 300deee618
6 changed files with 96 additions and 72 deletions

View File

@ -1,7 +1,9 @@
package io.nosqlbench.activitytype.cql.core;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.*;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.activitytype.cql.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cql.api.RowCycleOperator;
import io.nosqlbench.activitytype.cql.api.StatementFilter;
@ -12,15 +14,14 @@ import io.nosqlbench.activitytype.cql.errorhandling.exceptions.ChangeUnappliedCy
import io.nosqlbench.activitytype.cql.errorhandling.exceptions.MaxTriesExhaustedException;
import io.nosqlbench.activitytype.cql.errorhandling.exceptions.UnexpectedPagingException;
import io.nosqlbench.activitytype.cql.statements.core.ReadyCQLStatement;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.activitytype.cql.statements.modifiers.StatementModifier;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.MultiPhaseAction;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -51,6 +52,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
private long retryDelay;
private long maxRetryDelay;
private boolean retryReplace;
private Timer bindTimer;
private Timer executeTimer;
private Timer resultTimer;
private Timer resultSuccessTimer;
private Histogram triesHisto;
public CqlAction(ActivityDef activityDef, int slot, CqlActivity cqlActivity) {
this.activityDef = activityDef;
@ -63,6 +69,12 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
public void init() {
onActivityDefUpdate(activityDef);
this.sequencer = cqlActivity.getOpSequencer();
this.bindTimer = cqlActivity.getInstrumentation().getOrCreateBindTimer();
this.executeTimer = cqlActivity.getInstrumentation().getOrCreateExecuteTimer();
this.resultTimer = cqlActivity.getInstrumentation().getOrCreateResultTimer();
this.resultSuccessTimer = cqlActivity.getInstrumentation().getOrCreateResultSuccessTimer();
this.triesHisto = cqlActivity.getInstrumentation().getOrCreateTriesHistogram();
}
@Override
@ -87,7 +99,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
int tries = 0;
try (Timer.Context bindTime = cqlActivity.bindTimer.time()) {
try (Timer.Context bindTime = bindTimer.time()) {
readyCQLStatement = sequencer.get(cycleValue);
readyCQLStatement.onStart();
@ -127,11 +139,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
}
}
try (Timer.Context executeTime = cqlActivity.executeTimer.time()) {
try (Timer.Context executeTime = executeTimer.time()) {
resultSetFuture = cqlActivity.getSession().executeAsync(statement);
}
Timer.Context resultTime = cqlActivity.resultTimer.time();
Timer.Context resultTime = resultTimer.time();
try {
ResultSet resultSet = resultSetFuture.getUninterruptibly();
@ -200,7 +212,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
if (resultSet.isFullyFetched()) {
long resultNanos = System.nanoTime() - nanoStartTime;
cqlActivity.resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS);
resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS);
cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery);
readyCQLStatement.onSuccess(cycleValue, resultNanos, totalRowsFetchedForQuery);
} else {
@ -228,7 +240,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
CQLCycleWithStatementException cqlCycleException = new CQLCycleWithStatementException(cycleValue, resultNanos, e, readyCQLStatement);
ErrorStatus errorStatus = ebdseErrorHandler.handleError(cycleValue, cqlCycleException);
if (!errorStatus.isRetryable()) {
cqlActivity.triesHisto.update(tries);
triesHisto.update(tries);
return errorStatus.getResultCode();
}
} finally {
@ -237,7 +249,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
}
}
}
cqlActivity.triesHisto.update(tries);
triesHisto.update(tries);
} else {
@ -252,11 +264,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
ListenableFuture<ResultSet> pagingFuture;
try (Timer.Context pagingTime = cqlActivity.pagesTimer.time()) {
try (Timer.Context executeTime = cqlActivity.executeTimer.time()) {
try (Timer.Context executeTime = executeTimer.time()) {
pagingFuture = pagingResultSet.fetchMoreResults();
}
Timer.Context resultTime = cqlActivity.resultTimer.time();
Timer.Context resultTime = resultTimer.time();
try {
ResultSet resultSet = pagingFuture.get();
@ -293,7 +305,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
if (resultSet.isFullyFetched()) {
long nanoTime = System.nanoTime() - nanoStartTime;
cqlActivity.resultSuccessTimer.update(nanoTime, TimeUnit.NANOSECONDS);
resultSuccessTimer.update(nanoTime, TimeUnit.NANOSECONDS);
cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery);
pagingReadyStatement.onSuccess(cycleValue, nanoTime, totalRowsFetchedForQuery);
pagingResultSet = null;
@ -320,7 +332,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
CQLCycleWithStatementException cqlCycleException = new CQLCycleWithStatementException(cycleValue, resultNanos, e, pagingReadyStatement);
ErrorStatus errorStatus = ebdseErrorHandler.handleError(cycleValue, cqlCycleException);
if (!errorStatus.isRetryable()) {
cqlActivity.triesHisto.update(tries);
triesHisto.update(tries);
return errorStatus.getResultCode();
}
} finally {
@ -330,7 +342,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
}
}
}
cqlActivity.triesHisto.update(tries);
triesHisto.update(tries);
}
return 0;
}

View File

@ -72,16 +72,11 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
private final Map<String, Writer> namedWriters = new HashMap<>();
protected List<OpTemplate> stmts;
Timer retryDelayTimer;
Timer bindTimer;
Timer executeTimer;
Timer resultTimer;
Timer resultSuccessTimer;
Timer pagesTimer;
Histogram triesHisto;
Histogram skippedTokensHisto;
Histogram resultSetSizeHisto;
int maxpages;
Meter rowsCounter;
int maxpages;
private HashedCQLErrorHandler errorHandler;
private OpSequence<ReadyCQLStatement> opsequence;
private Session session;
@ -124,14 +119,9 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
setDefaultsFromOpSequence(this.opsequence);
retryDelayTimer = ActivityMetrics.timer(activityDef, "retry-delay");
bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute");
resultTimer = ActivityMetrics.timer(activityDef, "result");
triesHisto = ActivityMetrics.histogram(activityDef, "tries");
pagesTimer = ActivityMetrics.timer(activityDef, "pages");
rowsCounter = ActivityMetrics.meter(activityDef, "rows");
skippedTokensHisto = ActivityMetrics.histogram(activityDef, "skipped-tokens");
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success");
resultSetSizeHisto = ActivityMetrics.histogram(activityDef, "resultset-size");
onActivityDefUpdate(activityDef);
logger.debug("activity fully initialized: " + this.activityDef.getAlias());

View File

@ -1,8 +1,11 @@
package io.nosqlbench.activitytype.cql.core;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.activitytype.cql.api.ErrorResponse;
import io.nosqlbench.activitytype.cql.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cql.api.RowCycleOperator;
@ -13,8 +16,6 @@ import io.nosqlbench.activitytype.cql.errorhandling.exceptions.CQLCycleWithState
import io.nosqlbench.activitytype.cql.errorhandling.exceptions.ChangeUnappliedCycleException;
import io.nosqlbench.activitytype.cql.errorhandling.exceptions.UnexpectedPagingException;
import io.nosqlbench.activitytype.cql.statements.core.ReadyCQLStatement;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.activitytype.cql.statements.modifiers.StatementModifier;
import io.nosqlbench.engine.api.activityapi.core.BaseAsyncAction;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.FailedOp;
@ -23,8 +24,8 @@ import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.SucceededOp
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.TrackedOp;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -53,6 +54,11 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
// private Statement pagingStatement;
// private ReadyCQLStatement pagingReadyStatement;
private boolean showcql;
private Timer bindTimer;
private Timer executeTimer;
private Timer resultTimer;
private Timer resultSuccessTimer;
private Histogram triesHisto;
// private long opsInFlight = 0L;
// private long maxOpsInFlight = 1L;
// private long pendingResults = 0;
@ -68,6 +74,11 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
public void init() {
onActivityDefUpdate(activityDef);
this.sequencer = activity.getOpSequencer();
this.bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
this.executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer();
this.resultTimer = activity.getInstrumentation().getOrCreateResultTimer();
this.resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer();
this.triesHisto = activity.getInstrumentation().getOrCreateTriesHistogram();
}
@Override
@ -83,7 +94,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
long cycle = opc.getCycle();
// bind timer covers all statement selection and binding, skipping, transforming logic
try (Timer.Context bindTime = activity.bindTimer.time()) {
try (Timer.Context bindTime = bindTimer.time()) {
cqlop.readyCQLStatement = sequencer.get(cycle);
cqlop.statement = cqlop.readyCQLStatement.bind(cycle);
@ -115,7 +126,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
cqlop.startedOp = startedOp;
// The execute timer covers only the point at which EB hands the op to the driver to be executed
try (Timer.Context executeTime = activity.executeTimer.time()) {
try (Timer.Context executeTime = executeTimer.time()) {
cqlop.future = activity.getSession().executeAsync(cqlop.statement);
Futures.addCallback(cqlop.future, cqlop);
}
@ -185,11 +196,11 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
SucceededOp<CqlOpData> success = sop.succeed(0);
cqlop.readyCQLStatement.onSuccess(cqlop.cycle, success.getServiceTimeNanos(), cqlop.totalRowsFetchedForQuery);
activity.triesHisto.update(cqlop.triesAttempted);
resultTimer.update(success.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
resultSuccessTimer.update(success.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
triesHisto.update(cqlop.triesAttempted);
activity.rowsCounter.mark(cqlop.totalRowsFetchedForQuery);
activity.resultSuccessTimer.update(success.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
activity.resultSetSizeHisto.update(cqlop.totalRowsFetchedForQuery);
activity.resultTimer.update(success.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
} catch (Exception e) {
long currentServiceTime = sop.getCurrentServiceTimeNanos();
@ -234,7 +245,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
if (errorStatus.isRetryable() && cqlop.triesAttempted < maxTries) {
startedOp.retry();
try (Timer.Context executeTime = activity.executeTimer.time()) {
try (Timer.Context executeTime = executeTimer.time()) {
cqlop.future = activity.getSession().executeAsync(cqlop.statement);
Futures.addCallback(cqlop.future, cqlop);
return;
@ -242,8 +253,8 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
}
FailedOp<CqlOpData> failed = startedOp.fail(errorStatus.getResultCode());
activity.resultTimer.update(failed.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
activity.triesHisto.update(cqlop.triesAttempted);
resultTimer.update(failed.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
triesHisto.update(cqlop.triesAttempted);
}

View File

@ -1,7 +1,9 @@
package io.nosqlbench.activitytype.cql.core;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.*;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.activitytype.cql.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cql.api.RowCycleOperator;
import io.nosqlbench.activitytype.cql.api.StatementFilter;
@ -12,15 +14,14 @@ import io.nosqlbench.activitytype.cql.errorhandling.exceptions.ChangeUnappliedCy
import io.nosqlbench.activitytype.cql.errorhandling.exceptions.MaxTriesExhaustedException;
import io.nosqlbench.activitytype.cql.errorhandling.exceptions.UnexpectedPagingException;
import io.nosqlbench.activitytype.cql.statements.core.ReadyCQLStatement;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.activitytype.cql.statements.modifiers.StatementModifier;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.MultiPhaseAction;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -51,6 +52,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
private long retryDelay;
private long maxRetryDelay;
private boolean retryReplace;
private Timer bindTimer;
private Timer executeTimer;
private Timer resultTimer;
private Timer resultSuccessTimer;
private Histogram triesHisto;
public CqlAction(ActivityDef activityDef, int slot, CqlActivity cqlActivity) {
this.activityDef = activityDef;
@ -63,6 +69,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
public void init() {
onActivityDefUpdate(activityDef);
this.sequencer = cqlActivity.getOpSequencer();
this.bindTimer = cqlActivity.getInstrumentation().getOrCreateBindTimer();
this.executeTimer = cqlActivity.getInstrumentation().getOrCreateExecuteTimer();
this.resultTimer = cqlActivity.getInstrumentation().getOrCreateResultTimer();
this.resultSuccessTimer = cqlActivity.getInstrumentation().getOrCreateResultSuccessTimer();
this.triesHisto = cqlActivity.getInstrumentation().getOrCreateTriesHistogram();
}
@Override
@ -87,7 +98,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
int tries = 0;
try (Timer.Context bindTime = cqlActivity.bindTimer.time()) {
try (Timer.Context bindTime = bindTimer.time()) {
readyCQLStatement = sequencer.get(cycleValue);
readyCQLStatement.onStart();
@ -127,11 +138,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
}
}
try (Timer.Context executeTime = cqlActivity.executeTimer.time()) {
try (Timer.Context executeTime = executeTimer.time()) {
resultSetFuture = cqlActivity.getSession().executeAsync(statement);
}
Timer.Context resultTime = cqlActivity.resultTimer.time();
Timer.Context resultTime = resultTimer.time();
try {
ResultSet resultSet = resultSetFuture.getUninterruptibly();
@ -200,7 +211,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
if (resultSet.isFullyFetched()) {
long resultNanos = System.nanoTime() - nanoStartTime;
cqlActivity.resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS);
resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS);
cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery);
readyCQLStatement.onSuccess(cycleValue, resultNanos, totalRowsFetchedForQuery);
} else {
@ -228,7 +239,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
CQLCycleWithStatementException cqlCycleException = new CQLCycleWithStatementException(cycleValue, resultNanos, e, readyCQLStatement);
ErrorStatus errorStatus = ebdseErrorHandler.handleError(cycleValue, cqlCycleException);
if (!errorStatus.isRetryable()) {
cqlActivity.triesHisto.update(tries);
triesHisto.update(tries);
return errorStatus.getResultCode();
}
} finally {
@ -237,7 +248,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
}
}
}
cqlActivity.triesHisto.update(tries);
triesHisto.update(tries);
} else {
@ -252,11 +263,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
ListenableFuture<ResultSet> pagingFuture;
try (Timer.Context pagingTime = cqlActivity.pagesTimer.time()) {
try (Timer.Context executeTime = cqlActivity.executeTimer.time()) {
try (Timer.Context executeTime = executeTimer.time()) {
pagingFuture = pagingResultSet.fetchMoreResults();
}
Timer.Context resultTime = cqlActivity.resultTimer.time();
Timer.Context resultTime = resultTimer.time();
try {
ResultSet resultSet = pagingFuture.get();
@ -293,7 +304,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
if (resultSet.isFullyFetched()) {
long nanoTime = System.nanoTime() - nanoStartTime;
cqlActivity.resultSuccessTimer.update(nanoTime, TimeUnit.NANOSECONDS);
resultSuccessTimer.update(nanoTime, TimeUnit.NANOSECONDS);
cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery);
pagingReadyStatement.onSuccess(cycleValue, nanoTime, totalRowsFetchedForQuery);
pagingResultSet = null;
@ -320,7 +331,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
CQLCycleWithStatementException cqlCycleException = new CQLCycleWithStatementException(cycleValue, resultNanos, e, pagingReadyStatement);
ErrorStatus errorStatus = ebdseErrorHandler.handleError(cycleValue, cqlCycleException);
if (!errorStatus.isRetryable()) {
cqlActivity.triesHisto.update(tries);
triesHisto.update(tries);
return errorStatus.getResultCode();
}
} finally {
@ -330,7 +341,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
}
}
}
cqlActivity.triesHisto.update(tries);
triesHisto.update(tries);
}
return 0;
}

View File

@ -72,16 +72,12 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
private final Map<String, Writer> namedWriters = new HashMap<>();
protected List<OpTemplate> stmts;
Timer retryDelayTimer;
Timer bindTimer;
Timer executeTimer;
Timer resultTimer;
Timer resultSuccessTimer;
Timer pagesTimer;
Histogram triesHisto;
private Histogram triesHisto;
Histogram skippedTokensHisto;
Histogram resultSetSizeHisto;
int maxpages;
Meter rowsCounter;
int maxpages;
private HashedCQLErrorHandler errorHandler;
private OpSequence<ReadyCQLStatement> opsequence;
private Session session;
@ -111,7 +107,6 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
injector.injectUserProvidedCodecs(session, true);
}
@Override
public synchronized void initActivity() {
logger.debug("initializing activity: " + this.activityDef.getAlias());
@ -122,16 +117,10 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
}
initSequencer();
setDefaultsFromOpSequence(this.opsequence);
retryDelayTimer = ActivityMetrics.timer(activityDef, "retry-delay");
bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute");
resultTimer = ActivityMetrics.timer(activityDef, "result");
triesHisto = ActivityMetrics.histogram(activityDef, "tries");
pagesTimer = ActivityMetrics.timer(activityDef, "pages");
rowsCounter = ActivityMetrics.meter(activityDef, "rows");
skippedTokensHisto = ActivityMetrics.histogram(activityDef, "skipped-tokens");
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success");
resultSetSizeHisto = ActivityMetrics.histogram(activityDef, "resultset-size");
onActivityDefUpdate(activityDef);
logger.debug("activity fully initialized: " + this.activityDef.getAlias());

View File

@ -1,8 +1,11 @@
package io.nosqlbench.activitytype.cql.core;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.activitytype.cql.api.ErrorResponse;
import io.nosqlbench.activitytype.cql.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cql.api.RowCycleOperator;
@ -13,8 +16,6 @@ import io.nosqlbench.activitytype.cql.errorhandling.exceptions.CQLCycleWithState
import io.nosqlbench.activitytype.cql.errorhandling.exceptions.ChangeUnappliedCycleException;
import io.nosqlbench.activitytype.cql.errorhandling.exceptions.UnexpectedPagingException;
import io.nosqlbench.activitytype.cql.statements.core.ReadyCQLStatement;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.activitytype.cql.statements.modifiers.StatementModifier;
import io.nosqlbench.engine.api.activityapi.core.BaseAsyncAction;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.FailedOp;
@ -23,8 +24,8 @@ import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.SucceededOp
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.TrackedOp;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -53,6 +54,11 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
// private Statement pagingStatement;
// private ReadyCQLStatement pagingReadyStatement;
private boolean showcql;
private Timer bindTimer;
private Timer executeTimer;
private Timer resultSuccessTimer;
private Timer resultTimer;
private Histogram triesHisto;
// private long opsInFlight = 0L;
// private long maxOpsInFlight = 1L;
// private long pendingResults = 0;
@ -68,6 +74,11 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
public void init() {
onActivityDefUpdate(activityDef);
this.sequencer = activity.getOpSequencer();
this.bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
this.executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer();
this.resultTimer = activity.getInstrumentation().getOrCreateResultTimer();
this.resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer();
this.triesHisto = activity.getInstrumentation().getOrCreateTriesHistogram();
}
@Override
@ -83,7 +94,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
long cycle = opc.getCycle();
// bind timer covers all statement selection and binding, skipping, transforming logic
try (Timer.Context bindTime = activity.bindTimer.time()) {
try (Timer.Context bindTime = bindTimer.time()) {
cqlop.readyCQLStatement = sequencer.get(cycle);
cqlop.statement = cqlop.readyCQLStatement.bind(cycle);
@ -115,7 +126,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
cqlop.startedOp = startedOp;
// The execute timer covers only the point at which EB hands the op to the driver to be executed
try (Timer.Context executeTime = activity.executeTimer.time()) {
try (Timer.Context executeTime = executeTimer.time()) {
cqlop.future = activity.getSession().executeAsync(cqlop.statement);
Futures.addCallback(cqlop.future, cqlop);
}
@ -185,11 +196,11 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
SucceededOp<CqlOpData> success = sop.succeed(0);
cqlop.readyCQLStatement.onSuccess(cqlop.cycle, success.getServiceTimeNanos(), cqlop.totalRowsFetchedForQuery);
activity.triesHisto.update(cqlop.triesAttempted);
triesHisto.update(cqlop.triesAttempted);
activity.rowsCounter.mark(cqlop.totalRowsFetchedForQuery);
activity.resultSuccessTimer.update(success.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
resultSuccessTimer.update(success.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
activity.resultSetSizeHisto.update(cqlop.totalRowsFetchedForQuery);
activity.resultTimer.update(success.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
resultTimer.update(success.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
} catch (Exception e) {
long currentServiceTime = sop.getCurrentServiceTimeNanos();
@ -234,7 +245,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
if (errorStatus.isRetryable() && cqlop.triesAttempted < maxTries) {
startedOp.retry();
try (Timer.Context executeTime = activity.executeTimer.time()) {
try (Timer.Context executeTime = executeTimer.time()) {
cqlop.future = activity.getSession().executeAsync(cqlop.statement);
Futures.addCallback(cqlop.future, cqlop);
return;
@ -242,8 +253,8 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
}
FailedOp<CqlOpData> failed = startedOp.fail(errorStatus.getResultCode());
activity.resultTimer.update(failed.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
activity.triesHisto.update(cqlop.triesAttempted);
resultTimer.update(failed.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
triesHisto.update(cqlop.triesAttempted);
}