add customizable timers to CQL driver

This commit is contained in:
Jonathan Shook 2021-01-13 02:27:57 -06:00
parent cb23fd92b9
commit 998439c8a1
7 changed files with 254 additions and 67 deletions

View File

@ -89,8 +89,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
try (Timer.Context bindTime = cqlActivity.bindTimer.time()) {
readyCQLStatement = sequencer.get(cycleValue);
readyCQLStatement.onStart();
statement = readyCQLStatement.bind(cycleValue);
if (statementFilter != null) {
if (!statementFilter.matches(statement)) {
cqlActivity.skippedTokensHisto.update(cycleValue);

View File

@ -4,14 +4,13 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.*;
import io.nosqlbench.activitytype.cql.codecsupport.UDTCodecInjector;
import com.datastax.driver.core.TokenRangeStmtFilter;
import io.nosqlbench.activitytype.cql.api.ErrorResponse;
import io.nosqlbench.activitytype.cql.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cql.api.RowCycleOperator;
import io.nosqlbench.activitytype.cql.api.StatementFilter;
import io.nosqlbench.activitytype.cql.errorhandling.NBCycleErrorHandler;
import io.nosqlbench.activitytype.cql.codecsupport.UDTCodecInjector;
import io.nosqlbench.activitytype.cql.errorhandling.HashedCQLErrorHandler;
import io.nosqlbench.activitytype.cql.errorhandling.NBCycleErrorHandler;
import io.nosqlbench.activitytype.cql.statements.binders.CqlBinderTypes;
import io.nosqlbench.activitytype.cql.statements.core.*;
import io.nosqlbench.activitytype.cql.statements.modifiers.StatementModifier;
@ -38,14 +37,16 @@ import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.metrics.ExceptionCountMetrics;
import io.nosqlbench.engine.api.metrics.ExceptionHistoMetrics;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.metrics.ThreadLocalNamedTimers;
import io.nosqlbench.engine.api.templating.StrInterpolator;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.util.TagFilter;
import io.nosqlbench.engine.api.util.Unit;
import io.nosqlbench.nb.api.config.params.Element;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.FileWriter;
import java.io.IOException;
@ -157,6 +158,9 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
throw new RuntimeException("There were no unfiltered statements found for this activity.");
}
Set<String> timerStarts = new HashSet<>();
Set<String> timerStops = new HashSet<>();
for (OpTemplate stmtDef : stmts) {
ParsedStmt parsed = stmtDef.getParsed().orError();
@ -182,18 +186,20 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
String logresultcsv_act = getParams().getOptionalString("logresultcsv").orElse("");
if (!logresultcsv_act.isEmpty() && !logresultcsv_act.toLowerCase().equals("true")) {
if (!logresultcsv_act.isEmpty() && !logresultcsv_act.equalsIgnoreCase("true")) {
throw new RuntimeException("At the activity level, only logresultcsv=true is allowed, no other values.");
}
logresultcsv = !logresultcsv.isEmpty() ? logresultcsv : logresultcsv_act;
logresultcsv = !logresultcsv.toLowerCase().equals("true") ? logresultcsv : stmtDef.getName() + "--results.csv";
logresultcsv = !logresultcsv.equalsIgnoreCase("true") ? logresultcsv : stmtDef.getName() + "--results.csv";
logger.debug("readying statement[" + (prepared ? "" : "un") + "prepared]:" + parsed.getStmt());
ReadyCQLStatementTemplate template;
String stmtForDriver = parsed.getPositionalStatement(s -> "?");
if (prepared) {
psummary.append(" prepared=>").append(prepared);
psummary.append(" prepared=>true");
PreparedStatement prepare = getSession().prepare(stmtForDriver);
cl.ifPresent((conlvl) -> {
psummary.append(" consistency_level=>").append(conlvl);
@ -229,16 +235,40 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
simpleStatement.setIdempotent(i);
});
template = new ReadyCQLStatementTemplate(fconfig, getSession(), simpleStatement, ratio,
parsed.getName(), parameterized);
parsed.getName(), parameterized, null, null);
}
Element params = parsed.getParamReader();
params.get("start-timers", String.class)
.map(s -> s.split(", *"))
.map(Arrays::asList)
.orElse(List.of())
.stream()
.forEach(name -> {
ThreadLocalNamedTimers.addTimer(activityDef, name);
template.addTimerStart(name);
timerStarts.add(name);
});
params.get("stop-timers", String.class)
.map(s -> s.split(", *"))
.map(Arrays::asList)
.orElse(List.of())
.stream()
.forEach(name -> {
template.addTimerStop(name);
timerStops.add(name);
});
stmtDef.getOptionalStringParam("save")
.map(s -> s.split("[,: ]"))
.map(Save::new)
.ifPresent(save_op -> {
psummary.append(" save=>").append(save_op.toString());
template.addRowCycleOperators(save_op);
});
.map(s -> s.split("[,: ]"))
.map(Save::new)
.ifPresent(save_op -> {
psummary.append(" save=>").append(save_op.toString());
template.addRowCycleOperators(save_op);
});
stmtDef.getOptionalStringParam("rsoperators")
.map(s -> s.split(","))
@ -261,7 +291,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
if (instrument) {
logger.info("Adding per-statement success and error and resultset-size timers to statement '" + parsed.getName() + "'");
template.instrument(this);
psummary.append(" instrument=>").append(instrument);
psummary.append(" instrument=>true");
}
if (!logresultcsv.isEmpty()) {
@ -279,6 +309,11 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
planner.addOp(template.resolve(), ratio);
}
if (!timerStarts.equals(timerStops)) {
throw new BasicError("The names for timer-starts and timer-stops must be matched up. " +
"timer-starts:" + timerStarts + ", timer-stops:" + timerStops);
}
opsequence = planner.resolve();
}

View File

@ -7,10 +7,12 @@ import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import io.nosqlbench.activitytype.cql.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cql.api.RowCycleOperator;
import io.nosqlbench.engine.api.metrics.ThreadLocalNamedTimers;
import io.nosqlbench.virtdata.core.bindings.ContextualArrayBindings;
import java.io.IOException;
import java.io.Writer;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@ -19,8 +21,8 @@ import java.util.concurrent.TimeUnit;
*/
public class ReadyCQLStatement {
private String name;
private ContextualArrayBindings<?, Statement> contextualBindings;
private final String name;
private final ContextualArrayBindings<?, Statement> contextualBindings;
private long ratio;
private ResultSetCycleOperator[] resultSetOperators = null;
private RowCycleOperator[] rowCycleOperators = null;
@ -29,6 +31,8 @@ public class ReadyCQLStatement {
private Timer errorTimer;
private Histogram rowsFetchedHisto;
private Writer resultCsvWriter;
private List<String> startTimers;
private List<String> stopTimers;
public ReadyCQLStatement(ContextualArrayBindings<?, Statement> contextualBindings, long ratio, String name) {
this.contextualBindings = contextualBindings;
@ -98,26 +102,36 @@ public class ReadyCQLStatement {
this.ratio = ratio;
}
public void onStart() {
if (startTimers != null) {
ThreadLocalNamedTimers.TL_INSTANCE.get().start(startTimers);
}
}
/**
* This method should be called when an associated statement is executed successfully.
* @param cycleValue The cycle associated with the execution.
* @param nanoTime The nanoTime duration of the execution.
*
* @param cycleValue The cycle associated with the execution.
* @param nanoTime The nanoTime duration of the execution.
* @param rowsFetched The number of rows fetched for this cycle
*/
public void onSuccess(long cycleValue, long nanoTime, long rowsFetched) {
if (successTimer!=null) {
if (successTimer != null) {
successTimer.update(nanoTime, TimeUnit.NANOSECONDS);
}
if (rowsFetchedHisto!=null) {
if (stopTimers != null) {
ThreadLocalNamedTimers.TL_INSTANCE.get().stop(stopTimers);
}
if (rowsFetchedHisto != null) {
rowsFetchedHisto.update(rowsFetched);
}
if (resultCsvWriter!=null) {
if (resultCsvWriter != null) {
try {
synchronized(resultCsvWriter) {
synchronized (resultCsvWriter) {
// <cycle>,(SUCCESS|FAILURE),<nanos>,<rowsfetched>,<errorname>\n
resultCsvWriter
.append(String.valueOf(cycleValue)).append(",")
.append("SUCCESS,")
.append(String.valueOf(cycleValue)).append(",")
.append("SUCCESS,")
.append(String.valueOf(nanoTime)).append(",")
.append(String.valueOf(rowsFetched))
.append(",NONE")
@ -138,19 +152,22 @@ public class ReadyCQLStatement {
* @param t The associated throwable
*/
public void onError(long cycleValue, long resultNanos, Throwable t) {
if (errorTimer!=null) {
if (errorTimer != null) {
errorTimer.update(resultNanos, TimeUnit.NANOSECONDS);
}
if (resultCsvWriter!=null) {
if (stopTimers != null) {
ThreadLocalNamedTimers.TL_INSTANCE.get().stop(stopTimers);
}
if (resultCsvWriter != null) {
try {
synchronized(resultCsvWriter) {
synchronized (resultCsvWriter) {
// <cycle>,(SUCCESS|FAILURE),<nanos>,<rowsfetched>,<errorname>\n
resultCsvWriter
.append(String.valueOf(cycleValue)).append(",")
.append("FAILURE,")
.append(String.valueOf(resultNanos)).append(",")
.append("0,")
.append(t.getClass().getSimpleName()).append(",")
.append(String.valueOf(cycleValue)).append(",")
.append("FAILURE,")
.append(String.valueOf(resultNanos)).append(",")
.append("0,")
.append(t.getClass().getSimpleName()).append(",")
.append("\n");
}
} catch (IOException e) {
@ -179,4 +196,18 @@ public class ReadyCQLStatement {
this.resultCsvWriter = resultCsvWriter;
return this;
}
public ReadyCQLStatement withStartTimers(List<String> startTimers) {
this.startTimers = startTimers;
return this;
}
public ReadyCQLStatement withStopTimers(List<String> stopTimers) {
this.stopTimers = stopTimers;
return this;
}
public String toString() {
return "ReadyCQLStatement: " + contextualBindings.toString();
}
}

View File

@ -15,10 +15,12 @@ import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.bindings.ContextualBindingsArrayTemplate;
import io.nosqlbench.virtdata.core.bindings.ValuesArrayBinder;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ReadyCQLStatementTemplate {
@ -36,6 +38,8 @@ public class ReadyCQLStatementTemplate {
private Timer errorTimer;
private Histogram rowsFetchedHisto;
private Writer resultCsvWriter;
private List<String> startTimers;
private List<String> stopTimers;
public ReadyCQLStatementTemplate(Map<String, Object> fconfig, CqlBinderTypes binderType, Session session,
PreparedStatement preparedStmt, long ratio, String name) {
@ -45,14 +49,35 @@ public class ReadyCQLStatementTemplate {
logger.trace("Using binder_type=>" + binder.toString());
template = new ContextualBindingsArrayTemplate<>(
preparedStmt,
new BindingsTemplate(fconfig),
binder
preparedStmt,
new BindingsTemplate(fconfig),
binder
);
this.ratio = ratio;
}
public ReadyCQLStatementTemplate(Map<String, Object> fconfig, Session session, SimpleStatement simpleStatement, long ratio, String name, boolean parameterized) {
public void addTimerStart(String name) {
if (startTimers == null) {
startTimers = new ArrayList<>();
}
startTimers.add(name);
}
public void addTimerStop(String name) {
if (stopTimers == null) {
stopTimers = new ArrayList<>();
}
stopTimers.add(name);
}
public ReadyCQLStatementTemplate(
Map<String, Object> fconfig,
Session session,
SimpleStatement simpleStatement,
long ratio, String name,
boolean parameterized,
List<String> startTimers,
List<String> stopTimers) {
this.session = session;
this.name = name;
template = new ContextualBindingsArrayTemplate<>(
@ -65,10 +90,12 @@ public class ReadyCQLStatementTemplate {
public ReadyCQLStatement resolve() {
return new ReadyCQLStatement(template.resolveBindings(), ratio, name)
.withMetrics(this.successTimer, this.errorTimer, this.rowsFetchedHisto)
.withResultSetCycleOperators(resultSetCycleOperators)
.withRowCycleOperators(rowCycleOperators)
.withResultCsvWriter(resultCsvWriter);
.withMetrics(this.successTimer, this.errorTimer, this.rowsFetchedHisto)
.withResultSetCycleOperators(resultSetCycleOperators)
.withRowCycleOperators(rowCycleOperators)
.withResultCsvWriter(resultCsvWriter)
.withStartTimers(startTimers)
.withStopTimers(stopTimers);
}
public ContextualBindingsArrayTemplate<?, Statement> getContextualBindings() {

View File

@ -200,28 +200,29 @@ activity types.
default: 'driver.*clusterid*.'
The clusterid specified is included so that separate cluster and session
contexts can be reported independently for advanced tests.
- **usercodecs** - enable the loading of user codec libraries
for more details see: com.datastax.codecs.framework.UDTCodecInjector in the nosqlbench
code base. This is for dynamic codec loading with user-provided codecs mapped
via the internal UDT APIs.
default: false
- **secureconnectbundle** - used to connect to CaaS, accepts a path to the secure connect bundle
that is downloaded from the CaaS UI.
Examples:
- **usercodecs** - enable the loading of user codec libraries for more
details see: com.datastax.codecs.framework.UDTCodecInjector in the
nosqlbench code base. This is for dynamic codec loading with
user-provided codecs mapped via the internal UDT APIs. default: false
- **secureconnectbundle** - used to connect to CaaS, accepts a path to the
secure connect bundle that is downloaded from the CaaS UI. Examples:
- `secureconnectbundle=/tmp/secure-connect-my_db.zip`
- `secureconnectbundle="/home/automaton/secure-connect-my_db.zip"`
Check out [Astra Documentation](https://docs.astra.datastax.com/docs/test-loading-data-with-nosqlbench) for samples
- **insights** - Set to false to disable the driver from sending insights monitoring information
Check
out [Astra Documentation](https://docs.astra.datastax.com/docs/test-loading-data-with-nosqlbench)
for samples
- **insights** - Set to false to disable the driver from sending insights
monitoring information
- `insights=false`
- **tickduration** - sets the tickDuration (milliseconds) of HashedWheelTimer of the
java driver. This timer is used to schedule speculative requests.
Examples:
- **tickduration** - sets the tickDuration (milliseconds) of
HashedWheelTimer of the java driver. This timer is used to schedule
speculative requests. Examples:
- `tickduration=10`
- `tickduration=100` (driver default value)
- **compression** - sets the transport compression to use for this
activity. Valid values are 'LZ4' and 'SNAPPY'. Both types are bundled
activity. Valid values are 'LZ4' and 'SNAPPY'. Both types are bundled
with EBDSE.
- **showcql** - logs cql statements as INFO (to see INFO messages in stdout use -v or greater) Note: this is expensive
and should only be done to troubleshoot workloads. Do not use `showcql` for your tests.
@ -299,14 +300,48 @@ now **they are limited to a YAML params block**:
# applies to all statements, and the only value values
# there are true and false.
start-timers: timername1, timername2, ...
#
# If a statement has start-timers value set, then the named
# timers are started in the local thread before the
# statement is executed
#
# Together, with the stop-timers modifier, you can measure
# sequences of statements with specific named boundaries.
#
# The name of the timer is qualified with the activity alias
# just as all other metric names.
#
# This is generally only useful when the async= parameter is
# NOT used, since the scope of the timer is thread-local. When
# async is used, many operations may overlap each other in the
# same thread, breaking linearization guarantees which make
# thread local scoping helpful for tracking linearized operations.
#
# When a timer is started, a timer context is created and stored
# under this name in the thread. You must ensure that an
# associated stop-timers setting is applied to another statement
# in order to trigger the tally of these metrics.
stop-timers: timername1, timername2, ...
#
# If a statement has a stop-timers value set, then after the
# statement is finished, whether by error or by successful
# completion, the named timers are stopped and the resulting
# measurement is added to metrics.
#
# If you add stop-timers with names that do not have a matching
# start-timers name, or vice-versa then an error is thrown.
### Metrics
- alias.result - A timer which tracks the performance of an op result only.
This is the async get on the future, broken out as a separate step.
- alias.result-success - A timer that records rate and histograms of the time
it takes from submitting a query to completely reading the result
set that it returns, across all pages. This metric is only counted
for non-exceptional results, while the result metric above includes
- alias.result - A timer which tracks the performance of an op result
only. This is the async get on the future, broken out as a separate
step.
- alias.result-success - A timer that records rate and histograms of the
time it takes from submitting a query to completely reading the result
set that it returns, across all pages. This metric is only counted for
non-exceptional results, while the result metric above includes
all operations.
- alias.bind - A timer which tracks the performance of the statement
binding logic, including the generation of data immediately prior

View File

@ -0,0 +1,53 @@
package io.nosqlbench.engine.api.metrics;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Auxiliary thread-local metrics for an activity which are tracked by name.
*/
public class ThreadLocalNamedTimers {
private final static Logger logger = LogManager.getLogger(ThreadLocalNamedTimers.class);
public final static ThreadLocal<ThreadLocalNamedTimers> TL_INSTANCE = ThreadLocal.withInitial(ThreadLocalNamedTimers::new);
private final static Map<String, Timer> timers = new HashMap<>();
private final Map<String, Timer.Context> contexts = new HashMap<>();
public static void addTimer(ActivityDef def, String name) {
if (timers.containsKey("name")) {
logger.warn("A timer named '" + name + "' was already defined and initialized.");
}
Timer timer = ActivityMetrics.timer(def, name);
timers.put(name, timer);
}
public void start(String name) {
Timer.Context context = timers.get(name).time();
contexts.put(name, context);
}
public void stop(String name) {
Timer.Context context = contexts.get(name);
context.stop();
}
public void start(List<String> timerName) {
for (String startTimer : timerName) {
start(startTimer);
}
}
public void stop(List<String> timerName) {
for (String stopTimer : timerName) {
stop(stopTimer);
}
}
}

View File

@ -15,8 +15,8 @@ package io.nosqlbench.virtdata.core.bindings;
public class ContextualArrayBindings<C, R> implements Binder<R> {
private final C context;
private Bindings bindings;
private ValuesArrayBinder<C, R> valuesArrayBinder;
private final Bindings bindings;
private final ValuesArrayBinder<C, R> valuesArrayBinder;
public ContextualArrayBindings(Bindings bindings, C context, ValuesArrayBinder<C, R> valuesArrayBinder) {
this.bindings = bindings;
@ -40,6 +40,9 @@ public class ContextualArrayBindings<C, R> implements Binder<R> {
} catch (Exception e) {
throw new RuntimeException("Binding error:" + bindings.getTemplate().toString(allGeneratedValues), e);
}
}
public String toString() {
return "context: '" + this.context.toString() + "' bindings:'" + this.bindings.toString() + "'";
}
}