mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Cqld4 improvements
This commit is contained in:
parent
0f8a05ab45
commit
2f09e836de
@ -0,0 +1,26 @@
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
|
||||
import io.nosqlbench.driver.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedCommand;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
@Service(value = DriverAdapter.class,selector = "cqld4")
|
||||
public class Cqld4DriverAdapter extends BaseDriverAdapter<Cqld4Op,Cqld4Space> {
|
||||
|
||||
private Cqld4SpaceCache sessionCache;
|
||||
|
||||
@Override
|
||||
public Function<ParsedCommand, OpDispenser<Cqld4Op>> getOpMapper() {
|
||||
return new Cqld4OpMapper(getSpaceCache());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<String, ? extends Cqld4Space> getSpaceInitializer() {
|
||||
return s -> new Cqld4Space(this);
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.adapter.cqld4.opdispensers.CqlD4PreparedBatchOpDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4BatchStatementDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4PreparedOpDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4SimpleCqlStatementDispenser;
|
||||
import io.nosqlbench.driver.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.templating.ParsedCommand;
|
||||
import io.nosqlbench.nb.api.errors.BasicError;
|
||||
|
||||
public class Cqld4OpMapper implements OpMapper<Cqld4Op> {
|
||||
|
||||
|
||||
private final DriverSpaceCache<? extends Cqld4Space> cache;
|
||||
|
||||
public Cqld4OpMapper(DriverSpaceCache<? extends Cqld4Space> cache) {
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
public OpDispenser<Cqld4Op> apply(ParsedCommand cmd) {
|
||||
|
||||
// if session field = static string, else ...
|
||||
|
||||
boolean prepared = cmd.getStaticValueOr("prepared",false);
|
||||
boolean batch = cmd.getStaticValueOr("boolean",false);
|
||||
|
||||
if (cmd.isDefinedDynamic("session")) {
|
||||
throw new BasicError("This driver adapter does not support dynamic sessions.");
|
||||
}
|
||||
// If it did, we would use something like this instead...
|
||||
// LongFunction<String> session = cmd.getAsFunctionOr("session", "default");
|
||||
|
||||
Cqld4Space cqld4Space = cache.get(cmd.getStaticValueOr("session", "default"));
|
||||
CqlSession session = cqld4Space.getSession();
|
||||
|
||||
if (prepared && batch) {
|
||||
return new CqlD4PreparedBatchOpDispenser(session,cmd);
|
||||
} else if (prepared) {
|
||||
return new Cqld4PreparedOpDispenser(session,cmd);
|
||||
} else if (batch) {
|
||||
return new Cqld4BatchStatementDispenser(session, cmd);
|
||||
} else {
|
||||
return new Cqld4SimpleCqlStatementDispenser(session,cmd);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.driver.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedCommand;
|
||||
|
||||
public class CqlD4PreparedBatchOpDispenser implements OpDispenser<Cqld4Op> {
|
||||
|
||||
private final CqlSession session;
|
||||
private final ParsedCommand cmd;
|
||||
|
||||
public CqlD4PreparedBatchOpDispenser(CqlSession session, ParsedCommand cmd) {
|
||||
this.session = session;
|
||||
this.cmd = cmd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4Op apply(long value) {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package io.nosqlbench.driver.cqld4.opdispensers;
|
||||
package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.driver.cqld4.Cqld4Op;
|
||||
@ -18,4 +18,5 @@ public class Cqld4BatchStatementDispenser implements OpDispenser<Cqld4Op> {
|
||||
public Cqld4Op apply(long value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.driver.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedCommand;
|
||||
|
||||
public class Cqld4PreparedOpDispenser implements OpDispenser<Cqld4Op> {
|
||||
|
||||
private final CqlSession session;
|
||||
private final ParsedCommand cmd;
|
||||
|
||||
public Cqld4PreparedOpDispenser(CqlSession session, ParsedCommand cmd) {
|
||||
|
||||
this.session = session;
|
||||
this.cmd = cmd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4Op apply(long value) {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -1,13 +1,13 @@
|
||||
package io.nosqlbench.driver.cqld4.opdispensers;
|
||||
package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import io.nosqlbench.driver.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.driver.cqld4.optypes.Cqld4SimpleCqlStatement;
|
||||
import io.nosqlbench.adapter.optypes.Cqld4SimpleCqlStatement;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedCommand;
|
||||
|
||||
public class Cqld4SimpleCqlStatementDispenser implements OpDispenser<Cqld4Op> {
|
||||
public class Cqld4SimpleCqlStatementDispenser implements OpDispenser<Cqld4Op> {
|
||||
|
||||
private final CqlSession session;
|
||||
private final ParsedCommand cmd;
|
@ -0,0 +1,10 @@
|
||||
package io.nosqlbench.adapter.optypes;
|
||||
|
||||
import io.nosqlbench.driver.cqld4.Cqld4Op;
|
||||
|
||||
public class Cqld4BatchStatement extends Cqld4Op {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
package io.nosqlbench.adapter.optypes;
|
||||
|
||||
import io.nosqlbench.driver.cqld4.Cqld4Op;
|
||||
|
||||
public class Cqld4PreparedStatement extends Cqld4Op {
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package io.nosqlbench.driver.cqld4.optypes;
|
||||
package io.nosqlbench.adapter.optypes;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
@ -1,88 +0,0 @@
|
||||
package io.nosqlbench.driver.cqld4;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
|
||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class Cqld4Action implements SyncAction, ActivityDefObserver {
|
||||
|
||||
private final int slot;
|
||||
private final Cqld4Activity activity;
|
||||
|
||||
private Timer bindTimer;
|
||||
private Timer executeTimer;
|
||||
private Timer resultTimer;
|
||||
private Timer resultSuccessTimer;
|
||||
private Histogram triesHisto;
|
||||
private int maxTries;
|
||||
|
||||
|
||||
public Cqld4Action(int slot, Cqld4Activity activity) {
|
||||
this.slot = slot;
|
||||
this.activity = activity;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
|
||||
this.executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer();
|
||||
this.resultTimer = activity.getInstrumentation().getOrCreateResultTimer();
|
||||
this.resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer();
|
||||
this.triesHisto = activity.getInstrumentation().getOrCreateTriesHistogram();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int runCycle(long cycle) {
|
||||
|
||||
Cqld4Op cql4op;
|
||||
try (Timer.Context ctx = bindTimer.time()) {
|
||||
OpDispenser<Cqld4Op> opDispenser = activity.getSequence().apply(cycle);
|
||||
cql4op = opDispenser.apply(cycle);
|
||||
}
|
||||
|
||||
int tries = 0;
|
||||
int result = 0;
|
||||
Exception error=null;
|
||||
while (tries < maxTries) {
|
||||
tries++;
|
||||
long startat = System.nanoTime();
|
||||
try {
|
||||
try (Timer.Context ctx = executeTimer.time()) {
|
||||
cql4op.run();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
error=e;
|
||||
} finally {
|
||||
long nanos = System.nanoTime() - startat;
|
||||
resultTimer.update(nanos, TimeUnit.NANOSECONDS);
|
||||
|
||||
if (error==null) {
|
||||
resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
|
||||
break;
|
||||
} else {
|
||||
ErrorDetail detail = activity.getErrorhandler().handleError(error,cycle,nanos);
|
||||
result = detail.resultCode;
|
||||
if (!detail.isRetryable()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
triesHisto.update(tries);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
|
||||
}
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
package io.nosqlbench.driver.cqld4;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.Action;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
|
||||
/**
|
||||
* A new driver which use the Apache Cassandra CQL driver version 4.
|
||||
* To differentiate code in this module from the initial CQL driver,
|
||||
* all classes are prefixed with Cqld4. Full docs are in the cqld4.md file.
|
||||
*/
|
||||
@Service(value=ActivityType.class,selector = "cqld4")
|
||||
public class Cqld4ActivityType implements ActivityType<Cqld4Activity> {
|
||||
|
||||
@Override
|
||||
public Cqld4Activity getActivity(ActivityDef activityDef) {
|
||||
return new Cqld4Activity(activityDef);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionDispenser getActionDispenser(Cqld4Activity activity) {
|
||||
if (activity.getParams().getOptionalString("async").isPresent()) {
|
||||
throw new RuntimeException("This driver does not support async mode yet.");
|
||||
}
|
||||
return new Cqld4ActionDispenser(activity);
|
||||
}
|
||||
|
||||
private final static class Cqld4ActionDispenser implements ActionDispenser {
|
||||
|
||||
private final Cqld4Activity activity;
|
||||
|
||||
public Cqld4ActionDispenser(Cqld4Activity activity) {
|
||||
this.activity = activity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Action getAction(int slot) {
|
||||
return new Cqld4Action(slot, activity);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,39 +0,0 @@
|
||||
package io.nosqlbench.driver.cqld4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.driver.cqld4.opdispensers.Cqld4BatchStatementDispenser;
|
||||
import io.nosqlbench.driver.cqld4.opdispensers.Cqld4PreparedStatementDispenser;
|
||||
import io.nosqlbench.driver.cqld4.opdispensers.Cqld4SimpleCqlStatementDispenser;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedCommand;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
public class Cqld4OpMapper implements Function<OpTemplate, OpDispenser<Cqld4Op>> {
|
||||
|
||||
private final Function<OpTemplate, OpDispenser<Cqld4Op>> templateToDispenser;
|
||||
private final CqlSession session;
|
||||
|
||||
public Cqld4OpMapper(CqlSession session, OpTemplate optpl) {
|
||||
this.session = session;
|
||||
ParsedCommand cmd = new ParsedCommand(optpl);
|
||||
templateToDispenser = resolve(cmd);
|
||||
}
|
||||
|
||||
private Function<OpTemplate, OpDispenser<Cqld4Op>> resolve(ParsedCommand cmd) {
|
||||
if (cmd.isDefinedStatic("prepared") && cmd.getStaticValue("prepared", boolean.class)) {
|
||||
return new Cqld4PreparedStatementDispenser(cmd);
|
||||
} else if (cmd.isDefinedStatic("batch")) {
|
||||
return ot -> new Cqld4BatchStatementDispenser(session, cmd);
|
||||
} else {
|
||||
return ot -> new Cqld4SimpleCqlStatementDispenser(session, cmd);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpDispenser<Cqld4Op> apply(OpTemplate opTemplate) {
|
||||
return templateToDispenser.apply(opTemplate);
|
||||
|
||||
}
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
package io.nosqlbench.driver.cqld4.opdispensers;
|
||||
|
||||
import io.nosqlbench.driver.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedCommand;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
public class Cqld4PreparedStatementDispenser implements Function<OpTemplate, OpDispenser<Cqld4Op>> {
|
||||
public Cqld4PreparedStatementDispenser(ParsedCommand cmd) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpDispenser<Cqld4Op> apply(OpTemplate opTemplate) {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -1,4 +0,0 @@
|
||||
package io.nosqlbench.driver.cqld4.optypes;
|
||||
|
||||
public class Cqld4BatchStatement {
|
||||
}
|
@ -1,4 +0,0 @@
|
||||
package io.nosqlbench.driver.cqld4.optypes;
|
||||
|
||||
public class Cqld4PreparedStatement {
|
||||
}
|
Loading…
Reference in New Issue
Block a user